This repository has been archived on 2025-10-27. You can view files and clone it, but cannot push or open issues or pull requests.
Files
visible-learning/visible-learning netzwerkanalyse.py
Jochen Hanisch-Johannsen aab5c683b9 Netzwerkanalyse erweitert:
- 3D-Visualisierung mit Effekt-Achse (z-Modi: Effekt, System, Semantik)
- Top-Listen (je 15 positive/negative Effektstärken) ergänzt
- Item-Projektion mit Community-Labels
- Config um Toggle für z-Modi mit sprechenden Achsentiteln erweitert
- Keys konsistent (top_n_extremes, show_item_projection)
2025-09-03 23:53:41 +02:00

741 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
"""
Visible Learning Netzwerkanalyse (Systemebenen × Thermometer)
---------------------------------------------------------------
CI: wie in den bestehenden Skripten (plotly_template)
Daten: Thermometer.csv (Pflichtspalten: Thermometer_ID, Stichwort, Effektstärke, Subkapitel, Kapitelname, Systemebene)
Modell:
- Bipartites Netzwerk: Systemebene (psychisch/sozial) ↔ Item (Thermometer)
- Kantengewicht = Effektstärke (Vorzeichen beibehalten), Breite ~ |d|
- Knoten-Infos im Hover: ID, Stichwort, Kapitel/Subkapitel, d
- Optional: Filter nach |d| (min_abs_d) und Kapiteln/Subkapiteln
Exports:
- PNG/HTML (gemäß config)
- JSON: nodes/edges + einfache Zentralitäten (weighted degree)
"""
# -----------------------------------------
# Imports
# -----------------------------------------
import os
import json
import math
import pandas as pd
import numpy as np
import plotly.graph_objs as go
import plotly.io as pio
import networkx as nx
# -----------------------------------------
# Konfiguration laden
# -----------------------------------------
from config_visible_learning import (
csv_file,
export_fig_visual,
export_fig_png,
theme,
z_mode,
z_axis_labels,
show_item_projection,
show_community_labels,
top_n_extremes,
)
# -----------------------------------------
# Template/CI
# -----------------------------------------
try:
from ci_template import plotly_template
plotly_template.set_theme(theme)
_ci_layout = lambda title: plotly_template.get_standard_layout(title=title, x_title="", y_title="")
_styles = plotly_template.get_plot_styles()
_colors = plotly_template.get_colors()
except Exception:
# Minimaler Fallback, falls Template nicht verfügbar ist
_ci_layout = lambda title: dict(title=title)
_styles = {}
_colors = {}
# -----------------------------------------
# Config-Fallbacks (falls Keys fehlen)
# -----------------------------------------
try:
_Z_MODE = z_mode
except Exception:
_Z_MODE = "effekt"
try:
_Z_AXIS_LABELS = z_axis_labels
except Exception:
_Z_AXIS_LABELS = {"effekt": "Effektstärke (Cohen d)", "kapitel": "Kapitel (numerischer Index)", "system": "Systemebene (0 = Psychisch, 1 = Sozial)"}
try:
_SHOW_ITEM_PROJECTION = show_item_projection
except Exception:
_SHOW_ITEM_PROJECTION = True
try:
_SHOW_COMMUNITY_LABELS = show_community_labels
except Exception:
_SHOW_COMMUNITY_LABELS = True
try:
_TOP_N_EXTREMES = int(top_n_extremes)
except Exception:
_TOP_N_EXTREMES = 15
# -----------------------------------------
# Export-Helfer
# -----------------------------------------
EXPORT_DIR = os.path.join(os.path.dirname(__file__), "export")
os.makedirs(EXPORT_DIR, exist_ok=True)
def export_figure(fig, name: str):
base = os.path.join(EXPORT_DIR, name)
if export_fig_visual:
pio.write_html(fig, file=f"{base}.html", auto_open=False, include_plotlyjs="cdn")
if export_fig_png:
try:
pio.write_image(fig, f"{base}.png", scale=2)
except Exception:
pass
def export_json(obj: dict, name: str):
try:
with open(os.path.join(EXPORT_DIR, name), "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
except Exception:
pass
# -----------------------------------------
# Daten laden
# -----------------------------------------
REQUIRED_COLS = ["Thermometer_ID", "Stichwort", "Effektstärke", "Subkapitel", "Kapitelname", "Systemebene"]
def load_data(path: str) -> pd.DataFrame:
df = pd.read_csv(path)
missing = [c for c in REQUIRED_COLS if c not in df.columns]
if missing:
raise ValueError(f"Fehlende Spalten in CSV: {missing}")
# Effektstärke robust nach float
df["Effektstärke"] = (
df["Effektstärke"].astype(str).str.replace(",", ".", regex=False).str.strip()
)
df["Effektstärke"] = pd.to_numeric(df["Effektstärke"], errors="coerce")
df = df.dropna(subset=["Effektstärke"])
# Prüfung: unspezifische Systemebenen
invalid_systems = df[~df["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
if not invalid_systems.empty:
print("WARNUNG: Unspezifische Systemebenen gefunden:")
print(invalid_systems[["Thermometer_ID", "Stichwort", "Systemebene"]].to_string(index=False))
# Kapitelnummer aus ID (optional nützlich)
try:
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
df["Kapitel"] = None
return df
# -----------------------------------------
# Top-Listen (positiv/negativ)
# -----------------------------------------
def top_extremes(df: pd.DataFrame, n: int = 15) -> dict:
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
data = data.dropna(subset=["Effektstärke"]) # Sicherheit
pos = data.sort_values("Effektstärke", ascending=False).head(n)
neg = data.sort_values("Effektstärke", ascending=True).head(n)
# Konsole
print(f"\nTop +{n} (positiv):")
for _, r in pos.iterrows():
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
print(f"\nTop -{n} (negativ):")
for _, r in neg.iterrows():
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
return {
"top_positive": pos[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Effektstärke","Systemebene"]].to_dict(orient="records"),
"top_negative": neg[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Effektstärke","Systemebene"]].to_dict(orient="records"),
}
# -----------------------------------------
# Netzwerk bauen
# -----------------------------------------
def build_bipartite_graph(
df: pd.DataFrame,
min_abs_d: float = 0.00,
kapitel_filter: list[int] | None = None,
subkapitel_filter: list[str] | None = None,
) -> nx.Graph:
data = df.copy()
# Filter
if kapitel_filter:
data = data[data["Kapitel"].isin(kapitel_filter)]
if subkapitel_filter:
data = data[data["Subkapitel"].isin(subkapitel_filter)]
if min_abs_d > 0:
data = data[data["Effektstärke"].abs() >= float(min_abs_d)]
# Nur gültige Systemebenen
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
G = nx.Graph()
# Systemknoten (part A)
systems = sorted(data["Systemebene"].str.lower().unique().tolist())
for s in systems:
G.add_node(
f"system::{s}",
bipartite="system",
label=s.capitalize(),
typ="System",
)
# Itemknoten + Kanten (part B)
for _, r in data.iterrows():
sys_key = f"system::{str(r['Systemebene']).lower()}"
item_key = f"item::{r['Thermometer_ID']}"
# Item node
G.add_node(
item_key,
bipartite="item",
label=str(r["Stichwort"]),
id=str(r["Thermometer_ID"]),
d=float(r["Effektstärke"]),
kapitelname=str(r["Kapitelname"]),
subkapitel=str(r["Subkapitel"]),
)
# Edge: Gewicht = Effektstärke (Vorzeichen beibehalten)
G.add_edge(
sys_key, item_key,
weight=float(r["Effektstärke"]),
sign="pos" if r["Effektstärke"] >= 0 else "neg"
)
return G
# -----------------------------------------
# Item-Projektion (bipartit -> Item-Item) + Communities
# -----------------------------------------
from networkx.algorithms import community as nx_comm
def build_item_projection(G: nx.Graph) -> tuple[nx.Graph, dict[str,int], list[set]]:
"""Projiziert das bipartite Netz auf die Item-Seite. Zwei Items werden verbunden,
wenn sie dasselbe System teilen. Kanten-Gewicht = min(|w_i|, |w_j|).
Liefert das Item-Graph, ein Mapping node->community_id und die Community-Mengen.
"""
# Item- und System-Knoten bestimmen
items = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
systems = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
# Zuordnung: System -> Liste (item, |weight|)
sys_to_items: dict[str, list[tuple[str,float]]] = {}
for s in systems:
sys_to_items[s] = []
for u, v, d in G.edges(data=True):
if u in systems and v in items:
sys_to_items[u].append((v, abs(float(d.get("weight",0.0)))))
elif v in systems and u in items:
sys_to_items[v].append((u, abs(float(d.get("weight",0.0)))))
# Item-Graph aufbauen
Gi = nx.Graph()
for it in items:
nd = G.nodes[it]
Gi.add_node(it, **nd)
for s, lst in sys_to_items.items():
# Alle Paare innerhalb desselben Systems verbinden
for i in range(len(lst)):
for j in range(i+1, len(lst)):
a, wa = lst[i]
b, wb = lst[j]
w = min(wa, wb)
if Gi.has_edge(a,b):
Gi[a][b]["weight"] += w
else:
Gi.add_edge(a, b, weight=w)
if Gi.number_of_edges() == 0:
return Gi, {}, []
# Communities (gewichtete Modularity, Greedy)
coms = nx_comm.greedy_modularity_communities(Gi, weight="weight")
node2com: dict[str,int] = {}
for cid, members in enumerate(coms):
for n in members:
node2com[n] = cid
return Gi, node2com, [set(c) for c in coms]
def plot_item_projection(Gi: nx.Graph, node2com: dict[str,int], title: str = "Item-Projektion (Communities)"):
if Gi.number_of_nodes() == 0:
print("Hinweis: Item-Projektion leer (zu wenig Überlappung).")
return
pos = nx.spring_layout(Gi, seed=42, weight="weight")
# Communities zu Traces gruppieren
com_to_nodes: dict[int, list[str]] = {}
for n in Gi.nodes():
cid = node2com.get(n, -1)
com_to_nodes.setdefault(cid, []).append(n)
traces = []
# Farb-/Markerstile aus CI (zyklisch)
style_keys = [
"marker_accent", "marker_brightArea", "marker_depthArea",
"marker_positiveHighlight", "marker_negativeHighlight",
"marker_primaryLine", "marker_secondaryLine"
]
keys_cycle = style_keys * 10
for idx, (cid, nodes) in enumerate(sorted(com_to_nodes.items(), key=lambda t: t[0])):
xs = [pos[n][0] for n in nodes]
ys = [pos[n][1] for n in nodes]
htxt = []
for n in nodes:
nd = Gi.nodes[n]
htxt.append(
"Thermometer: " + str(nd.get("id","")) +
"<br>Stichwort: " + str(nd.get("label","")) +
"<br>Kapitel: " + str(nd.get("kapitelname","")) +
"<br>Subkapitel: " + str(nd.get("subkapitel","")) +
"<br>d: " + f"{nd.get('d',np.nan):.2f}"
)
mk = _styles.get(keys_cycle[idx], dict(size=8))
traces.append(go.Scatter(
x=xs, y=ys, mode="markers+text" if _SHOW_COMMUNITY_LABELS else "markers",
marker={**mk, "size": 9},
text=[str(node2com.get(n, -1)) if _SHOW_COMMUNITY_LABELS else None for n in nodes],
textposition="top center",
hovertext=htxt,
hovertemplate="%{hovertext}<extra></extra>",
name=f"Community {cid}"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title))
fig.update_xaxes(title_text="Semantische Position X (Projektion)", showticklabels=False, showgrid=False, zeroline=False)
fig.update_yaxes(title_text="Semantische Position Y (Projektion)", showticklabels=False, showgrid=False, zeroline=False)
fig.show()
export_figure(fig, "vl-network-item-projection")
# -----------------------------------------
# Layout & Visualisierung (Plotly)
# -----------------------------------------
def _edge_segments(G: nx.Graph, pos: dict[str, tuple[float, float]], sign: str | None = None):
"""Erzeugt x,y-Koordinaten-Listen für Liniensegmente (mit None-Trennern). Optional nach Vorzeichen filtern."""
xs, ys = [], []
for u, v, d in G.edges(data=True):
if sign and d.get("sign") != sign:
continue
x0, y0 = pos[u]
x1, y1 = pos[v]
xs += [x0, x1, None]
ys += [y0, y1, None]
return xs, ys
def plot_network(G: nx.Graph, title: str = "Netzwerk: Systemebenen × Thermometer", seed: int = 42):
# Spring-Layout (reproduzierbar über seed)
pos = nx.spring_layout(G, seed=seed, k=None, weight="weight")
# Knoten nach Typ trennen
system_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
item_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
# Edges (pos/neg) als eigene Traces (Linienstile aus CI)
x_pos, y_pos = _edge_segments(G, pos, sign="pos")
x_neg, y_neg = _edge_segments(G, pos, sign="neg")
line_positive = _styles.get("linie_positiveHighlight", dict(width=1))
line_negative = _styles.get("linie_negativeHighlight", dict(width=1))
edge_pos = go.Scatter(
x=x_pos, y=y_pos,
mode="lines",
line=line_positive,
hoverinfo="skip",
showlegend=True,
name="Kanten (d ≥ 0)"
)
edge_neg = go.Scatter(
x=x_neg, y=y_neg,
mode="lines",
line=line_negative,
hoverinfo="skip",
showlegend=True,
name="Kanten (d < 0)"
)
# System-Knoten: Marker aus CI (z. B. accent)
sys_marker = _styles.get("marker_primaryLine", dict(size=18))
sys_x = [pos[n][0] for n in system_nodes]
sys_y = [pos[n][1] for n in system_nodes]
sys_text = [G.nodes[n].get("label", n) for n in system_nodes]
sys_hover = [f"Systemebene: {G.nodes[n].get('label','')}" for n in system_nodes]
systems_trace = go.Scatter(
x=sys_x, y=sys_y, mode="markers",
marker={**sys_marker, "size": 18},
text=sys_text,
hovertext=sys_hover,
hovertemplate="%{hovertext}<extra></extra>",
name="System"
)
# Item-Knoten: Marker aus CI (z. B. brightArea); Größe ~ |degree_weight|
item_marker = _styles.get("marker_secondaryLine", dict(size=10))
it_x = [pos[n][0] for n in item_nodes]
it_y = [pos[n][1] for n in item_nodes]
# Gewichtete Degree als Größe
wdeg = []
htxt = []
for n in item_nodes:
dsum = 0.0
for nbr in G[n]:
dsum += abs(G[n][nbr].get("weight", 0.0))
wdeg.append(dsum)
nd = G.nodes[n]
htxt.append(
"Thermometer: "
+ str(nd.get("id",""))
+ "<br>Stichwort: "
+ str(nd.get("label",""))
+ "<br>Kapitel: "
+ str(nd.get("kapitelname",""))
+ "<br>Subkapitel: "
+ str(nd.get("subkapitel",""))
+ "<br>d: "
+ f"{nd.get('d',np.nan):.2f}"
)
# Größen skalieren
wdeg = np.asarray(wdeg, dtype=float)
if wdeg.size and np.nanmax(wdeg) > 0:
sizes = 8 + 12 * (wdeg / np.nanmax(wdeg))
else:
sizes = np.full_like(wdeg, 10)
items_trace = go.Scatter(
x=it_x, y=it_y, mode="markers",
marker={**item_marker, "size": sizes},
hovertext=htxt,
hovertemplate="%{hovertext}<extra></extra>",
name="Thermometer"
)
fig = go.Figure(data=[edge_pos, edge_neg, systems_trace, items_trace])
# CI-Layout und inhaltliche Achsentitel (2D: Semantische Position aus Layout)
fig.update_layout(_ci_layout(title))
fig.update_xaxes(
title_text="Semantische Position X (Layout)",
showticklabels=False, showgrid=False, zeroline=False
)
fig.update_yaxes(
title_text="Semantische Position Y (Layout)",
showticklabels=False, showgrid=False, zeroline=False
)
fig.show()
export_figure(fig, "vl-network")
def _edge_segments_3d(G: nx.Graph, pos_xy: dict[str, tuple[float, float]], z_map: dict[str, float], sign: str | None = None):
"""Erzeugt x,y,z-Koordinaten-Listen für 3D-Liniensegmente (mit None-Trennern). Optional nach Vorzeichen filtern."""
xs, ys, zs = [], [], []
for u, v, d in G.edges(data=True):
if sign and d.get("sign") != sign:
continue
x0, y0 = pos_xy[u]
x1, y1 = pos_xy[v]
z0 = float(z_map.get(u, 0.0))
z1 = float(z_map.get(v, 0.0))
xs += [x0, x1, None]
ys += [y0, y1, None]
zs += [z0, z1, None]
return xs, ys, zs
def plot_network_3d(G: nx.Graph, z_mode: str = "effekt", title: str = "3D: Systemebenen × Thermometer", seed: int = 42):
"""
Semantische 3D-Ansicht:
- z_mode = "effekt": z = Effektstärke (Items), Systeme z=0
- z_mode = "kapitel": z = Kapitelnummer (Items), Systeme unterhalb der Items (min_z - 0.5)
- z_mode = "system": z = 0 (psychisch), 1 (sozial), Items = Mittelwert ihrer Systemnachbarn
x/y stammen aus einem 2D-Spring-Layout (stabile, gut lesbare Projektion), z ist semantisch belegt.
"""
styles = _styles
colors = _colors
# 2D-Layout für X/Y (stabile Projektion)
pos_xy = nx.spring_layout(G, seed=seed, k=None, weight="weight", dim=2)
# Z-Koordinaten je Knoten ermitteln
z_map: dict[str, float] = {}
if z_mode == "effekt":
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
z_map[n] = float(d.get("d", 0.0))
else:
z_map[n] = 0.0
elif z_mode == "kapitel":
item_z_vals = []
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
try:
# Kapitelnummer aus Kapitelname kann alphanumerisch sein; wir nutzen, wenn vorhanden, numerische "Kapitel"
# Falls keine numerische Kapitelspalte existiert, wird 0 gesetzt.
kap = d.get("kapitelname", "")
# Fallback: im Nodes-Attribut existiert keine numerische Kapitelnummer; daher 0
z_map[n] = float(d.get("kapitel", 0.0)) if "kapitel" in d else 0.0
except Exception:
z_map[n] = 0.0
item_z_vals.append(z_map[n])
min_z = min(item_z_vals) if item_z_vals else 0.0
for n, d in G.nodes(data=True):
if d.get("bipartite") == "system":
z_map[n] = float(min_z) - 0.5
elif z_mode == "system":
# Systeme klar trennen
for n, d in G.nodes(data=True):
if d.get("bipartite") == "system":
lbl = str(d.get("label", "")).strip().lower()
z_map[n] = 0.0 if "psych" in lbl else 1.0
# Items: Mittelwert der z-Werte ihrer System-Nachbarn (im bipartiten Graphen genau einer)
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
zs = []
for nbr in G[n]:
zs.append(z_map.get(nbr, 0.0))
z_map[n] = float(np.mean(zs)) if zs else 0.0
else:
# Unbekannter Modus -> alle 0
z_map = {n: 0.0 for n in G.nodes()}
# Knotenlisten
system_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
item_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
# Kanten (pos/neg) vorbereiten
x_pos, y_pos, z_pos = _edge_segments_3d(G, pos_xy, z_map, sign="pos")
x_neg, y_neg, z_neg = _edge_segments_3d(G, pos_xy, z_map, sign="neg")
line_positive = styles.get("linie_positiveHighlight", dict(width=1))
line_negative = styles.get("linie_negativeHighlight", dict(width=1))
edge_pos = go.Scatter3d(
x=x_pos, y=y_pos, z=z_pos,
mode="lines",
line=line_positive,
hoverinfo="skip",
showlegend=True,
name="Kanten (d ≥ 0)"
)
edge_neg = go.Scatter3d(
x=x_neg, y=y_neg, z=z_neg,
mode="lines",
line=line_negative,
hoverinfo="skip",
showlegend=True,
name="Kanten (d &lt; 0)"
)
# System-Knoten
sys_marker = styles.get("marker_primaryLine", dict(size=18))
sys_x = [pos_xy[n][0] for n in system_nodes]
sys_y = [pos_xy[n][1] for n in system_nodes]
sys_z = [z_map[n] for n in system_nodes]
sys_text = [G.nodes[n].get("label", n) for n in system_nodes]
sys_hover = [f"Systemebene: {G.nodes[n].get('label','')}" for n in system_nodes]
systems_trace = go.Scatter3d(
x=sys_x, y=sys_y, z=sys_z, mode="markers",
marker={**sys_marker, "size": 10},
text=sys_text,
hovertext=sys_hover,
hovertemplate="%{hovertext}<extra></extra>",
name="System"
)
# Item-Knoten: Thermometer im Sekundärstil (gleiches Marker-Design für +/-); Kanten behalten Vorzeichenfarben
pos_marker = styles.get("marker_secondaryLine", dict(size=6))
neg_marker = styles.get("marker_secondaryLine", dict(size=6))
pos_x, pos_y, pos_z, pos_hover = [], [], [], []
neg_x, neg_y, neg_z, neg_hover = [], [], [], []
for n in item_nodes:
x, y = pos_xy[n]
z = z_map[n]
nd = G.nodes[n]
hover = (
"Thermometer: " + str(nd.get("id","")) +
"<br>Stichwort: " + str(nd.get("label","")) +
"<br>Kapitel: " + str(nd.get("kapitelname","")) +
"<br>Subkapitel: " + str(nd.get("subkapitel","")) +
"<br>d: " + f"{nd.get('d',np.nan):.2f}"
)
if float(nd.get("d", 0.0)) >= 0:
pos_x.append(x); pos_y.append(y); pos_z.append(z); pos_hover.append(hover)
else:
neg_x.append(x); neg_y.append(y); neg_z.append(z); neg_hover.append(hover)
items_pos_trace = go.Scatter3d(
x=pos_x, y=pos_y, z=pos_z, mode="markers",
marker=pos_marker,
hovertext=pos_hover,
hovertemplate="%{hovertext}<extra></extra>",
name="Thermometer (d ≥ 0)"
)
items_neg_trace = go.Scatter3d(
x=neg_x, y=neg_y, z=neg_z, mode="markers",
marker=neg_marker,
hovertext=neg_hover,
hovertemplate="%{hovertext}<extra></extra>",
name="Thermometer (d &lt; 0)"
)
fig = go.Figure(data=[edge_pos, edge_neg, systems_trace, items_pos_trace, items_neg_trace])
fig.update_layout(_ci_layout(f"{title} z: {z_mode}"))
# Achsentitel mit inhaltlicher Bedeutung setzen
z_title = _Z_AXIS_LABELS.get(z_mode, "Z")
fig.update_scenes(
xaxis=dict(
title="Semantische Position X (Layout)",
showticklabels=False, showgrid=False, zeroline=False
),
yaxis=dict(
title="Semantische Position Y (Layout)",
showticklabels=False, showgrid=False, zeroline=False
),
zaxis=dict(
title=z_title,
showticklabels=False, showgrid=False, zeroline=False
),
)
fig.show()
export_figure(fig, f"vl-network-3d-{z_mode}")
# -----------------------------------------
# Einfache Metriken & Export
# -----------------------------------------
def summarize_network(G: nx.Graph) -> dict:
# weighted degree je Knoten
wdeg = {}
for n in G.nodes():
s = 0.0
for nbr in G[n]:
s += abs(G[n][nbr].get("weight", 0.0))
wdeg[n] = float(s)
# Top-Items nach gewichteter Degree
items = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
items_sorted = sorted(items, key=lambda t: t[1], reverse=True)[:15]
top_items = []
for n, val in items_sorted:
nd = G.nodes[n]
top_items.append({
"Thermometer_ID": nd.get("id"),
"Stichwort": nd.get("label"),
"Kapitelname": nd.get("kapitelname"),
"Subkapitel": nd.get("subkapitel"),
"Effektstärke": nd.get("d"),
"weighted_degree_abs": val
})
# Systemseiten-Summe
systems = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
system_summary = {G.nodes[n].get("label", n): float(val) for n, val in systems}
return {"top_items_by_weighted_degree": top_items, "system_weight_sums": system_summary}
# -----------------------------------------
# Pipeline
# -----------------------------------------
def run_network_analysis(
csv_path: str,
min_abs_d: float = 0.00,
kapitel_filter: list[int] | None = None,
subkapitel_filter: list[str] | None = None,
seed: int = 42,
z_mode: str = "effekt"
):
df = load_data(csv_path)
# Datenqualität knapp loggen
print(f"Rows: {len(df)} | min d = {df['Effektstärke'].min():.2f} | max d = {df['Effektstärke'].max():.2f}")
print("Systemebenen:", df["Systemebene"].dropna().unique().tolist())
if kapitel_filter:
print("Kapitel-Filter:", kapitel_filter)
if subkapitel_filter:
print("Subkapitel-Filter:", subkapitel_filter)
if min_abs_d > 0:
print(f"Filter |d| ≥ {min_abs_d:.2f}")
G = build_bipartite_graph(df, min_abs_d=min_abs_d,
kapitel_filter=kapitel_filter,
subkapitel_filter=subkapitel_filter)
if G.number_of_nodes() == 0 or G.number_of_edges() == 0:
print("Hinweis: Nach Filtern keine Knoten/Kanten bitte Filter anpassen.")
return
plot_network(G, title="Netzwerk: Systemebenen × Thermometer (Kanten: Effektstärke)", seed=seed)
# 3D-Ansicht mit semantischer z-Achse
plot_network_3d(G, z_mode=z_mode, title="Netzwerk (3D): semantische z-Achse", seed=seed)
summary = summarize_network(G)
print("\nSystemgewicht-Summen:", summary["system_weight_sums"])
print("\nTop-Items (weighted degree):")
for r in summary["top_items_by_weighted_degree"]:
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={r['Effektstärke']:.2f} | wd={r['weighted_degree_abs']:.2f}")
# Top-Listen exportieren
extremes = top_extremes(df, n=_TOP_N_EXTREMES)
export_json(extremes, "network_top_extremes.json")
# Item-Projektion + Communities (optional)
item_proj_summary = {}
if _SHOW_ITEM_PROJECTION:
Gi, node2com, coms = build_item_projection(G)
plot_item_projection(Gi, node2com, title="Item-Projektion (Communities)")
item_proj_summary = {
"n_nodes": Gi.number_of_nodes(),
"n_edges": Gi.number_of_edges(),
"n_communities": len(coms),
}
# Export JSON
payload = {
"extremes": extremes,
"item_projection": item_proj_summary,
"meta": {
"theme": theme,
"min_abs_d": float(min_abs_d),
"kapitel_filter": kapitel_filter,
"subkapitel_filter": subkapitel_filter
},
"nodes": [
{
"id": n,
"label": G.nodes[n].get("label", ""),
"type": G.nodes[n].get("bipartite", ""),
"Thermometer_ID": G.nodes[n].get("id"),
"Kapitelname": G.nodes[n].get("kapitelname"),
"Subkapitel": G.nodes[n].get("subkapitel"),
"Effektstärke": G.nodes[n].get("d")
}
for n in G.nodes()
],
"edges": [
{
"source": u,
"target": v,
"weight": float(d.get("weight", 0.0)),
"sign": d.get("sign", "")
}
for u, v, d in G.edges(data=True)
],
"summary": summary
}
export_json(payload, "network_systemebenen.json")
# -----------------------------------------
# Main
# -----------------------------------------
if __name__ == "__main__":
# Beispiel: keine Filter, aber du kannst unten einfach drehen:
# - min_abs_d=0.10 (macht das Netz ruhiger)
# - kapitel_filter=[5,6,7] oder subkapitel_filter=["Fähigkeiten", ...]
run_network_analysis(
csv_path=os.path.join(os.path.dirname(__file__), csv_file),
min_abs_d=0.00,
kapitel_filter=None,
subkapitel_filter=None,
seed=42,
z_mode=_Z_MODE
)