This repository has been archived on 2025-10-27. You can view files and clone it, but cannot push or open issues or pull requests.
Files
visible-learning/visible-learning netzwerkanalyse.py
Jochen Hanisch-Johannsen 7e75ed790c Visible Learning: umfassende Erweiterungen
- Berechnung Kopplungspotenzial je Bedürfnis (Young/Roediger)
- Export von coupling_potential_per_need.csv inkl. bridge_energy
- Aggregation und Export von Kopplungsindizes (per Item, per Need)
- Konsolidierung des Mappings Young ↔ Hattie (werte_mapping.csv)
- Neue Visualisierungen:
  • Kopplungspotenzial (2D, farbkodiert nach Balance)
  • Item-Projektion mit Communities
  • 3D-Netzwerkdarstellung (Systemebenen × Thermometer)
  • 3D-Triangulation: Effekt × Bedürfnis × Semantik
  • Thermo-Dashboard (Energie, Entropie, Modularität)
- Verbesserte Skalierungen (Effektstärken normiert, Markergrößen, Kantenstärken)
- Konsistente Export-Pfade (export/*.csv, *.html, *.png)
- Fehlerbehebungen:
  • os-Import nachgezogen
  • robustere Merge-Strategien beim CSV-Export
  • Schutz vor leeren/inkonsistenten Spalten
- CI-Styling & Plotly-Template in alle neuen Plots integriert
2025-09-08 02:43:28 +02:00

2218 lines
87 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
def compute_and_export_coupling_potential(df: pd.DataFrame) -> dict:
"""
Berechnet das Kopplungspotenzial je Young/Roediger-Bedürfnis:
- n_items: Anzahl Items pro Bedürfnis
- E_sum: Σ|d| (alle Items)
- E_psych: Σ|d| für psychisch
- E_sozial: Σ|d| für sozial
- balance: 1 - |(E_psych/E_sum) - (E_sozial/E_sum)|, 0 = einseitig, 1 = balanciert
- coupling_potential = E_sum × balance
Exportiert als CSV und gibt dict zurück. Loggt eine Zusammenfassung.
"""
need_col = OPTIONAL_NEED_COL if OPTIONAL_NEED_COL in df.columns else None
if need_col is None:
df[OPTIONAL_NEED_COL] = np.nan
need_col = OPTIONAL_NEED_COL
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
data = data.dropna(subset=["Effektstärke"])
data[need_col] = data[need_col].astype(str).str.strip()
# Spalte für Betrag
data["abs_d"] = data["Effektstärke"].abs()
groups = []
for need, g in data.groupby(need_col, dropna=False):
n_items = len(g)
E_sum = g["abs_d"].sum()
E_psych = g.loc[g["Systemebene"].str.lower() == "psychisch", "abs_d"].sum()
E_sozial = g.loc[g["Systemebene"].str.lower() == "sozial", "abs_d"].sum()
# Robust gegen 0
if E_sum > 0:
share_psych = E_psych / E_sum
share_sozial = E_sozial / E_sum
balance = 1.0 - abs(share_psych - share_sozial)
else:
share_psych = 0.0
share_sozial = 0.0
balance = 0.0
coupling_potential = E_sum * balance
bridge_energy = min(E_psych, E_sozial)
groups.append({
"Beduerfnis": need if (isinstance(need, str) and need.strip()) else "",
"n_items": n_items,
"E_sum": E_sum,
"E_psych": E_psych,
"E_sozial": E_sozial,
"balance": balance,
"coupling_potential": coupling_potential,
"bridge_energy": bridge_energy
})
# Export
df_out = pd.DataFrame(groups)
out_path = os.path.join(EXPORT_DIR, "coupling_potential_per_need.csv")
try:
# Wenn bereits vorhanden, vorsichtig mergen (alte Zusatzspalten erhalten)
if os.path.exists(out_path):
try:
old = pd.read_csv(out_path)
except Exception:
old = pd.DataFrame(columns=["Beduerfnis"])
# Outer-Join auf Beduerfnis
merged = old.merge(df_out, on="Beduerfnis", how="outer", suffixes=("_old", ""))
# Bevorzugt neue Werte für die Kernspalten; alte nur als Fallback
core_cols = ["n_items", "E_sum", "E_psych", "E_sozial", "balance", "coupling_potential", "bridge_energy"]
for c in core_cols:
if c in merged.columns and f"{c}_old" in merged.columns:
merged[c] = np.where(merged[c].notna(), merged[c], merged[f"{c}_old"])
merged.drop(columns=[f"{c}_old"], inplace=True, errors=True)
# Eventuelle doppelte Altspalten bereinigen
for col in list(merged.columns):
if col.endswith("_old"):
base = col[:-4]
if base not in merged.columns:
merged.rename(columns={col: base}, inplace=True)
else:
merged.drop(columns=[col], inplace=True, errors=True)
merged.to_csv(out_path, index=False, encoding="utf-8")
else:
# Neu schreiben
df_out.to_csv(out_path, index=False, encoding="utf-8")
except Exception:
# Fail-silent, um Pipeline nicht zu unterbrechen
pass
# Log
print("\nKOPPLUNGSPOTENZIAL je Bedürfnis (Young/Roediger):")
for row in groups:
print(
f" {row['Beduerfnis']}: n={row['n_items']}, E_sum={row['E_sum']:.2f}, "
f"E_psych={row['E_psych']:.2f}, E_sozial={row['E_sozial']:.2f}, "
f"balance={row['balance']:.3f}, coupling_potential={row['coupling_potential']:.2f}, "
f"bridge_energy={row['bridge_energy']:.2f}"
)
return {r["Beduerfnis"]: r for r in groups}
# Optional: Visualisierung Kopplungspotenzial
def plot_coupling_potential(summary: dict, title: str = "Kopplungspotenzial je Bedürfnis"):
"""
Balkendiagramm: X = Bedürfnis, Y = coupling_potential, Farbe = Balance (rot→grün).
"""
if not summary:
print("Keine Kopplungspotenzialdaten für Plot.")
return
import plotly.graph_objs as go
# DataFrame für Plot
df = pd.DataFrame(list(summary.values()))
# Sortierung nach coupling_potential
df = df.sort_values("coupling_potential", ascending=False)
# Farben: Balance von rot (0) nach grün (1)
import matplotlib
import matplotlib.cm
cmap = matplotlib.cm.get_cmap('RdYlGn')
balances = df["balance"].clip(0,1).fillna(0.0).to_numpy()
colors = [matplotlib.colors.rgb2hex(cmap(b)[:3]) for b in balances]
fig = go.Figure(go.Bar(
x=df["Beduerfnis"],
y=df["coupling_potential"],
marker=dict(color=colors),
text=[f"Balance={bal:.2f}" for bal in df["balance"]],
hovertemplate=(
"Bedürfnis: %{x}<br>Kopplungspotenzial: %{y:.2f}<br>Balance: %{text}<extra></extra>"
),
))
fig.update_layout(
_ci_layout(title),
xaxis_title="Bedürfnis (Young/Roediger)",
yaxis_title="Kopplungspotenzial (Σ|d| × Balance)",
autosize=True,
height=440,
margin=dict(l=60, r=40, t=70, b=100),
)
fig.show()
export_figure(fig, "vl-coupling-potential")
"""
Visible Learning Netzwerkanalyse (Systemebenen × Thermometer)
---------------------------------------------------------------
CI: wie in den bestehenden Skripten (plotly_template)
Daten: Thermometer.csv (Pflichtspalten: Thermometer_ID, Stichwort, Effektstärke, Subkapitel, Kapitelname, Systemebene)
Modell:
- Bipartites Netzwerk: Systemebene (psychisch/sozial) ↔ Item (Thermometer)
- Kantengewicht = Effektstärke (Vorzeichen beibehalten), Breite ~ |d|
- Knoten-Infos im Hover: ID, Stichwort, Kapitel/Subkapitel, d
- Optional: Filter nach |d| (min_abs_d) und Kapiteln/Subkapiteln
Exports:
- PNG/HTML (gemäß config)
- JSON: nodes/edges + einfache Zentralitäten (weighted degree)
"""
# -----------------------------------------
# Imports
# -----------------------------------------
import os
import json
import math
import pandas as pd
import numpy as np
import plotly.graph_objs as go
import plotly.io as pio
from plotly.subplots import make_subplots
import networkx as nx
# Helper import for file existence checks
from pathlib import Path
# --- Optional: Clustering (K-Means); fallback auf Numpy-Implementierung ---
try:
from sklearn.cluster import KMeans # type: ignore
_HAS_SKLEARN = True
except Exception:
_HAS_SKLEARN = False
# Zusätzliche Cluster-Algorithmen (falls vorhanden)
try:
from sklearn.cluster import AgglomerativeClustering, SpectralClustering, DBSCAN
from sklearn.mixture import GaussianMixture
_HAS_SKLEARN_EXTRA = True
except Exception:
_HAS_SKLEARN_EXTRA = False
# -----------------------------------------
# Konfiguration laden
# -----------------------------------------
from config_visible_learning import (
csv_file,
export_fig_visual,
export_fig_png,
theme,
z_mode,
z_axis_labels,
show_item_projection,
show_community_labels,
top_n_extremes,
cluster_algo, n_clusters, dbscan_eps, dbscan_min_samples, spectral_k
)
# -----------------------------------------
# Template/CI
# -----------------------------------------
try:
from ci_template import plotly_template
plotly_template.set_theme(theme)
_ci_layout = lambda title: plotly_template.get_standard_layout(title=title, x_title="", y_title="")
_styles = plotly_template.get_plot_styles()
_colors = plotly_template.get_colors()
except Exception:
# Minimaler Fallback, falls Template nicht verfügbar ist
_ci_layout = lambda title: dict(title=title)
_styles = {}
_colors = {}
# -----------------------------------------
# Config-Fallbacks (falls Keys fehlen)
# -----------------------------------------
try:
_Z_MODE = z_mode
except Exception:
_Z_MODE = "effekt"
try:
_Z_AXIS_LABELS = z_axis_labels
except Exception:
_Z_AXIS_LABELS = {"effekt": "Effektstärke (Cohen d)", "kapitel": "Kapitel (numerischer Index)", "system": "Systemebene (0 = Psychisch, 1 = Sozial)"}
try:
_SHOW_ITEM_PROJECTION = show_item_projection
except Exception:
_SHOW_ITEM_PROJECTION = True
try:
_SHOW_COMMUNITY_LABELS = show_community_labels
except Exception:
_SHOW_COMMUNITY_LABELS = True
try:
_TOP_N_EXTREMES = int(top_n_extremes)
except Exception:
_TOP_N_EXTREMES = 15
try:
_CLUSTER_ALGO = str(cluster_algo)
except Exception:
_CLUSTER_ALGO = "kmeans" # Optionen: kmeans, gmm, agglomerative, spectral, dbscan
try:
_N_CLUSTERS = int(n_clusters)
except Exception:
_N_CLUSTERS = 5
try:
_DBSCAN_EPS = float(dbscan_eps)
except Exception:
_DBSCAN_EPS = 0.15
try:
_DBSCAN_MIN_SAMPLES = int(dbscan_min_samples)
except Exception:
_DBSCAN_MIN_SAMPLES = 5
try:
_SPECTRAL_K = int(spectral_k)
except Exception:
_SPECTRAL_K = _N_CLUSTERS
# -----------------------------------------
# Export-Helfer
# -----------------------------------------
EXPORT_DIR = os.path.join(os.path.dirname(__file__), "export")
os.makedirs(EXPORT_DIR, exist_ok=True)
def export_figure(fig, name: str):
base = os.path.join(EXPORT_DIR, name)
if export_fig_visual:
pio.write_html(fig, file=f"{base}.html", auto_open=False, include_plotlyjs="cdn")
if export_fig_png:
try:
pio.write_image(fig, f"{base}.png", scale=2)
except Exception:
pass
def export_json(obj: dict, name: str):
try:
with open(os.path.join(EXPORT_DIR, name), "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
except Exception:
pass
# -----------------------------------------
# Daten laden
# -----------------------------------------
REQUIRED_COLS = ["Thermometer_ID", "Stichwort", "Effektstärke", "Subkapitel", "Kapitelname", "Systemebene"]
# Optional needs column for Young/Roediger
OPTIONAL_NEED_COL = "Young_Beduerfnis"
def load_data(path: str) -> pd.DataFrame:
df = pd.read_csv(path)
missing = [c for c in REQUIRED_COLS if c not in df.columns]
if missing:
raise ValueError(f"Fehlende Spalten in CSV: {missing}")
# Effektstärke robust nach float
df["Effektstärke"] = (
df["Effektstärke"].astype(str).str.replace(",", ".", regex=False).str.strip()
)
df["Effektstärke"] = pd.to_numeric(df["Effektstärke"], errors="coerce")
df = df.dropna(subset=["Effektstärke"])
# Young/Roediger Bedürfnis (optional Spalte). Wenn nicht vorhanden, versuche Merge aus werte_mapping.csv.
if OPTIONAL_NEED_COL not in df.columns:
map_path = Path(os.path.dirname(__file__)) / "werte_mapping.csv"
if map_path.exists():
try:
m = pd.read_csv(map_path)
if "Thermometer_ID" in m.columns and OPTIONAL_NEED_COL in m.columns:
df = df.merge(m[["Thermometer_ID", OPTIONAL_NEED_COL]], on="Thermometer_ID", how="left")
except Exception:
# Falls Mapping nicht lesbar ist, lege leere Spalte an
df[OPTIONAL_NEED_COL] = np.nan
else:
df[OPTIONAL_NEED_COL] = np.nan
else:
# Spalte vereinheitlichen (Strings trimmen)
df[OPTIONAL_NEED_COL] = df[OPTIONAL_NEED_COL].astype(str).str.strip()
# Prüfung: unspezifische Systemebenen
invalid_systems = df[~df["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
if not invalid_systems.empty:
print("WARNUNG: Unspezifische Systemebenen gefunden:")
print(invalid_systems[["Thermometer_ID", "Stichwort", "Systemebene"]].to_string(index=False))
# Kapitelnummer aus ID (optional nützlich)
try:
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
df["Kapitel"] = None
return df
# -----------------------------------------
# Triangulation: Feature-Aufbau & K-Means
# -----------------------------------------
def _encode_system(val: str) -> float:
v = str(val).strip().lower()
if v.startswith("psych"):
return 0.0
if v.startswith("sozi"):
return 1.0
return np.nan
def _minmax(series: pd.Series) -> np.ndarray:
arr = series.to_numpy(dtype=float)
if np.all(~np.isfinite(arr)):
return np.zeros_like(arr)
mn = np.nanmin(arr)
mx = np.nanmax(arr)
if not np.isfinite(mn) or not np.isfinite(mx) or mx == mn:
return np.zeros_like(arr)
return (arr - mn) / (mx - mn)
def build_triangulation_features(df: pd.DataFrame) -> pd.DataFrame:
"""Erzeugt den 3D-Feature-Raum pro Item:
- x: Kapitel (numerisch, min-max skaliert)
- y: Systemebene (0=psychisch, 1=sozial)
- z: Effektstärke (Cohen d, min-max skaliert mit Vorzeichen beibehalten über separate Skalierung)
Rückgabe: DataFrame mit [key,item_id,stichwort,kapitel,system,d,x,y,z].
"""
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch","sozial"])]
# sichere Kapitel-Spalte
if "Kapitel" not in data.columns or data["Kapitel"].isna().all():
try:
data["Kapitel"] = data["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
data["Kapitel"] = np.nan
# numerische Achsen
data["sys_bin"] = data["Systemebene"].map(_encode_system)
# Effekt min-max separat für pos und neg, damit Vorzeichenstruktur erhalten bleibt
eff = data["Effektstärke"].astype(float)
pos = eff.clip(lower=0)
neg = (-eff.clip(upper=0))
pos_s = _minmax(pos)
neg_s = _minmax(neg)
eff_scaled = np.where(eff >= 0, pos_s, -neg_s)
data["x"] = _minmax(data["Kapitel"].astype(float))
data["y"] = data["sys_bin"].astype(float)
data["z"] = eff_scaled
# Schlüssel
data["key"] = "item::" + data["Thermometer_ID"].astype(str)
return data[["key","Thermometer_ID","Stichwort","Kapitel","Kapitelname","Subkapitel","Systemebene","Effektstärke","x","y","z"]]
# ---------------------------------------------------
# NEW: Triangulation (3D) Effekt × Bedürfnis × Semantik
# ---------------------------------------------------
# Feste Ordnungsreihenfolge der Bedürfnisse (Young/Roediger)
NEED_ORDER = [
"Bindung",
"Orientierung",
"Stabilität",
"Emotion/SC",
"Struktur/Kompetenz",
]
def _encode_need_series(series: pd.Series) -> tuple[pd.Series, dict]:
"""Kodiert die Bedürfnis-Spalte deterministisch in numerische Codes.
Reihenfolge: NEED_ORDER, unbekannte Labels danach alphabetisch.
Rückgabe: (codes_series 0..k-1, mapping_dict {label:code}).
"""
lab = series.fillna("").astype(str).str.strip()
# bekannte in fester Reihenfolge, unbekannte alphabetisch anhängen
known = [n for n in NEED_ORDER if n in set(lab)]
unknown = sorted([n for n in sorted(set(lab)) if n and n not in NEED_ORDER])
order = known + unknown
mapping = {name: idx for idx, name in enumerate(order)}
codes = lab.map(lambda v: mapping.get(v, np.nan))
return codes, mapping
def build_triangulation_features_needs(df: pd.DataFrame) -> pd.DataFrame:
"""Erzeugt 3D-Features mit Achsen:
X = Effektstärke (minmax, Vorzeichen beibehalten),
Y = Bedürfnis-Code (Young/Roediger, deterministisch kodiert),
Z = Semantik (Kapitel/Subkapitel → numerische Indizes; zuerst Kapitel, dann Subkapitel fein)
Rückgabe: DataFrame mit Spalten [Thermometer_ID, Stichwort, Systemebene, Effektstärke, Young_Beduerfnis,
Kapitelname, Subkapitel, x, y, z].
"""
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch","sozial"])]
data = data.dropna(subset=["Effektstärke"]).copy()
# X: Effekt min-max getrennt für pos/neg, Vorzeichen erhalten
d = pd.to_numeric(data["Effektstärke"], errors="coerce").astype(float)
pos = d.clip(lower=0)
neg = (-d.clip(upper=0))
def _mm(a: pd.Series) -> np.ndarray:
arr = a.to_numpy(dtype=float)
mn, mx = np.nanmin(arr), np.nanmax(arr)
if not np.isfinite(mn) or not np.isfinite(mx) or mx <= mn:
return np.zeros_like(arr)
return (arr - mn) / (mx - mn)
x = np.where(d >= 0, _mm(pos), -_mm(neg))
# Y: Bedürfnis-Codes (deterministisch)
if OPTIONAL_NEED_COL not in data.columns:
data[OPTIONAL_NEED_COL] = np.nan
y, need_map = _encode_need_series(data[OPTIONAL_NEED_COL])
# Z: Semantik Kapitel und Subkapitel in eine robuste Zahlenachse überführen
# Kapitel-Index (stabiler, wenn vorhanden)
if "Kapitel" not in data.columns or data["Kapitel"].isna().all():
try:
data["Kapitel"] = data["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
data["Kapitel"] = np.nan
# Subkapitel feingranular als Rang innerhalb des Kapitels (deterministische Sortierung nach Label)
data["_sub_key"] = (
data[["Kapitel","Subkapitel"]]
.astype(str)
.fillna("")
.agg("::".join, axis=1)
)
# Rang [0..1] je Kapitel
def _rank_within_kap(group: pd.Series) -> pd.Series:
vals = group.sort_values().unique().tolist()
idx = {v: i for i, v in enumerate(vals)}
if len(vals) <= 1:
return pd.Series(np.zeros(len(group)), index=group.index)
return group.map(lambda v: idx.get(v, 0) / (len(vals)-1))
sub_rank = data.groupby("Kapitel")["_sub_key"].transform(_rank_within_kap)
# Z als Kapitel (minmax) + 0.01 * Sub-Rang (feiner Versatz)
kap_scaled = pd.Series(_minmax(data["Kapitel"].astype(float)), index=data.index)
z = kap_scaled + 0.01 * sub_rank.fillna(0.0)
out = pd.DataFrame({
"Thermometer_ID": data["Thermometer_ID"].astype(str),
"Stichwort": data["Stichwort"].astype(str),
"Kapitelname": data["Kapitelname"].astype(str),
"Subkapitel": data["Subkapitel"].astype(str),
"Systemebene": data["Systemebene"].astype(str),
"Effektstärke": d,
OPTIONAL_NEED_COL: data[OPTIONAL_NEED_COL].astype(str),
"x": x,
"y": y,
"z": z,
})
# Mapping exportieren (für Nachvollziehbarkeit)
try:
mp_df = pd.DataFrame({"Beduerfnis": list(need_map.keys()), "code": list(need_map.values())})
mp_df.to_csv(os.path.join(EXPORT_DIR, "needs_mapping_codes.csv"), index=False, encoding="utf-8")
except Exception:
pass
# Features exportieren
try:
out.to_csv(os.path.join(EXPORT_DIR, "triangulation_needs_3d.csv"), index=False, encoding="utf-8")
except Exception:
pass
return out
def plot_triangulation_needs_3d(feats: pd.DataFrame, title: str = "Triangulation (3D): Effekt × Bedürfnis × Semantik"):
"""3D-Streudiagramm:
X = Effekt (normiert, Vorzeichen erhalten),
Y = Bedürfnis-Code (beschriftet nach Mapping),
Z = Semantik (Kapitel + Subkapitel-Rang).
Farbkanal: Vorzeichen von d (CI-konform: pos/neg)."""
if feats is None or feats.empty:
print("Hinweis: Keine Daten für Triangulation (Needs).")
return
# Farben aus CI
pos_marker = _styles.get("marker_positiveHighlight", {"size": 6})
neg_marker = _styles.get("marker_negativeHighlight", {"size": 6})
feats = feats.copy()
feats["d"] = pd.to_numeric(feats["Effektstärke"], errors="coerce")
pos = feats[feats["d"] >= 0]
neg = feats[feats["d"] < 0]
# Y-Achsen-Ticks: Mapping aus Exportdatei, ansonsten Labels aus Daten ableiten
# Wir rekonstruieren die Ordnung konsistent zur Kodierung in build_triangulation_features_needs
labels_present = [lab for lab in NEED_ORDER if lab in feats[OPTIONAL_NEED_COL].unique().tolist()]
# ggf. unbekannte Labels hinzufügen (alphabetisch)
others = sorted([lab for lab in feats[OPTIONAL_NEED_COL].unique().tolist() if lab and lab not in NEED_ORDER])
y_labels = labels_present + others
y_codes = list(range(len(y_labels)))
def _hover(group: pd.DataFrame) -> pd.Series:
return (
"Thermometer: " + group["Thermometer_ID"].astype(str) +
"<br>Stichwort: " + group["Stichwort"].astype(str) +
"<br>Bedürfnis: " + group[OPTIONAL_NEED_COL].astype(str) +
"<br>Kapitel: " + group["Kapitelname"].astype(str) +
"<br>Subkapitel: " + group["Subkapitel"].astype(str) +
"<br>d: " + group["d"].map(lambda v: f"{float(v):.2f}")
)
traces = []
if len(pos):
traces.append(go.Scatter3d(
x=pos["x"], y=pos["y"], z=pos["z"],
mode="markers",
marker=pos_marker,
hovertext=_hover(pos),
hovertemplate="%{hovertext}<extra></extra>",
name="d ≥ 0"
))
if len(neg):
traces.append(go.Scatter3d(
x=neg["x"], y=neg["y"], z=neg["z"],
mode="markers",
marker=neg_marker,
hovertext=_hover(neg),
hovertemplate="%{hovertext}<extra></extra>",
name="d < 0"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_scenes(
xaxis=dict(title="Effekt (normiert, Vorzeichen erhalten)", showgrid=False, showticklabels=False),
yaxis=dict(title="Bedürfnis (Code)", showgrid=False, showticklabels=True, tickvals=y_codes, ticktext=y_labels),
zaxis=dict(title="Semantik (Kapitel + Subrang)", showgrid=False, showticklabels=False),
)
fig.show()
export_figure(fig, "vl-triangulation-needs-3d")
def _kmeans_numpy(X: np.ndarray, k: int = 5, iters: int = 100, seed: int = 42) -> tuple[np.ndarray, np.ndarray]:
rng = np.random.default_rng(seed)
# zufällige Startzentren aus den Daten
idx = rng.choice(len(X), size=k, replace=False)
C = X[idx].copy()
for _ in range(iters):
# Zuordnung
dists = ((X[:, None, :] - C[None, :, :]) ** 2).sum(axis=2)
labels = dists.argmin(axis=1)
# neue Zentren
C_new = np.vstack([X[labels==j].mean(axis=0) if np.any(labels==j) else C[j] for j in range(k)])
if np.allclose(C_new, C, atol=1e-6):
C = C_new
break
C = C_new
# finale Labels
dists = ((X[:, None, :] - C[None, :, :]) ** 2).sum(axis=2)
labels = dists.argmin(axis=1)
return labels, C
def triangulate_clusters(
df: pd.DataFrame,
algo: str = "kmeans",
n_clusters: int = 5,
seed: int = 42,
dbscan_eps: float = 0.15,
dbscan_min_samples: int = 5,
spectral_k: int | None = None
) -> tuple[pd.DataFrame, np.ndarray]:
"""
Clustert den 3D-Feature-Raum (Kapitel × System × Effekt).
Unterstützte Algorithmen:
- "kmeans" (sphärische Cluster, schnell, baseline)
- "gmm" (Gaussian Mixture; ellipsoide Cluster)
- "agglomerative" (hier: Ward-Linkage, euklidisch)
- "spectral" (graphbasierte Struktur, nicht-konvex)
- "dbscan" (dichtebasiert; entdeckt Bänder, Noise möglich)
Falls sklearn nicht verfügbar, wird auf eine reine Numpy-KMeans-Variante zurückgefallen.
"""
feats = build_triangulation_features(df)
X = feats[["x","y","z"]].to_numpy(dtype=float)
algo = str(algo).lower()
# NEW: raw mode without clustering
if algo == "none":
feats["cluster"] = 0
labels = np.zeros(len(feats), dtype=int)
return feats, labels
labels: np.ndarray
if not _HAS_SKLEARN and algo != "kmeans":
# harte Fallback-Strategie
labels, _ = _kmeans_numpy(X, k=n_clusters, iters=150, seed=seed)
else:
if algo == "kmeans" or not _HAS_SKLEARN:
if _HAS_SKLEARN:
km = KMeans(n_clusters=n_clusters, random_state=seed, n_init=10)
labels = km.fit_predict(X)
else:
labels, _ = _kmeans_numpy(X, k=n_clusters, iters=150, seed=seed)
elif algo == "gmm" and _HAS_SKLEARN_EXTRA:
gmm = GaussianMixture(n_components=n_clusters, random_state=seed)
labels = gmm.fit_predict(X)
elif algo == "agglomerative" and _HAS_SKLEARN_EXTRA:
agg = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward")
labels = agg.fit_predict(X)
elif algo == "spectral" and _HAS_SKLEARN_EXTRA:
k = spectral_k if spectral_k is not None else n_clusters
spec = SpectralClustering(n_clusters=k, affinity="rbf", random_state=seed, assign_labels="kmeans")
labels = spec.fit_predict(X)
elif algo == "dbscan" and _HAS_SKLEARN_EXTRA:
db = DBSCAN(eps=dbscan_eps, min_samples=dbscan_min_samples)
labels = db.fit_predict(X)
# DBSCAN kann -1 für Noise ergeben → auf fortlaufende IDs mappen
uniq = np.unique(labels)
mapping = {v: i for i, v in enumerate(uniq)}
labels = np.vectorize(mapping.get)(labels)
else:
# Sicherheitsnetz
if _HAS_SKLEARN:
km = KMeans(n_clusters=n_clusters, random_state=seed, n_init=10)
labels = km.fit_predict(X)
else:
labels, _ = _kmeans_numpy(X, k=n_clusters, iters=150, seed=seed)
feats["cluster"] = labels.astype(int)
return feats, labels
# -----------------------------------------
# Top-Listen (positiv/negativ)
# -----------------------------------------
def top_extremes(df: pd.DataFrame, n: int = 15) -> dict:
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
data = data.dropna(subset=["Effektstärke"]) # Sicherheit
pos = data.sort_values("Effektstärke", ascending=False).head(n)
neg = data.sort_values("Effektstärke", ascending=True).head(n)
# Konsole
print(f"\nTop +{n} (positiv):")
for _, r in pos.iterrows():
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
print(f"\nTop -{n} (negativ):")
for _, r in neg.iterrows():
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
return {
"top_positive": pos[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Effektstärke","Systemebene"]].to_dict(orient="records"),
"top_negative": neg[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Effektstärke","Systemebene"]].to_dict(orient="records"),
}
# -----------------------------------------
# Netzwerk bauen
# -----------------------------------------
def build_bipartite_graph(
df: pd.DataFrame,
min_abs_d: float = 0.00,
kapitel_filter: list[int] | None = None,
subkapitel_filter: list[str] | None = None,
) -> nx.Graph:
data = df.copy()
# Filter
if kapitel_filter:
data = data[data["Kapitel"].isin(kapitel_filter)]
if subkapitel_filter:
data = data[data["Subkapitel"].isin(subkapitel_filter)]
if min_abs_d > 0:
data = data[data["Effektstärke"].abs() >= float(min_abs_d)]
# Nur gültige Systemebenen
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
G = nx.Graph()
# Systemknoten (part A)
systems = sorted(data["Systemebene"].str.lower().unique().tolist())
for s in systems:
G.add_node(
f"system::{s}",
bipartite="system",
label=s.capitalize(),
typ="System",
)
# Itemknoten + Kanten (part B)
for _, r in data.iterrows():
sys_key = f"system::{str(r['Systemebene']).lower()}"
item_key = f"item::{r['Thermometer_ID']}"
# Item node
G.add_node(
item_key,
bipartite="item",
label=str(r["Stichwort"]),
id=str(r["Thermometer_ID"]),
d=float(r["Effektstärke"]),
kapitelname=str(r["Kapitelname"]),
subkapitel=str(r["Subkapitel"]),
)
# Edge: Gewicht = Effektstärke (Vorzeichen beibehalten)
G.add_edge(
sys_key, item_key,
weight=float(r["Effektstärke"]),
sign="pos" if r["Effektstärke"] >= 0 else "neg"
)
return G
# -----------------------------------------
# Item-Projektion (bipartit -> Item-Item) + Communities
# -----------------------------------------
from networkx.algorithms import community as nx_comm
def build_item_projection(G: nx.Graph) -> tuple[nx.Graph, dict[str,int], list[set]]:
"""Projiziert das bipartite Netz auf die Item-Seite. Zwei Items werden verbunden,
wenn sie dasselbe System teilen. Kanten-Gewicht = min(|w_i|, |w_j|).
Liefert das Item-Graph, ein Mapping node->community_id und die Community-Mengen.
"""
# Item- und System-Knoten bestimmen
items = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
systems = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
# Zuordnung: System -> Liste (item, |weight|)
sys_to_items: dict[str, list[tuple[str,float]]] = {}
for s in systems:
sys_to_items[s] = []
for u, v, d in G.edges(data=True):
if u in systems and v in items:
sys_to_items[u].append((v, abs(float(d.get("weight",0.0)))))
elif v in systems and u in items:
sys_to_items[v].append((u, abs(float(d.get("weight",0.0)))))
# Item-Graph aufbauen
Gi = nx.Graph()
for it in items:
nd = G.nodes[it]
Gi.add_node(it, **nd)
for s, lst in sys_to_items.items():
# Alle Paare innerhalb desselben Systems verbinden
for i in range(len(lst)):
for j in range(i+1, len(lst)):
a, wa = lst[i]
b, wb = lst[j]
w = min(wa, wb)
if Gi.has_edge(a,b):
Gi[a][b]["weight"] += w
else:
Gi.add_edge(a, b, weight=w)
if Gi.number_of_edges() == 0:
return Gi, {}, []
# Communities (gewichtete Modularity, Greedy)
coms = nx_comm.greedy_modularity_communities(Gi, weight="weight")
node2com: dict[str,int] = {}
for cid, members in enumerate(coms):
for n in members:
node2com[n] = cid
return Gi, node2com, [set(c) for c in coms]
# -----------------------------------------
# Kopplungsindizes (Needs × Brücken)
# -----------------------------------------
def _norm01(series: pd.Series) -> pd.Series:
arr = pd.to_numeric(series, errors="coerce").astype(float)
mn, mx = np.nanmin(arr), np.nanmax(arr)
if not np.isfinite(mn) or not np.isfinite(mx) or mx <= mn:
return pd.Series(np.zeros(len(arr)), index=series.index)
return (arr - mn) / (mx - mn)
def compute_and_export_coupling_indices(df: pd.DataFrame, G: nx.Graph) -> dict:
"""
Berechnet Kopplungsindizes je Item und aggregiert nach Young/RoedigerBedürfnissen.
Definition:
- |d|_norm: minmax normierte Effektmasse pro Item
- bc_norm: betweenness centrality im ItemProjektnetz (Gewicht = 1/edge_weight)
- coupling_index = |d|_norm * bc_norm (betont hohe Wirkung + Brückenlage)
Export:
- CSV "coupling_per_item.csv"
- CSV "coupling_per_need.csv"
Rückgabe: dict mit NeedSummaries (für JSON/Log).
"""
# Nur gültige Items
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
data = data.dropna(subset=["Effektstärke"]).copy()
# ItemProjektion (für Brückenmetriken)
Gi, node2com, _ = build_item_projection(G)
# Betweenness zentralität (gewichtete, auf [0,1] normiert)
bc = {n: 0.0 for n in Gi.nodes()}
if Gi.number_of_edges() > 0:
H = Gi.copy()
# Netzwerkx interpretiert 'weight' als Distanz -> wir wollen hohe Gewichte ~ kurze Distanzen: length = 1/(w+eps)
eps = 1e-9
for u, v, d in H.edges(data=True):
w = float(d.get("weight", 0.0))
d["length"] = 1.0 / max(eps, w)
bc = nx.betweenness_centrality(H, weight="length", normalized=True)
# Auf DataFrame mappen
item_keys = [f"item::{tid}" for tid in data["Thermometer_ID"].astype(str)]
betw_vals = [bc.get(k, 0.0) for k in item_keys]
# |d| normiert
data["abs_d"] = data["Effektstärke"].abs()
data["abs_d_norm"] = _norm01(data["abs_d"]) # 0..1
data["bc_norm"] = pd.Series(betw_vals, index=data.index)
data["coupling_index"] = data["abs_d_norm"] * data["bc_norm"]
# Aggregation nach Need
need_col = OPTIONAL_NEED_COL if OPTIONAL_NEED_COL in data.columns else None
if need_col is None:
data[OPTIONAL_NEED_COL] = np.nan
need_col = OPTIONAL_NEED_COL
grp = data.groupby(need_col, dropna=False)
summary = grp.agg(
n_items=("Thermometer_ID", "count"),
energy_sum=("abs_d", "sum"),
energy_mean=("abs_d", "mean"),
coupling_sum=("coupling_index", "sum"),
coupling_mean=("coupling_index", "mean"),
bc_mean=("bc_norm", "mean")
).reset_index().rename(columns={need_col: "Beduerfnis"})
# Exporte
per_item_cols = [
"Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Systemebene",
"Effektstärke", OPTIONAL_NEED_COL, "abs_d","abs_d_norm","bc_norm","coupling_index"
]
per_item = data[per_item_cols].copy()
try:
per_item.to_csv(os.path.join(EXPORT_DIR, "coupling_per_item.csv"), index=False, encoding="utf-8")
summary.to_csv(os.path.join(EXPORT_DIR, "coupling_per_need.csv"), index=False, encoding="utf-8")
except Exception:
pass
# In dict für JSON/Console
need_dict = summary.to_dict(orient="records")
# Kurze Log-Ausgabe
print("\nKOPPLUNGSINDEX Aggregat je Bedürfnis (Young/Roediger):")
for row in need_dict:
print(
f" {row.get('Beduerfnis', '')}: n={row['n_items']}, E_sum={row['energy_sum']:.2f}, "
f"CI_mean={row['coupling_mean']:.3f}, BC_mean={row['bc_mean']:.3f}"
)
return {
"per_need": need_dict,
"per_item_csv": "coupling_per_item.csv",
"per_need_csv": "coupling_per_need.csv"
}
def plot_item_projection(Gi: nx.Graph, node2com: dict[str,int], title: str = "Item-Projektion (Communities)"):
if Gi.number_of_nodes() == 0:
print("Hinweis: Item-Projektion leer (zu wenig Überlappung).")
return
pos = nx.spring_layout(Gi, seed=42, weight="weight")
# Communities zu Traces gruppieren
com_to_nodes: dict[int, list[str]] = {}
for n in Gi.nodes():
cid = node2com.get(n, -1)
com_to_nodes.setdefault(cid, []).append(n)
traces = []
# Farb-/Markerstile aus CI (zyklisch)
style_keys = [
"marker_accent", "marker_brightArea", "marker_depthArea",
"marker_positiveHighlight", "marker_negativeHighlight",
"marker_primaryLine", "marker_secondaryLine"
]
keys_cycle = style_keys * 10
for idx, (cid, nodes) in enumerate(sorted(com_to_nodes.items(), key=lambda t: t[0])):
xs = [pos[n][0] for n in nodes]
ys = [pos[n][1] for n in nodes]
htxt = []
for n in nodes:
nd = Gi.nodes[n]
htxt.append(
"Thermometer: " + str(nd.get("id","")) +
"<br>Stichwort: " + str(nd.get("label","")) +
"<br>Kapitel: " + str(nd.get("kapitelname","")) +
"<br>Subkapitel: " + str(nd.get("subkapitel","")) +
"<br>d: " + f"{nd.get('d',np.nan):.2f}"
)
mk = _styles.get(keys_cycle[idx], dict(size=8))
traces.append(go.Scatter(
x=xs, y=ys, mode="markers+text" if _SHOW_COMMUNITY_LABELS else "markers",
marker={**mk, "size": 9},
text=[str(node2com.get(n, -1)) if _SHOW_COMMUNITY_LABELS else None for n in nodes],
textposition="top center",
hovertext=htxt,
hovertemplate="%{hovertext}<extra></extra>",
name=f"Community {cid}"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_xaxes(title_text="Semantische Position X (Projektion)", showticklabels=False, showgrid=False, zeroline=False)
fig.update_yaxes(title_text="Semantische Position Y (Projektion)", showticklabels=False, showgrid=False, zeroline=False)
fig.show()
export_figure(fig, "vl-network-item-projection")
# -----------------------------------------
# Layout & Visualisierung (Plotly)
# -----------------------------------------
def _edge_segments(G: nx.Graph, pos: dict[str, tuple[float, float]], sign: str | None = None):
"""Erzeugt x,y-Koordinaten-Listen für Liniensegmente (mit None-Trennern). Optional nach Vorzeichen filtern."""
xs, ys = [], []
for u, v, d in G.edges(data=True):
if sign and d.get("sign") != sign:
continue
x0, y0 = pos[u]
x1, y1 = pos[v]
xs += [x0, x1, None]
ys += [y0, y1, None]
return xs, ys
def plot_network(G: nx.Graph, title: str = "Netzwerk: Systemebenen × Thermometer", seed: int = 42):
# Spring-Layout (reproduzierbar über seed)
pos = nx.spring_layout(G, seed=seed, k=None, weight="weight")
# Knoten nach Typ trennen
system_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
item_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
# Edges (pos/neg) als eigene Traces (Linienstile aus CI)
x_pos, y_pos = _edge_segments(G, pos, sign="pos")
x_neg, y_neg = _edge_segments(G, pos, sign="neg")
line_positive = _styles.get("linie_positiveHighlight", dict(width=1))
line_negative = _styles.get("linie_negativeHighlight", dict(width=1))
edge_pos = go.Scatter(
x=x_pos, y=y_pos,
mode="lines",
line=line_positive,
hoverinfo="skip",
showlegend=True,
name="Kanten (d ≥ 0)"
)
edge_neg = go.Scatter(
x=x_neg, y=y_neg,
mode="lines",
line=line_negative,
hoverinfo="skip",
showlegend=True,
name="Kanten (d < 0)"
)
# System-Knoten: Marker aus CI (z. B. accent)
sys_marker = _styles.get("marker_primaryLine", dict(size=18))
sys_x = [pos[n][0] for n in system_nodes]
sys_y = [pos[n][1] for n in system_nodes]
sys_text = [G.nodes[n].get("label", n) for n in system_nodes]
sys_hover = [f"Systemebene: {G.nodes[n].get('label','')}" for n in system_nodes]
systems_trace = go.Scatter(
x=sys_x, y=sys_y, mode="markers",
marker={**sys_marker, "size": 18},
text=sys_text,
hovertext=sys_hover,
hovertemplate="%{hovertext}<extra></extra>",
name="System"
)
# Item-Knoten: Marker aus CI (z. B. brightArea); Größe ~ |degree_weight|
item_marker = _styles.get("marker_secondaryLine", dict(size=10))
it_x = [pos[n][0] for n in item_nodes]
it_y = [pos[n][1] for n in item_nodes]
# Gewichtete Degree als Größe
wdeg = []
htxt = []
for n in item_nodes:
dsum = 0.0
for nbr in G[n]:
dsum += abs(G[n][nbr].get("weight", 0.0))
wdeg.append(dsum)
nd = G.nodes[n]
htxt.append(
"Thermometer: "
+ str(nd.get("id",""))
+ "<br>Stichwort: "
+ str(nd.get("label",""))
+ "<br>Kapitel: "
+ str(nd.get("kapitelname",""))
+ "<br>Subkapitel: "
+ str(nd.get("subkapitel",""))
+ "<br>d: "
+ f"{nd.get('d',np.nan):.2f}"
)
# Größen skalieren
wdeg = np.asarray(wdeg, dtype=float)
if wdeg.size and np.nanmax(wdeg) > 0:
sizes = 8 + 12 * (wdeg / np.nanmax(wdeg))
else:
sizes = np.full_like(wdeg, 10)
items_trace = go.Scatter(
x=it_x, y=it_y, mode="markers",
marker={**item_marker, "size": sizes},
hovertext=htxt,
hovertemplate="%{hovertext}<extra></extra>",
name="Thermometer"
)
fig = go.Figure(data=[edge_pos, edge_neg, systems_trace, items_trace])
# CI-Layout und inhaltliche Achsentitel (2D: Semantische Position aus Layout)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_xaxes(
title_text="Semantische Position X (Layout)",
showticklabels=False, showgrid=False, zeroline=False
)
fig.update_yaxes(
title_text="Semantische Position Y (Layout)",
showticklabels=False, showgrid=False, zeroline=False
)
fig.show()
export_figure(fig, "vl-network")
def _edge_segments_3d(G: nx.Graph, pos_xy: dict[str, tuple[float, float]], z_map: dict[str, float], sign: str | None = None):
"""Erzeugt x,y,z-Koordinaten-Listen für 3D-Liniensegmente (mit None-Trennern). Optional nach Vorzeichen filtern."""
xs, ys, zs = [], [], []
for u, v, d in G.edges(data=True):
if sign and d.get("sign") != sign:
continue
x0, y0 = pos_xy[u]
x1, y1 = pos_xy[v]
z0 = float(z_map.get(u, 0.0))
z1 = float(z_map.get(v, 0.0))
xs += [x0, x1, None]
ys += [y0, y1, None]
zs += [z0, z1, None]
return xs, ys, zs
def plot_network_3d(G: nx.Graph, z_mode: str = "effekt", title: str = "3D: Systemebenen × Thermometer", seed: int = 42):
"""
Semantische 3D-Ansicht:
- z_mode = "effekt": z = Effektstärke (Items), Systeme z=0
- z_mode = "kapitel": z = Kapitelnummer (Items), Systeme unterhalb der Items (min_z - 0.5)
- z_mode = "system": z = 0 (psychisch), 1 (sozial), Items = Mittelwert ihrer Systemnachbarn
x/y stammen aus einem 2D-Spring-Layout (stabile, gut lesbare Projektion), z ist semantisch belegt.
"""
styles = _styles
colors = _colors
# 2D-Layout für X/Y (stabile Projektion)
pos_xy = nx.spring_layout(G, seed=seed, k=None, weight="weight", dim=2)
# Z-Koordinaten je Knoten ermitteln
z_map: dict[str, float] = {}
if z_mode == "effekt":
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
z_map[n] = float(d.get("d", 0.0))
else:
z_map[n] = 0.0
elif z_mode == "kapitel":
item_z_vals = []
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
try:
# Kapitelnummer aus Kapitelname kann alphanumerisch sein; wir nutzen, wenn vorhanden, numerische "Kapitel"
# Falls keine numerische Kapitelspalte existiert, wird 0 gesetzt.
kap = d.get("kapitelname", "")
# Fallback: im Nodes-Attribut existiert keine numerische Kapitelnummer; daher 0
z_map[n] = float(d.get("kapitel", 0.0)) if "kapitel" in d else 0.0
except Exception:
z_map[n] = 0.0
item_z_vals.append(z_map[n])
min_z = min(item_z_vals) if item_z_vals else 0.0
for n, d in G.nodes(data=True):
if d.get("bipartite") == "system":
z_map[n] = float(min_z) - 0.5
elif z_mode == "system":
# Systeme klar trennen
for n, d in G.nodes(data=True):
if d.get("bipartite") == "system":
lbl = str(d.get("label", "")).strip().lower()
z_map[n] = 0.0 if "psych" in lbl else 1.0
# Items: Mittelwert der z-Werte ihrer System-Nachbarn (im bipartiten Graphen genau einer)
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
zs = []
for nbr in G[n]:
zs.append(z_map.get(nbr, 0.0))
z_map[n] = float(np.mean(zs)) if zs else 0.0
else:
# Unbekannter Modus -> alle 0
z_map = {n: 0.0 for n in G.nodes()}
# Knotenlisten
system_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
item_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
# Kanten (pos/neg) vorbereiten
x_pos, y_pos, z_pos = _edge_segments_3d(G, pos_xy, z_map, sign="pos")
x_neg, y_neg, z_neg = _edge_segments_3d(G, pos_xy, z_map, sign="neg")
line_positive = styles.get("linie_positiveHighlight", dict(width=1))
line_negative = styles.get("linie_negativeHighlight", dict(width=1))
edge_pos = go.Scatter3d(
x=x_pos, y=y_pos, z=z_pos,
mode="lines",
line=line_positive,
hoverinfo="skip",
showlegend=True,
name="Kanten (d ≥ 0)"
)
edge_neg = go.Scatter3d(
x=x_neg, y=y_neg, z=z_neg,
mode="lines",
line=line_negative,
hoverinfo="skip",
showlegend=True,
name="Kanten (d &lt; 0)"
)
# System-Knoten
sys_marker = styles.get("marker_primaryLine", dict(size=18))
sys_x = [pos_xy[n][0] for n in system_nodes]
sys_y = [pos_xy[n][1] for n in system_nodes]
sys_z = [z_map[n] for n in system_nodes]
sys_text = [G.nodes[n].get("label", n) for n in system_nodes]
sys_hover = [f"Systemebene: {G.nodes[n].get('label','')}" for n in system_nodes]
systems_trace = go.Scatter3d(
x=sys_x, y=sys_y, z=sys_z, mode="markers",
marker={**sys_marker, "size": 10},
text=sys_text,
hovertext=sys_hover,
hovertemplate="%{hovertext}<extra></extra>",
name="System"
)
# Item-Knoten: Thermometer im Sekundärstil (gleiches Marker-Design für +/-); Kanten behalten Vorzeichenfarben
pos_marker = styles.get("marker_secondaryLine", dict(size=6))
neg_marker = styles.get("marker_secondaryLine", dict(size=6))
pos_x, pos_y, pos_z, pos_hover = [], [], [], []
neg_x, neg_y, neg_z, neg_hover = [], [], [], []
for n in item_nodes:
x, y = pos_xy[n]
z = z_map[n]
nd = G.nodes[n]
hover = (
"Thermometer: " + str(nd.get("id","")) +
"<br>Stichwort: " + str(nd.get("label","")) +
"<br>Kapitel: " + str(nd.get("kapitelname","")) +
"<br>Subkapitel: " + str(nd.get("subkapitel","")) +
"<br>d: " + f"{nd.get('d',np.nan):.2f}"
)
if float(nd.get("d", 0.0)) >= 0:
pos_x.append(x); pos_y.append(y); pos_z.append(z); pos_hover.append(hover)
else:
neg_x.append(x); neg_y.append(y); neg_z.append(z); neg_hover.append(hover)
items_pos_trace = go.Scatter3d(
x=pos_x, y=pos_y, z=pos_z, mode="markers",
marker=pos_marker,
hovertext=pos_hover,
hovertemplate="%{hovertext}<extra></extra>",
name="Thermometer (d ≥ 0)"
)
items_neg_trace = go.Scatter3d(
x=neg_x, y=neg_y, z=neg_z, mode="markers",
marker=neg_marker,
hovertext=neg_hover,
hovertemplate="%{hovertext}<extra></extra>",
name="Thermometer (d &lt; 0)"
)
fig = go.Figure(data=[edge_pos, edge_neg, systems_trace, items_pos_trace, items_neg_trace])
fig.update_layout(_ci_layout(f"{title} z: {z_mode}"), autosize=True, height=None, width=None)
# Achsentitel mit inhaltlicher Bedeutung setzen
z_title = _Z_AXIS_LABELS.get(z_mode, "Z")
fig.update_scenes(
xaxis=dict(
title="Semantische Position X (Layout)",
showticklabels=False, showgrid=False, zeroline=False
),
yaxis=dict(
title="Semantische Position Y (Layout)",
showticklabels=False, showgrid=False, zeroline=False
),
zaxis=dict(
title=z_title,
showticklabels=False, showgrid=False, zeroline=False
),
)
fig.show()
export_figure(fig, f"vl-network-3d-{z_mode}")
# -----------------------------------------
# Triangulation: 3D-Plot (Kapitel × System × Effekt)
# -----------------------------------------
def plot_triangulation_3d(feats: pd.DataFrame, title: str = "Triangulation (3D): Kapitel × System × Effekt"):
if feats.empty:
print("Hinweis: Keine Daten für Triangulation.")
return
# Farben/Marker pro Cluster (aus CI stilistisch zyklisch)
style_cycle = [
_styles.get("marker_accent", {"size": 6}),
_styles.get("marker_brightArea", {"size": 6}),
_styles.get("marker_depthArea", {"size": 6}),
_styles.get("marker_primaryLine", {"size": 6}),
_styles.get("marker_secondaryLine", {"size": 6}),
_styles.get("marker_positiveHighlight", {"size": 6}),
_styles.get("marker_negativeHighlight", {"size": 6}),
]
traces = []
for cid, group in feats.groupby("cluster"):
mk = style_cycle[cid % len(style_cycle)].copy()
mk.setdefault("size", 6)
hover = (
"Thermometer: " + group["Thermometer_ID"].astype(str) +
"<br>Stichwort: " + group["Stichwort"].astype(str) +
"<br>Kapitel: " + group["Kapitelname"].astype(str) +
"<br>Subkapitel: " + group["Subkapitel"].astype(str) +
"<br>System: " + group["Systemebene"].astype(str) +
"<br>d: " + group["Effektstärke"].map(lambda v: f"{float(v):.2f}")
)
traces.append(go.Scatter3d(
x=group["x"], y=group["y"], z=group["z"],
mode="markers",
marker=mk,
hovertext=hover,
hovertemplate="%{hovertext}<extra></extra>",
name=f"Cluster {cid} (n={len(group)})"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_scenes(
xaxis=dict(title="Kapitel (minmax normiert)", showgrid=False, showticklabels=False),
yaxis=dict(title="Systemebene (0=Psychisch, 1=Sozial)", showgrid=False, showticklabels=True, tickvals=[0,1], ticktext=["Psych","Sozial"]),
zaxis=dict(title="Effektstärke (normiert, Vorzeichen beibehalten)", showgrid=False, showticklabels=False),
)
fig.show()
export_figure(fig, "vl-triangulation-3d")
# -----------------------------------------
# Einfache 3-Achsen-Projektion ohne Clustering
# -----------------------------------------
def plot_triangulation_3d_simple(feats: pd.DataFrame, title: str = "Triangulation (3D): Kapitel × System × Effekt (ohne Cluster)"):
"""
Einfache 3-Achsen-Projektion ohne Clustering.
Achsen:
x = Kapitel (minmax normiert)
y = Systemebene (0=Psychisch, 1=Sozial)
z = Effektstärke (normiert, Vorzeichen beibehalten)
Farbgebung: grün = d ≥ 0, rot = d &lt; 0 (CI-konform).
"""
if feats.empty:
print("Hinweis: Keine Daten für Triangulation (simple).")
return
pos = feats[feats["Effektstärke"] >= 0]
neg = feats[feats["Effektstärke"] < 0]
pos_marker = _styles.get("marker_positiveHighlight", {"size": 6})
neg_marker = _styles.get("marker_negativeHighlight", {"size": 6})
def _hover(group: pd.DataFrame) -> pd.Series:
return (
"Thermometer: " + group["Thermometer_ID"].astype(str) +
"<br>Stichwort: " + group["Stichwort"].astype(str) +
"<br>Kapitel: " + group["Kapitelname"].astype(str) +
"<br>Subkapitel: " + group["Subkapitel"].astype(str) +
"<br>System: " + group["Systemebene"].astype(str) +
"<br>d: " + group["Effektstärke"].map(lambda v: f"{float(v):.2f}")
)
traces = []
if len(pos):
traces.append(go.Scatter3d(
x=pos["x"], y=pos["y"], z=pos["z"],
mode="markers",
marker=pos_marker,
hovertext=_hover(pos),
hovertemplate="%{hovertext}<extra></extra>",
name="d ≥ 0"
))
if len(neg):
traces.append(go.Scatter3d(
x=neg["x"], y=neg["y"], z=neg["z"],
mode="markers",
marker=neg_marker,
hovertext=_hover(neg),
hovertemplate="%{hovertext}<extra></extra>",
name="d &lt; 0"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_scenes(
xaxis=dict(title="Kapitel (minmax normiert)", showgrid=False, showticklabels=False),
yaxis=dict(title="Systemebene (0=Psychisch, 1=Sozial)", showgrid=False, showticklabels=True, tickvals=[0,1], ticktext=["Psych","Sozial"]),
zaxis=dict(title="Effektstärke (normiert, Vorzeichen beibehalten)", showgrid=False, showticklabels=False),
)
fig.show()
export_figure(fig, "vl-triangulation-3d-simple")
def summarize_triangulation(feats: pd.DataFrame, top_n: int = 5) -> dict:
out = {}
for cid, g in feats.groupby("cluster"):
g = g.copy()
g["abs_d"] = g["Effektstärke"].abs()
top_pos = g.sort_values("Effektstärke", ascending=False).head(top_n)
top_neg = g.sort_values("Effektstärke", ascending=True).head(top_n)
out[int(cid)] = {
"n": int(len(g)),
"mean_d": float(g["Effektstärke"].mean()),
"top_positive": top_pos[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Systemebene","Effektstärke"]].to_dict(orient="records"),
"top_negative": top_neg[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Systemebene","Effektstärke"]].to_dict(orient="records"),
}
return out
# -----------------------------------------
# Einfache Metriken & Export
# -----------------------------------------
def summarize_network(G: nx.Graph) -> dict:
# weighted degree je Knoten
wdeg = {}
for n in G.nodes():
s = 0.0
for nbr in G[n]:
s += abs(G[n][nbr].get("weight", 0.0))
wdeg[n] = float(s)
# Top-Items nach gewichteter Degree
items = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
items_sorted = sorted(items, key=lambda t: t[1], reverse=True)[:15]
top_items = []
for n, val in items_sorted:
nd = G.nodes[n]
top_items.append({
"Thermometer_ID": nd.get("id"),
"Stichwort": nd.get("label"),
"Kapitelname": nd.get("kapitelname"),
"Subkapitel": nd.get("subkapitel"),
"Effektstärke": nd.get("d"),
"weighted_degree_abs": val
})
# Systemseiten-Summe
systems = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
system_summary = {G.nodes[n].get("label", n): float(val) for n, val in systems}
return {"top_items_by_weighted_degree": top_items, "system_weight_sums": system_summary}
# -----------------------------------------
# Thermo-Dashboard: Energie/Entropie/Ordnung/Kopplung
# -----------------------------------------
def _normalized_entropy(weights: list[float]) -> float:
"""Normalisierte Shannon-Entropie S in [0,1] über eine Gewichtsliste."""
arr = np.asarray(weights, dtype=float)
arr = arr[np.isfinite(arr) & (arr >= 0)]
if arr.size == 0:
return 0.0
total = arr.sum()
if total <= 0:
return 0.0
p = arr / total
# numerisch stabil
p = p[p > 0]
S = -np.sum(p * np.log(p))
Smax = np.log(len(p)) if len(p) > 0 else 1.0
return float(S / Smax) if Smax > 0 else 0.0
def compute_thermo_dashboard(df: pd.DataFrame, G: nx.Graph) -> dict:
"""
Operationalisiert thermodynamische Analogien auf Informations-/Wirksamkeitsmaßen:
- Budget: E_total, E_psych, E_soz
- Nutzanteile: E_pos, E_neg, η_pos, η_net
- Entropie/Ordnung: S/O über Verteilung der |d| auf Subkapitel und Kapitel
- Modularity (Kopplung/Abgrenzung) der Partition {psychisch, sozial} im bipartiten Netz
Liefert ein dict und druckt eine kompakte Textzusammenfassung.
"""
# --- Budget ---
d = df["Effektstärke"].astype(float)
E_total = float(np.abs(d).sum())
E_pos = float(np.clip(d, 0, None).sum())
E_neg = float(np.clip(-d, 0, None).sum()) # Beträge negativer d
eta_pos = float(E_pos / (E_pos + E_neg)) if (E_pos + E_neg) > 0 else 0.0
eta_net = float((E_pos - E_neg) / (E_pos + E_neg)) if (E_pos + E_neg) > 0 else 0.0
# Budget nach System
df_sys = df[df["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])].copy()
E_psych = float(np.abs(df_sys.loc[df_sys["Systemebene"].str.lower()=="psychisch","Effektstärke"]).sum())
E_soz = float(np.abs(df_sys.loc[df_sys["Systemebene"].str.lower()=="sozial","Effektstärke"]).sum())
# --- Entropie/Ordnung (Subkapitel, Kapitelname) ---
def _weights_by(col: str) -> list[float]:
grp = df_sys.groupby(col)["Effektstärke"].apply(lambda s: float(np.abs(s).sum()))
return [v for v in grp.values if np.isfinite(v) and v >= 0]
S_sub = _normalized_entropy(_weights_by("Subkapitel"))
S_kap = _normalized_entropy(_weights_by("Kapitelname"))
O_sub = float(1.0 - S_sub)
O_kap = float(1.0 - S_kap)
# --- Modularity (Community-Trennung psychisch/sozial) ---
# Partition: jedes Item in Community seiner Systemebene; Systemknoten ebenfalls.
parts: dict[int, set[str]] = {0: set(), 1: set()}
for n, data in G.nodes(data=True):
if data.get("bipartite") == "system":
lbl = str(data.get("label","")).strip().lower()
parts[0 if "psych" in lbl else 1].add(n)
elif data.get("bipartite") == "item":
# finde System-Nachbarn (bipartit: genau einer)
sys_lbls = [G.nodes[nbr].get("label","").strip().lower() for nbr in G[n]]
if any("psych" in s for s in sys_lbls):
parts[0].add(n)
else:
parts[1].add(n)
partition = [parts[0], parts[1]]
# Modularity mit Kantengewicht = |d|
H = G.copy()
for u, v, dd in H.edges(data=True):
dd["weight"] = abs(float(dd.get("weight", 0.0)))
try:
Q_mod = float(nx_comm.modularity(H, partition, weight="weight"))
except Exception:
Q_mod = float("nan")
# --- Ausgabe ---
print("\nTHERMO-DASHBOARD")
print(f" Budget: E_total={E_total:.2f} | E_psych={E_psych:.2f} | E_sozial={E_soz:.2f}")
print(f" Nutzanteile: E_pos={E_pos:.2f} | E_neg={E_neg:.2f} | η_pos={eta_pos:.2f} | η_net={eta_net:.2f}")
print(f" Entropie/Ordnung (Subkapitel): S={S_sub:.2f} | O={O_sub:.2f}")
print(f" Entropie/Ordnung (Kapitel): S={S_kap:.2f} | O={O_kap:.2f}")
print(f" Modularity (psychisch/sozial): Q={Q_mod:.3f}")
return {
"budget": {
"E_total": E_total, "E_psych": E_psych, "E_sozial": E_soz,
"E_pos": E_pos, "E_neg": E_neg, "eta_pos": eta_pos, "eta_net": eta_net
},
"entropy": {
"S_subkapitel": S_sub, "O_subkapitel": O_sub,
"S_kapitel": S_kap, "O_kapitel": O_kap
},
"modularity": {"Q_psych_sozial": Q_mod}
}
# -----------------------------------------
# Hilfsfunktion: HEX zu RGBA (Plotly)
# -----------------------------------------
def hex_to_rgba(hex_color: str, alpha: float = 1.0) -> str:
"""
Wandelt eine HEX-Farbe (#RRGGBB oder #RGB) in einen Plotly-kompatiblen
rgba()-String um. alpha ∈ [0,1].
"""
if not isinstance(hex_color, str):
raise ValueError("hex_to_rgba: hex_color must be a string like '#RRGGBB'")
h = hex_color.lstrip("#")
if len(h) == 6:
r, g, b = int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16)
elif len(h) == 3:
r, g, b = int(h[0]*2, 16), int(h[1]*2, 16), int(h[2]*2, 16)
else:
# Fallback: versuche, bereits gültige CSS-Farbe durchzureichen
return hex_color
# Begrenze alpha defensiv
a = max(0.0, min(1.0, float(alpha)))
return f"rgba({r},{g},{b},{a})"
# -----------------------------------------
# Thermo-Dashboard Visualization (CI-styled)
# -----------------------------------------
def plot_thermo_dashboard(thermo: dict, title: str = "Thermo-Dashboard: Energie • Entropie • Modularität") -> None:
"""
Verfeinerte, CI-konforme Visualisierung in 2×2 Kacheln:
(A) Energie nach System (100%-Stack + absolute Werte)
(B) Positiv vs. Negativ (100%-Stack)
(C) Entropie/Ordnung (gruppiert, [0..1] mit Referenzlinien)
(D) Modularität Q als Gauge (0..1) mit Schwellenbändern)
Fokus: sehr gute Lesbarkeit (größere Fonts, keine Überlagerungen),
ausgewogene Panel-Größen, und eine klar abgegrenzte Gauge-Kachel.
"""
if not isinstance(thermo, dict) or not thermo:
print("Hinweis: Kein Thermo-Objekt für Dashboard übergeben.")
return
# Farben/Styles
colors = _colors if isinstance(_colors, dict) else {}
styles = _styles if isinstance(_styles, dict) else {}
# Fallback-Farben
c_psych = colors.get("positiveHighlight", "#2ca02c")
c_soz = colors.get("secondaryLine", "#ff7f0e")
c_pos = colors.get("positiveHighlight", "#2ca02c")
c_neg = colors.get("negativeHighlight", "#d62728")
c_S = colors.get("brightArea", "#66CCCC")
c_O = colors.get("depthArea", "#006666")
c_text = colors.get("text", "#333333")
c_bg = colors.get("background", "#ffffff")
# Kennzahlen
budget = thermo.get("budget", {})
entropy = thermo.get("entropy", {})
modular = thermo.get("modularity", {})
E_total = float(budget.get("E_total", 0.0))
E_psych = float(budget.get("E_psych", 0.0))
E_soz = float(budget.get("E_sozial", 0.0))
E_pos = float(budget.get("E_pos", 0.0))
E_neg = float(budget.get("E_neg", 0.0))
eta_pos = float(budget.get("eta_pos", 0.0))
eta_net = float(budget.get("eta_net", 0.0))
S_kap = float(entropy.get("S_kapitel", 0.0))
O_kap = float(entropy.get("O_kapitel", 0.0))
S_sub = float(entropy.get("S_subkapitel", 0.0))
O_sub = float(entropy.get("O_subkapitel", 0.0))
Q_mod = float(modular.get("Q_psych_sozial", float("nan")))
q_val = 0.0 if not (isinstance(Q_mod, float) and math.isfinite(Q_mod)) else Q_mod
# Subplot-Layout (Indicator benötigt Domain-Typ)
fig = make_subplots(
rows=2, cols=2,
specs=[[{"type": "xy"}, {"type": "xy"}],
[{"type": "xy"}, {"type": "domain"}]],
column_widths=[0.58, 0.42],
row_heights=[0.55, 0.45],
vertical_spacing=0.12,
horizontal_spacing=0.08,
subplot_titles=(
"Energiebilanz: psychisch vs. sozial",
"Positiv vs. negativ (Effektmasse)",
"Entropie / Ordnung",
"Modularität Q (psychisch/sozial)"
)
)
# ---------- (A) Energie nach System: 100%-Stack + Textlabels ----------
share_psych = (E_psych / E_total) if E_total > 0 else 0
share_soz = (E_soz / E_total) if E_total > 0 else 0
fig.add_trace(
go.Bar(
x=[share_psych*100], y=["Anteil (%)"], orientation="h",
marker=dict(color=c_psych, line=dict(width=0)),
text=[f"Psychisch {share_psych*100:.1f}%<br>(Σ={E_psych:.2f})"],
textposition="inside",
insidetextanchor="middle",
insidetextfont=dict(color="#ffffff", size=12),
texttemplate="%{text}",
name="Psychisch",
hovertemplate="Psychisch: %{x:.1f}% (Σ=%{customdata:.2f})<extra></extra>",
customdata=[E_psych],
showlegend=False,
cliponaxis=False
), row=1, col=1
)
fig.add_trace(
go.Bar(
x=[share_soz*100], y=["Anteil (%)"], orientation="h",
marker=dict(color=c_soz, line=dict(width=0)),
text=[f"Sozial {share_soz*100:.1f}%<br>(Σ={E_soz:.2f})"],
textposition="inside",
insidetextanchor="middle",
insidetextfont=dict(color="#ffffff", size=12),
texttemplate="%{text}",
name="Sozial",
hovertemplate="Sozial: %{x:.1f}% (Σ=%{customdata:.2f})<extra></extra>",
customdata=[E_soz],
showlegend=False,
cliponaxis=False
), row=1, col=1
)
fig.update_xaxes(range=[0,100], title_text="Energieanteil [% von Σ|d|]", row=1, col=1, showgrid=True, gridwidth=1)
fig.update_yaxes(title_text="System", row=1, col=1)
# KPI-Badge: Σ|d|
fig.add_annotation(
row=1, col=1,
xref="x1", yref="y1",
x=100, y=1,
text=f"Σ|d| = {E_total:.2f}",
showarrow=False, font=dict(color=c_text, size=12),
xanchor="right", yanchor="bottom",
bgcolor=hex_to_rgba(c_bg, 0.25)
)
# ---------- (B) Positiv vs. Negativ: 100%-Stack ----------
share_pos = (E_pos / (E_pos + E_neg)) if (E_pos + E_neg) > 0 else 0
share_neg = 1 - share_pos
fig.add_trace(
go.Bar(
x=[share_pos*100], y=["Anteil (%)"], orientation="h",
marker=dict(color=c_pos, line=dict(width=0)),
text=[f"Positiv {share_pos*100:.1f}%<br>(Σ={E_pos:.2f})"],
textposition="inside",
insidetextanchor="middle",
insidetextfont=dict(color="#ffffff", size=12),
texttemplate="%{text}",
name="Positiv",
hovertemplate="Positiv: %{x:.1f}% (Σ=%{customdata:.2f})<extra></extra>",
customdata=[E_pos],
showlegend=False,
cliponaxis=False
), row=1, col=2
)
fig.add_trace(
go.Bar(
x=[share_neg*100], y=["Anteil (%)"], orientation="h",
marker=dict(color=c_neg, line=dict(width=0)),
text=[f"Negativ {share_neg*100:.1f}%<br>(Σ={E_neg:.2f})"],
textposition="inside",
insidetextanchor="middle",
insidetextfont=dict(color="#ffffff", size=12),
texttemplate="%{text}",
name="Negativ",
hovertemplate="Negativ: %{x:.1f}% (Σ=%{customdata:.2f})<extra></extra>",
customdata=[E_neg],
showlegend=False,
cliponaxis=False
), row=1, col=2
)
fig.update_xaxes(range=[0,100], title_text="Effektmasse [%]", row=1, col=2, showgrid=True, gridwidth=1)
fig.update_yaxes(title_text="Vorzeichen", row=1, col=2)
# η_net als Textbadge (außerhalb der Bars, damit nichts verdeckt)
fig.add_annotation(
row=1, col=2,
xref="x2 domain", yref="y2 domain",
x=0.98, y=0.02,
text=f"η_net = {eta_net:.2f}",
showarrow=False, font=dict(color=c_text, size=12),
xanchor="right", yanchor="bottom",
bgcolor=hex_to_rgba(c_bg, 0.25)
)
# ---------- (C) Entropie/Ordnung ----------
fig.add_trace(
go.Bar(x=["Kapitel S","Subkapitel S"], y=[S_kap, S_sub], marker=dict(color=c_S, line=dict(width=0)), name="Entropie S", showlegend=False),
row=2, col=1
)
fig.add_trace(
go.Bar(x=["Kapitel O","Subkapitel O"], y=[O_kap, O_sub], marker=dict(color=c_O, line=dict(width=0)), name="Ordnung O", showlegend=False),
row=2, col=1
)
fig.update_yaxes(range=[0,1], tick0=0, dtick=0.2, title_text="Wert [0..1]", row=2, col=1)
fig.update_xaxes(title_text="Maß", row=2, col=1)
# Referenzlinien (Faustwerte für niedrig/mittel/hoch)
fig.add_hline(y=0.33, line_width=1, line_dash="dot", row=2, col=1)
fig.add_hline(y=0.66, line_width=1, line_dash="dot", row=2, col=1)
# ---------- (D) Modularität als Gauge ----------
fig.add_trace(
go.Indicator(
mode="gauge+number+delta",
value=q_val,
delta={'reference': 0.5, 'position': "top", 'increasing': {'color': c_pos}, 'decreasing': {'color': c_neg}},
gauge=dict(
shape="angular",
axis=dict(range=[0,1], tick0=0, dtick=0.2, ticks="outside",
tickfont=dict(size=11), tickwidth=1, ticklen=4),
bar=dict(color=c_psych, thickness=0.25),
steps=[
dict(range=[0,0.3], color=hex_to_rgba(c_O, 0.25)),
dict(range=[0.3,0.5], color=hex_to_rgba(c_S, 0.35)),
dict(range=[0.5,1.0], color=hex_to_rgba(c_pos, 0.25)),
],
threshold=dict(line=dict(color=c_text, width=2), thickness=0.75, value=q_val)
),
number=dict(suffix=" Q", font=dict(size=28, color=c_text)),
title={"text": "Q (psychisch/sozial)", "font": {"size": 12, "color": c_text}}
), row=2, col=2
)
# Layout (CI) größere Titel, einheitliche Fonts, keine Überlagerungen
layout_base = _ci_layout(title)
fig.update_layout(
layout_base,
barmode="stack",
bargap=0.18,
autosize=True,
height=None,
width=None,
margin=dict(l=60, r=40, t=70, b=55),
uniformtext=dict(minsize=10, mode="hide"),
legend=dict(font=dict(size=11)),
font=dict(size=12, color=c_text),
plot_bgcolor=colors.get("background", layout_base.get("plot_bgcolor")),
paper_bgcolor=colors.get("background", layout_base.get("paper_bgcolor")),
)
# Subplot-Titel etwas größer/fetter (robust für Plotly Annotation-Objekte)
target_titles = {
"Energiebilanz: psychisch vs. sozial",
"Positiv vs. negativ (Effektmasse)",
"Entropie / Ordnung",
"Modularität Q (psychisch/sozial)"
}
if getattr(fig.layout, "annotations", None):
for i, ann in enumerate(fig.layout.annotations):
# Plotly liefert Annotation-Objekte; wir lesen .text statt dict.get
txt = getattr(ann, "text", None)
if txt in target_titles:
# direkt am Objekt setzen
fig.layout.annotations[i].font = dict(size=13, color=c_text)
fig.show()
export_figure(fig, "vl-thermo-dashboard")
# ---------------------------------------------------
# Einzelkacheln: Thermo-Dashboard (CI-styled)
# ---------------------------------------------------
def plot_thermo_energy_by_system(thermo: dict, title: str = "Energie: psychisch vs. sozial (Σ|d|)") -> None:
colors = _colors if isinstance(_colors, dict) else {}
c_psych = colors.get("positiveHighlight", "#2ca02c")
c_soz = colors.get("secondaryLine", "#ff7f0e")
budget = thermo.get("budget", {})
E_total = float(budget.get("E_total", 0.0))
E_psych = float(budget.get("E_psych", 0.0))
E_soz = float(budget.get("E_sozial", 0.0))
share_psych = (E_psych / E_total) * 100 if E_total > 0 else 0.0
share_soz = (E_soz / E_total) * 100 if E_total > 0 else 0.0
fig = go.Figure()
fig.add_trace(go.Bar(
x=["Psychisch"], y=[E_psych], name=f"Psychisch ({share_psych:.1f}%)",
marker=dict(color=c_psych, line=dict(width=0)),
hovertemplate="Psychisch: Σ|d|=%{y:.2f} (%{customdata:.1f}%)<extra></extra>",
customdata=[share_psych]
))
fig.add_trace(go.Bar(
x=["Sozial"], y=[E_soz], name=f"Sozial ({share_soz:.1f}%)",
marker=dict(color=c_soz, line=dict(width=0)),
hovertemplate="Sozial: Σ|d|=%{y:.2f} (%{customdata:.1f}%)<extra></extra>",
customdata=[share_soz]
))
fig.update_layout(_ci_layout(title), barmode="group", autosize=True, height=None, width=None)
fig.update_yaxes(title_text="Σ|d| (Effektmasse)")
fig.update_xaxes(title_text="Systemebene")
fig.show()
export_figure(fig, "vl-thermo-energy-by-system")
def plot_thermo_pos_neg(thermo: dict, title: str = "Effektmasse: positiv vs. negativ") -> None:
colors = _colors if isinstance(_colors, dict) else {}
c_pos = colors.get("positiveHighlight", "#2ca02c")
c_neg = colors.get("negativeHighlight", "#d62728")
budget = thermo.get("budget", {})
E_pos = float(budget.get("E_pos", 0.0))
E_neg = float(budget.get("E_neg", 0.0))
tot = E_pos + E_neg
share_pos = (E_pos / tot) * 100 if tot > 0 else 0.0
share_neg = (E_neg / tot) * 100 if tot > 0 else 0.0
eta_net = float(budget.get("eta_net", 0.0))
fig = go.Figure()
fig.add_trace(go.Bar(
x=["Positiv"], y=[E_pos], name=f"Positiv ({share_pos:.1f}%)",
marker=dict(color=c_pos, line=dict(width=0)),
hovertemplate="Positiv: Σ|d|=%{y:.2f} (%{customdata:.1f}%)<extra></extra>",
customdata=[share_pos]
))
fig.add_trace(go.Bar(
x=["Negativ"], y=[E_neg], name=f"Negativ ({share_neg:.1f}%)",
marker=dict(color=c_neg, line=dict(width=0)),
hovertemplate="Negativ: Σ|d|=%{y:.2f} (%{customdata:.1f}%)<extra></extra>",
customdata=[share_neg]
))
fig.update_layout(_ci_layout(title), barmode="group", autosize=True, height=None, width=None)
fig.update_yaxes(title_text="Σ|d| (Effektmasse)")
fig.update_xaxes(title_text="Vorzeichen")
# Badge für η_net
fig.add_annotation(
xref="paper", yref="paper", x=0.98, y=0.02, showarrow=False,
text=f"η_net = {eta_net:.2f}"
)
fig.show()
export_figure(fig, "vl-thermo-pos-neg")
def plot_thermo_entropy_order(thermo: dict, title: str = "Entropie / Ordnung nach (Sub)Kapitel") -> None:
colors = _colors if isinstance(_colors, dict) else {}
c_S = colors.get("brightArea", "#66CCCC")
c_O = colors.get("depthArea", "#006666")
ent = thermo.get("entropy", {})
S_kap = float(ent.get("S_kapitel", 0.0))
O_kap = float(ent.get("O_kapitel", 0.0))
S_sub = float(ent.get("S_subkapitel", 0.0))
O_sub = float(ent.get("O_subkapitel", 0.0))
fig = go.Figure()
fig.add_trace(go.Bar(x=["Kapitel S","Subkapitel S"], y=[S_kap, S_sub],
marker=dict(color=c_S, line=dict(width=0)),
name="Entropie S",
hovertemplate="%{x}: %{y:.2f}<extra></extra>"))
fig.add_trace(go.Bar(x=["Kapitel O","Subkapitel O"], y=[O_kap, O_sub],
marker=dict(color=c_O, line=dict(width=0)),
name="Ordnung O",
hovertemplate="%{x}: %{y:.2f}<extra></extra>"))
fig.update_layout(_ci_layout(title), barmode="group", autosize=True, height=None, width=None)
fig.update_yaxes(title_text="Wert [0..1]", range=[0,1], dtick=0.2)
fig.update_xaxes(title_text="Maß")
# Referenzlinien
fig.add_hline(y=0.33, line_width=1, line_dash="dot")
fig.add_hline(y=0.66, line_width=1, line_dash="dot")
fig.show()
export_figure(fig, "vl-thermo-entropy-order")
def plot_thermo_modularity_gauge(thermo: dict, title: str = "Modularität Q (psychisch/sozial)") -> None:
colors = _colors if isinstance(_colors, dict) else {}
c_psych = colors.get("positiveHighlight", "#2ca02c")
c_pos = colors.get("positiveHighlight", "#2ca02c")
c_neg = colors.get("negativeHighlight", "#d62728")
c_S = colors.get("brightArea", "#66CCCC")
c_O = colors.get("depthArea", "#006666")
c_text = colors.get("text", "#333333")
q = float(thermo.get("modularity", {}).get("Q_psych_sozial", 0.0))
# Begrenze robust auf [0,1]
q = max(0.0, min(1.0, q))
# Einzel-Gauge ohne doppelte Überschrift:
# -> Layout-Titel via _ci_layout(title)
# -> Indicator-Title leer lassen
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=q,
gauge=dict(
shape="angular",
axis=dict(
range=[0, 1],
tick0=0,
dtick=0.1,
ticks="outside",
tickfont=dict(size=14),
tickwidth=1,
ticklen=6
),
bar=dict(color=c_psych, thickness=0.33),
steps=[
dict(range=[0.0, 0.3], color=hex_to_rgba(c_O, 0.30)),
dict(range=[0.3, 0.5], color=hex_to_rgba(c_S, 0.35)),
dict(range=[0.5, 1.0], color=hex_to_rgba(c_pos, 0.28)),
],
threshold=dict(
line=dict(color=c_text, width=2),
thickness=0.75,
value=q
)
),
number=dict(suffix=" Q", font=dict(size=36, color=c_text)),
title={"text": ""} # keine zweite Überschrift im Plotkörper
))
# Band-Beschriftungen als klare Annotations (vermeidet Überlappungen)
fig.add_annotation(x=0.18, y=0.72, xref="paper", yref="paper",
text="niedrig &lt; 0.3", showarrow=False,
font=dict(size=12, color=c_text))
fig.add_annotation(x=0.46, y=0.72, xref="paper", yref="paper",
text="mittel 0.30.5", showarrow=False,
font=dict(size=12, color=c_text))
fig.add_annotation(x=0.80, y=0.72, xref="paper", yref="paper",
text="hoch &gt; 0.5", showarrow=False,
font=dict(size=12, color=c_text))
fig.update_layout(
_ci_layout(title),
autosize=True,
height=None,
width=None,
margin=dict(l=60, r=60, t=70, b=40),
paper_bgcolor=colors.get("background", None),
plot_bgcolor=colors.get("background", None),
showlegend=False
)
fig.show()
export_figure(fig, "vl-thermo-modularity-gauge")
# ---------------------------------------------------
# Einzelvisualisierung: Effektstärken-Verteilung (Histogramm + Violin)
# ---------------------------------------------------
def plot_thermo_effect_sizes(df: pd.DataFrame, title: str = "Effektstärken-Verteilung (Histogramm + Violin)") -> None:
"""
Einzelvisualisierung der Effektmasse als Verteilung.
Links: Histogramm der Effektstärken (d), gestapelt nach Vorzeichen, normiert auf Prozent.
Rechts: Violin-Plot (geteilte Seiten) für d≥0 vs. d<0 mit eingebetteter Box,
optional getrennt nach Systemebene im Hover.
Referenzlinien bei 0.0, ±0.2 (klein), ±0.5 (mittel), ±0.8 (groß).
"""
if df is None or df.empty:
print("Hinweis: Keine Daten für Effektmassen-Plot.")
return
colors = _colors if isinstance(_colors, dict) else {}
c_pos = colors.get("positiveHighlight", "#2ca02c")
c_neg = colors.get("negativeHighlight", "#d62728")
c_text = colors.get("text", "#333333")
data = df.copy()
data["d"] = pd.to_numeric(data["Effektstärke"], errors="coerce")
data = data.dropna(subset=["d"])
# Split nach Vorzeichen
pos = data.loc[data["d"] >= 0, "d"]
neg = data.loc[data["d"] < 0, "d"]
# Achsengrenzen robust
d_min = float(np.nanmin(data["d"])) if len(data) else -1.0
d_max = float(np.nanmax(data["d"])) if len(data) else 1.0
pad = 0.05 * (d_max - d_min) if (d_max - d_min) > 0 else 0.1
x_range = [d_min - pad, d_max + pad]
fig = make_subplots(
rows=1, cols=2,
shared_yaxes=False,
specs=[[{"type": "xy"}, {"type": "xy"}]],
column_widths=[0.62, 0.38],
horizontal_spacing=0.10,
subplot_titles=("Histogramm der Effektstärken (in %)", "Violin: d ≥ 0 vs. d < 0")
)
# ---------- (A) Histogramm ----------
# Bins automatisch; normierung auf Prozent
if len(pos):
fig.add_trace(
go.Histogram(
x=pos, name="d ≥ 0",
marker=dict(color=c_pos, line=dict(width=0)),
opacity=0.9,
histnorm="percent",
hovertemplate="d (pos): %{x:.2f}<br>Anteil: %{y:.2f}%<extra></extra>"
),
row=1, col=1
)
if len(neg):
fig.add_trace(
go.Histogram(
x=neg, name="d < 0",
marker=dict(color=c_neg, line=dict(width=0)),
opacity=0.9,
histnorm="percent",
hovertemplate="d (neg): %{x:.2f}<br>Anteil: %{y:.2f}%<extra></extra>"
),
row=1, col=1
)
fig.update_xaxes(title_text="Cohen d", range=x_range, row=1, col=1)
fig.update_yaxes(title_text="Anteil [%]", row=1, col=1)
fig.update_layout(barmode="overlay") # überlagert, damit beide Verteilungen sichtbar sind
# Referenzlinien
for v in [0.0, -0.2, 0.2, -0.5, 0.5, -0.8, 0.8]:
fig.add_vline(x=v, line_width=1, line_dash="dot", row=1, col=1)
# ---------- (B) Violin ----------
# Violin mit geteilten Seiten; Punkte zeigen, Box einblenden
if len(pos):
fig.add_trace(
go.Violin(
y=pos,
name="d ≥ 0",
side="positive",
box=dict(visible=True),
meanline=dict(visible=True),
points="all",
pointpos=0.0,
marker=dict(color=c_pos, opacity=0.6),
hovertemplate="d (pos): %{y:.2f}<extra></extra>"
),
row=1, col=2
)
if len(neg):
fig.add_trace(
go.Violin(
y=neg,
name="d < 0",
side="negative",
box=dict(visible=True),
meanline=dict(visible=True),
points="all",
pointpos=0.0,
marker=dict(color=c_neg, opacity=0.6),
hovertemplate="d (neg): %{y:.2f}<extra></extra>"
),
row=1, col=2
)
fig.update_yaxes(title_text="Cohen d", row=1, col=2)
# Gemeinsames Layout
fig.update_layout(
_ci_layout(title),
autosize=True,
height=None,
width=None,
margin=dict(l=60, r=40, t=70, b=55),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1.0),
font=dict(size=12, color=c_text)
)
fig.show()
export_figure(fig, "vl-thermo-effect-sizes")
# -----------------------------------------
# Pipeline
# -----------------------------------------
def run_network_analysis(
csv_path: str,
min_abs_d: float = 0.00,
kapitel_filter: list[int] | None = None,
subkapitel_filter: list[str] | None = None,
seed: int = 42,
z_mode: str = "effekt"
):
df = load_data(csv_path)
# Datenqualität knapp loggen
print(f"Rows: {len(df)} | min d = {df['Effektstärke'].min():.2f} | max d = {df['Effektstärke'].max():.2f}")
print("Systemebenen:", df["Systemebene"].dropna().unique().tolist())
if kapitel_filter:
print("Kapitel-Filter:", kapitel_filter)
if subkapitel_filter:
print("Subkapitel-Filter:", subkapitel_filter)
if min_abs_d > 0:
print(f"Filter |d| ≥ {min_abs_d:.2f}")
G = build_bipartite_graph(df, min_abs_d=min_abs_d,
kapitel_filter=kapitel_filter,
subkapitel_filter=subkapitel_filter)
if G.number_of_nodes() == 0 or G.number_of_edges() == 0:
print("Hinweis: Nach Filtern keine Knoten/Kanten bitte Filter anpassen.")
return
plot_network(G, title="Netzwerk: Systemebenen × Thermometer (Kanten: Effektstärke)", seed=seed)
# 3D-Ansicht mit semantischer z-Achse
plot_network_3d(G, z_mode=z_mode, title="Netzwerk (3D): semantische z-Achse", seed=seed)
summary = summarize_network(G)
print("\nSystemgewicht-Summen:", summary["system_weight_sums"])
print("\nTop-Items (weighted degree):")
for r in summary["top_items_by_weighted_degree"]:
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={r['Effektstärke']:.2f} | wd={r['weighted_degree_abs']:.2f}")
# --- Thermo-Dashboard: Energie/Entropie/Kopplung ---
thermo = compute_thermo_dashboard(df, G)
# Einzel-Visualisierungen (Thermo)
plot_thermo_energy_by_system(thermo, title="Energie: psychisch vs. sozial (Σ|d|)")
plot_thermo_pos_neg(thermo, title="Effektmasse: positiv vs. negativ")
plot_thermo_entropy_order(thermo, title="Entropie / Ordnung nach (Sub)Kapitel")
plot_thermo_modularity_gauge(thermo, title="Modularität Q (psychisch/sozial)")
# Effektmasse als Verteilung (Histogramm + Violin)
plot_thermo_effect_sizes(df, title="Effektstärken-Verteilung (Histogramm + Violin)")
# Visual Thermo-Dashboard (kompakt)
plot_thermo_dashboard(thermo, title="Thermo-Dashboard: Energie • Entropie • Modularität")
export_json(thermo, "thermo_dashboard.json")
# Top-Listen exportieren
extremes = top_extremes(df, n=_TOP_N_EXTREMES)
export_json(extremes, "network_top_extremes.json")
# Item-Projektion + Communities (optional)
item_proj_summary = {}
if _SHOW_ITEM_PROJECTION:
Gi, node2com, coms = build_item_projection(G)
plot_item_projection(Gi, node2com, title="Item-Projektion (Communities)")
item_proj_summary = {
"n_nodes": Gi.number_of_nodes(),
"n_edges": Gi.number_of_edges(),
"n_communities": len(coms),
}
# --- Kopplungsindizes (Young/Roediger) berechnen & exportieren ---
coupling = compute_and_export_coupling_indices(df, G)
# --- Kopplungspotenzial je Bedürfnis berechnen & exportieren ---
coupling_potential_summary = compute_and_export_coupling_potential(df)
# Log und Visualisierung
if coupling_potential_summary:
plot_coupling_potential(coupling_potential_summary)
# --- NEU: 3D-Triangulation Effekt × Bedürfnis × Semantik ---
feats_needs = build_triangulation_features_needs(df)
# Plot & Export (CSV wird in der Funktion geschrieben)
plot_triangulation_needs_3d(feats_needs, title="Triangulation (3D): Effekt × Bedürfnis × Semantik")
# --- Triangulation: Kapitel × System × Effekt ---
feats, labels = triangulate_clusters(
df,
algo=_CLUSTER_ALGO,
n_clusters=_N_CLUSTERS,
seed=seed,
dbscan_eps=_DBSCAN_EPS,
dbscan_min_samples=_DBSCAN_MIN_SAMPLES,
spectral_k=_SPECTRAL_K
)
print(f"\nTriangulation (Algo={_CLUSTER_ALGO}): Clustergrößen")
for cid, cnt in pd.Series(labels).value_counts().sort_index().items():
print(f" Cluster {cid}: n={cnt}")
if _CLUSTER_ALGO == "dbscan":
print(f" DBSCAN: eps={_DBSCAN_EPS}, min_samples={_DBSCAN_MIN_SAMPLES}")
elif _CLUSTER_ALGO == "spectral":
print(f" Spectral: k={_SPECTRAL_K}")
else:
print(f" n_clusters={_N_CLUSTERS}")
if _CLUSTER_ALGO == "none":
# keine Clusterberichte reine Projektion
plot_triangulation_3d_simple(feats, title="Triangulation (3D): Kapitel × System × Effekt (ohne Cluster)")
tri_summary = {}
else:
tri_summary = summarize_triangulation(feats, top_n=5)
for cid in sorted(tri_summary.keys()):
info = tri_summary[cid]
print(f"\nCluster {cid} n={info['n']} | mean d={info['mean_d']:.2f}")
print(" Top +: ")
for r in info["top_positive"]:
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
print(" Top -: ")
for r in info["top_negative"]:
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
# Plot
plot_triangulation_3d(feats, title="Triangulation (3D): Kapitel × System × Effekt")
# Export JSON
payload = {
"extremes": extremes,
"item_projection": item_proj_summary,
"triangulation": {
"clusters": tri_summary,
},
"coupling": coupling,
"coupling_potential": coupling_potential_summary,
"meta": {
"theme": theme,
"min_abs_d": float(min_abs_d),
"kapitel_filter": kapitel_filter,
"subkapitel_filter": subkapitel_filter
},
"nodes": [
{
"id": n,
"label": G.nodes[n].get("label", ""),
"type": G.nodes[n].get("bipartite", ""),
"Thermometer_ID": G.nodes[n].get("id"),
"Kapitelname": G.nodes[n].get("kapitelname"),
"Subkapitel": G.nodes[n].get("subkapitel"),
"Effektstärke": G.nodes[n].get("d")
}
for n in G.nodes()
],
"edges": [
{
"source": u,
"target": v,
"weight": float(d.get("weight", 0.0)),
"sign": d.get("sign", "")
}
for u, v, d in G.edges(data=True)
],
"summary": summary
}
print("\nExports: coupling_per_item.csv, coupling_per_need.csv im export/ Ordner abgelegt.")
export_json(payload, "network_systemebenen.json")
# -----------------------------------------
# Main
# -----------------------------------------
if __name__ == "__main__":
# Beispiel: keine Filter, aber du kannst unten einfach drehen:
# - min_abs_d=0.10 (macht das Netz ruhiger)
# - kapitel_filter=[5,6,7] oder subkapitel_filter=["Fähigkeiten", ...]
run_network_analysis(
csv_path=os.path.join(os.path.dirname(__file__), csv_file),
min_abs_d=0.00,
kapitel_filter=None,
subkapitel_filter=None,
seed=42,
z_mode=_Z_MODE
)