This repository has been archived on 2025-10-27. You can view files and clone it, but cannot push or open issues or pull requests.
Files
visible-learning/Robustheitsprüfung.py

429 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# robustheit_visible_learning.py
# Vollständige Robustheitsprüfung für Visible-Learning-Analyse
# Liest generierte Exporte aus ./export, erzeugt neue CSV/JSON-Resultate für Bootstraps,
# Permutationstests, Netzwerk-Nullmodelle und Sensitivitätstests.
from __future__ import annotations
import os, json, math, random, itertools
from pathlib import Path
import numpy as np
import pandas as pd
import networkx as nx
from networkx.algorithms import community as nx_comm
# ------------------------------------------------------------
# Pfade & Konstanten
# ------------------------------------------------------------
HERE = Path(__file__).resolve().parent
EXPORT = HERE / "export"
EXPORT.mkdir(exist_ok=True)
THERMO_CSV = HERE / "Thermometer.csv" # deine Hauptquelle
COUPLING_ITEM_CSV = EXPORT / "coupling_per_item.csv"
COUPLING_NEED_CSV = EXPORT / "coupling_per_need.csv"
COUPLING_POT_NEED_CSV = EXPORT / "coupling_potential_per_need.csv"
TRIANG_NEEDS_CSV = EXPORT / "triangulation_needs_3d.csv"
NEEDS_MAPPING_CSV = EXPORT / "needs_mapping_codes.csv"
WERTE_MAPPING_CSV = HERE / "werte_mapping.csv" # optional (falls vorhanden)
# Ergebnis-Dateien
OUT_BOOTSTRAP_Q = EXPORT / "robust_bootstrap_Q.csv"
OUT_PERM_NEEDS = EXPORT / "robust_permutation_needs.csv"
OUT_NULLMODEL_Q = EXPORT / "robust_nullmodel_Q.csv"
OUT_SENS_ITEM = EXPORT / "robust_sensitivity_items.csv"
OUT_SENS_TOPK = EXPORT / "robust_sensitivity_topk.csv"
OUT_SENS_NEEDSWAP = EXPORT / "robust_sensitivity_needswap.csv"
OUT_SUMMARY_JSON = EXPORT / "robust_summary.json"
# Standard-Parameter
SEED = 42
B_BOOT = 1000 # Anzahl Bootstrap-Replikate
P_PERM = 2000 # Anzahl Permutationstests
M_NULL = 500 # Anzahl Nullmodell-Rewirings
K_TOP = 10 # Top-k Items für Entfernung im Sensitivitätstest
ALT_NEED_SWAP_FRAC = 0.1 # ~10% Items zufällig auf anderes Bedürfnis mappen
rng = np.random.default_rng(SEED)
random.seed(SEED)
# ------------------------------------------------------------
# Utilities
# ------------------------------------------------------------
def _ensure_float(s: pd.Series) -> pd.Series:
x = s.astype(str).str.replace(",", ".", regex=False).str.strip()
return pd.to_numeric(x, errors="coerce")
def load_base() -> pd.DataFrame:
if not THERMO_CSV.exists():
raise FileNotFoundError(f"Thermometer.csv nicht gefunden: {THERMO_CSV}")
df = pd.read_csv(THERMO_CSV)
req = ["Thermometer_ID","Stichwort","Effektstärke","Subkapitel","Kapitelname","Systemebene"]
missing = [c for c in req if c not in df.columns]
if missing:
raise ValueError(f"Fehlende Spalten in Thermometer.csv: {missing}")
df["Effektstärke"] = _ensure_float(df["Effektstärke"])
# ggf. Bedürfnis-Spalte aus Mapping mergen
if "Young_Beduerfnis" not in df.columns and WERTE_MAPPING_CSV.exists():
try:
m = pd.read_csv(WERTE_MAPPING_CSV)
if {"Thermometer_ID","Young_Beduerfnis"}.issubset(m.columns):
df = df.merge(m[["Thermometer_ID","Young_Beduerfnis"]], on="Thermometer_ID", how="left")
except Exception:
df["Young_Beduerfnis"] = np.nan
else:
df["Young_Beduerfnis"] = df.get("Young_Beduerfnis", np.nan)
# nur gültige Systemebenen
mask = df["Systemebene"].astype(str).str.lower().isin(["psychisch","sozial"])
df = df[mask].dropna(subset=["Effektstärke"]).copy()
# Kapitel numerisch (optional)
try:
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
except Exception:
df["Kapitel"] = np.nan
return df
def build_bipartite(df: pd.DataFrame) -> nx.Graph:
G = nx.Graph()
for s in ["psychisch","sozial"]:
G.add_node(f"system::{s}", bipartite="system", label=s.capitalize())
for _, r in df.iterrows():
sys = str(r["Systemebene"]).lower()
u = f"system::{sys}"
v = f"item::{r['Thermometer_ID']}"
G.add_node(v, bipartite="item",
id=str(r["Thermometer_ID"]),
label=str(r["Stichwort"]),
kapitelname=str(r["Kapitelname"]),
subkapitel=str(r["Subkapitel"]),
d=float(r["Effektstärke"]))
G.add_edge(u, v, weight=float(r["Effektstärke"]))
return G
def item_projection(G: nx.Graph) -> nx.Graph:
items = [n for n,d in G.nodes(data=True) if d.get("bipartite")=="item"]
systems = [n for n,d in G.nodes(data=True) if d.get("bipartite")=="system"]
Gi = nx.Graph()
for it in items:
Gi.add_node(it, **G.nodes[it])
sys_to_items = {s:[] for s in systems}
for u,v,d in G.edges(data=True):
if u in systems and v in items:
sys_to_items[u].append((v, abs(float(d.get("weight",0.0)))))
elif v in systems and u in items:
sys_to_items[v].append((u, abs(float(d.get("weight",0.0)))))
for s, lst in sys_to_items.items():
for i in range(len(lst)):
for j in range(i+1, len(lst)):
a, wa = lst[i]; b, wb = lst[j]
w = min(wa, wb)
if Gi.has_edge(a,b):
Gi[a][b]["weight"] += w
else:
Gi.add_edge(a,b,weight=w)
return Gi
def modularity_Q_psych_sozial(G: nx.Graph) -> float:
# Partition: Systemknoten + ihre Items
parts = {0:set(), 1:set()}
for n,d in G.nodes(data=True):
if d.get("bipartite")=="system":
lbl = str(d.get("label","")).lower()
parts[0 if "psych" in lbl else 1].add(n)
for n,d in G.nodes(data=True):
if d.get("bipartite")=="item":
sys_lbls = [G.nodes[nbr].get("label","").lower() for nbr in G[n]]
parts[0 if any("psych" in s for s in sys_lbls) else 1].add(n)
H = G.copy()
for u,v,d in H.edges(data=True):
d["weight"] = abs(float(d.get("weight",0.0)))
try:
return float(nx_comm.modularity(H, [parts[0],parts[1]], weight="weight"))
except Exception:
return float("nan")
def betweenness_on_projection(Gi: nx.Graph) -> dict[str,float]:
if Gi.number_of_edges()==0:
return {}
H = Gi.copy()
eps = 1e-9
for u,v,d in H.edges(data=True):
w = float(d.get("weight",0.0))
d["length"] = 1.0/max(eps, w)
return nx.betweenness_centrality(H, weight="length", normalized=True)
def abs_d_norm(series: pd.Series) -> pd.Series:
x = series.to_numpy(dtype=float)
mn, mx = np.nanmin(x), np.nanmax(x)
return pd.Series(np.zeros_like(x)) if (not np.isfinite(mn) or not np.isfinite(mx) or mx<=mn) \
else pd.Series((x-mn)/(mx-mn))
def observed_coupling_index(df: pd.DataFrame) -> tuple[pd.DataFrame, float]:
G = build_bipartite(df)
Gi = item_projection(G)
bc = betweenness_on_projection(Gi)
data = df.copy()
data["abs_d"] = data["Effektstärke"].abs()
data["abs_d_norm"] = abs_d_norm(data["abs_d"])
data["bc_norm"] = [bc.get(f"item::{tid}", 0.0) for tid in data["Thermometer_ID"].astype(str)]
data["coupling_index"] = data["abs_d_norm"] * data["bc_norm"]
# Summierter CI über alle Items
ci_sum = float(data["coupling_index"].sum())
return data, ci_sum
# ------------------------------------------------------------
# 1) BOOTSTRAP (Items resamplen) Kennzahlen: Q, CI_sum
# ------------------------------------------------------------
def run_bootstrap(df: pd.DataFrame, B: int = B_BOOT) -> pd.DataFrame:
rows = []
n = len(df)
for b in range(B):
idx = rng.integers(0, n, size=n) # mit Zurücklegen
samp = df.iloc[idx].copy()
G = build_bipartite(samp)
Q = modularity_Q_psych_sozial(G)
per_item, ci_sum = observed_coupling_index(samp)
rows.append({"b": b+1, "Q": Q, "CI_sum": ci_sum})
out = pd.DataFrame(rows)
out.to_csv(OUT_BOOTSTRAP_Q, index=False, encoding="utf-8")
return out
# ------------------------------------------------------------
# 2) PERMUTATION der Bedürfnis-Labels Benchmark ggü. beobachtet
# Kennzahl: coupling_potential (oder CI_sum nach Need)
# ------------------------------------------------------------
def coupling_potential_by_need(df: pd.DataFrame) -> pd.DataFrame:
d = df.copy()
d["abs_d"] = d["Effektstärke"].abs()
d_ps = d.groupby(["Young_Beduerfnis","Systemebene"])["abs_d"].sum().unstack(fill_value=0.0)
d_ps.columns = [c.lower() for c in d_ps.columns]
d_ps["E_sum"] = d_ps.sum(axis=1)
d_ps["balance"] = 1.0 - (d_ps.apply(lambda r: abs((r.get("psychisch",0.0)/r["E_sum"]) - (r.get("sozial",0.0)/r["E_sum"])) if r["E_sum"]>0 else 1.0, axis=1))
d_ps["coupling_potential"] = d_ps["E_sum"] * d_ps["balance"]
d_ps["bridge_energy"] = np.minimum(d_ps.get("psychisch",0.0), d_ps.get("sozial",0.0))
d_ps = d_ps.reset_index().rename(columns={"Young_Beduerfnis":"Beduerfnis"})
return d_ps[["Beduerfnis","E_sum","psychisch","sozial","balance","coupling_potential","bridge_energy"]]
def run_permutation_needs(df: pd.DataFrame, P: int = P_PERM) -> pd.DataFrame:
# beobachtet
obs = coupling_potential_by_need(df)
obs_total = float(obs["coupling_potential"].sum())
rows = [{"perm": 0, "cp_total": obs_total, "is_observed": 1}]
# Permutieren: Bedürfnisse zufällig permutieren
needs = df["Young_Beduerfnis"].astype(str).fillna("").to_numpy()
for p in range(1, P+1):
perm = needs.copy()
rng.shuffle(perm)
dperm = df.copy()
dperm["Young_Beduerfnis"] = perm
cp = coupling_potential_by_need(dperm)
rows.append({"perm": p, "cp_total": float(cp["coupling_potential"].sum()), "is_observed": 0})
out = pd.DataFrame(rows)
out.to_csv(OUT_PERM_NEEDS, index=False, encoding="utf-8")
return out
def export_observed_need_coupling(df: pd.DataFrame) -> None:
obs = coupling_potential_by_need(df)
(EXPORT / "observed_coupling_per_need.csv").write_text(
obs.to_csv(index=False, encoding="utf-8"), encoding="utf-8"
)
# ------------------------------------------------------------
# 3) NETZWERK-NULLMODELLE bipartites Rewiring (Erhalte Knotengrade)
# Kennzahl: Q (psych/sozial)
# ------------------------------------------------------------
def rewire_bipartite_preserve_degrees(G: nx.Graph, iters: int = 10_000) -> nx.Graph:
"""Bipartites, grad-erhaltendes Rewiring.
(systemAitem1, systemBitem2) -> (systemAitem2, systemBitem1),
ohne Duplikate/Loops. Bis zu `iters` erfolgreiche Swaps.
"""
H = G.copy()
edges = [(u, v) if str(u).startswith("system::") else (v, u)
for u, v in H.edges()]
systems = [n for n in H.nodes if str(n).startswith("system::")]
items = [n for n in H.nodes if str(n).startswith("item::")]
adj = {n: set(H.neighbors(n)) for n in H.nodes}
tries = 0; swaps = 0; max_tries = iters * 20
m = len(edges)
if m < 2:
return H
rng_local = np.random.default_rng(SEED)
while swaps < iters and tries < max_tries:
i, j = rng_local.integers(0, m, size=2)
if i == j: tries += 1; continue
s1, it1 = edges[i]; s2, it2 = edges[j]
if s1 == s2 or it1 == it2: tries += 1; continue
a1, b1 = s1, it2
a2, b2 = s2, it1
if b1 in adj[a1] or b2 in adj[a2]:
tries += 1; continue
# Capture current edge weights from H before removing (edges may have been rewired already)
w1 = float(H[s1][it1].get("weight", 1.0))
w2 = float(H[s2][it2].get("weight", 1.0))
H.remove_edge(s1, it1)
H.remove_edge(s2, it2)
H.add_edge(a1, b1, weight=w1)
H.add_edge(a2, b2, weight=w2)
adj[a1].add(b1); adj[b1].add(a1)
adj[a2].add(b2); adj[b2].add(a2)
adj[s1].discard(it1); adj[it1].discard(s1)
adj[s2].discard(it2); adj[it2].discard(s2)
edges[i] = (a1, b1); edges[j] = (a2, b2)
swaps += 1; tries += 1
return H
def run_nullmodels_Q(df: pd.DataFrame, M: int = M_NULL) -> pd.DataFrame:
G = build_bipartite(df)
Q_obs = modularity_Q_psych_sozial(G)
rows = [{"trial": 0, "Q": Q_obs, "is_observed": 1}]
for m in range(1, M + 1):
H = rewire_bipartite_preserve_degrees(G, iters=2000)
q = modularity_Q_psych_sozial(H)
rows.append({"trial": m, "Q": q, "is_observed": 0})
out = pd.DataFrame(rows)
out_valid = out.dropna(subset=["Q"]).copy()
out_valid.to_csv(OUT_NULLMODEL_Q, index=False, encoding="utf-8")
return out_valid
# ------------------------------------------------------------
# 4) SENSITIVITÄT (a) Leave-One-Out, (b) Top-k entfernen,
# (c) alternative Bedarfszuordnung (10%)
# ------------------------------------------------------------
def run_sensitivity_items(df: pd.DataFrame) -> pd.DataFrame:
rows = []
_, base_ci = observed_coupling_index(df)
base_ci = float(base_ci)
for _, r in df.iterrows():
tid = str(r["Thermometer_ID"])
d2 = df[df["Thermometer_ID"].astype(str) != tid]
_, ci_sum = observed_coupling_index(d2)
ci_sum = float(ci_sum)
rows.append({
"Thermometer_ID": tid,
"CI_sum_after_drop": ci_sum,
"CI_delta": ci_sum - base_ci # < 0 bedeutet: Item trägt stark zum CI bei
})
out = pd.DataFrame(rows).sort_values("CI_delta")
out.to_csv(OUT_SENS_ITEM, index=False, encoding="utf-8")
return out
def run_sensitivity_topk(df: pd.DataFrame, k: int = K_TOP) -> pd.DataFrame:
# Top-k nach |d| entfernen und Kennzahlen neu
d = df.copy()
d["abs_d"] = d["Effektstärke"].abs()
top = d.sort_values("abs_d", ascending=False).head(k)["Thermometer_ID"].astype(str).tolist()
d2 = d[~d["Thermometer_ID"].astype(str).isin(top)].copy()
G2 = build_bipartite(d2)
Q2 = modularity_Q_psych_sozial(G2)
_, CI2 = observed_coupling_index(d2)
out = pd.DataFrame([{"k": k, "removed_ids": ";".join(top), "Q_after": Q2, "CI_sum_after": CI2}])
out.to_csv(OUT_SENS_TOPK, index=False, encoding="utf-8")
return out
def run_sensitivity_needswap(df: pd.DataFrame, frac: float = ALT_NEED_SWAP_FRAC, trials: int = 200) -> pd.DataFrame:
needs = df["Young_Beduerfnis"].astype(str).fillna("").tolist()
uniq = sorted(set([n for n in needs if n]))
rows = []
for t in range(1, trials+1):
d2 = df.copy()
idx = rng.choice(len(d2), size=max(1, int(frac*len(d2))), replace=False)
# für die gewählten Items ein anderes (zufälliges) Bedürfnis zuweisen
for i in idx:
cur = str(d2.iloc[i]["Young_Beduerfnis"])
choices = [u for u in uniq if u and u != cur] or [cur]
d2.iloc[i, d2.columns.get_loc("Young_Beduerfnis")] = random.choice(choices)
cp = coupling_potential_by_need(d2)
rows.append({"trial": t, "cp_total": float(cp["coupling_potential"].sum())})
out = pd.DataFrame(rows)
out.to_csv(OUT_SENS_NEEDSWAP, index=False, encoding="utf-8")
return out
# ------------------------------------------------------------
# MAIN
# ------------------------------------------------------------
def main():
print("Lade Daten …")
df = load_base()
print("Berechne beobachtete Kennzahlen …")
G = build_bipartite(df)
Q_obs = modularity_Q_psych_sozial(G)
per_item_obs, CI_obs = observed_coupling_index(df)
export_observed_need_coupling(df)
# Sicher: beobachtete Zusammenfassung wegschreiben
per_item_obs.to_csv(EXPORT / "observed_per_item_metrics.csv", index=False, encoding="utf-8")
print("Bootstrap …")
boot = run_bootstrap(df, B_BOOT)
print("Permutation (Needs) …")
perm = run_permutation_needs(df, P_PERM)
print("Nullmodelle (Rewiring) …")
nullm = run_nullmodels_Q(df, M_NULL)
print("Sensitivität: Leave-One-Out …")
sens_items = run_sensitivity_items(df)
print(f"Sensitivität: Top-{K_TOP} entfernen …")
sens_topk = run_sensitivity_topk(df, K_TOP)
print("Sensitivität: Need-Swap (10%) …")
sens_need = run_sensitivity_needswap(df, ALT_NEED_SWAP_FRAC, trials=200)
# Kurze Zusammenfassung
summary = {
"observed": {
"Q_psych_sozial": Q_obs,
"CI_sum": CI_obs,
"n_items": int(len(df))
},
"bootstrap": {
"B": int(len(boot)),
"Q_mean": float(boot["Q"].mean()),
"Q_ci95": [float(boot["Q"].quantile(0.025)), float(boot["Q"].quantile(0.975))],
"CI_sum_mean": float(boot["CI_sum"].mean()),
"CI_sum_ci95": [float(boot["CI_sum"].quantile(0.025)), float(boot["CI_sum"].quantile(0.975))]
},
"permutation_needs": {
"P": int(len(perm) - 1),
"observed_cp_total": float(perm.loc[perm["is_observed"] == 1, "cp_total"].iloc[0]),
"p_value_right": float(((perm["cp_total"] >= perm.loc[0, "cp_total"]) & perm["cp_total"].notna()).sum() + 1) / float(len(perm.dropna(subset=["cp_total"])) + 1),
"p_value_left": float(((perm["cp_total"] <= perm.loc[0, "cp_total"]) & perm["cp_total"].notna()).sum() + 1) / float(len(perm.dropna(subset=["cp_total"])) + 1),
},
"nullmodels": {
"M": int(len(nullm) - 1),
"Q_obs": float(nullm.loc[nullm["is_observed"] == 1, "Q"].iloc[0]),
"Q_null_mean": float(nullm.loc[nullm["is_observed"] == 0, "Q"].dropna().mean()),
"Q_null_ci95": [
float(nullm.loc[nullm["is_observed"] == 0, "Q"].dropna().quantile(0.025)),
float(nullm.loc[nullm["is_observed"] == 0, "Q"].dropna().quantile(0.975)),
],
},
"sensitivity": {
"leave_one_out_min_CI_sum": float(sens_items["CI_sum_after_drop"].min()) if len(sens_items) else None,
"top_k_removed": K_TOP,
"top_k_Q_after": float(sens_topk["Q_after"].iloc[0]),
"top_k_CI_sum_after": float(sens_topk["CI_sum_after"].iloc[0]),
"need_swap_trials": int(len(sens_need)),
"need_swap_cp_mean": float(sens_need["cp_total"].mean()),
"need_swap_cp_ci95": [float(sens_need["cp_total"].quantile(0.025)),
float(sens_need["cp_total"].quantile(0.975))]
}
}
with open(OUT_SUMMARY_JSON, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
# Konsolenreport
print("\n=== ROBUSTHEITS-ZUSAMMENFASSUNG ===")
print(f"Q (beobachtet): {summary['observed']['Q_psych_sozial']:.3f}")
print(f"CI_sum (beobachtet): {summary['observed']['CI_sum']:.3f}")
print(f"Bootstrap Q 95%-CI: {summary['bootstrap']['Q_ci95']}")
print(f"Permutation Needs p_right: {summary['permutation_needs']['p_value_right']:.4f} | p_left: {summary['permutation_needs']['p_value_left']:.4f}")
print(f"Nullmodelle Q_null_mean: {summary['nullmodels']['Q_null_mean']:.3f} | 95%-CI: {summary['nullmodels']['Q_null_ci95']}")
print(f"Top-{K_TOP} Entfernen -> Q={summary['sensitivity']['top_k_Q_after']:.3f}, CI_sum={summary['sensitivity']['top_k_CI_sum_after']:.3f}")
print("Ergebnisse gespeichert in:", EXPORT)
if __name__ == "__main__":
main()