from __future__ import annotations
def compute_and_export_coupling_potential(df: pd.DataFrame) -> dict:
"""
Berechnet das Kopplungspotenzial je Young/Roediger-Bedürfnis:
- n_items: Anzahl Items pro Bedürfnis
- E_sum: Σ|d| (alle Items)
- E_psych: Σ|d| für psychisch
- E_sozial: Σ|d| für sozial
- balance: 1 - |(E_psych/E_sum) - (E_sozial/E_sum)|, 0 = einseitig, 1 = balanciert
- coupling_potential = E_sum × balance
Exportiert als CSV und gibt dict zurück. Loggt eine Zusammenfassung.
"""
need_col = OPTIONAL_NEED_COL if OPTIONAL_NEED_COL in df.columns else None
if need_col is None:
df[OPTIONAL_NEED_COL] = np.nan
need_col = OPTIONAL_NEED_COL
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
data = data.dropna(subset=["Effektstärke"])
data[need_col] = data[need_col].astype(str).str.strip()
# Spalte für Betrag
data["abs_d"] = data["Effektstärke"].abs()
groups = []
for need, g in data.groupby(need_col, dropna=False):
n_items = len(g)
E_sum = g["abs_d"].sum()
E_psych = g.loc[g["Systemebene"].str.lower() == "psychisch", "abs_d"].sum()
E_sozial = g.loc[g["Systemebene"].str.lower() == "sozial", "abs_d"].sum()
# Robust gegen 0
if E_sum > 0:
share_psych = E_psych / E_sum
share_sozial = E_sozial / E_sum
balance = 1.0 - abs(share_psych - share_sozial)
else:
share_psych = 0.0
share_sozial = 0.0
balance = 0.0
coupling_potential = E_sum * balance
bridge_energy = min(E_psych, E_sozial)
groups.append({
"Beduerfnis": need if (isinstance(need, str) and need.strip()) else "—",
"n_items": n_items,
"E_sum": E_sum,
"E_psych": E_psych,
"E_sozial": E_sozial,
"balance": balance,
"coupling_potential": coupling_potential,
"bridge_energy": bridge_energy
})
# Export
df_out = pd.DataFrame(groups)
out_path = os.path.join(EXPORT_DIR, "coupling_potential_per_need.csv")
try:
# Wenn bereits vorhanden, vorsichtig mergen (alte Zusatzspalten erhalten)
if os.path.exists(out_path):
try:
old = pd.read_csv(out_path)
except Exception:
old = pd.DataFrame(columns=["Beduerfnis"])
# Outer-Join auf Beduerfnis
merged = old.merge(df_out, on="Beduerfnis", how="outer", suffixes=("_old", ""))
# Bevorzugt neue Werte für die Kernspalten; alte nur als Fallback
core_cols = ["n_items", "E_sum", "E_psych", "E_sozial", "balance", "coupling_potential", "bridge_energy"]
for c in core_cols:
if c in merged.columns and f"{c}_old" in merged.columns:
merged[c] = np.where(merged[c].notna(), merged[c], merged[f"{c}_old"])
merged.drop(columns=[f"{c}_old"], inplace=True, errors=True)
# Eventuelle doppelte Altspalten bereinigen
for col in list(merged.columns):
if col.endswith("_old"):
base = col[:-4]
if base not in merged.columns:
merged.rename(columns={col: base}, inplace=True)
else:
merged.drop(columns=[col], inplace=True, errors=True)
merged.to_csv(out_path, index=False, encoding="utf-8")
else:
# Neu schreiben
df_out.to_csv(out_path, index=False, encoding="utf-8")
except Exception:
# Fail-silent, um Pipeline nicht zu unterbrechen
pass
# Log
print("\nKOPPLUNGSPOTENZIAL – je Bedürfnis (Young/Roediger):")
for row in groups:
print(
f" {row['Beduerfnis']}: n={row['n_items']}, E_sum={row['E_sum']:.2f}, "
f"E_psych={row['E_psych']:.2f}, E_sozial={row['E_sozial']:.2f}, "
f"balance={row['balance']:.3f}, coupling_potential={row['coupling_potential']:.2f}, "
f"bridge_energy={row['bridge_energy']:.2f}"
)
return {r["Beduerfnis"]: r for r in groups}
# Optional: Visualisierung Kopplungspotenzial
def plot_coupling_potential(summary: dict, title: str = "Kopplungspotenzial je Bedürfnis"):
"""
Balkendiagramm: X = Bedürfnis, Y = coupling_potential, Farbe = Balance (rot→grün).
"""
if not summary:
print("Keine Kopplungspotenzialdaten für Plot.")
return
import plotly.graph_objs as go
# DataFrame für Plot
df = pd.DataFrame(list(summary.values()))
# Sortierung nach coupling_potential
df = df.sort_values("coupling_potential", ascending=False)
# Farben: Balance von rot (0) nach grün (1)
import matplotlib
import matplotlib.cm
cmap = matplotlib.cm.get_cmap('RdYlGn')
balances = df["balance"].clip(0,1).fillna(0.0).to_numpy()
colors = [matplotlib.colors.rgb2hex(cmap(b)[:3]) for b in balances]
fig = go.Figure(go.Bar(
x=df["Beduerfnis"],
y=df["coupling_potential"],
marker=dict(color=colors),
text=[f"Balance={bal:.2f}" for bal in df["balance"]],
hovertemplate=(
"Bedürfnis: %{x}
Kopplungspotenzial: %{y:.2f}
Balance: %{text}"
),
))
fig.update_layout(
_ci_layout(title),
xaxis_title="Bedürfnis (Young/Roediger)",
yaxis_title="Kopplungspotenzial (Σ|d| × Balance)",
autosize=True,
height=440,
margin=dict(l=60, r=40, t=70, b=100),
)
fig.show()
export_figure(fig, "vl-coupling-potential")
"""
Visible Learning – Netzwerkanalyse (Systemebenen × Thermometer)
---------------------------------------------------------------
CI: wie in den bestehenden Skripten (plotly_template)
Daten: Thermometer.csv (Pflichtspalten: Thermometer_ID, Stichwort, Effektstärke, Subkapitel, Kapitelname, Systemebene)
Modell:
- Bipartites Netzwerk: Systemebene (psychisch/sozial) ↔ Item (Thermometer)
- Kantengewicht = Effektstärke (Vorzeichen beibehalten), Breite ~ |d|
- Knoten-Infos im Hover: ID, Stichwort, Kapitel/Subkapitel, d
- Optional: Filter nach |d| (min_abs_d) und Kapiteln/Subkapiteln
Exports:
- PNG/HTML (gemäß config)
- JSON: nodes/edges + einfache Zentralitäten (weighted degree)
"""
# -----------------------------------------
# Imports
# -----------------------------------------
import os
import json
import math
import pandas as pd
import numpy as np
import plotly.graph_objs as go
import plotly.io as pio
from plotly.subplots import make_subplots
import networkx as nx
# Helper import for file existence checks
from pathlib import Path
# --- Optional: Clustering (K-Means); fallback auf Numpy-Implementierung ---
try:
from sklearn.cluster import KMeans # type: ignore
_HAS_SKLEARN = True
except Exception:
_HAS_SKLEARN = False
# Zusätzliche Cluster-Algorithmen (falls vorhanden)
try:
from sklearn.cluster import AgglomerativeClustering, SpectralClustering, DBSCAN
from sklearn.mixture import GaussianMixture
_HAS_SKLEARN_EXTRA = True
except Exception:
_HAS_SKLEARN_EXTRA = False
# -----------------------------------------
# Konfiguration laden
# -----------------------------------------
from config_visible_learning import (
csv_file,
export_fig_visual,
export_fig_png,
theme,
z_mode,
z_axis_labels,
show_item_projection,
show_community_labels,
top_n_extremes,
cluster_algo, n_clusters, dbscan_eps, dbscan_min_samples, spectral_k
)
# -----------------------------------------
# Template/CI
# -----------------------------------------
try:
from ci_template import plotly_template
plotly_template.set_theme(theme)
_ci_layout = lambda title: plotly_template.get_standard_layout(title=title, x_title="", y_title="")
_styles = plotly_template.get_plot_styles()
_colors = plotly_template.get_colors()
except Exception:
# Minimaler Fallback, falls Template nicht verfügbar ist
_ci_layout = lambda title: dict(title=title)
_styles = {}
_colors = {}
# -----------------------------------------
# Config-Fallbacks (falls Keys fehlen)
# -----------------------------------------
try:
_Z_MODE = z_mode
except Exception:
_Z_MODE = "effekt"
try:
_Z_AXIS_LABELS = z_axis_labels
except Exception:
_Z_AXIS_LABELS = {"effekt": "Effektstärke (Cohen d)", "kapitel": "Kapitel (numerischer Index)", "system": "Systemebene (0 = Psychisch, 1 = Sozial)"}
try:
_SHOW_ITEM_PROJECTION = show_item_projection
except Exception:
_SHOW_ITEM_PROJECTION = True
try:
_SHOW_COMMUNITY_LABELS = show_community_labels
except Exception:
_SHOW_COMMUNITY_LABELS = True
try:
_TOP_N_EXTREMES = int(top_n_extremes)
except Exception:
_TOP_N_EXTREMES = 15
try:
_CLUSTER_ALGO = str(cluster_algo)
except Exception:
_CLUSTER_ALGO = "kmeans" # Optionen: kmeans, gmm, agglomerative, spectral, dbscan
try:
_N_CLUSTERS = int(n_clusters)
except Exception:
_N_CLUSTERS = 5
try:
_DBSCAN_EPS = float(dbscan_eps)
except Exception:
_DBSCAN_EPS = 0.15
try:
_DBSCAN_MIN_SAMPLES = int(dbscan_min_samples)
except Exception:
_DBSCAN_MIN_SAMPLES = 5
try:
_SPECTRAL_K = int(spectral_k)
except Exception:
_SPECTRAL_K = _N_CLUSTERS
# -----------------------------------------
# Export-Helfer
# -----------------------------------------
EXPORT_DIR = os.path.join(os.path.dirname(__file__), "export")
os.makedirs(EXPORT_DIR, exist_ok=True)
def export_figure(fig, name: str):
base = os.path.join(EXPORT_DIR, name)
if export_fig_visual:
pio.write_html(fig, file=f"{base}.html", auto_open=False, include_plotlyjs="cdn")
if export_fig_png:
try:
pio.write_image(fig, f"{base}.png", scale=2)
except Exception:
pass
def export_json(obj: dict, name: str):
try:
with open(os.path.join(EXPORT_DIR, name), "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
except Exception:
pass
# -----------------------------------------
# Daten laden
# -----------------------------------------
REQUIRED_COLS = ["Thermometer_ID", "Stichwort", "Effektstärke", "Subkapitel", "Kapitelname", "Systemebene"]
# Optional needs column for Young/Roediger
OPTIONAL_NEED_COL = "Young_Beduerfnis"
def load_data(path: str) -> pd.DataFrame:
df = pd.read_csv(path)
missing = [c for c in REQUIRED_COLS if c not in df.columns]
if missing:
raise ValueError(f"Fehlende Spalten in CSV: {missing}")
# Effektstärke robust nach float
df["Effektstärke"] = (
df["Effektstärke"].astype(str).str.replace(",", ".", regex=False).str.strip()
)
df["Effektstärke"] = pd.to_numeric(df["Effektstärke"], errors="coerce")
df = df.dropna(subset=["Effektstärke"])
# Young/Roediger Bedürfnis (optional Spalte). Wenn nicht vorhanden, versuche Merge aus werte_mapping.csv.
if OPTIONAL_NEED_COL not in df.columns:
map_path = Path(os.path.dirname(__file__)) / "werte_mapping.csv"
if map_path.exists():
try:
m = pd.read_csv(map_path)
if "Thermometer_ID" in m.columns and OPTIONAL_NEED_COL in m.columns:
df = df.merge(m[["Thermometer_ID", OPTIONAL_NEED_COL]], on="Thermometer_ID", how="left")
except Exception:
# Falls Mapping nicht lesbar ist, lege leere Spalte an
df[OPTIONAL_NEED_COL] = np.nan
else:
df[OPTIONAL_NEED_COL] = np.nan
else:
# Spalte vereinheitlichen (Strings trimmen)
df[OPTIONAL_NEED_COL] = df[OPTIONAL_NEED_COL].astype(str).str.strip()
# Prüfung: unspezifische Systemebenen
invalid_systems = df[~df["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
if not invalid_systems.empty:
print("WARNUNG: Unspezifische Systemebenen gefunden:")
print(invalid_systems[["Thermometer_ID", "Stichwort", "Systemebene"]].to_string(index=False))
# Kapitelnummer aus ID (optional nützlich)
try:
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
df["Kapitel"] = None
return df
# -----------------------------------------
# Triangulation: Feature-Aufbau & K-Means
# -----------------------------------------
def _encode_system(val: str) -> float:
v = str(val).strip().lower()
if v.startswith("psych"):
return 0.0
if v.startswith("sozi"):
return 1.0
return np.nan
def _minmax(series: pd.Series) -> np.ndarray:
arr = series.to_numpy(dtype=float)
if np.all(~np.isfinite(arr)):
return np.zeros_like(arr)
mn = np.nanmin(arr)
mx = np.nanmax(arr)
if not np.isfinite(mn) or not np.isfinite(mx) or mx == mn:
return np.zeros_like(arr)
return (arr - mn) / (mx - mn)
def build_triangulation_features(df: pd.DataFrame) -> pd.DataFrame:
"""Erzeugt den 3D-Feature-Raum pro Item:
- x: Kapitel (numerisch, min-max skaliert)
- y: Systemebene (0=psychisch, 1=sozial)
- z: Effektstärke (Cohen d, min-max skaliert mit Vorzeichen beibehalten über separate Skalierung)
Rückgabe: DataFrame mit [key,item_id,stichwort,kapitel,system,d,x,y,z].
"""
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch","sozial"])]
# sichere Kapitel-Spalte
if "Kapitel" not in data.columns or data["Kapitel"].isna().all():
try:
data["Kapitel"] = data["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
data["Kapitel"] = np.nan
# numerische Achsen
data["sys_bin"] = data["Systemebene"].map(_encode_system)
# Effekt min-max separat für pos und neg, damit Vorzeichenstruktur erhalten bleibt
eff = data["Effektstärke"].astype(float)
pos = eff.clip(lower=0)
neg = (-eff.clip(upper=0))
pos_s = _minmax(pos)
neg_s = _minmax(neg)
eff_scaled = np.where(eff >= 0, pos_s, -neg_s)
data["x"] = _minmax(data["Kapitel"].astype(float))
data["y"] = data["sys_bin"].astype(float)
data["z"] = eff_scaled
# Schlüssel
data["key"] = "item::" + data["Thermometer_ID"].astype(str)
return data[["key","Thermometer_ID","Stichwort","Kapitel","Kapitelname","Subkapitel","Systemebene","Effektstärke","x","y","z"]]
# ---------------------------------------------------
# NEW: Triangulation (3D) – Effekt × Bedürfnis × Semantik
# ---------------------------------------------------
# Feste Ordnungsreihenfolge der Bedürfnisse (Young/Roediger)
NEED_ORDER = [
"Bindung",
"Orientierung",
"Stabilität",
"Emotion/SC",
"Struktur/Kompetenz",
]
def _encode_need_series(series: pd.Series) -> tuple[pd.Series, dict]:
"""Kodiert die Bedürfnis-Spalte deterministisch in numerische Codes.
Reihenfolge: NEED_ORDER, unbekannte Labels danach alphabetisch.
Rückgabe: (codes_series 0..k-1, mapping_dict {label:code}).
"""
lab = series.fillna("").astype(str).str.strip()
# bekannte in fester Reihenfolge, unbekannte alphabetisch anhängen
known = [n for n in NEED_ORDER if n in set(lab)]
unknown = sorted([n for n in sorted(set(lab)) if n and n not in NEED_ORDER])
order = known + unknown
mapping = {name: idx for idx, name in enumerate(order)}
codes = lab.map(lambda v: mapping.get(v, np.nan))
return codes, mapping
def build_triangulation_features_needs(df: pd.DataFrame) -> pd.DataFrame:
"""Erzeugt 3D-Features mit Achsen:
X = Effektstärke (min–max, Vorzeichen beibehalten),
Y = Bedürfnis-Code (Young/Roediger, deterministisch kodiert),
Z = Semantik (Kapitel/Subkapitel → numerische Indizes; zuerst Kapitel, dann Subkapitel fein)
Rückgabe: DataFrame mit Spalten [Thermometer_ID, Stichwort, Systemebene, Effektstärke, Young_Beduerfnis,
Kapitelname, Subkapitel, x, y, z].
"""
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch","sozial"])]
data = data.dropna(subset=["Effektstärke"]).copy()
# X: Effekt – min-max getrennt für pos/neg, Vorzeichen erhalten
d = pd.to_numeric(data["Effektstärke"], errors="coerce").astype(float)
pos = d.clip(lower=0)
neg = (-d.clip(upper=0))
def _mm(a: pd.Series) -> np.ndarray:
arr = a.to_numpy(dtype=float)
mn, mx = np.nanmin(arr), np.nanmax(arr)
if not np.isfinite(mn) or not np.isfinite(mx) or mx <= mn:
return np.zeros_like(arr)
return (arr - mn) / (mx - mn)
x = np.where(d >= 0, _mm(pos), -_mm(neg))
# Y: Bedürfnis-Codes (deterministisch)
if OPTIONAL_NEED_COL not in data.columns:
data[OPTIONAL_NEED_COL] = np.nan
y, need_map = _encode_need_series(data[OPTIONAL_NEED_COL])
# Z: Semantik – Kapitel und Subkapitel in eine robuste Zahlenachse überführen
# Kapitel-Index (stabiler, wenn vorhanden)
if "Kapitel" not in data.columns or data["Kapitel"].isna().all():
try:
data["Kapitel"] = data["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
data["Kapitel"] = np.nan
# Subkapitel feingranular als Rang innerhalb des Kapitels (deterministische Sortierung nach Label)
data["_sub_key"] = (
data[["Kapitel","Subkapitel"]]
.astype(str)
.fillna("")
.agg("::".join, axis=1)
)
# Rang [0..1] je Kapitel
def _rank_within_kap(group: pd.Series) -> pd.Series:
vals = group.sort_values().unique().tolist()
idx = {v: i for i, v in enumerate(vals)}
if len(vals) <= 1:
return pd.Series(np.zeros(len(group)), index=group.index)
return group.map(lambda v: idx.get(v, 0) / (len(vals)-1))
sub_rank = data.groupby("Kapitel")["_sub_key"].transform(_rank_within_kap)
# Z als Kapitel (min–max) + 0.01 * Sub-Rang (feiner Versatz)
kap_scaled = pd.Series(_minmax(data["Kapitel"].astype(float)), index=data.index)
z = kap_scaled + 0.01 * sub_rank.fillna(0.0)
out = pd.DataFrame({
"Thermometer_ID": data["Thermometer_ID"].astype(str),
"Stichwort": data["Stichwort"].astype(str),
"Kapitelname": data["Kapitelname"].astype(str),
"Subkapitel": data["Subkapitel"].astype(str),
"Systemebene": data["Systemebene"].astype(str),
"Effektstärke": d,
OPTIONAL_NEED_COL: data[OPTIONAL_NEED_COL].astype(str),
"x": x,
"y": y,
"z": z,
})
# Mapping exportieren (für Nachvollziehbarkeit)
try:
mp_df = pd.DataFrame({"Beduerfnis": list(need_map.keys()), "code": list(need_map.values())})
mp_df.to_csv(os.path.join(EXPORT_DIR, "needs_mapping_codes.csv"), index=False, encoding="utf-8")
except Exception:
pass
# Features exportieren
try:
out.to_csv(os.path.join(EXPORT_DIR, "triangulation_needs_3d.csv"), index=False, encoding="utf-8")
except Exception:
pass
return out
def plot_triangulation_needs_3d(feats: pd.DataFrame, title: str = "Triangulation (3D): Effekt × Bedürfnis × Semantik"):
"""3D-Streudiagramm:
X = Effekt (normiert, Vorzeichen erhalten),
Y = Bedürfnis-Code (beschriftet nach Mapping),
Z = Semantik (Kapitel + Subkapitel-Rang).
Farbkanal: Vorzeichen von d (CI-konform: pos/neg)."""
if feats is None or feats.empty:
print("Hinweis: Keine Daten für Triangulation (Needs).")
return
# Farben aus CI
pos_marker = _styles.get("marker_positiveHighlight", {"size": 6})
neg_marker = _styles.get("marker_negativeHighlight", {"size": 6})
feats = feats.copy()
feats["d"] = pd.to_numeric(feats["Effektstärke"], errors="coerce")
pos = feats[feats["d"] >= 0]
neg = feats[feats["d"] < 0]
# Y-Achsen-Ticks: Mapping aus Exportdatei, ansonsten Labels aus Daten ableiten
# Wir rekonstruieren die Ordnung konsistent zur Kodierung in build_triangulation_features_needs
labels_present = [lab for lab in NEED_ORDER if lab in feats[OPTIONAL_NEED_COL].unique().tolist()]
# ggf. unbekannte Labels hinzufügen (alphabetisch)
others = sorted([lab for lab in feats[OPTIONAL_NEED_COL].unique().tolist() if lab and lab not in NEED_ORDER])
y_labels = labels_present + others
y_codes = list(range(len(y_labels)))
def _hover(group: pd.DataFrame) -> pd.Series:
return (
"Thermometer: " + group["Thermometer_ID"].astype(str) +
"
Stichwort: " + group["Stichwort"].astype(str) +
"
Bedürfnis: " + group[OPTIONAL_NEED_COL].astype(str) +
"
Kapitel: " + group["Kapitelname"].astype(str) +
"
Subkapitel: " + group["Subkapitel"].astype(str) +
"
d: " + group["d"].map(lambda v: f"{float(v):.2f}")
)
traces = []
if len(pos):
traces.append(go.Scatter3d(
x=pos["x"], y=pos["y"], z=pos["z"],
mode="markers",
marker=pos_marker,
hovertext=_hover(pos),
hovertemplate="%{hovertext}",
name="d ≥ 0"
))
if len(neg):
traces.append(go.Scatter3d(
x=neg["x"], y=neg["y"], z=neg["z"],
mode="markers",
marker=neg_marker,
hovertext=_hover(neg),
hovertemplate="%{hovertext}",
name="d < 0"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_scenes(
xaxis=dict(title="Effekt (normiert, Vorzeichen erhalten)", showgrid=False, showticklabels=False),
yaxis=dict(title="Bedürfnis (Code)", showgrid=False, showticklabels=True, tickvals=y_codes, ticktext=y_labels),
zaxis=dict(title="Semantik (Kapitel + Subrang)", showgrid=False, showticklabels=False),
)
fig.show()
export_figure(fig, "vl-triangulation-needs-3d")
def _kmeans_numpy(X: np.ndarray, k: int = 5, iters: int = 100, seed: int = 42) -> tuple[np.ndarray, np.ndarray]:
rng = np.random.default_rng(seed)
# zufällige Startzentren aus den Daten
idx = rng.choice(len(X), size=k, replace=False)
C = X[idx].copy()
for _ in range(iters):
# Zuordnung
dists = ((X[:, None, :] - C[None, :, :]) ** 2).sum(axis=2)
labels = dists.argmin(axis=1)
# neue Zentren
C_new = np.vstack([X[labels==j].mean(axis=0) if np.any(labels==j) else C[j] for j in range(k)])
if np.allclose(C_new, C, atol=1e-6):
C = C_new
break
C = C_new
# finale Labels
dists = ((X[:, None, :] - C[None, :, :]) ** 2).sum(axis=2)
labels = dists.argmin(axis=1)
return labels, C
def triangulate_clusters(
df: pd.DataFrame,
algo: str = "kmeans",
n_clusters: int = 5,
seed: int = 42,
dbscan_eps: float = 0.15,
dbscan_min_samples: int = 5,
spectral_k: int | None = None
) -> tuple[pd.DataFrame, np.ndarray]:
"""
Clustert den 3D-Feature-Raum (Kapitel × System × Effekt).
Unterstützte Algorithmen:
- "kmeans" (sphärische Cluster, schnell, baseline)
- "gmm" (Gaussian Mixture; ellipsoide Cluster)
- "agglomerative" (hier: Ward-Linkage, euklidisch)
- "spectral" (graphbasierte Struktur, nicht-konvex)
- "dbscan" (dichtebasiert; entdeckt Bänder, Noise möglich)
Falls sklearn nicht verfügbar, wird auf eine reine Numpy-KMeans-Variante zurückgefallen.
"""
feats = build_triangulation_features(df)
X = feats[["x","y","z"]].to_numpy(dtype=float)
algo = str(algo).lower()
# NEW: raw mode without clustering
if algo == "none":
feats["cluster"] = 0
labels = np.zeros(len(feats), dtype=int)
return feats, labels
labels: np.ndarray
if not _HAS_SKLEARN and algo != "kmeans":
# harte Fallback-Strategie
labels, _ = _kmeans_numpy(X, k=n_clusters, iters=150, seed=seed)
else:
if algo == "kmeans" or not _HAS_SKLEARN:
if _HAS_SKLEARN:
km = KMeans(n_clusters=n_clusters, random_state=seed, n_init=10)
labels = km.fit_predict(X)
else:
labels, _ = _kmeans_numpy(X, k=n_clusters, iters=150, seed=seed)
elif algo == "gmm" and _HAS_SKLEARN_EXTRA:
gmm = GaussianMixture(n_components=n_clusters, random_state=seed)
labels = gmm.fit_predict(X)
elif algo == "agglomerative" and _HAS_SKLEARN_EXTRA:
agg = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward")
labels = agg.fit_predict(X)
elif algo == "spectral" and _HAS_SKLEARN_EXTRA:
k = spectral_k if spectral_k is not None else n_clusters
spec = SpectralClustering(n_clusters=k, affinity="rbf", random_state=seed, assign_labels="kmeans")
labels = spec.fit_predict(X)
elif algo == "dbscan" and _HAS_SKLEARN_EXTRA:
db = DBSCAN(eps=dbscan_eps, min_samples=dbscan_min_samples)
labels = db.fit_predict(X)
# DBSCAN kann -1 für Noise ergeben → auf fortlaufende IDs mappen
uniq = np.unique(labels)
mapping = {v: i for i, v in enumerate(uniq)}
labels = np.vectorize(mapping.get)(labels)
else:
# Sicherheitsnetz
if _HAS_SKLEARN:
km = KMeans(n_clusters=n_clusters, random_state=seed, n_init=10)
labels = km.fit_predict(X)
else:
labels, _ = _kmeans_numpy(X, k=n_clusters, iters=150, seed=seed)
feats["cluster"] = labels.astype(int)
return feats, labels
# -----------------------------------------
# Top-Listen (positiv/negativ)
# -----------------------------------------
def top_extremes(df: pd.DataFrame, n: int = 15) -> dict:
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
data = data.dropna(subset=["Effektstärke"]) # Sicherheit
pos = data.sort_values("Effektstärke", ascending=False).head(n)
neg = data.sort_values("Effektstärke", ascending=True).head(n)
# Konsole
print(f"\nTop +{n} (positiv):")
for _, r in pos.iterrows():
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
print(f"\nTop -{n} (negativ):")
for _, r in neg.iterrows():
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
return {
"top_positive": pos[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Effektstärke","Systemebene"]].to_dict(orient="records"),
"top_negative": neg[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Effektstärke","Systemebene"]].to_dict(orient="records"),
}
# -----------------------------------------
# Netzwerk bauen
# -----------------------------------------
def build_bipartite_graph(
df: pd.DataFrame,
min_abs_d: float = 0.00,
kapitel_filter: list[int] | None = None,
subkapitel_filter: list[str] | None = None,
) -> nx.Graph:
data = df.copy()
# Filter
if kapitel_filter:
data = data[data["Kapitel"].isin(kapitel_filter)]
if subkapitel_filter:
data = data[data["Subkapitel"].isin(subkapitel_filter)]
if min_abs_d > 0:
data = data[data["Effektstärke"].abs() >= float(min_abs_d)]
# Nur gültige Systemebenen
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
G = nx.Graph()
# Systemknoten (part A)
systems = sorted(data["Systemebene"].str.lower().unique().tolist())
for s in systems:
G.add_node(
f"system::{s}",
bipartite="system",
label=s.capitalize(),
typ="System",
)
# Itemknoten + Kanten (part B)
for _, r in data.iterrows():
sys_key = f"system::{str(r['Systemebene']).lower()}"
item_key = f"item::{r['Thermometer_ID']}"
# Item node
G.add_node(
item_key,
bipartite="item",
label=str(r["Stichwort"]),
id=str(r["Thermometer_ID"]),
d=float(r["Effektstärke"]),
kapitelname=str(r["Kapitelname"]),
subkapitel=str(r["Subkapitel"]),
)
# Edge: Gewicht = Effektstärke (Vorzeichen beibehalten)
G.add_edge(
sys_key, item_key,
weight=float(r["Effektstärke"]),
sign="pos" if r["Effektstärke"] >= 0 else "neg"
)
return G
# -----------------------------------------
# Item-Projektion (bipartit -> Item-Item) + Communities
# -----------------------------------------
from networkx.algorithms import community as nx_comm
def build_item_projection(G: nx.Graph) -> tuple[nx.Graph, dict[str,int], list[set]]:
"""Projiziert das bipartite Netz auf die Item-Seite. Zwei Items werden verbunden,
wenn sie dasselbe System teilen. Kanten-Gewicht = min(|w_i|, |w_j|).
Liefert das Item-Graph, ein Mapping node->community_id und die Community-Mengen.
"""
# Item- und System-Knoten bestimmen
items = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
systems = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
# Zuordnung: System -> Liste (item, |weight|)
sys_to_items: dict[str, list[tuple[str,float]]] = {}
for s in systems:
sys_to_items[s] = []
for u, v, d in G.edges(data=True):
if u in systems and v in items:
sys_to_items[u].append((v, abs(float(d.get("weight",0.0)))))
elif v in systems and u in items:
sys_to_items[v].append((u, abs(float(d.get("weight",0.0)))))
# Item-Graph aufbauen
Gi = nx.Graph()
for it in items:
nd = G.nodes[it]
Gi.add_node(it, **nd)
for s, lst in sys_to_items.items():
# Alle Paare innerhalb desselben Systems verbinden
for i in range(len(lst)):
for j in range(i+1, len(lst)):
a, wa = lst[i]
b, wb = lst[j]
w = min(wa, wb)
if Gi.has_edge(a,b):
Gi[a][b]["weight"] += w
else:
Gi.add_edge(a, b, weight=w)
if Gi.number_of_edges() == 0:
return Gi, {}, []
# Communities (gewichtete Modularity, Greedy)
coms = nx_comm.greedy_modularity_communities(Gi, weight="weight")
node2com: dict[str,int] = {}
for cid, members in enumerate(coms):
for n in members:
node2com[n] = cid
return Gi, node2com, [set(c) for c in coms]
# -----------------------------------------
# Kopplungsindizes (Needs × Brücken)
# -----------------------------------------
def _norm01(series: pd.Series) -> pd.Series:
arr = pd.to_numeric(series, errors="coerce").astype(float)
mn, mx = np.nanmin(arr), np.nanmax(arr)
if not np.isfinite(mn) or not np.isfinite(mx) or mx <= mn:
return pd.Series(np.zeros(len(arr)), index=series.index)
return (arr - mn) / (mx - mn)
def compute_and_export_coupling_indices(df: pd.DataFrame, G: nx.Graph) -> dict:
"""
Berechnet Kopplungsindizes je Item und aggregiert nach Young/Roediger‑Bedürfnissen.
Definition:
- |d|_norm: min–max normierte Effektmasse pro Item
- bc_norm: betweenness centrality im Item‑Projektnetz (Gewicht = 1/edge_weight)
- coupling_index = |d|_norm * bc_norm (betont hohe Wirkung + Brückenlage)
Export:
- CSV "coupling_per_item.csv"
- CSV "coupling_per_need.csv"
Rückgabe: dict mit Need‑Summaries (für JSON/Log).
"""
# Nur gültige Items
data = df.copy()
data = data[data["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])]
data = data.dropna(subset=["Effektstärke"]).copy()
# Item‑Projektion (für Brückenmetriken)
Gi, node2com, _ = build_item_projection(G)
# Betweenness zentralität (gewichtete, auf [0,1] normiert)
bc = {n: 0.0 for n in Gi.nodes()}
if Gi.number_of_edges() > 0:
H = Gi.copy()
# Netzwerkx interpretiert 'weight' als Distanz -> wir wollen hohe Gewichte ~ kurze Distanzen: length = 1/(w+eps)
eps = 1e-9
for u, v, d in H.edges(data=True):
w = float(d.get("weight", 0.0))
d["length"] = 1.0 / max(eps, w)
bc = nx.betweenness_centrality(H, weight="length", normalized=True)
# Auf DataFrame mappen
item_keys = [f"item::{tid}" for tid in data["Thermometer_ID"].astype(str)]
betw_vals = [bc.get(k, 0.0) for k in item_keys]
# |d| normiert
data["abs_d"] = data["Effektstärke"].abs()
data["abs_d_norm"] = _norm01(data["abs_d"]) # 0..1
data["bc_norm"] = pd.Series(betw_vals, index=data.index)
data["coupling_index"] = data["abs_d_norm"] * data["bc_norm"]
# Aggregation nach Need
need_col = OPTIONAL_NEED_COL if OPTIONAL_NEED_COL in data.columns else None
if need_col is None:
data[OPTIONAL_NEED_COL] = np.nan
need_col = OPTIONAL_NEED_COL
grp = data.groupby(need_col, dropna=False)
summary = grp.agg(
n_items=("Thermometer_ID", "count"),
energy_sum=("abs_d", "sum"),
energy_mean=("abs_d", "mean"),
coupling_sum=("coupling_index", "sum"),
coupling_mean=("coupling_index", "mean"),
bc_mean=("bc_norm", "mean")
).reset_index().rename(columns={need_col: "Beduerfnis"})
# Exporte
per_item_cols = [
"Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Systemebene",
"Effektstärke", OPTIONAL_NEED_COL, "abs_d","abs_d_norm","bc_norm","coupling_index"
]
per_item = data[per_item_cols].copy()
try:
per_item.to_csv(os.path.join(EXPORT_DIR, "coupling_per_item.csv"), index=False, encoding="utf-8")
summary.to_csv(os.path.join(EXPORT_DIR, "coupling_per_need.csv"), index=False, encoding="utf-8")
except Exception:
pass
# In dict für JSON/Console
need_dict = summary.to_dict(orient="records")
# Kurze Log-Ausgabe
print("\nKOPPLUNGSINDEX – Aggregat je Bedürfnis (Young/Roediger):")
for row in need_dict:
print(
f" {row.get('Beduerfnis', '—')}: n={row['n_items']}, E_sum={row['energy_sum']:.2f}, "
f"CI_mean={row['coupling_mean']:.3f}, BC_mean={row['bc_mean']:.3f}"
)
return {
"per_need": need_dict,
"per_item_csv": "coupling_per_item.csv",
"per_need_csv": "coupling_per_need.csv"
}
def plot_item_projection(Gi: nx.Graph, node2com: dict[str,int], title: str = "Item-Projektion (Communities)"):
if Gi.number_of_nodes() == 0:
print("Hinweis: Item-Projektion leer (zu wenig Überlappung).")
return
pos = nx.spring_layout(Gi, seed=42, weight="weight")
# Communities zu Traces gruppieren
com_to_nodes: dict[int, list[str]] = {}
for n in Gi.nodes():
cid = node2com.get(n, -1)
com_to_nodes.setdefault(cid, []).append(n)
traces = []
# Farb-/Markerstile aus CI (zyklisch)
style_keys = [
"marker_accent", "marker_brightArea", "marker_depthArea",
"marker_positiveHighlight", "marker_negativeHighlight",
"marker_primaryLine", "marker_secondaryLine"
]
keys_cycle = style_keys * 10
for idx, (cid, nodes) in enumerate(sorted(com_to_nodes.items(), key=lambda t: t[0])):
xs = [pos[n][0] for n in nodes]
ys = [pos[n][1] for n in nodes]
htxt = []
for n in nodes:
nd = Gi.nodes[n]
htxt.append(
"Thermometer: " + str(nd.get("id","")) +
"
Stichwort: " + str(nd.get("label","")) +
"
Kapitel: " + str(nd.get("kapitelname","")) +
"
Subkapitel: " + str(nd.get("subkapitel","")) +
"
d: " + f"{nd.get('d',np.nan):.2f}"
)
mk = _styles.get(keys_cycle[idx], dict(size=8))
traces.append(go.Scatter(
x=xs, y=ys, mode="markers+text" if _SHOW_COMMUNITY_LABELS else "markers",
marker={**mk, "size": 9},
text=[str(node2com.get(n, -1)) if _SHOW_COMMUNITY_LABELS else None for n in nodes],
textposition="top center",
hovertext=htxt,
hovertemplate="%{hovertext}",
name=f"Community {cid}"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_xaxes(title_text="Semantische Position X (Projektion)", showticklabels=False, showgrid=False, zeroline=False)
fig.update_yaxes(title_text="Semantische Position Y (Projektion)", showticklabels=False, showgrid=False, zeroline=False)
fig.show()
export_figure(fig, "vl-network-item-projection")
# -----------------------------------------
# Layout & Visualisierung (Plotly)
# -----------------------------------------
def _edge_segments(G: nx.Graph, pos: dict[str, tuple[float, float]], sign: str | None = None):
"""Erzeugt x,y-Koordinaten-Listen für Liniensegmente (mit None-Trennern). Optional nach Vorzeichen filtern."""
xs, ys = [], []
for u, v, d in G.edges(data=True):
if sign and d.get("sign") != sign:
continue
x0, y0 = pos[u]
x1, y1 = pos[v]
xs += [x0, x1, None]
ys += [y0, y1, None]
return xs, ys
def plot_network(G: nx.Graph, title: str = "Netzwerk: Systemebenen × Thermometer", seed: int = 42):
# Spring-Layout (reproduzierbar über seed)
pos = nx.spring_layout(G, seed=seed, k=None, weight="weight")
# Knoten nach Typ trennen
system_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
item_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
# Edges (pos/neg) als eigene Traces (Linienstile aus CI)
x_pos, y_pos = _edge_segments(G, pos, sign="pos")
x_neg, y_neg = _edge_segments(G, pos, sign="neg")
line_positive = _styles.get("linie_positiveHighlight", dict(width=1))
line_negative = _styles.get("linie_negativeHighlight", dict(width=1))
edge_pos = go.Scatter(
x=x_pos, y=y_pos,
mode="lines",
line=line_positive,
hoverinfo="skip",
showlegend=True,
name="Kanten (d ≥ 0)"
)
edge_neg = go.Scatter(
x=x_neg, y=y_neg,
mode="lines",
line=line_negative,
hoverinfo="skip",
showlegend=True,
name="Kanten (d < 0)"
)
# System-Knoten: Marker aus CI (z. B. accent)
sys_marker = _styles.get("marker_primaryLine", dict(size=18))
sys_x = [pos[n][0] for n in system_nodes]
sys_y = [pos[n][1] for n in system_nodes]
sys_text = [G.nodes[n].get("label", n) for n in system_nodes]
sys_hover = [f"Systemebene: {G.nodes[n].get('label','')}" for n in system_nodes]
systems_trace = go.Scatter(
x=sys_x, y=sys_y, mode="markers",
marker={**sys_marker, "size": 18},
text=sys_text,
hovertext=sys_hover,
hovertemplate="%{hovertext}",
name="System"
)
# Item-Knoten: Marker aus CI (z. B. brightArea); Größe ~ |degree_weight|
item_marker = _styles.get("marker_secondaryLine", dict(size=10))
it_x = [pos[n][0] for n in item_nodes]
it_y = [pos[n][1] for n in item_nodes]
# Gewichtete Degree als Größe
wdeg = []
htxt = []
for n in item_nodes:
dsum = 0.0
for nbr in G[n]:
dsum += abs(G[n][nbr].get("weight", 0.0))
wdeg.append(dsum)
nd = G.nodes[n]
htxt.append(
"Thermometer: "
+ str(nd.get("id",""))
+ "
Stichwort: "
+ str(nd.get("label",""))
+ "
Kapitel: "
+ str(nd.get("kapitelname",""))
+ "
Subkapitel: "
+ str(nd.get("subkapitel",""))
+ "
d: "
+ f"{nd.get('d',np.nan):.2f}"
)
# Größen skalieren
wdeg = np.asarray(wdeg, dtype=float)
if wdeg.size and np.nanmax(wdeg) > 0:
sizes = 8 + 12 * (wdeg / np.nanmax(wdeg))
else:
sizes = np.full_like(wdeg, 10)
items_trace = go.Scatter(
x=it_x, y=it_y, mode="markers",
marker={**item_marker, "size": sizes},
hovertext=htxt,
hovertemplate="%{hovertext}",
name="Thermometer"
)
fig = go.Figure(data=[edge_pos, edge_neg, systems_trace, items_trace])
# CI-Layout und inhaltliche Achsentitel (2D: Semantische Position aus Layout)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_xaxes(
title_text="Semantische Position X (Layout)",
showticklabels=False, showgrid=False, zeroline=False
)
fig.update_yaxes(
title_text="Semantische Position Y (Layout)",
showticklabels=False, showgrid=False, zeroline=False
)
fig.show()
export_figure(fig, "vl-network")
def _edge_segments_3d(G: nx.Graph, pos_xy: dict[str, tuple[float, float]], z_map: dict[str, float], sign: str | None = None):
"""Erzeugt x,y,z-Koordinaten-Listen für 3D-Liniensegmente (mit None-Trennern). Optional nach Vorzeichen filtern."""
xs, ys, zs = [], [], []
for u, v, d in G.edges(data=True):
if sign and d.get("sign") != sign:
continue
x0, y0 = pos_xy[u]
x1, y1 = pos_xy[v]
z0 = float(z_map.get(u, 0.0))
z1 = float(z_map.get(v, 0.0))
xs += [x0, x1, None]
ys += [y0, y1, None]
zs += [z0, z1, None]
return xs, ys, zs
def plot_network_3d(G: nx.Graph, z_mode: str = "effekt", title: str = "3D: Systemebenen × Thermometer", seed: int = 42):
"""
Semantische 3D-Ansicht:
- z_mode = "effekt": z = Effektstärke (Items), Systeme z=0
- z_mode = "kapitel": z = Kapitelnummer (Items), Systeme unterhalb der Items (min_z - 0.5)
- z_mode = "system": z = 0 (psychisch), 1 (sozial), Items = Mittelwert ihrer Systemnachbarn
x/y stammen aus einem 2D-Spring-Layout (stabile, gut lesbare Projektion), z ist semantisch belegt.
"""
styles = _styles
colors = _colors
# 2D-Layout für X/Y (stabile Projektion)
pos_xy = nx.spring_layout(G, seed=seed, k=None, weight="weight", dim=2)
# Z-Koordinaten je Knoten ermitteln
z_map: dict[str, float] = {}
if z_mode == "effekt":
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
z_map[n] = float(d.get("d", 0.0))
else:
z_map[n] = 0.0
elif z_mode == "kapitel":
item_z_vals = []
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
try:
# Kapitelnummer aus Kapitelname kann alphanumerisch sein; wir nutzen, wenn vorhanden, numerische "Kapitel"
# Falls keine numerische Kapitelspalte existiert, wird 0 gesetzt.
kap = d.get("kapitelname", "")
# Fallback: im Nodes-Attribut existiert keine numerische Kapitelnummer; daher 0
z_map[n] = float(d.get("kapitel", 0.0)) if "kapitel" in d else 0.0
except Exception:
z_map[n] = 0.0
item_z_vals.append(z_map[n])
min_z = min(item_z_vals) if item_z_vals else 0.0
for n, d in G.nodes(data=True):
if d.get("bipartite") == "system":
z_map[n] = float(min_z) - 0.5
elif z_mode == "system":
# Systeme klar trennen
for n, d in G.nodes(data=True):
if d.get("bipartite") == "system":
lbl = str(d.get("label", "")).strip().lower()
z_map[n] = 0.0 if "psych" in lbl else 1.0
# Items: Mittelwert der z-Werte ihrer System-Nachbarn (im bipartiten Graphen genau einer)
for n, d in G.nodes(data=True):
if d.get("bipartite") == "item":
zs = []
for nbr in G[n]:
zs.append(z_map.get(nbr, 0.0))
z_map[n] = float(np.mean(zs)) if zs else 0.0
else:
# Unbekannter Modus -> alle 0
z_map = {n: 0.0 for n in G.nodes()}
# Knotenlisten
system_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
item_nodes = [n for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
# Kanten (pos/neg) vorbereiten
x_pos, y_pos, z_pos = _edge_segments_3d(G, pos_xy, z_map, sign="pos")
x_neg, y_neg, z_neg = _edge_segments_3d(G, pos_xy, z_map, sign="neg")
line_positive = styles.get("linie_positiveHighlight", dict(width=1))
line_negative = styles.get("linie_negativeHighlight", dict(width=1))
edge_pos = go.Scatter3d(
x=x_pos, y=y_pos, z=z_pos,
mode="lines",
line=line_positive,
hoverinfo="skip",
showlegend=True,
name="Kanten (d ≥ 0)"
)
edge_neg = go.Scatter3d(
x=x_neg, y=y_neg, z=z_neg,
mode="lines",
line=line_negative,
hoverinfo="skip",
showlegend=True,
name="Kanten (d < 0)"
)
# System-Knoten
sys_marker = styles.get("marker_primaryLine", dict(size=18))
sys_x = [pos_xy[n][0] for n in system_nodes]
sys_y = [pos_xy[n][1] for n in system_nodes]
sys_z = [z_map[n] for n in system_nodes]
sys_text = [G.nodes[n].get("label", n) for n in system_nodes]
sys_hover = [f"Systemebene: {G.nodes[n].get('label','')}" for n in system_nodes]
systems_trace = go.Scatter3d(
x=sys_x, y=sys_y, z=sys_z, mode="markers",
marker={**sys_marker, "size": 10},
text=sys_text,
hovertext=sys_hover,
hovertemplate="%{hovertext}",
name="System"
)
# Item-Knoten: Thermometer im Sekundärstil (gleiches Marker-Design für +/-); Kanten behalten Vorzeichenfarben
pos_marker = styles.get("marker_secondaryLine", dict(size=6))
neg_marker = styles.get("marker_secondaryLine", dict(size=6))
pos_x, pos_y, pos_z, pos_hover = [], [], [], []
neg_x, neg_y, neg_z, neg_hover = [], [], [], []
for n in item_nodes:
x, y = pos_xy[n]
z = z_map[n]
nd = G.nodes[n]
hover = (
"Thermometer: " + str(nd.get("id","")) +
"
Stichwort: " + str(nd.get("label","")) +
"
Kapitel: " + str(nd.get("kapitelname","")) +
"
Subkapitel: " + str(nd.get("subkapitel","")) +
"
d: " + f"{nd.get('d',np.nan):.2f}"
)
if float(nd.get("d", 0.0)) >= 0:
pos_x.append(x); pos_y.append(y); pos_z.append(z); pos_hover.append(hover)
else:
neg_x.append(x); neg_y.append(y); neg_z.append(z); neg_hover.append(hover)
items_pos_trace = go.Scatter3d(
x=pos_x, y=pos_y, z=pos_z, mode="markers",
marker=pos_marker,
hovertext=pos_hover,
hovertemplate="%{hovertext}",
name="Thermometer (d ≥ 0)"
)
items_neg_trace = go.Scatter3d(
x=neg_x, y=neg_y, z=neg_z, mode="markers",
marker=neg_marker,
hovertext=neg_hover,
hovertemplate="%{hovertext}",
name="Thermometer (d < 0)"
)
fig = go.Figure(data=[edge_pos, edge_neg, systems_trace, items_pos_trace, items_neg_trace])
fig.update_layout(_ci_layout(f"{title} – z: {z_mode}"), autosize=True, height=None, width=None)
# Achsentitel mit inhaltlicher Bedeutung setzen
z_title = _Z_AXIS_LABELS.get(z_mode, "Z")
fig.update_scenes(
xaxis=dict(
title="Semantische Position X (Layout)",
showticklabels=False, showgrid=False, zeroline=False
),
yaxis=dict(
title="Semantische Position Y (Layout)",
showticklabels=False, showgrid=False, zeroline=False
),
zaxis=dict(
title=z_title,
showticklabels=False, showgrid=False, zeroline=False
),
)
fig.show()
export_figure(fig, f"vl-network-3d-{z_mode}")
# -----------------------------------------
# Triangulation: 3D-Plot (Kapitel × System × Effekt)
# -----------------------------------------
def plot_triangulation_3d(feats: pd.DataFrame, title: str = "Triangulation (3D): Kapitel × System × Effekt"):
if feats.empty:
print("Hinweis: Keine Daten für Triangulation.")
return
# Farben/Marker pro Cluster (aus CI stilistisch zyklisch)
style_cycle = [
_styles.get("marker_accent", {"size": 6}),
_styles.get("marker_brightArea", {"size": 6}),
_styles.get("marker_depthArea", {"size": 6}),
_styles.get("marker_primaryLine", {"size": 6}),
_styles.get("marker_secondaryLine", {"size": 6}),
_styles.get("marker_positiveHighlight", {"size": 6}),
_styles.get("marker_negativeHighlight", {"size": 6}),
]
traces = []
for cid, group in feats.groupby("cluster"):
mk = style_cycle[cid % len(style_cycle)].copy()
mk.setdefault("size", 6)
hover = (
"Thermometer: " + group["Thermometer_ID"].astype(str) +
"
Stichwort: " + group["Stichwort"].astype(str) +
"
Kapitel: " + group["Kapitelname"].astype(str) +
"
Subkapitel: " + group["Subkapitel"].astype(str) +
"
System: " + group["Systemebene"].astype(str) +
"
d: " + group["Effektstärke"].map(lambda v: f"{float(v):.2f}")
)
traces.append(go.Scatter3d(
x=group["x"], y=group["y"], z=group["z"],
mode="markers",
marker=mk,
hovertext=hover,
hovertemplate="%{hovertext}",
name=f"Cluster {cid} (n={len(group)})"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_scenes(
xaxis=dict(title="Kapitel (min–max normiert)", showgrid=False, showticklabels=False),
yaxis=dict(title="Systemebene (0=Psychisch, 1=Sozial)", showgrid=False, showticklabels=True, tickvals=[0,1], ticktext=["Psych","Sozial"]),
zaxis=dict(title="Effektstärke (normiert, Vorzeichen beibehalten)", showgrid=False, showticklabels=False),
)
fig.show()
export_figure(fig, "vl-triangulation-3d")
# -----------------------------------------
# Einfache 3-Achsen-Projektion ohne Clustering
# -----------------------------------------
def plot_triangulation_3d_simple(feats: pd.DataFrame, title: str = "Triangulation (3D): Kapitel × System × Effekt (ohne Cluster)"):
"""
Einfache 3-Achsen-Projektion ohne Clustering.
Achsen:
x = Kapitel (min–max normiert)
y = Systemebene (0=Psychisch, 1=Sozial)
z = Effektstärke (normiert, Vorzeichen beibehalten)
Farbgebung: grün = d ≥ 0, rot = d < 0 (CI-konform).
"""
if feats.empty:
print("Hinweis: Keine Daten für Triangulation (simple).")
return
pos = feats[feats["Effektstärke"] >= 0]
neg = feats[feats["Effektstärke"] < 0]
pos_marker = _styles.get("marker_positiveHighlight", {"size": 6})
neg_marker = _styles.get("marker_negativeHighlight", {"size": 6})
def _hover(group: pd.DataFrame) -> pd.Series:
return (
"Thermometer: " + group["Thermometer_ID"].astype(str) +
"
Stichwort: " + group["Stichwort"].astype(str) +
"
Kapitel: " + group["Kapitelname"].astype(str) +
"
Subkapitel: " + group["Subkapitel"].astype(str) +
"
System: " + group["Systemebene"].astype(str) +
"
d: " + group["Effektstärke"].map(lambda v: f"{float(v):.2f}")
)
traces = []
if len(pos):
traces.append(go.Scatter3d(
x=pos["x"], y=pos["y"], z=pos["z"],
mode="markers",
marker=pos_marker,
hovertext=_hover(pos),
hovertemplate="%{hovertext}",
name="d ≥ 0"
))
if len(neg):
traces.append(go.Scatter3d(
x=neg["x"], y=neg["y"], z=neg["z"],
mode="markers",
marker=neg_marker,
hovertext=_hover(neg),
hovertemplate="%{hovertext}",
name="d < 0"
))
fig = go.Figure(data=traces)
fig.update_layout(_ci_layout(title), autosize=True, height=None, width=None)
fig.update_scenes(
xaxis=dict(title="Kapitel (min–max normiert)", showgrid=False, showticklabels=False),
yaxis=dict(title="Systemebene (0=Psychisch, 1=Sozial)", showgrid=False, showticklabels=True, tickvals=[0,1], ticktext=["Psych","Sozial"]),
zaxis=dict(title="Effektstärke (normiert, Vorzeichen beibehalten)", showgrid=False, showticklabels=False),
)
fig.show()
export_figure(fig, "vl-triangulation-3d-simple")
def summarize_triangulation(feats: pd.DataFrame, top_n: int = 5) -> dict:
out = {}
for cid, g in feats.groupby("cluster"):
g = g.copy()
g["abs_d"] = g["Effektstärke"].abs()
top_pos = g.sort_values("Effektstärke", ascending=False).head(top_n)
top_neg = g.sort_values("Effektstärke", ascending=True).head(top_n)
out[int(cid)] = {
"n": int(len(g)),
"mean_d": float(g["Effektstärke"].mean()),
"top_positive": top_pos[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Systemebene","Effektstärke"]].to_dict(orient="records"),
"top_negative": top_neg[["Thermometer_ID","Stichwort","Kapitelname","Subkapitel","Systemebene","Effektstärke"]].to_dict(orient="records"),
}
return out
# -----------------------------------------
# Einfache Metriken & Export
# -----------------------------------------
def summarize_network(G: nx.Graph) -> dict:
# weighted degree je Knoten
wdeg = {}
for n in G.nodes():
s = 0.0
for nbr in G[n]:
s += abs(G[n][nbr].get("weight", 0.0))
wdeg[n] = float(s)
# Top-Items nach gewichteter Degree
items = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "item"]
items_sorted = sorted(items, key=lambda t: t[1], reverse=True)[:15]
top_items = []
for n, val in items_sorted:
nd = G.nodes[n]
top_items.append({
"Thermometer_ID": nd.get("id"),
"Stichwort": nd.get("label"),
"Kapitelname": nd.get("kapitelname"),
"Subkapitel": nd.get("subkapitel"),
"Effektstärke": nd.get("d"),
"weighted_degree_abs": val
})
# Systemseiten-Summe
systems = [(n, wdeg[n]) for n, d in G.nodes(data=True) if d.get("bipartite") == "system"]
system_summary = {G.nodes[n].get("label", n): float(val) for n, val in systems}
return {"top_items_by_weighted_degree": top_items, "system_weight_sums": system_summary}
# -----------------------------------------
# Thermo-Dashboard: Energie/Entropie/Ordnung/Kopplung
# -----------------------------------------
def _normalized_entropy(weights: list[float]) -> float:
"""Normalisierte Shannon-Entropie S in [0,1] über eine Gewichtsliste."""
arr = np.asarray(weights, dtype=float)
arr = arr[np.isfinite(arr) & (arr >= 0)]
if arr.size == 0:
return 0.0
total = arr.sum()
if total <= 0:
return 0.0
p = arr / total
# numerisch stabil
p = p[p > 0]
S = -np.sum(p * np.log(p))
Smax = np.log(len(p)) if len(p) > 0 else 1.0
return float(S / Smax) if Smax > 0 else 0.0
def compute_thermo_dashboard(df: pd.DataFrame, G: nx.Graph) -> dict:
"""
Operationalisiert thermodynamische Analogien auf Informations-/Wirksamkeitsmaßen:
- Budget: E_total, E_psych, E_soz
- Nutzanteile: E_pos, E_neg, η_pos, η_net
- Entropie/Ordnung: S/O über Verteilung der |d| auf Subkapitel und Kapitel
- Modularity (Kopplung/Abgrenzung) der Partition {psychisch, sozial} im bipartiten Netz
Liefert ein dict und druckt eine kompakte Textzusammenfassung.
"""
# --- Budget ---
d = df["Effektstärke"].astype(float)
E_total = float(np.abs(d).sum())
E_pos = float(np.clip(d, 0, None).sum())
E_neg = float(np.clip(-d, 0, None).sum()) # Beträge negativer d
eta_pos = float(E_pos / (E_pos + E_neg)) if (E_pos + E_neg) > 0 else 0.0
eta_net = float((E_pos - E_neg) / (E_pos + E_neg)) if (E_pos + E_neg) > 0 else 0.0
# Budget nach System
df_sys = df[df["Systemebene"].astype(str).str.lower().isin(["psychisch", "sozial"])].copy()
E_psych = float(np.abs(df_sys.loc[df_sys["Systemebene"].str.lower()=="psychisch","Effektstärke"]).sum())
E_soz = float(np.abs(df_sys.loc[df_sys["Systemebene"].str.lower()=="sozial","Effektstärke"]).sum())
# --- Entropie/Ordnung (Subkapitel, Kapitelname) ---
def _weights_by(col: str) -> list[float]:
grp = df_sys.groupby(col)["Effektstärke"].apply(lambda s: float(np.abs(s).sum()))
return [v for v in grp.values if np.isfinite(v) and v >= 0]
S_sub = _normalized_entropy(_weights_by("Subkapitel"))
S_kap = _normalized_entropy(_weights_by("Kapitelname"))
O_sub = float(1.0 - S_sub)
O_kap = float(1.0 - S_kap)
# --- Modularity (Community-Trennung psychisch/sozial) ---
# Partition: jedes Item in Community seiner Systemebene; Systemknoten ebenfalls.
parts: dict[int, set[str]] = {0: set(), 1: set()}
for n, data in G.nodes(data=True):
if data.get("bipartite") == "system":
lbl = str(data.get("label","")).strip().lower()
parts[0 if "psych" in lbl else 1].add(n)
elif data.get("bipartite") == "item":
# finde System-Nachbarn (bipartit: genau einer)
sys_lbls = [G.nodes[nbr].get("label","").strip().lower() for nbr in G[n]]
if any("psych" in s for s in sys_lbls):
parts[0].add(n)
else:
parts[1].add(n)
partition = [parts[0], parts[1]]
# Modularity mit Kantengewicht = |d|
H = G.copy()
for u, v, dd in H.edges(data=True):
dd["weight"] = abs(float(dd.get("weight", 0.0)))
try:
Q_mod = float(nx_comm.modularity(H, partition, weight="weight"))
except Exception:
Q_mod = float("nan")
# --- Ausgabe ---
print("\nTHERMO-DASHBOARD")
print(f" Budget: E_total={E_total:.2f} | E_psych={E_psych:.2f} | E_sozial={E_soz:.2f}")
print(f" Nutzanteile: E_pos={E_pos:.2f} | E_neg={E_neg:.2f} | η_pos={eta_pos:.2f} | η_net={eta_net:.2f}")
print(f" Entropie/Ordnung (Subkapitel): S={S_sub:.2f} | O={O_sub:.2f}")
print(f" Entropie/Ordnung (Kapitel): S={S_kap:.2f} | O={O_kap:.2f}")
print(f" Modularity (psychisch/sozial): Q={Q_mod:.3f}")
return {
"budget": {
"E_total": E_total, "E_psych": E_psych, "E_sozial": E_soz,
"E_pos": E_pos, "E_neg": E_neg, "eta_pos": eta_pos, "eta_net": eta_net
},
"entropy": {
"S_subkapitel": S_sub, "O_subkapitel": O_sub,
"S_kapitel": S_kap, "O_kapitel": O_kap
},
"modularity": {"Q_psych_sozial": Q_mod}
}
# -----------------------------------------
# Hilfsfunktion: HEX zu RGBA (Plotly)
# -----------------------------------------
def hex_to_rgba(hex_color: str, alpha: float = 1.0) -> str:
"""
Wandelt eine HEX-Farbe (#RRGGBB oder #RGB) in einen Plotly-kompatiblen
rgba()-String um. alpha ∈ [0,1].
"""
if not isinstance(hex_color, str):
raise ValueError("hex_to_rgba: hex_color must be a string like '#RRGGBB'")
h = hex_color.lstrip("#")
if len(h) == 6:
r, g, b = int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16)
elif len(h) == 3:
r, g, b = int(h[0]*2, 16), int(h[1]*2, 16), int(h[2]*2, 16)
else:
# Fallback: versuche, bereits gültige CSS-Farbe durchzureichen
return hex_color
# Begrenze alpha defensiv
a = max(0.0, min(1.0, float(alpha)))
return f"rgba({r},{g},{b},{a})"
# -----------------------------------------
# Thermo-Dashboard Visualization (CI-styled)
# -----------------------------------------
def plot_thermo_dashboard(thermo: dict, title: str = "Thermo-Dashboard: Energie • Entropie • Modularität") -> None:
"""
Verfeinerte, CI-konforme Visualisierung in 2×2 Kacheln:
(A) Energie nach System (100%-Stack + absolute Werte)
(B) Positiv vs. Negativ (100%-Stack)
(C) Entropie/Ordnung (gruppiert, [0..1] mit Referenzlinien)
(D) Modularität Q als Gauge (0..1) mit Schwellenbändern)
Fokus: sehr gute Lesbarkeit (größere Fonts, keine Überlagerungen),
ausgewogene Panel-Größen, und eine klar abgegrenzte Gauge-Kachel.
"""
if not isinstance(thermo, dict) or not thermo:
print("Hinweis: Kein Thermo-Objekt für Dashboard übergeben.")
return
# Farben/Styles
colors = _colors if isinstance(_colors, dict) else {}
styles = _styles if isinstance(_styles, dict) else {}
# Fallback-Farben
c_psych = colors.get("positiveHighlight", "#2ca02c")
c_soz = colors.get("secondaryLine", "#ff7f0e")
c_pos = colors.get("positiveHighlight", "#2ca02c")
c_neg = colors.get("negativeHighlight", "#d62728")
c_S = colors.get("brightArea", "#66CCCC")
c_O = colors.get("depthArea", "#006666")
c_text = colors.get("text", "#333333")
c_bg = colors.get("background", "#ffffff")
# Kennzahlen
budget = thermo.get("budget", {})
entropy = thermo.get("entropy", {})
modular = thermo.get("modularity", {})
E_total = float(budget.get("E_total", 0.0))
E_psych = float(budget.get("E_psych", 0.0))
E_soz = float(budget.get("E_sozial", 0.0))
E_pos = float(budget.get("E_pos", 0.0))
E_neg = float(budget.get("E_neg", 0.0))
eta_pos = float(budget.get("eta_pos", 0.0))
eta_net = float(budget.get("eta_net", 0.0))
S_kap = float(entropy.get("S_kapitel", 0.0))
O_kap = float(entropy.get("O_kapitel", 0.0))
S_sub = float(entropy.get("S_subkapitel", 0.0))
O_sub = float(entropy.get("O_subkapitel", 0.0))
Q_mod = float(modular.get("Q_psych_sozial", float("nan")))
q_val = 0.0 if not (isinstance(Q_mod, float) and math.isfinite(Q_mod)) else Q_mod
# Subplot-Layout (Indicator benötigt Domain-Typ)
fig = make_subplots(
rows=2, cols=2,
specs=[[{"type": "xy"}, {"type": "xy"}],
[{"type": "xy"}, {"type": "domain"}]],
column_widths=[0.58, 0.42],
row_heights=[0.55, 0.45],
vertical_spacing=0.12,
horizontal_spacing=0.08,
subplot_titles=(
"Energiebilanz: psychisch vs. sozial",
"Positiv vs. negativ (Effektmasse)",
"Entropie / Ordnung",
"Modularität Q (psychisch/sozial)"
)
)
# ---------- (A) Energie nach System: 100%-Stack + Textlabels ----------
share_psych = (E_psych / E_total) if E_total > 0 else 0
share_soz = (E_soz / E_total) if E_total > 0 else 0
fig.add_trace(
go.Bar(
x=[share_psych*100], y=["Anteil (%)"], orientation="h",
marker=dict(color=c_psych, line=dict(width=0)),
text=[f"Psychisch {share_psych*100:.1f}%
(Σ={E_psych:.2f})"],
textposition="inside",
insidetextanchor="middle",
insidetextfont=dict(color="#ffffff", size=12),
texttemplate="%{text}",
name="Psychisch",
hovertemplate="Psychisch: %{x:.1f}% (Σ=%{customdata:.2f})",
customdata=[E_psych],
showlegend=False,
cliponaxis=False
), row=1, col=1
)
fig.add_trace(
go.Bar(
x=[share_soz*100], y=["Anteil (%)"], orientation="h",
marker=dict(color=c_soz, line=dict(width=0)),
text=[f"Sozial {share_soz*100:.1f}%
(Σ={E_soz:.2f})"],
textposition="inside",
insidetextanchor="middle",
insidetextfont=dict(color="#ffffff", size=12),
texttemplate="%{text}",
name="Sozial",
hovertemplate="Sozial: %{x:.1f}% (Σ=%{customdata:.2f})",
customdata=[E_soz],
showlegend=False,
cliponaxis=False
), row=1, col=1
)
fig.update_xaxes(range=[0,100], title_text="Energieanteil [% von Σ|d|]", row=1, col=1, showgrid=True, gridwidth=1)
fig.update_yaxes(title_text="System", row=1, col=1)
# KPI-Badge: Σ|d|
fig.add_annotation(
row=1, col=1,
xref="x1", yref="y1",
x=100, y=1,
text=f"Σ|d| = {E_total:.2f}",
showarrow=False, font=dict(color=c_text, size=12),
xanchor="right", yanchor="bottom",
bgcolor=hex_to_rgba(c_bg, 0.25)
)
# ---------- (B) Positiv vs. Negativ: 100%-Stack ----------
share_pos = (E_pos / (E_pos + E_neg)) if (E_pos + E_neg) > 0 else 0
share_neg = 1 - share_pos
fig.add_trace(
go.Bar(
x=[share_pos*100], y=["Anteil (%)"], orientation="h",
marker=dict(color=c_pos, line=dict(width=0)),
text=[f"Positiv {share_pos*100:.1f}%
(Σ={E_pos:.2f})"],
textposition="inside",
insidetextanchor="middle",
insidetextfont=dict(color="#ffffff", size=12),
texttemplate="%{text}",
name="Positiv",
hovertemplate="Positiv: %{x:.1f}% (Σ=%{customdata:.2f})",
customdata=[E_pos],
showlegend=False,
cliponaxis=False
), row=1, col=2
)
fig.add_trace(
go.Bar(
x=[share_neg*100], y=["Anteil (%)"], orientation="h",
marker=dict(color=c_neg, line=dict(width=0)),
text=[f"Negativ {share_neg*100:.1f}%
(Σ={E_neg:.2f})"],
textposition="inside",
insidetextanchor="middle",
insidetextfont=dict(color="#ffffff", size=12),
texttemplate="%{text}",
name="Negativ",
hovertemplate="Negativ: %{x:.1f}% (Σ=%{customdata:.2f})",
customdata=[E_neg],
showlegend=False,
cliponaxis=False
), row=1, col=2
)
fig.update_xaxes(range=[0,100], title_text="Effektmasse [%]", row=1, col=2, showgrid=True, gridwidth=1)
fig.update_yaxes(title_text="Vorzeichen", row=1, col=2)
# η_net als Textbadge (außerhalb der Bars, damit nichts verdeckt)
fig.add_annotation(
row=1, col=2,
xref="x2 domain", yref="y2 domain",
x=0.98, y=0.02,
text=f"η_net = {eta_net:.2f}",
showarrow=False, font=dict(color=c_text, size=12),
xanchor="right", yanchor="bottom",
bgcolor=hex_to_rgba(c_bg, 0.25)
)
# ---------- (C) Entropie/Ordnung ----------
fig.add_trace(
go.Bar(x=["Kapitel S","Subkapitel S"], y=[S_kap, S_sub], marker=dict(color=c_S, line=dict(width=0)), name="Entropie S", showlegend=False),
row=2, col=1
)
fig.add_trace(
go.Bar(x=["Kapitel O","Subkapitel O"], y=[O_kap, O_sub], marker=dict(color=c_O, line=dict(width=0)), name="Ordnung O", showlegend=False),
row=2, col=1
)
fig.update_yaxes(range=[0,1], tick0=0, dtick=0.2, title_text="Wert [0..1]", row=2, col=1)
fig.update_xaxes(title_text="Maß", row=2, col=1)
# Referenzlinien (Faustwerte für niedrig/mittel/hoch)
fig.add_hline(y=0.33, line_width=1, line_dash="dot", row=2, col=1)
fig.add_hline(y=0.66, line_width=1, line_dash="dot", row=2, col=1)
# ---------- (D) Modularität als Gauge ----------
fig.add_trace(
go.Indicator(
mode="gauge+number+delta",
value=q_val,
delta={'reference': 0.5, 'position': "top", 'increasing': {'color': c_pos}, 'decreasing': {'color': c_neg}},
gauge=dict(
shape="angular",
axis=dict(range=[0,1], tick0=0, dtick=0.2, ticks="outside",
tickfont=dict(size=11), tickwidth=1, ticklen=4),
bar=dict(color=c_psych, thickness=0.25),
steps=[
dict(range=[0,0.3], color=hex_to_rgba(c_O, 0.25)),
dict(range=[0.3,0.5], color=hex_to_rgba(c_S, 0.35)),
dict(range=[0.5,1.0], color=hex_to_rgba(c_pos, 0.25)),
],
threshold=dict(line=dict(color=c_text, width=2), thickness=0.75, value=q_val)
),
number=dict(suffix=" Q", font=dict(size=28, color=c_text)),
title={"text": "Q (psychisch/sozial)", "font": {"size": 12, "color": c_text}}
), row=2, col=2
)
# Layout (CI) – größere Titel, einheitliche Fonts, keine Überlagerungen
layout_base = _ci_layout(title)
fig.update_layout(
layout_base,
barmode="stack",
bargap=0.18,
autosize=True,
height=None,
width=None,
margin=dict(l=60, r=40, t=70, b=55),
uniformtext=dict(minsize=10, mode="hide"),
legend=dict(font=dict(size=11)),
font=dict(size=12, color=c_text),
plot_bgcolor=colors.get("background", layout_base.get("plot_bgcolor")),
paper_bgcolor=colors.get("background", layout_base.get("paper_bgcolor")),
)
# Subplot-Titel etwas größer/fetter (robust für Plotly Annotation-Objekte)
target_titles = {
"Energiebilanz: psychisch vs. sozial",
"Positiv vs. negativ (Effektmasse)",
"Entropie / Ordnung",
"Modularität Q (psychisch/sozial)"
}
if getattr(fig.layout, "annotations", None):
for i, ann in enumerate(fig.layout.annotations):
# Plotly liefert Annotation-Objekte; wir lesen .text statt dict.get
txt = getattr(ann, "text", None)
if txt in target_titles:
# direkt am Objekt setzen
fig.layout.annotations[i].font = dict(size=13, color=c_text)
fig.show()
export_figure(fig, "vl-thermo-dashboard")
# ---------------------------------------------------
# Einzelkacheln: Thermo-Dashboard (CI-styled)
# ---------------------------------------------------
def plot_thermo_energy_by_system(thermo: dict, title: str = "Energie: psychisch vs. sozial (Σ|d|)") -> None:
colors = _colors if isinstance(_colors, dict) else {}
c_psych = colors.get("positiveHighlight", "#2ca02c")
c_soz = colors.get("secondaryLine", "#ff7f0e")
budget = thermo.get("budget", {})
E_total = float(budget.get("E_total", 0.0))
E_psych = float(budget.get("E_psych", 0.0))
E_soz = float(budget.get("E_sozial", 0.0))
share_psych = (E_psych / E_total) * 100 if E_total > 0 else 0.0
share_soz = (E_soz / E_total) * 100 if E_total > 0 else 0.0
fig = go.Figure()
fig.add_trace(go.Bar(
x=["Psychisch"], y=[E_psych], name=f"Psychisch ({share_psych:.1f}%)",
marker=dict(color=c_psych, line=dict(width=0)),
hovertemplate="Psychisch: Σ|d|=%{y:.2f} (%{customdata:.1f}%)",
customdata=[share_psych]
))
fig.add_trace(go.Bar(
x=["Sozial"], y=[E_soz], name=f"Sozial ({share_soz:.1f}%)",
marker=dict(color=c_soz, line=dict(width=0)),
hovertemplate="Sozial: Σ|d|=%{y:.2f} (%{customdata:.1f}%)",
customdata=[share_soz]
))
fig.update_layout(_ci_layout(title), barmode="group", autosize=True, height=None, width=None)
fig.update_yaxes(title_text="Σ|d| (Effektmasse)")
fig.update_xaxes(title_text="Systemebene")
fig.show()
export_figure(fig, "vl-thermo-energy-by-system")
def plot_thermo_pos_neg(thermo: dict, title: str = "Effektmasse: positiv vs. negativ") -> None:
colors = _colors if isinstance(_colors, dict) else {}
c_pos = colors.get("positiveHighlight", "#2ca02c")
c_neg = colors.get("negativeHighlight", "#d62728")
budget = thermo.get("budget", {})
E_pos = float(budget.get("E_pos", 0.0))
E_neg = float(budget.get("E_neg", 0.0))
tot = E_pos + E_neg
share_pos = (E_pos / tot) * 100 if tot > 0 else 0.0
share_neg = (E_neg / tot) * 100 if tot > 0 else 0.0
eta_net = float(budget.get("eta_net", 0.0))
fig = go.Figure()
fig.add_trace(go.Bar(
x=["Positiv"], y=[E_pos], name=f"Positiv ({share_pos:.1f}%)",
marker=dict(color=c_pos, line=dict(width=0)),
hovertemplate="Positiv: Σ|d|=%{y:.2f} (%{customdata:.1f}%)",
customdata=[share_pos]
))
fig.add_trace(go.Bar(
x=["Negativ"], y=[E_neg], name=f"Negativ ({share_neg:.1f}%)",
marker=dict(color=c_neg, line=dict(width=0)),
hovertemplate="Negativ: Σ|d|=%{y:.2f} (%{customdata:.1f}%)",
customdata=[share_neg]
))
fig.update_layout(_ci_layout(title), barmode="group", autosize=True, height=None, width=None)
fig.update_yaxes(title_text="Σ|d| (Effektmasse)")
fig.update_xaxes(title_text="Vorzeichen")
# Badge für η_net
fig.add_annotation(
xref="paper", yref="paper", x=0.98, y=0.02, showarrow=False,
text=f"η_net = {eta_net:.2f}"
)
fig.show()
export_figure(fig, "vl-thermo-pos-neg")
def plot_thermo_entropy_order(thermo: dict, title: str = "Entropie / Ordnung nach (Sub‑)Kapitel") -> None:
colors = _colors if isinstance(_colors, dict) else {}
c_S = colors.get("brightArea", "#66CCCC")
c_O = colors.get("depthArea", "#006666")
ent = thermo.get("entropy", {})
S_kap = float(ent.get("S_kapitel", 0.0))
O_kap = float(ent.get("O_kapitel", 0.0))
S_sub = float(ent.get("S_subkapitel", 0.0))
O_sub = float(ent.get("O_subkapitel", 0.0))
fig = go.Figure()
fig.add_trace(go.Bar(x=["Kapitel S","Subkapitel S"], y=[S_kap, S_sub],
marker=dict(color=c_S, line=dict(width=0)),
name="Entropie S",
hovertemplate="%{x}: %{y:.2f}"))
fig.add_trace(go.Bar(x=["Kapitel O","Subkapitel O"], y=[O_kap, O_sub],
marker=dict(color=c_O, line=dict(width=0)),
name="Ordnung O",
hovertemplate="%{x}: %{y:.2f}"))
fig.update_layout(_ci_layout(title), barmode="group", autosize=True, height=None, width=None)
fig.update_yaxes(title_text="Wert [0..1]", range=[0,1], dtick=0.2)
fig.update_xaxes(title_text="Maß")
# Referenzlinien
fig.add_hline(y=0.33, line_width=1, line_dash="dot")
fig.add_hline(y=0.66, line_width=1, line_dash="dot")
fig.show()
export_figure(fig, "vl-thermo-entropy-order")
def plot_thermo_modularity_gauge(thermo: dict, title: str = "Modularität Q (psychisch/sozial)") -> None:
colors = _colors if isinstance(_colors, dict) else {}
c_psych = colors.get("positiveHighlight", "#2ca02c")
c_pos = colors.get("positiveHighlight", "#2ca02c")
c_neg = colors.get("negativeHighlight", "#d62728")
c_S = colors.get("brightArea", "#66CCCC")
c_O = colors.get("depthArea", "#006666")
c_text = colors.get("text", "#333333")
q = float(thermo.get("modularity", {}).get("Q_psych_sozial", 0.0))
# Begrenze robust auf [0,1]
q = max(0.0, min(1.0, q))
# Einzel-Gauge ohne doppelte Überschrift:
# -> Layout-Titel via _ci_layout(title)
# -> Indicator-Title leer lassen
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=q,
gauge=dict(
shape="angular",
axis=dict(
range=[0, 1],
tick0=0,
dtick=0.1,
ticks="outside",
tickfont=dict(size=14),
tickwidth=1,
ticklen=6
),
bar=dict(color=c_psych, thickness=0.33),
steps=[
dict(range=[0.0, 0.3], color=hex_to_rgba(c_O, 0.30)),
dict(range=[0.3, 0.5], color=hex_to_rgba(c_S, 0.35)),
dict(range=[0.5, 1.0], color=hex_to_rgba(c_pos, 0.28)),
],
threshold=dict(
line=dict(color=c_text, width=2),
thickness=0.75,
value=q
)
),
number=dict(suffix=" Q", font=dict(size=36, color=c_text)),
title={"text": ""} # keine zweite Überschrift im Plotkörper
))
# Band-Beschriftungen als klare Annotations (vermeidet Überlappungen)
fig.add_annotation(x=0.18, y=0.72, xref="paper", yref="paper",
text="niedrig < 0.3", showarrow=False,
font=dict(size=12, color=c_text))
fig.add_annotation(x=0.46, y=0.72, xref="paper", yref="paper",
text="mittel 0.3–0.5", showarrow=False,
font=dict(size=12, color=c_text))
fig.add_annotation(x=0.80, y=0.72, xref="paper", yref="paper",
text="hoch > 0.5", showarrow=False,
font=dict(size=12, color=c_text))
fig.update_layout(
_ci_layout(title),
autosize=True,
height=None,
width=None,
margin=dict(l=60, r=60, t=70, b=40),
paper_bgcolor=colors.get("background", None),
plot_bgcolor=colors.get("background", None),
showlegend=False
)
fig.show()
export_figure(fig, "vl-thermo-modularity-gauge")
# ---------------------------------------------------
# Einzelvisualisierung: Effektstärken-Verteilung (Histogramm + Violin)
# ---------------------------------------------------
def plot_thermo_effect_sizes(df: pd.DataFrame, title: str = "Effektstärken-Verteilung (Histogramm + Violin)") -> None:
"""
Einzelvisualisierung der Effektmasse als Verteilung.
Links: Histogramm der Effektstärken (d), gestapelt nach Vorzeichen, normiert auf Prozent.
Rechts: Violin-Plot (geteilte Seiten) für d≥0 vs. d<0 mit eingebetteter Box,
optional getrennt nach Systemebene im Hover.
Referenzlinien bei 0.0, ±0.2 (klein), ±0.5 (mittel), ±0.8 (groß).
"""
if df is None or df.empty:
print("Hinweis: Keine Daten für Effektmassen-Plot.")
return
colors = _colors if isinstance(_colors, dict) else {}
c_pos = colors.get("positiveHighlight", "#2ca02c")
c_neg = colors.get("negativeHighlight", "#d62728")
c_text = colors.get("text", "#333333")
data = df.copy()
data["d"] = pd.to_numeric(data["Effektstärke"], errors="coerce")
data = data.dropna(subset=["d"])
# Split nach Vorzeichen
pos = data.loc[data["d"] >= 0, "d"]
neg = data.loc[data["d"] < 0, "d"]
# Achsengrenzen robust
d_min = float(np.nanmin(data["d"])) if len(data) else -1.0
d_max = float(np.nanmax(data["d"])) if len(data) else 1.0
pad = 0.05 * (d_max - d_min) if (d_max - d_min) > 0 else 0.1
x_range = [d_min - pad, d_max + pad]
fig = make_subplots(
rows=1, cols=2,
shared_yaxes=False,
specs=[[{"type": "xy"}, {"type": "xy"}]],
column_widths=[0.62, 0.38],
horizontal_spacing=0.10,
subplot_titles=("Histogramm der Effektstärken (in %)", "Violin: d ≥ 0 vs. d < 0")
)
# ---------- (A) Histogramm ----------
# Bins automatisch; normierung auf Prozent
if len(pos):
fig.add_trace(
go.Histogram(
x=pos, name="d ≥ 0",
marker=dict(color=c_pos, line=dict(width=0)),
opacity=0.9,
histnorm="percent",
hovertemplate="d (pos): %{x:.2f}
Anteil: %{y:.2f}%"
),
row=1, col=1
)
if len(neg):
fig.add_trace(
go.Histogram(
x=neg, name="d < 0",
marker=dict(color=c_neg, line=dict(width=0)),
opacity=0.9,
histnorm="percent",
hovertemplate="d (neg): %{x:.2f}
Anteil: %{y:.2f}%"
),
row=1, col=1
)
fig.update_xaxes(title_text="Cohen d", range=x_range, row=1, col=1)
fig.update_yaxes(title_text="Anteil [%]", row=1, col=1)
fig.update_layout(barmode="overlay") # überlagert, damit beide Verteilungen sichtbar sind
# Referenzlinien
for v in [0.0, -0.2, 0.2, -0.5, 0.5, -0.8, 0.8]:
fig.add_vline(x=v, line_width=1, line_dash="dot", row=1, col=1)
# ---------- (B) Violin ----------
# Violin mit geteilten Seiten; Punkte zeigen, Box einblenden
if len(pos):
fig.add_trace(
go.Violin(
y=pos,
name="d ≥ 0",
side="positive",
box=dict(visible=True),
meanline=dict(visible=True),
points="all",
pointpos=0.0,
marker=dict(color=c_pos, opacity=0.6),
hovertemplate="d (pos): %{y:.2f}"
),
row=1, col=2
)
if len(neg):
fig.add_trace(
go.Violin(
y=neg,
name="d < 0",
side="negative",
box=dict(visible=True),
meanline=dict(visible=True),
points="all",
pointpos=0.0,
marker=dict(color=c_neg, opacity=0.6),
hovertemplate="d (neg): %{y:.2f}"
),
row=1, col=2
)
fig.update_yaxes(title_text="Cohen d", row=1, col=2)
# Gemeinsames Layout
fig.update_layout(
_ci_layout(title),
autosize=True,
height=None,
width=None,
margin=dict(l=60, r=40, t=70, b=55),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1.0),
font=dict(size=12, color=c_text)
)
fig.show()
export_figure(fig, "vl-thermo-effect-sizes")
# -----------------------------------------
# Pipeline
# -----------------------------------------
def run_network_analysis(
csv_path: str,
min_abs_d: float = 0.00,
kapitel_filter: list[int] | None = None,
subkapitel_filter: list[str] | None = None,
seed: int = 42,
z_mode: str = "effekt"
):
df = load_data(csv_path)
# Datenqualität knapp loggen
print(f"Rows: {len(df)} | min d = {df['Effektstärke'].min():.2f} | max d = {df['Effektstärke'].max():.2f}")
print("Systemebenen:", df["Systemebene"].dropna().unique().tolist())
if kapitel_filter:
print("Kapitel-Filter:", kapitel_filter)
if subkapitel_filter:
print("Subkapitel-Filter:", subkapitel_filter)
if min_abs_d > 0:
print(f"Filter |d| ≥ {min_abs_d:.2f}")
G = build_bipartite_graph(df, min_abs_d=min_abs_d,
kapitel_filter=kapitel_filter,
subkapitel_filter=subkapitel_filter)
if G.number_of_nodes() == 0 or G.number_of_edges() == 0:
print("Hinweis: Nach Filtern keine Knoten/Kanten – bitte Filter anpassen.")
return
plot_network(G, title="Netzwerk: Systemebenen × Thermometer (Kanten: Effektstärke)", seed=seed)
# 3D-Ansicht mit semantischer z-Achse
plot_network_3d(G, z_mode=z_mode, title="Netzwerk (3D): semantische z-Achse", seed=seed)
summary = summarize_network(G)
print("\nSystemgewicht-Summen:", summary["system_weight_sums"])
print("\nTop-Items (weighted degree):")
for r in summary["top_items_by_weighted_degree"]:
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={r['Effektstärke']:.2f} | wd={r['weighted_degree_abs']:.2f}")
# --- Thermo-Dashboard: Energie/Entropie/Kopplung ---
thermo = compute_thermo_dashboard(df, G)
# Einzel-Visualisierungen (Thermo)
plot_thermo_energy_by_system(thermo, title="Energie: psychisch vs. sozial (Σ|d|)")
plot_thermo_pos_neg(thermo, title="Effektmasse: positiv vs. negativ")
plot_thermo_entropy_order(thermo, title="Entropie / Ordnung nach (Sub‑)Kapitel")
plot_thermo_modularity_gauge(thermo, title="Modularität Q (psychisch/sozial)")
# Effektmasse als Verteilung (Histogramm + Violin)
plot_thermo_effect_sizes(df, title="Effektstärken-Verteilung (Histogramm + Violin)")
# Visual Thermo-Dashboard (kompakt)
plot_thermo_dashboard(thermo, title="Thermo-Dashboard: Energie • Entropie • Modularität")
export_json(thermo, "thermo_dashboard.json")
# Top-Listen exportieren
extremes = top_extremes(df, n=_TOP_N_EXTREMES)
export_json(extremes, "network_top_extremes.json")
# Item-Projektion + Communities (optional)
item_proj_summary = {}
if _SHOW_ITEM_PROJECTION:
Gi, node2com, coms = build_item_projection(G)
plot_item_projection(Gi, node2com, title="Item-Projektion (Communities)")
item_proj_summary = {
"n_nodes": Gi.number_of_nodes(),
"n_edges": Gi.number_of_edges(),
"n_communities": len(coms),
}
# --- Kopplungsindizes (Young/Roediger) berechnen & exportieren ---
coupling = compute_and_export_coupling_indices(df, G)
# --- Kopplungspotenzial je Bedürfnis berechnen & exportieren ---
coupling_potential_summary = compute_and_export_coupling_potential(df)
# Log und Visualisierung
if coupling_potential_summary:
plot_coupling_potential(coupling_potential_summary)
# --- NEU: 3D-Triangulation Effekt × Bedürfnis × Semantik ---
feats_needs = build_triangulation_features_needs(df)
# Plot & Export (CSV wird in der Funktion geschrieben)
plot_triangulation_needs_3d(feats_needs, title="Triangulation (3D): Effekt × Bedürfnis × Semantik")
# --- Triangulation: Kapitel × System × Effekt ---
feats, labels = triangulate_clusters(
df,
algo=_CLUSTER_ALGO,
n_clusters=_N_CLUSTERS,
seed=seed,
dbscan_eps=_DBSCAN_EPS,
dbscan_min_samples=_DBSCAN_MIN_SAMPLES,
spectral_k=_SPECTRAL_K
)
print(f"\nTriangulation (Algo={_CLUSTER_ALGO}): Clustergrößen")
for cid, cnt in pd.Series(labels).value_counts().sort_index().items():
print(f" Cluster {cid}: n={cnt}")
if _CLUSTER_ALGO == "dbscan":
print(f" DBSCAN: eps={_DBSCAN_EPS}, min_samples={_DBSCAN_MIN_SAMPLES}")
elif _CLUSTER_ALGO == "spectral":
print(f" Spectral: k={_SPECTRAL_K}")
else:
print(f" n_clusters={_N_CLUSTERS}")
if _CLUSTER_ALGO == "none":
# keine Clusterberichte – reine Projektion
plot_triangulation_3d_simple(feats, title="Triangulation (3D): Kapitel × System × Effekt (ohne Cluster)")
tri_summary = {}
else:
tri_summary = summarize_triangulation(feats, top_n=5)
for cid in sorted(tri_summary.keys()):
info = tri_summary[cid]
print(f"\nCluster {cid} – n={info['n']} | mean d={info['mean_d']:.2f}")
print(" Top +: ")
for r in info["top_positive"]:
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
print(" Top -: ")
for r in info["top_negative"]:
print(f" {r['Thermometer_ID']}: {r['Stichwort']} | d={float(r['Effektstärke']):.2f}")
# Plot
plot_triangulation_3d(feats, title="Triangulation (3D): Kapitel × System × Effekt")
# Export JSON
payload = {
"extremes": extremes,
"item_projection": item_proj_summary,
"triangulation": {
"clusters": tri_summary,
},
"coupling": coupling,
"coupling_potential": coupling_potential_summary,
"meta": {
"theme": theme,
"min_abs_d": float(min_abs_d),
"kapitel_filter": kapitel_filter,
"subkapitel_filter": subkapitel_filter
},
"nodes": [
{
"id": n,
"label": G.nodes[n].get("label", ""),
"type": G.nodes[n].get("bipartite", ""),
"Thermometer_ID": G.nodes[n].get("id"),
"Kapitelname": G.nodes[n].get("kapitelname"),
"Subkapitel": G.nodes[n].get("subkapitel"),
"Effektstärke": G.nodes[n].get("d")
}
for n in G.nodes()
],
"edges": [
{
"source": u,
"target": v,
"weight": float(d.get("weight", 0.0)),
"sign": d.get("sign", "")
}
for u, v, d in G.edges(data=True)
],
"summary": summary
}
print("\nExports: coupling_per_item.csv, coupling_per_need.csv im export/ Ordner abgelegt.")
export_json(payload, "network_systemebenen.json")
# -----------------------------------------
# Main
# -----------------------------------------
if __name__ == "__main__":
# Beispiel: keine Filter, aber du kannst unten einfach drehen:
# - min_abs_d=0.10 (macht das Netz ruhiger)
# - kapitel_filter=[5,6,7] oder subkapitel_filter=["Fähigkeiten", ...]
run_network_analysis(
csv_path=os.path.join(os.path.dirname(__file__), csv_file),
min_abs_d=0.00,
kapitel_filter=None,
subkapitel_filter=None,
seed=42,
z_mode=_Z_MODE
)