429 lines
19 KiB
Python
429 lines
19 KiB
Python
# robustheit_visible_learning.py
|
||
# Vollständige Robustheitsprüfung für Visible-Learning-Analyse
|
||
# Liest generierte Exporte aus ./export, erzeugt neue CSV/JSON-Resultate für Bootstraps,
|
||
# Permutationstests, Netzwerk-Nullmodelle und Sensitivitätstests.
|
||
|
||
from __future__ import annotations
|
||
import os, json, math, random, itertools
|
||
from pathlib import Path
|
||
import numpy as np
|
||
import pandas as pd
|
||
import networkx as nx
|
||
from networkx.algorithms import community as nx_comm
|
||
|
||
# ------------------------------------------------------------
|
||
# Pfade & Konstanten
|
||
# ------------------------------------------------------------
|
||
HERE = Path(__file__).resolve().parent
|
||
EXPORT = HERE / "export"
|
||
EXPORT.mkdir(exist_ok=True)
|
||
|
||
THERMO_CSV = HERE / "Thermometer.csv" # deine Hauptquelle
|
||
COUPLING_ITEM_CSV = EXPORT / "coupling_per_item.csv"
|
||
COUPLING_NEED_CSV = EXPORT / "coupling_per_need.csv"
|
||
COUPLING_POT_NEED_CSV = EXPORT / "coupling_potential_per_need.csv"
|
||
TRIANG_NEEDS_CSV = EXPORT / "triangulation_needs_3d.csv"
|
||
NEEDS_MAPPING_CSV = EXPORT / "needs_mapping_codes.csv"
|
||
WERTE_MAPPING_CSV = HERE / "werte_mapping.csv" # optional (falls vorhanden)
|
||
|
||
# Ergebnis-Dateien
|
||
OUT_BOOTSTRAP_Q = EXPORT / "robust_bootstrap_Q.csv"
|
||
OUT_PERM_NEEDS = EXPORT / "robust_permutation_needs.csv"
|
||
OUT_NULLMODEL_Q = EXPORT / "robust_nullmodel_Q.csv"
|
||
OUT_SENS_ITEM = EXPORT / "robust_sensitivity_items.csv"
|
||
OUT_SENS_TOPK = EXPORT / "robust_sensitivity_topk.csv"
|
||
OUT_SENS_NEEDSWAP = EXPORT / "robust_sensitivity_needswap.csv"
|
||
OUT_SUMMARY_JSON = EXPORT / "robust_summary.json"
|
||
|
||
# Standard-Parameter
|
||
SEED = 42
|
||
B_BOOT = 1000 # Anzahl Bootstrap-Replikate
|
||
P_PERM = 2000 # Anzahl Permutationstests
|
||
M_NULL = 500 # Anzahl Nullmodell-Rewirings
|
||
K_TOP = 10 # Top-k Items für Entfernung im Sensitivitätstest
|
||
ALT_NEED_SWAP_FRAC = 0.1 # ~10% Items zufällig auf anderes Bedürfnis mappen
|
||
|
||
rng = np.random.default_rng(SEED)
|
||
random.seed(SEED)
|
||
|
||
# ------------------------------------------------------------
|
||
# Utilities
|
||
# ------------------------------------------------------------
|
||
def _ensure_float(s: pd.Series) -> pd.Series:
|
||
x = s.astype(str).str.replace(",", ".", regex=False).str.strip()
|
||
return pd.to_numeric(x, errors="coerce")
|
||
|
||
def load_base() -> pd.DataFrame:
|
||
if not THERMO_CSV.exists():
|
||
raise FileNotFoundError(f"Thermometer.csv nicht gefunden: {THERMO_CSV}")
|
||
df = pd.read_csv(THERMO_CSV)
|
||
req = ["Thermometer_ID","Stichwort","Effektstärke","Subkapitel","Kapitelname","Systemebene"]
|
||
missing = [c for c in req if c not in df.columns]
|
||
if missing:
|
||
raise ValueError(f"Fehlende Spalten in Thermometer.csv: {missing}")
|
||
df["Effektstärke"] = _ensure_float(df["Effektstärke"])
|
||
# ggf. Bedürfnis-Spalte aus Mapping mergen
|
||
if "Young_Beduerfnis" not in df.columns and WERTE_MAPPING_CSV.exists():
|
||
try:
|
||
m = pd.read_csv(WERTE_MAPPING_CSV)
|
||
if {"Thermometer_ID","Young_Beduerfnis"}.issubset(m.columns):
|
||
df = df.merge(m[["Thermometer_ID","Young_Beduerfnis"]], on="Thermometer_ID", how="left")
|
||
except Exception:
|
||
df["Young_Beduerfnis"] = np.nan
|
||
else:
|
||
df["Young_Beduerfnis"] = df.get("Young_Beduerfnis", np.nan)
|
||
# nur gültige Systemebenen
|
||
mask = df["Systemebene"].astype(str).str.lower().isin(["psychisch","sozial"])
|
||
df = df[mask].dropna(subset=["Effektstärke"]).copy()
|
||
# Kapitel numerisch (optional)
|
||
try:
|
||
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
|
||
except Exception:
|
||
df["Kapitel"] = np.nan
|
||
return df
|
||
|
||
def build_bipartite(df: pd.DataFrame) -> nx.Graph:
|
||
G = nx.Graph()
|
||
for s in ["psychisch","sozial"]:
|
||
G.add_node(f"system::{s}", bipartite="system", label=s.capitalize())
|
||
for _, r in df.iterrows():
|
||
sys = str(r["Systemebene"]).lower()
|
||
u = f"system::{sys}"
|
||
v = f"item::{r['Thermometer_ID']}"
|
||
G.add_node(v, bipartite="item",
|
||
id=str(r["Thermometer_ID"]),
|
||
label=str(r["Stichwort"]),
|
||
kapitelname=str(r["Kapitelname"]),
|
||
subkapitel=str(r["Subkapitel"]),
|
||
d=float(r["Effektstärke"]))
|
||
G.add_edge(u, v, weight=float(r["Effektstärke"]))
|
||
return G
|
||
|
||
def item_projection(G: nx.Graph) -> nx.Graph:
|
||
items = [n for n,d in G.nodes(data=True) if d.get("bipartite")=="item"]
|
||
systems = [n for n,d in G.nodes(data=True) if d.get("bipartite")=="system"]
|
||
Gi = nx.Graph()
|
||
for it in items:
|
||
Gi.add_node(it, **G.nodes[it])
|
||
sys_to_items = {s:[] for s in systems}
|
||
for u,v,d in G.edges(data=True):
|
||
if u in systems and v in items:
|
||
sys_to_items[u].append((v, abs(float(d.get("weight",0.0)))))
|
||
elif v in systems and u in items:
|
||
sys_to_items[v].append((u, abs(float(d.get("weight",0.0)))))
|
||
for s, lst in sys_to_items.items():
|
||
for i in range(len(lst)):
|
||
for j in range(i+1, len(lst)):
|
||
a, wa = lst[i]; b, wb = lst[j]
|
||
w = min(wa, wb)
|
||
if Gi.has_edge(a,b):
|
||
Gi[a][b]["weight"] += w
|
||
else:
|
||
Gi.add_edge(a,b,weight=w)
|
||
return Gi
|
||
|
||
def modularity_Q_psych_sozial(G: nx.Graph) -> float:
|
||
# Partition: Systemknoten + ihre Items
|
||
parts = {0:set(), 1:set()}
|
||
for n,d in G.nodes(data=True):
|
||
if d.get("bipartite")=="system":
|
||
lbl = str(d.get("label","")).lower()
|
||
parts[0 if "psych" in lbl else 1].add(n)
|
||
for n,d in G.nodes(data=True):
|
||
if d.get("bipartite")=="item":
|
||
sys_lbls = [G.nodes[nbr].get("label","").lower() for nbr in G[n]]
|
||
parts[0 if any("psych" in s for s in sys_lbls) else 1].add(n)
|
||
H = G.copy()
|
||
for u,v,d in H.edges(data=True):
|
||
d["weight"] = abs(float(d.get("weight",0.0)))
|
||
try:
|
||
return float(nx_comm.modularity(H, [parts[0],parts[1]], weight="weight"))
|
||
except Exception:
|
||
return float("nan")
|
||
|
||
def betweenness_on_projection(Gi: nx.Graph) -> dict[str,float]:
|
||
if Gi.number_of_edges()==0:
|
||
return {}
|
||
H = Gi.copy()
|
||
eps = 1e-9
|
||
for u,v,d in H.edges(data=True):
|
||
w = float(d.get("weight",0.0))
|
||
d["length"] = 1.0/max(eps, w)
|
||
return nx.betweenness_centrality(H, weight="length", normalized=True)
|
||
|
||
def abs_d_norm(series: pd.Series) -> pd.Series:
|
||
x = series.to_numpy(dtype=float)
|
||
mn, mx = np.nanmin(x), np.nanmax(x)
|
||
return pd.Series(np.zeros_like(x)) if (not np.isfinite(mn) or not np.isfinite(mx) or mx<=mn) \
|
||
else pd.Series((x-mn)/(mx-mn))
|
||
|
||
def observed_coupling_index(df: pd.DataFrame) -> tuple[pd.DataFrame, float]:
|
||
G = build_bipartite(df)
|
||
Gi = item_projection(G)
|
||
bc = betweenness_on_projection(Gi)
|
||
data = df.copy()
|
||
data["abs_d"] = data["Effektstärke"].abs()
|
||
data["abs_d_norm"] = abs_d_norm(data["abs_d"])
|
||
data["bc_norm"] = [bc.get(f"item::{tid}", 0.0) for tid in data["Thermometer_ID"].astype(str)]
|
||
data["coupling_index"] = data["abs_d_norm"] * data["bc_norm"]
|
||
# Summierter CI über alle Items
|
||
ci_sum = float(data["coupling_index"].sum())
|
||
return data, ci_sum
|
||
|
||
# ------------------------------------------------------------
|
||
# 1) BOOTSTRAP (Items resamplen) – Kennzahlen: Q, CI_sum
|
||
# ------------------------------------------------------------
|
||
def run_bootstrap(df: pd.DataFrame, B: int = B_BOOT) -> pd.DataFrame:
|
||
rows = []
|
||
n = len(df)
|
||
for b in range(B):
|
||
idx = rng.integers(0, n, size=n) # mit Zurücklegen
|
||
samp = df.iloc[idx].copy()
|
||
G = build_bipartite(samp)
|
||
Q = modularity_Q_psych_sozial(G)
|
||
per_item, ci_sum = observed_coupling_index(samp)
|
||
rows.append({"b": b+1, "Q": Q, "CI_sum": ci_sum})
|
||
out = pd.DataFrame(rows)
|
||
out.to_csv(OUT_BOOTSTRAP_Q, index=False, encoding="utf-8")
|
||
return out
|
||
|
||
# ------------------------------------------------------------
|
||
# 2) PERMUTATION der Bedürfnis-Labels – Benchmark ggü. beobachtet
|
||
# Kennzahl: coupling_potential (oder CI_sum nach Need)
|
||
# ------------------------------------------------------------
|
||
def coupling_potential_by_need(df: pd.DataFrame) -> pd.DataFrame:
|
||
d = df.copy()
|
||
d["abs_d"] = d["Effektstärke"].abs()
|
||
d_ps = d.groupby(["Young_Beduerfnis","Systemebene"])["abs_d"].sum().unstack(fill_value=0.0)
|
||
d_ps.columns = [c.lower() for c in d_ps.columns]
|
||
d_ps["E_sum"] = d_ps.sum(axis=1)
|
||
d_ps["balance"] = 1.0 - (d_ps.apply(lambda r: abs((r.get("psychisch",0.0)/r["E_sum"]) - (r.get("sozial",0.0)/r["E_sum"])) if r["E_sum"]>0 else 1.0, axis=1))
|
||
d_ps["coupling_potential"] = d_ps["E_sum"] * d_ps["balance"]
|
||
d_ps["bridge_energy"] = np.minimum(d_ps.get("psychisch",0.0), d_ps.get("sozial",0.0))
|
||
d_ps = d_ps.reset_index().rename(columns={"Young_Beduerfnis":"Beduerfnis"})
|
||
return d_ps[["Beduerfnis","E_sum","psychisch","sozial","balance","coupling_potential","bridge_energy"]]
|
||
|
||
def run_permutation_needs(df: pd.DataFrame, P: int = P_PERM) -> pd.DataFrame:
|
||
# beobachtet
|
||
obs = coupling_potential_by_need(df)
|
||
obs_total = float(obs["coupling_potential"].sum())
|
||
rows = [{"perm": 0, "cp_total": obs_total, "is_observed": 1}]
|
||
# Permutieren: Bedürfnisse zufällig permutieren
|
||
needs = df["Young_Beduerfnis"].astype(str).fillna("").to_numpy()
|
||
for p in range(1, P+1):
|
||
perm = needs.copy()
|
||
rng.shuffle(perm)
|
||
dperm = df.copy()
|
||
dperm["Young_Beduerfnis"] = perm
|
||
cp = coupling_potential_by_need(dperm)
|
||
rows.append({"perm": p, "cp_total": float(cp["coupling_potential"].sum()), "is_observed": 0})
|
||
out = pd.DataFrame(rows)
|
||
out.to_csv(OUT_PERM_NEEDS, index=False, encoding="utf-8")
|
||
return out
|
||
|
||
def export_observed_need_coupling(df: pd.DataFrame) -> None:
|
||
obs = coupling_potential_by_need(df)
|
||
(EXPORT / "observed_coupling_per_need.csv").write_text(
|
||
obs.to_csv(index=False, encoding="utf-8"), encoding="utf-8"
|
||
)
|
||
|
||
# ------------------------------------------------------------
|
||
# 3) NETZWERK-NULLMODELLE – bipartites Rewiring (Erhalte Knotengrade)
|
||
# Kennzahl: Q (psych/sozial)
|
||
# ------------------------------------------------------------
|
||
def rewire_bipartite_preserve_degrees(G: nx.Graph, iters: int = 10_000) -> nx.Graph:
|
||
"""Bipartites, grad-erhaltendes Rewiring.
|
||
(systemA–item1, systemB–item2) -> (systemA–item2, systemB–item1),
|
||
ohne Duplikate/Loops. Bis zu `iters` erfolgreiche Swaps.
|
||
"""
|
||
H = G.copy()
|
||
edges = [(u, v) if str(u).startswith("system::") else (v, u)
|
||
for u, v in H.edges()]
|
||
systems = [n for n in H.nodes if str(n).startswith("system::")]
|
||
items = [n for n in H.nodes if str(n).startswith("item::")]
|
||
adj = {n: set(H.neighbors(n)) for n in H.nodes}
|
||
tries = 0; swaps = 0; max_tries = iters * 20
|
||
m = len(edges)
|
||
if m < 2:
|
||
return H
|
||
rng_local = np.random.default_rng(SEED)
|
||
while swaps < iters and tries < max_tries:
|
||
i, j = rng_local.integers(0, m, size=2)
|
||
if i == j: tries += 1; continue
|
||
s1, it1 = edges[i]; s2, it2 = edges[j]
|
||
if s1 == s2 or it1 == it2: tries += 1; continue
|
||
a1, b1 = s1, it2
|
||
a2, b2 = s2, it1
|
||
if b1 in adj[a1] or b2 in adj[a2]:
|
||
tries += 1; continue
|
||
# Capture current edge weights from H before removing (edges may have been rewired already)
|
||
w1 = float(H[s1][it1].get("weight", 1.0))
|
||
w2 = float(H[s2][it2].get("weight", 1.0))
|
||
|
||
H.remove_edge(s1, it1)
|
||
H.remove_edge(s2, it2)
|
||
H.add_edge(a1, b1, weight=w1)
|
||
H.add_edge(a2, b2, weight=w2)
|
||
adj[a1].add(b1); adj[b1].add(a1)
|
||
adj[a2].add(b2); adj[b2].add(a2)
|
||
adj[s1].discard(it1); adj[it1].discard(s1)
|
||
adj[s2].discard(it2); adj[it2].discard(s2)
|
||
edges[i] = (a1, b1); edges[j] = (a2, b2)
|
||
swaps += 1; tries += 1
|
||
return H
|
||
def run_nullmodels_Q(df: pd.DataFrame, M: int = M_NULL) -> pd.DataFrame:
|
||
G = build_bipartite(df)
|
||
Q_obs = modularity_Q_psych_sozial(G)
|
||
rows = [{"trial": 0, "Q": Q_obs, "is_observed": 1}]
|
||
for m in range(1, M + 1):
|
||
H = rewire_bipartite_preserve_degrees(G, iters=2000)
|
||
q = modularity_Q_psych_sozial(H)
|
||
rows.append({"trial": m, "Q": q, "is_observed": 0})
|
||
out = pd.DataFrame(rows)
|
||
out_valid = out.dropna(subset=["Q"]).copy()
|
||
out_valid.to_csv(OUT_NULLMODEL_Q, index=False, encoding="utf-8")
|
||
return out_valid
|
||
|
||
# ------------------------------------------------------------
|
||
# 4) SENSITIVITÄT – (a) Leave-One-Out, (b) Top-k entfernen,
|
||
# (c) alternative Bedarfszuordnung (10%)
|
||
# ------------------------------------------------------------
|
||
def run_sensitivity_items(df: pd.DataFrame) -> pd.DataFrame:
|
||
rows = []
|
||
_, base_ci = observed_coupling_index(df)
|
||
base_ci = float(base_ci)
|
||
for _, r in df.iterrows():
|
||
tid = str(r["Thermometer_ID"])
|
||
d2 = df[df["Thermometer_ID"].astype(str) != tid]
|
||
_, ci_sum = observed_coupling_index(d2)
|
||
ci_sum = float(ci_sum)
|
||
rows.append({
|
||
"Thermometer_ID": tid,
|
||
"CI_sum_after_drop": ci_sum,
|
||
"CI_delta": ci_sum - base_ci # < 0 bedeutet: Item trägt stark zum CI bei
|
||
})
|
||
out = pd.DataFrame(rows).sort_values("CI_delta")
|
||
out.to_csv(OUT_SENS_ITEM, index=False, encoding="utf-8")
|
||
return out
|
||
|
||
def run_sensitivity_topk(df: pd.DataFrame, k: int = K_TOP) -> pd.DataFrame:
|
||
# Top-k nach |d| entfernen und Kennzahlen neu
|
||
d = df.copy()
|
||
d["abs_d"] = d["Effektstärke"].abs()
|
||
top = d.sort_values("abs_d", ascending=False).head(k)["Thermometer_ID"].astype(str).tolist()
|
||
d2 = d[~d["Thermometer_ID"].astype(str).isin(top)].copy()
|
||
G2 = build_bipartite(d2)
|
||
Q2 = modularity_Q_psych_sozial(G2)
|
||
_, CI2 = observed_coupling_index(d2)
|
||
out = pd.DataFrame([{"k": k, "removed_ids": ";".join(top), "Q_after": Q2, "CI_sum_after": CI2}])
|
||
out.to_csv(OUT_SENS_TOPK, index=False, encoding="utf-8")
|
||
return out
|
||
|
||
def run_sensitivity_needswap(df: pd.DataFrame, frac: float = ALT_NEED_SWAP_FRAC, trials: int = 200) -> pd.DataFrame:
|
||
needs = df["Young_Beduerfnis"].astype(str).fillna("").tolist()
|
||
uniq = sorted(set([n for n in needs if n]))
|
||
rows = []
|
||
for t in range(1, trials+1):
|
||
d2 = df.copy()
|
||
idx = rng.choice(len(d2), size=max(1, int(frac*len(d2))), replace=False)
|
||
# für die gewählten Items ein anderes (zufälliges) Bedürfnis zuweisen
|
||
for i in idx:
|
||
cur = str(d2.iloc[i]["Young_Beduerfnis"])
|
||
choices = [u for u in uniq if u and u != cur] or [cur]
|
||
d2.iloc[i, d2.columns.get_loc("Young_Beduerfnis")] = random.choice(choices)
|
||
cp = coupling_potential_by_need(d2)
|
||
rows.append({"trial": t, "cp_total": float(cp["coupling_potential"].sum())})
|
||
out = pd.DataFrame(rows)
|
||
out.to_csv(OUT_SENS_NEEDSWAP, index=False, encoding="utf-8")
|
||
return out
|
||
|
||
# ------------------------------------------------------------
|
||
# MAIN
|
||
# ------------------------------------------------------------
|
||
def main():
|
||
print("Lade Daten …")
|
||
df = load_base()
|
||
|
||
print("Berechne beobachtete Kennzahlen …")
|
||
G = build_bipartite(df)
|
||
Q_obs = modularity_Q_psych_sozial(G)
|
||
per_item_obs, CI_obs = observed_coupling_index(df)
|
||
export_observed_need_coupling(df)
|
||
|
||
# Sicher: beobachtete Zusammenfassung wegschreiben
|
||
per_item_obs.to_csv(EXPORT / "observed_per_item_metrics.csv", index=False, encoding="utf-8")
|
||
|
||
print("Bootstrap …")
|
||
boot = run_bootstrap(df, B_BOOT)
|
||
|
||
print("Permutation (Needs) …")
|
||
perm = run_permutation_needs(df, P_PERM)
|
||
|
||
print("Nullmodelle (Rewiring) …")
|
||
nullm = run_nullmodels_Q(df, M_NULL)
|
||
|
||
print("Sensitivität: Leave-One-Out …")
|
||
sens_items = run_sensitivity_items(df)
|
||
|
||
print(f"Sensitivität: Top-{K_TOP} entfernen …")
|
||
sens_topk = run_sensitivity_topk(df, K_TOP)
|
||
|
||
print("Sensitivität: Need-Swap (10%) …")
|
||
sens_need = run_sensitivity_needswap(df, ALT_NEED_SWAP_FRAC, trials=200)
|
||
|
||
# Kurze Zusammenfassung
|
||
summary = {
|
||
"observed": {
|
||
"Q_psych_sozial": Q_obs,
|
||
"CI_sum": CI_obs,
|
||
"n_items": int(len(df))
|
||
},
|
||
"bootstrap": {
|
||
"B": int(len(boot)),
|
||
"Q_mean": float(boot["Q"].mean()),
|
||
"Q_ci95": [float(boot["Q"].quantile(0.025)), float(boot["Q"].quantile(0.975))],
|
||
"CI_sum_mean": float(boot["CI_sum"].mean()),
|
||
"CI_sum_ci95": [float(boot["CI_sum"].quantile(0.025)), float(boot["CI_sum"].quantile(0.975))]
|
||
},
|
||
"permutation_needs": {
|
||
"P": int(len(perm) - 1),
|
||
"observed_cp_total": float(perm.loc[perm["is_observed"] == 1, "cp_total"].iloc[0]),
|
||
"p_value_right": float(((perm["cp_total"] >= perm.loc[0, "cp_total"]) & perm["cp_total"].notna()).sum() + 1) / float(len(perm.dropna(subset=["cp_total"])) + 1),
|
||
"p_value_left": float(((perm["cp_total"] <= perm.loc[0, "cp_total"]) & perm["cp_total"].notna()).sum() + 1) / float(len(perm.dropna(subset=["cp_total"])) + 1),
|
||
},
|
||
"nullmodels": {
|
||
"M": int(len(nullm) - 1),
|
||
"Q_obs": float(nullm.loc[nullm["is_observed"] == 1, "Q"].iloc[0]),
|
||
"Q_null_mean": float(nullm.loc[nullm["is_observed"] == 0, "Q"].dropna().mean()),
|
||
"Q_null_ci95": [
|
||
float(nullm.loc[nullm["is_observed"] == 0, "Q"].dropna().quantile(0.025)),
|
||
float(nullm.loc[nullm["is_observed"] == 0, "Q"].dropna().quantile(0.975)),
|
||
],
|
||
},
|
||
"sensitivity": {
|
||
"leave_one_out_min_CI_sum": float(sens_items["CI_sum_after_drop"].min()) if len(sens_items) else None,
|
||
"top_k_removed": K_TOP,
|
||
"top_k_Q_after": float(sens_topk["Q_after"].iloc[0]),
|
||
"top_k_CI_sum_after": float(sens_topk["CI_sum_after"].iloc[0]),
|
||
"need_swap_trials": int(len(sens_need)),
|
||
"need_swap_cp_mean": float(sens_need["cp_total"].mean()),
|
||
"need_swap_cp_ci95": [float(sens_need["cp_total"].quantile(0.025)),
|
||
float(sens_need["cp_total"].quantile(0.975))]
|
||
}
|
||
}
|
||
with open(OUT_SUMMARY_JSON, "w", encoding="utf-8") as f:
|
||
json.dump(summary, f, ensure_ascii=False, indent=2)
|
||
|
||
# Konsolenreport
|
||
print("\n=== ROBUSTHEITS-ZUSAMMENFASSUNG ===")
|
||
print(f"Q (beobachtet): {summary['observed']['Q_psych_sozial']:.3f}")
|
||
print(f"CI_sum (beobachtet): {summary['observed']['CI_sum']:.3f}")
|
||
print(f"Bootstrap Q 95%-CI: {summary['bootstrap']['Q_ci95']}")
|
||
print(f"Permutation Needs p_right: {summary['permutation_needs']['p_value_right']:.4f} | p_left: {summary['permutation_needs']['p_value_left']:.4f}")
|
||
print(f"Nullmodelle Q_null_mean: {summary['nullmodels']['Q_null_mean']:.3f} | 95%-CI: {summary['nullmodels']['Q_null_ci95']}")
|
||
print(f"Top-{K_TOP} Entfernen -> Q={summary['sensitivity']['top_k_Q_after']:.3f}, CI_sum={summary['sensitivity']['top_k_CI_sum_after']:.3f}")
|
||
print("Ergebnisse gespeichert in:", EXPORT)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|