from __future__ import annotations """ Visible Learning – Netzwerkanalyse (Systemebenen × Thermometer) --------------------------------------------------------------- CI: wie in den bestehenden Skripten (plotly_template) Daten: Thermometer.csv (Pflichtspalten: Thermometer_ID, Stichwort, Effektstärke, Subkapitel, Kapitelname, Systemebene) Modell: - Bipartites Netzwerk: Systemebene (psychisch/sozial) ↔ Item (Thermometer) - Kantengewicht = Effektstärke (Vorzeichen beibehalten), Breite ~ |d| - Knoten-Infos im Hover: ID, Stichwort, Kapitel/Subkapitel, d - Optional: Filter nach |d| (min_abs_d) und Kapiteln/Subkapiteln Exports: - PNG/HTML (gemäß config) - JSON: nodes/edges + einfache Zentralitäten (weighted degree) """ # ----------------------------------------- # Imports # ----------------------------------------- import os import json import math import pandas as pd import numpy as np import plotly.graph_objs as go import plotly.io as pio import networkx as nx # ----------------------------------------- # Konfiguration laden # ----------------------------------------- from config_visible_learning import ( csv_file, export_fig_visual, export_fig_png, theme, ) # ----------------------------------------- # Template/CI # ----------------------------------------- try: from ci_template import plotly_template plotly_template.set_theme(theme) _ci_layout = lambda title: plotly_template.get_standard_layout(title=title, x_title="", y_title="") _styles = plotly_template.get_plot_styles() _colors = plotly_template.get_colors() except Exception: # Minimaler Fallback, falls Template nicht verfügbar ist _ci_layout = lambda title: dict(title=title) _styles = {} _colors = {} # ----------------------------------------- # Export-Helfer # ----------------------------------------- EXPORT_DIR = os.path.join(os.path.dirname(__file__), "export") os.makedirs(EXPORT_DIR, exist_ok=True) def export_figure(fig, name: str): base = os.path.join(EXPORT_DIR, name) if export_fig_visual: pio.write_html(fig, file=f"{base}.html", auto_open=False, include_plotlyjs="cdn") if export_fig_png: try: pio.write_image(fig, f"{base}.png", scale=2) except Exception: pass def export_json(obj: dict, name: str): try: with open(os.path.join(EXPORT_DIR, name), "w", encoding="utf-8") as f: json.dump(obj, f, ensure_ascii=False, indent=2) except Exception: pass # ----------------------------------------- # Daten laden # ----------------------------------------- REQUIRED_COLS = ["Thermometer_ID", "Stichwort", "Effektstärke", "Subkapitel", "Kapitelname", "Systemebene"] def load_data(path: str) -> pd.DataFrame: df = pd.read_csv(path) missing = [c for c in REQUIRED_COLS if c not in df.columns] if missing: raise ValueError(f"Fehlende Spalten in CSV: {missing}") # Effektstärke robust nach float df["Effektstärke"] = ( df["Effektstärke"].astype(str).str.replace(",", ".", regex=False).str.strip() ) df["Effektstärke"] = pd.to_numeric(df["Effektstärke"], errors="coerce") df = df.dropna(subset=["Effektstärke"]) # Prüfung: unspezifische Systemebenen invalid_systems = df[~df["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])] if not invalid_systems.empty: print("WARNUNG: Unspezifische Systemebenen gefunden:") print(invalid_systems[["Thermometer_ID", "Stichwort", "Systemebene"]].to_string(index=False)) # Kapitelnummer aus ID (optional nützlich) try: df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int) except Exception: df["Kapitel"] = None return df # ----------------------------------------- # Netzwerk bauen # ----------------------------------------- def build_bipartite_graph( df: pd.DataFrame, min_abs_d: float = 0.00, kapitel_filter: list[int] | None = None, subkapitel_filter: list[str] | None = None, ) -> nx.Graph: data = df.copy() # Filter if kapitel_filter: data = data[data["Kapitel"].isin(kapitel_filter)] if subkapitel_filter: data = data[data["Subkapitel"].isin(subkapitel_filter)] if min_abs_d > 0: data = data[data["Effektstärke"].abs() >= float(min_abs_d)] # Nur gültige Systemebenen data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])] G = nx.Graph() # Systemknoten (part A) systems = sorted(data["Systemebene"].str.lower().unique().tolist()) for s in systems: G.add_node( f"system::{s}", bipartite="system", label=s.capitalize(), typ="System", ) # Itemknoten + Kanten (part B) for _, r in data.iterrows(): sys_key = f"system::{str(r['Systemebene']).lower()}" item_key = f"item::{r['Thermometer_ID']}" # Item node G.add_node( item_key, bipartite="item", label=str(r["Stichwort"]), id=str(r["Thermometer_ID"]), d=float(r["Effektstärke"]), kapitelname=str(r["Kapitelname"]), subkapitel=str(r["Subkapitel"]), ) # Edge: Gewicht = Effektstärke (Vorzeichen beibehalten) G.add_edge( sys_key, item_key, weight=float(r["Effektstärke"]), sign="pos" if r["Effektstärke"] >= 0 else "neg" ) return G # ----------------------------------------- # Layout & Visualisierung (Plotly) # ----------------------------------------- def _edge_segments(G: nx.Graph, pos: dict[str, tuple[float, float]], sign: str | None = None): """Erzeugt x,y-Koordinaten-Listen für Liniensegmente (mit None-Trennern). Optional nach Vorzeichen filtern.""" xs, ys = [], [] for u, v, d in G.edges(data=True): if sign and d.get("sign") != sign: continue x0, y0 = pos[u] x1, y1 = pos[v] xs += [x0, x1, None] ys += [y0, y1, None] return xs, ys def plot_network(G: nx.Graph, title: str = "Netzwerk: Systemebenen × Thermometer", seed: int = 42): # Spring-Layout (reproduzierbar über seed) pos = nx.spring_layout(G, seed=seed, k=None, weight="weight") # Knoten nach Typ trennen system_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"] item_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"] # Edges (pos/neg) als eigene Traces (Linienstile aus CI) x_pos, y_pos = _edge_segments(G, pos, sign="pos") x_neg, y_neg = _edge_segments(G, pos, sign="neg") line_primary = _styles.get("linie_primaryLine", dict(width=1)) line_secondary = _styles.get("linie_secondaryLine", dict(width=1)) edge_pos = go.Scatter( x=x_pos, y=y_pos, mode="lines", line=line_primary, hoverinfo="skip", showlegend=True, name="Kanten (d ≥ 0)" ) edge_neg = go.Scatter( x=x_neg, y=y_neg, mode="lines", line=line_secondary, hoverinfo="skip", showlegend=True, name="Kanten (d < 0)" ) # System-Knoten: Marker aus CI (z. B. accent) sys_marker = _styles.get("marker_accent", dict(size=18)) sys_x = [pos[n][0] for n in system_nodes] sys_y = [pos[n][1] for n in system_nodes] sys_text = [G.nodes[n].get("label", n) for n in system_nodes] sys_hover = [f"Systemebene: {G.nodes[n].get('label','')}" for n in system_nodes] systems_trace = go.Scatter( x=sys_x, y=sys_y, mode="markers", marker={**sys_marker, "size": 18}, text=sys_text, hovertext=sys_hover, hovertemplate="%{hovertext}", name="System" ) # Item-Knoten: Marker aus CI (z. B. brightArea); Größe ~ |degree_weight| item_marker = _styles.get("marker_brightArea", dict(size=10)) it_x = [pos[n][0] for n in item_nodes] it_y = [pos[n][1] for n in item_nodes] # Gewichtete Degree als Größe wdeg = [] htxt = [] for n in item_nodes: dsum = 0.0 for nbr in G[n]: dsum += abs(G[n][nbr].get("weight", 0.0)) wdeg.append(dsum) nd = G.nodes[n] htxt.append( "Thermometer: " + str(nd.get("id","")) + "
Stichwort: " + str(nd.get("label","")) + "
Kapitel: " + str(nd.get("kapitelname","")) + "
Subkapitel: " + str(nd.get("subkapitel","")) + "
d: " + f"{nd.get('d',np.nan):.2f}" ) # Größen skalieren wdeg = np.asarray(wdeg, dtype=float) if wdeg.size and np.nanmax(wdeg) > 0: sizes = 8 + 12 * (wdeg / np.nanmax(wdeg)) else: sizes = np.full_like(wdeg, 10) items_trace = go.Scatter( x=it_x, y=it_y, mode="markers", marker={**item_marker, "size": sizes}, hovertext=htxt, hovertemplate="%{hovertext}", name="Thermometer" ) fig = go.Figure(data=[edge_pos, edge_neg, systems_trace, items_trace]) fig.update_layout(_ci_layout(title)) # Achsen & Grid neutral halten, keine Beschriftungen im Plot (alles im Hover) fig.update_xaxes(showticklabels=False, showgrid=False, zeroline=False) fig.update_yaxes(showticklabels=False, showgrid=False, zeroline=False) fig.show() export_figure(fig, "vl-network") # ----------------------------------------- # Einfache Metriken & Export # ----------------------------------------- def summarize_network(G: nx.Graph) -> dict: # weighted degree je Knoten wdeg = {} for n in G.nodes(): s = 0.0 for nbr in G[n]: s += abs(G[n][nbr].get("weight", 0.0)) wdeg[n] = float(s) # Top-Items nach gewichteter Degree items = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "item"] items_sorted = sorted(items, key=lambda t: t[1], reverse=True)[:15] top_items = [] for n, val in items_sorted: nd = G.nodes[n] top_items.append({ "Thermometer_ID": nd.get("id"), "Stichwort": nd.get("label"), "Kapitelname": nd.get("kapitelname"), "Subkapitel": nd.get("subkapitel"), "Effektstärke": nd.get("d"), "weighted_degree_abs": val }) # Systemseiten-Summe systems = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "system"] system_summary = {G.nodes[n].get("label", n): float(val) for n, val in systems} return {"top_items_by_weighted_degree": top_items, "system_weight_sums": system_summary} # ----------------------------------------- # Pipeline # ----------------------------------------- def run_network_analysis( csv_path: str, min_abs_d: float = 0.00, kapitel_filter: list[int] | None = None, subkapitel_filter: list[str] | None = None, seed: int = 42 ): df = load_data(csv_path) # Datenqualität knapp loggen print(f"Rows: {len(df)} | min d = {df['Effektstärke'].min():.2f} | max d = {df['Effektstärke'].max():.2f}") print("Systemebenen:", df["Systemebene"].dropna().unique().tolist()) if kapitel_filter: print("Kapitel-Filter:", kapitel_filter) if subkapitel_filter: print("Subkapitel-Filter:", subkapitel_filter) if min_abs_d > 0: print(f"Filter |d| ≥ {min_abs_d:.2f}") G = build_bipartite_graph(df, min_abs_d=min_abs_d, kapitel_filter=kapitel_filter, subkapitel_filter=subkapitel_filter) if G.number_of_nodes() == 0 or G.number_of_edges() == 0: print("Hinweis: Nach Filtern keine Knoten/Kanten – bitte Filter anpassen.") return plot_network(G, title="Netzwerk: Systemebenen × Thermometer (Kanten: Effektstärke)", seed=seed) summary = summarize_network(G) print("\nSystemgewicht-Summen:", summary["system_weight_sums"]) print("\nTop-Items (weighted degree):") for r in summary["top_items_by_weighted_degree"]: print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={r['Effektstärke']:.2f} | wd={r['weighted_degree_abs']:.2f}") # Export JSON payload = { "meta": { "theme": theme, "min_abs_d": float(min_abs_d), "kapitel_filter": kapitel_filter, "subkapitel_filter": subkapitel_filter }, "nodes": [ { "id": n, "label": G.nodes[n].get("label", ""), "type": G.nodes[n].get("bipartite", ""), "Thermometer_ID": G.nodes[n].get("id"), "Kapitelname": G.nodes[n].get("kapitelname"), "Subkapitel": G.nodes[n].get("subkapitel"), "Effektstärke": G.nodes[n].get("d") } for n in G.nodes() ], "edges": [ { "source": u, "target": v, "weight": float(d.get("weight", 0.0)), "sign": d.get("sign", "") } for u, v, d in G.edges(data=True) ], "summary": summary } export_json(payload, "network_systemebenen.json") # ----------------------------------------- # Main # ----------------------------------------- if __name__ == "__main__": # Beispiel: keine Filter, aber du kannst unten einfach drehen: # - min_abs_d=0.10 (macht das Netz ruhiger) # - kapitel_filter=[5,6,7] oder subkapitel_filter=["Fähigkeiten", ...] run_network_analysis( csv_path=os.path.join(os.path.dirname(__file__), csv_file), min_abs_d=0.00, kapitel_filter=None, subkapitel_filter=None, seed=42 )