from __future__ import annotations
"""
Visible Learning – Netzwerkanalyse (Systemebenen × Thermometer)
---------------------------------------------------------------
CI: wie in den bestehenden Skripten (plotly_template)
Daten: Thermometer.csv (Pflichtspalten: Thermometer_ID, Stichwort, Effektstärke, Subkapitel, Kapitelname, Systemebene)
Modell:
- Bipartites Netzwerk: Systemebene (psychisch/sozial) ↔ Item (Thermometer)
- Kantengewicht = Effektstärke (Vorzeichen beibehalten), Breite ~ |d|
- Knoten-Infos im Hover: ID, Stichwort, Kapitel/Subkapitel, d
- Optional: Filter nach |d| (min_abs_d) und Kapiteln/Subkapiteln
Exports:
- PNG/HTML (gemäß config)
- JSON: nodes/edges + einfache Zentralitäten (weighted degree)
"""
# -----------------------------------------
# Imports
# -----------------------------------------
import os
import json
import math
import pandas as pd
import numpy as np
import plotly.graph_objs as go
import plotly.io as pio
import networkx as nx
# -----------------------------------------
# Konfiguration laden
# -----------------------------------------
from config_visible_learning import (
csv_file,
export_fig_visual,
export_fig_png,
theme,
)
# -----------------------------------------
# Template/CI
# -----------------------------------------
try:
from ci_template import plotly_template
plotly_template.set_theme(theme)
_ci_layout = lambda title: plotly_template.get_standard_layout(title=title, x_title="", y_title="")
_styles = plotly_template.get_plot_styles()
_colors = plotly_template.get_colors()
except Exception:
# Minimaler Fallback, falls Template nicht verfügbar ist
_ci_layout = lambda title: dict(title=title)
_styles = {}
_colors = {}
# -----------------------------------------
# Export-Helfer
# -----------------------------------------
EXPORT_DIR = os.path.join(os.path.dirname(__file__), "export")
os.makedirs(EXPORT_DIR, exist_ok=True)
def export_figure(fig, name: str):
base = os.path.join(EXPORT_DIR, name)
if export_fig_visual:
pio.write_html(fig, file=f"{base}.html", auto_open=False, include_plotlyjs="cdn")
if export_fig_png:
try:
pio.write_image(fig, f"{base}.png", scale=2)
except Exception:
pass
def export_json(obj: dict, name: str):
try:
with open(os.path.join(EXPORT_DIR, name), "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
except Exception:
pass
# -----------------------------------------
# Daten laden
# -----------------------------------------
REQUIRED_COLS = ["Thermometer_ID", "Stichwort", "Effektstärke", "Subkapitel", "Kapitelname", "Systemebene"]
def load_data(path: str) -> pd.DataFrame:
df = pd.read_csv(path)
missing = [c for c in REQUIRED_COLS if c not in df.columns]
if missing:
raise ValueError(f"Fehlende Spalten in CSV: {missing}")
# Effektstärke robust nach float
df["Effektstärke"] = (
df["Effektstärke"].astype(str).str.replace(",", ".", regex=False).str.strip()
)
df["Effektstärke"] = pd.to_numeric(df["Effektstärke"], errors="coerce")
df = df.dropna(subset=["Effektstärke"])
# Prüfung: unspezifische Systemebenen
invalid_systems = df[~df["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
if not invalid_systems.empty:
print("WARNUNG: Unspezifische Systemebenen gefunden:")
print(invalid_systems[["Thermometer_ID", "Stichwort", "Systemebene"]].to_string(index=False))
# Kapitelnummer aus ID (optional nützlich)
try:
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
df["Kapitel"] = None
return df
# -----------------------------------------
# Netzwerk bauen
# -----------------------------------------
def build_bipartite_graph(
df: pd.DataFrame,
min_abs_d: float = 0.00,
kapitel_filter: list[int] | None = None,
subkapitel_filter: list[str] | None = None,
) -> nx.Graph:
data = df.copy()
# Filter
if kapitel_filter:
data = data[data["Kapitel"].isin(kapitel_filter)]
if subkapitel_filter:
data = data[data["Subkapitel"].isin(subkapitel_filter)]
if min_abs_d > 0:
data = data[data["Effektstärke"].abs() >= float(min_abs_d)]
# Nur gültige Systemebenen
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
G = nx.Graph()
# Systemknoten (part A)
systems = sorted(data["Systemebene"].str.lower().unique().tolist())
for s in systems:
G.add_node(
f"system::{s}",
bipartite="system",
label=s.capitalize(),
typ="System",
)
# Itemknoten + Kanten (part B)
for _, r in data.iterrows():
sys_key = f"system::{str(r['Systemebene']).lower()}"
item_key = f"item::{r['Thermometer_ID']}"
# Item node
G.add_node(
item_key,
bipartite="item",
label=str(r["Stichwort"]),
id=str(r["Thermometer_ID"]),
d=float(r["Effektstärke"]),
kapitelname=str(r["Kapitelname"]),
subkapitel=str(r["Subkapitel"]),
)
# Edge: Gewicht = Effektstärke (Vorzeichen beibehalten)
G.add_edge(
sys_key, item_key,
weight=float(r["Effektstärke"]),
sign="pos" if r["Effektstärke"] >= 0 else "neg"
)
return G
# -----------------------------------------
# Layout & Visualisierung (Plotly)
# -----------------------------------------
def _edge_segments(G: nx.Graph, pos: dict[str, tuple[float, float]], sign: str | None = None):
"""Erzeugt x,y-Koordinaten-Listen für Liniensegmente (mit None-Trennern). Optional nach Vorzeichen filtern."""
xs, ys = [], []
for u, v, d in G.edges(data=True):
if sign and d.get("sign") != sign:
continue
x0, y0 = pos[u]
x1, y1 = pos[v]
xs += [x0, x1, None]
ys += [y0, y1, None]
return xs, ys
def plot_network(G: nx.Graph, title: str = "Netzwerk: Systemebenen × Thermometer", seed: int = 42):
# Spring-Layout (reproduzierbar über seed)
pos = nx.spring_layout(G, seed=seed, k=None, weight="weight")
# Knoten nach Typ trennen
system_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
item_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
# Edges (pos/neg) als eigene Traces (Linienstile aus CI)
x_pos, y_pos = _edge_segments(G, pos, sign="pos")
x_neg, y_neg = _edge_segments(G, pos, sign="neg")
line_primary = _styles.get("linie_primaryLine", dict(width=1))
line_secondary = _styles.get("linie_secondaryLine", dict(width=1))
edge_pos = go.Scatter(
x=x_pos, y=y_pos,
mode="lines",
line=line_primary,
hoverinfo="skip",
showlegend=True,
name="Kanten (d ≥ 0)"
)
edge_neg = go.Scatter(
x=x_neg, y=y_neg,
mode="lines",
line=line_secondary,
hoverinfo="skip",
showlegend=True,
name="Kanten (d < 0)"
)
# System-Knoten: Marker aus CI (z. B. accent)
sys_marker = _styles.get("marker_accent", dict(size=18))
sys_x = [pos[n][0] for n in system_nodes]
sys_y = [pos[n][1] for n in system_nodes]
sys_text = [G.nodes[n].get("label", n) for n in system_nodes]
sys_hover = [f"Systemebene: {G.nodes[n].get('label','')}" for n in system_nodes]
systems_trace = go.Scatter(
x=sys_x, y=sys_y, mode="markers",
marker={**sys_marker, "size": 18},
text=sys_text,
hovertext=sys_hover,
hovertemplate="%{hovertext}",
name="System"
)
# Item-Knoten: Marker aus CI (z. B. brightArea); Größe ~ |degree_weight|
item_marker = _styles.get("marker_brightArea", dict(size=10))
it_x = [pos[n][0] for n in item_nodes]
it_y = [pos[n][1] for n in item_nodes]
# Gewichtete Degree als Größe
wdeg = []
htxt = []
for n in item_nodes:
dsum = 0.0
for nbr in G[n]:
dsum += abs(G[n][nbr].get("weight", 0.0))
wdeg.append(dsum)
nd = G.nodes[n]
htxt.append(
"Thermometer: "
+ str(nd.get("id",""))
+ "
Stichwort: "
+ str(nd.get("label",""))
+ "
Kapitel: "
+ str(nd.get("kapitelname",""))
+ "
Subkapitel: "
+ str(nd.get("subkapitel",""))
+ "
d: "
+ f"{nd.get('d',np.nan):.2f}"
)
# Größen skalieren
wdeg = np.asarray(wdeg, dtype=float)
if wdeg.size and np.nanmax(wdeg) > 0:
sizes = 8 + 12 * (wdeg / np.nanmax(wdeg))
else:
sizes = np.full_like(wdeg, 10)
items_trace = go.Scatter(
x=it_x, y=it_y, mode="markers",
marker={**item_marker, "size": sizes},
hovertext=htxt,
hovertemplate="%{hovertext}",
name="Thermometer"
)
fig = go.Figure(data=[edge_pos, edge_neg, systems_trace, items_trace])
fig.update_layout(_ci_layout(title))
# Achsen & Grid neutral halten, keine Beschriftungen im Plot (alles im Hover)
fig.update_xaxes(showticklabels=False, showgrid=False, zeroline=False)
fig.update_yaxes(showticklabels=False, showgrid=False, zeroline=False)
fig.show()
export_figure(fig, "vl-network")
# -----------------------------------------
# Einfache Metriken & Export
# -----------------------------------------
def summarize_network(G: nx.Graph) -> dict:
# weighted degree je Knoten
wdeg = {}
for n in G.nodes():
s = 0.0
for nbr in G[n]:
s += abs(G[n][nbr].get("weight", 0.0))
wdeg[n] = float(s)
# Top-Items nach gewichteter Degree
items = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
items_sorted = sorted(items, key=lambda t: t[1], reverse=True)[:15]
top_items = []
for n, val in items_sorted:
nd = G.nodes[n]
top_items.append({
"Thermometer_ID": nd.get("id"),
"Stichwort": nd.get("label"),
"Kapitelname": nd.get("kapitelname"),
"Subkapitel": nd.get("subkapitel"),
"Effektstärke": nd.get("d"),
"weighted_degree_abs": val
})
# Systemseiten-Summe
systems = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
system_summary = {G.nodes[n].get("label", n): float(val) for n, val in systems}
return {"top_items_by_weighted_degree": top_items, "system_weight_sums": system_summary}
# -----------------------------------------
# Pipeline
# -----------------------------------------
def run_network_analysis(
csv_path: str,
min_abs_d: float = 0.00,
kapitel_filter: list[int] | None = None,
subkapitel_filter: list[str] | None = None,
seed: int = 42
):
df = load_data(csv_path)
# Datenqualität knapp loggen
print(f"Rows: {len(df)} | min d = {df['Effektstärke'].min():.2f} | max d = {df['Effektstärke'].max():.2f}")
print("Systemebenen:", df["Systemebene"].dropna().unique().tolist())
if kapitel_filter:
print("Kapitel-Filter:", kapitel_filter)
if subkapitel_filter:
print("Subkapitel-Filter:", subkapitel_filter)
if min_abs_d > 0:
print(f"Filter |d| ≥ {min_abs_d:.2f}")
G = build_bipartite_graph(df, min_abs_d=min_abs_d,
kapitel_filter=kapitel_filter,
subkapitel_filter=subkapitel_filter)
if G.number_of_nodes() == 0 or G.number_of_edges() == 0:
print("Hinweis: Nach Filtern keine Knoten/Kanten – bitte Filter anpassen.")
return
plot_network(G, title="Netzwerk: Systemebenen × Thermometer (Kanten: Effektstärke)", seed=seed)
summary = summarize_network(G)
print("\nSystemgewicht-Summen:", summary["system_weight_sums"])
print("\nTop-Items (weighted degree):")
for r in summary["top_items_by_weighted_degree"]:
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={r['Effektstärke']:.2f} | wd={r['weighted_degree_abs']:.2f}")
# Export JSON
payload = {
"meta": {
"theme": theme,
"min_abs_d": float(min_abs_d),
"kapitel_filter": kapitel_filter,
"subkapitel_filter": subkapitel_filter
},
"nodes": [
{
"id": n,
"label": G.nodes[n].get("label", ""),
"type": G.nodes[n].get("bipartite", ""),
"Thermometer_ID": G.nodes[n].get("id"),
"Kapitelname": G.nodes[n].get("kapitelname"),
"Subkapitel": G.nodes[n].get("subkapitel"),
"Effektstärke": G.nodes[n].get("d")
}
for n in G.nodes()
],
"edges": [
{
"source": u,
"target": v,
"weight": float(d.get("weight", 0.0)),
"sign": d.get("sign", "")
}
for u, v, d in G.edges(data=True)
],
"summary": summary
}
export_json(payload, "network_systemebenen.json")
# -----------------------------------------
# Main
# -----------------------------------------
if __name__ == "__main__":
# Beispiel: keine Filter, aber du kannst unten einfach drehen:
# - min_abs_d=0.10 (macht das Netz ruhiger)
# - kapitel_filter=[5,6,7] oder subkapitel_filter=["Fähigkeiten", ...]
run_network_analysis(
csv_path=os.path.join(os.path.dirname(__file__), csv_file),
min_abs_d=0.00,
kapitel_filter=None,
subkapitel_filter=None,
seed=42
)