from __future__ import annotations
"""
Visible Learning – Explorative Clusteranalyse
---------------------------------------------
CI: angelehnt an simulation_bildungswirkgefuege
Funktion: CSV mit Effektstärken laden, manuelle Bins + K-Means-Cluster bilden,
Silhouette-Score berechnen und Visualisierungen erzeugen.
"""
# -----------------------------------------
# Imports
# -----------------------------------------
import os
import math
import pandas as pd
import numpy as np
import json
from sklearn.preprocessing import OneHotEncoder
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score, silhouette_samples
from scipy import stats
from scipy.stats import anderson, kruskal, levene, spearmanr
import plotly.graph_objs as go
import plotly.io as pio
# -----------------------------------------
# Konfiguration laden
# -----------------------------------------
from config_visible_learning import (
csv_file,
k_clusters,
export_fig_visual,
export_fig_png,
theme,
selected_kapitel,
analyse_all,
export_werte_all,
)
# -----------------------------------------
# Template/CI
# -----------------------------------------
try:
from ci_template import plotly_template
plotly_template.set_theme(theme)
_ci_layout = lambda title, x, y: plotly_template.get_standard_layout(
title=title, x_title=x, y_title=y
)
except Exception:
# Fallback: neutrale Plotly-Defaults
_ci_layout = lambda title, x, y: dict(
title=title, xaxis_title=x, yaxis_title=y
)
# -----------------------------------------
# Export-Helfer (HTML/PNG) – CI-kompatibel
# -----------------------------------------
EXPORT_DIR = os.path.join(os.path.dirname(__file__), "export")
if not os.path.exists(EXPORT_DIR):
try:
os.makedirs(EXPORT_DIR, exist_ok=True)
except Exception:
pass
def export_figure(fig, name: str, do_html: bool, do_png: bool):
"""Exportiert eine Plotly-Figur gemäß CI-Flags."""
base = os.path.join(EXPORT_DIR, name)
if do_html:
p = f"{base}.html"
pio.write_html(fig, file=p, auto_open=False, include_plotlyjs="cdn")
if do_png:
try:
p = f"{base}.png"
pio.write_image(fig, p, scale=2)
except Exception:
# PNG erfordert Kaleido; leise ignorieren, wenn nicht installiert
pass
# -----------------------------------------
# Daten | Laden & Vorbereiten
# -----------------------------------------
REQUIRED_COLS = ["Thermometer_ID", "Stichwort", "Effektstärke"]
def load_data(csv_path: str) -> pd.DataFrame:
df = pd.read_csv(csv_path)
missing = [c for c in REQUIRED_COLS if c not in df.columns]
if missing:
raise ValueError(f"Fehlende Spalten in CSV: {missing}")
# Typen bereinigen
df["Thermometer_ID"] = df["Thermometer_ID"].astype(str)
# Effektstärke robust in float wandeln
df["Effektstärke"] = (
df["Effektstärke"].astype(str).str.replace(",", ".", regex=False).str.strip()
)
df["Effektstärke"] = pd.to_numeric(df["Effektstärke"], errors="coerce")
# explizit ±inf auf NaN setzen, um sie zu entfernen
df["Effektstärke"] = df["Effektstärke"].replace([np.inf, -np.inf], np.nan)
# Kapitel aus Thermometer_ID ableiten und Kapitelname mappen
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
kapitel_map = {
5: "Lernende",
6: "Elternhaus und Familie",
7: "Schule und Gesellschaft",
8: "Klassenzimmer",
9: "Lehrperson",
10: "Curriculum",
11: "Zielorientiertes Unterrichten",
12: "Lernstrategien",
13: "Lehrstrategien",
14: "Nutzung von Technologien",
15: "Schulische und außerschulische Einflüsse",
}
df["Kapitelname"] = df["Kapitel"].map(kapitel_map).fillna(df["Kapitel"].map(lambda k: f"Kapitel {k}"))
return df.dropna(subset=["Effektstärke"]) # nur gültige Zahlen
def validate_data(df: pd.DataFrame) -> dict:
"""Einfache Datenvalidierung und Qualitätsreport.
Hinweis: Fehlende Effektstärken wurden in load_data bereits entfernt.
"""
report = {}
report["n_rows"] = int(len(df))
# Duplikate
dup_counts = df["Thermometer_ID"].value_counts()
duplicates = dup_counts[dup_counts > 1]
report["duplicate_ids"] = duplicates.to_dict()
report["n_duplicates"] = int(duplicates.sum()) if not duplicates.empty else 0
# Gültige Kapitel (5..15)
valid_kap = set(range(5, 16))
invalid_kapitel = df.loc[~df["Kapitel"].isin(valid_kap), "Kapitel"].tolist()
report["invalid_kapitel_entries"] = invalid_kapitel
# Wertebereich d
report["effekt_min"] = float(df["Effektstärke"].min()) if not df.empty else None
report["effekt_max"] = float(df["Effektstärke"].max()) if not df.empty else None
# Leere Stichworte
empty_keywords = df["Stichwort"].astype(str).str.strip().eq("").sum()
report["empty_stichwort"] = int(empty_keywords)
return report
# -----------------------------------------
# Manuelle Bins (heuristische Einteilung)
# -----------------------------------------
BIN_LABELS = ["negativ", "gering", "mittel", "hoch"]
def manual_bin(d: float) -> str:
if d < 0:
return "negativ"
if 0 <= d < 0.40:
return "gering"
if 0.40 <= d < 0.70:
return "mittel"
return "hoch"
def add_manual_bins(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["Bin"] = df["Effektstärke"].apply(manual_bin)
return df
# -----------------------------------------
# K-Means-Clustering (Effektstärke + Kapitel)
# -----------------------------------------
def encode_features(df: pd.DataFrame, kapitel_weight: float = 1.0) -> tuple[np.ndarray, list[str]]:
"""One-Hot-Encoding des Kapitels + Effektstärke (metrisch)."""
try:
enc = OneHotEncoder(sparse_output=False, handle_unknown="ignore") # neuere sklearn-Versionen
except TypeError:
enc = OneHotEncoder(sparse=False, handle_unknown="ignore") # ältere sklearn-Versionen
cat = df[["Kapitel"]].fillna(-1)
cat_ohe = enc.fit_transform(cat)
cat_ohe = cat_ohe * float(kapitel_weight)
eff = df[["Effektstärke"]].values
X = np.hstack([eff, cat_ohe])
feature_names = ["Effektstärke"] + [f"kap::{c}" for c in enc.get_feature_names_out(["Kapitel"])]
return X, feature_names
def encode_features_3d(df: pd.DataFrame, kapitel_weight: float = 1.0) -> tuple[np.ndarray, list[str]]:
"""Effektstärke + Kapitel + Textdimension (TF-IDF + PCA) für 3D-Clustering."""
# Kapitel
try:
enc = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
except TypeError:
enc = OneHotEncoder(sparse=False, handle_unknown="ignore")
cat = df[["Kapitel"]].fillna(-1)
cat_ohe = enc.fit_transform(cat)
cat_ohe = cat_ohe * float(kapitel_weight)
# Effektstärke
eff = df[["Effektstärke"]].values
# Textdimension über TF-IDF + PCA
vectorizer = TfidfVectorizer(max_features=100)
X_text = vectorizer.fit_transform(df["Stichwort"].astype(str))
# Sicherstellen, dass TF-IDF keine Inf/NaN enthält (sollte nicht vorkommen)
X_text = X_text.tocsr()
pca = PCA(n_components=1, random_state=42)
text_dim = pca.fit_transform(X_text.toarray())
# Textdimension im DataFrame speichern
df["Text_Dimension"] = text_dim.flatten()
# Zusammenführen
X = np.hstack([eff, cat_ohe, text_dim])
feature_names = ["Effektstärke"] + list(enc.get_feature_names_out(["Kapitel"])) + ["Text_Dimension"]
return X, feature_names
# -----------------------------------------
# Hilfsfunktion zur Sanitisierung von Feature-Matrizen
# -----------------------------------------
def _sanitize_X(X: np.ndarray, clip: float | None = None) -> np.ndarray:
"""Ersetzt NaN/Inf in Feature-Matrizen und optionales Clipping gegen numerische Ausreißer.
Gibt eine *neue* Matrix zurück.
"""
X = np.asarray(X, dtype=float).copy()
# NaN/Inf -> 0
X[~np.isfinite(X)] = 0.0
if clip is not None and clip > 0:
X = np.clip(X, -float(clip), float(clip))
return X
def run_kmeans(df: pd.DataFrame, k: int = 4, random_state: int = 42, kapitel_weight: float = 1.0):
X, feature_names = encode_features(df, kapitel_weight=kapitel_weight)
X = _sanitize_X(X, clip=1e6)
if not np.isfinite(X).all():
print("Warnung: Nicht-endliche Werte in X nach Sanitisierung – werden als 0 behandelt.")
model = KMeans(n_clusters=k, n_init=20, random_state=random_state)
labels = model.fit_predict(X)
sil = silhouette_score(X, labels) if k > 1 and len(df) > k else np.nan
return labels, sil, model
# -----------------------------------------
# Statistische Auswertungen
# -----------------------------------------
def describe_effects(df: pd.DataFrame) -> pd.DataFrame:
"""Deskriptive Statistik (gesamt & je Kapitel)."""
# Aggregation für Kapitel mit eindeutigen Spaltennamen
by_kap = df.groupby("Kapitel")["Effektstärke"].agg(
n="count",
mean="mean",
std=lambda s: s.std(ddof=1),
min="min",
q1=lambda s: s.quantile(0.25),
median="median",
q3=lambda s: s.quantile(0.75),
max="max",
skew="skew",
kurtosis=lambda s: s.kurt(),
)
# Gesamtzeile
overall = pd.DataFrame({
"n": [df["Effektstärke"].count()],
"mean": [df["Effektstärke"].mean()],
"std": [df["Effektstärke"].std(ddof=1)],
"min": [df["Effektstärke"].min()],
"q1": [df["Effektstärke"].quantile(0.25)],
"median": [df["Effektstärke"].median()],
"q3": [df["Effektstärke"].quantile(0.75)],
"max": [df["Effektstärke"].max()],
"skew": [df["Effektstärke"].skew()],
"kurtosis": [df["Effektstärke"].kurt()],
}, index=["Gesamt"])
# Kapitel-Index schöner beschriften
by_kap.index = [f"Kapitel {int(k)}" for k in by_kap.index]
# Zusammenführen
out = pd.concat([overall, by_kap])
return out
def plot_table_stats(stats_df: pd.DataFrame, title: str):
from plotly.graph_objs import Table, Figure
colors = plotly_template.get_colors()
fig = Figure(data=[Table(
header=dict(values=[""] + list(stats_df.columns),
fill_color=colors["brightArea"], font=dict(color=colors["white"])),
cells=dict(values=[stats_df.index.astype(str)] + [stats_df[c].round(3).tolist() for c in stats_df.columns],
fill_color=colors["depthArea"], font=dict(color=colors["white"]))
)])
fig.update_layout(plotly_template.get_standard_layout(title, "", ""))
fig.show()
def normality_and_qq(df: pd.DataFrame, kapitel: int | None = None):
x = df["Effektstärke"].dropna().values
ad = anderson(x, dist='norm')
print(f"Anderson–Darling: A2={ad.statistic:.3f} | kritische Werte {ad.critical_values} | Sig-Level {ad.significance_level}")
# QQ-Plot
if len(x) < 3:
print("QQ-Plot: Zu wenige Datenpunkte (<3) – Plot wird übersprungen.")
return
styles = plotly_template.get_plot_styles()
osm, osr = stats.probplot(x, dist="norm", rvalue=False)
# Kompatibel zu unterschiedlichen SciPy-Versionen:
# Variante A: osm=array(theoretische Quantile), osr=array(ordered responses)
# Variante B: osm=(array(theoretische Quantile), array(ordered responses)), osr=... (ungleich genutzt)
if isinstance(osm, (tuple, list)) and len(osm) == 2 and np.ndim(osm[0]) == 1:
th = np.asarray(osm[0])
ord_data = np.asarray(osm[1])
else:
th = np.asarray(osm)
ord_data = np.asarray(osr)
# NaNs/Inf filtern und Mindestlänge absichern
mask = np.isfinite(th) & np.isfinite(ord_data)
th = th[mask]
ord_data = ord_data[mask]
if th.size < 2:
print("QQ-Plot: Zu wenige gültige Punkte nach Filter – Fit wird übersprungen.")
# nur Punkte plotten
fig = go.Figure()
fig.add_trace(go.Scatter(x=th, y=ord_data, mode="markers", marker=styles["marker_accent"], name="Daten"))
lab = f"QQ-Plot Effektstärken ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})"
fig.update_layout(plotly_template.get_standard_layout(lab, "Theoretische Quantile (Normal)", "Beobachtete Quantile"))
fig.show()
return
fig = go.Figure()
fig.add_trace(go.Scatter(x=th, y=ord_data, mode="markers", marker=styles["marker_accent"], name="Daten"))
# Diagonale (Least Squares Fit)
m, b = np.polyfit(th, ord_data, 1)
fig.add_trace(go.Scatter(x=th, y=m*th + b, mode="lines", line=styles["linie_primaryLine"], name="Fit"))
lab = f"QQ-Plot Effektstärken ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})"
fig.update_layout(plotly_template.get_standard_layout(lab, "Theoretische Quantile (Normal)", "Beobachtete Quantile"))
fig.show()
def mark_outliers_iqr(df: pd.DataFrame) -> pd.DataFrame:
q1, q3 = df["Effektstärke"].quantile([0.25, 0.75])
iqr = q3 - q1
lo, hi = q1 - 1.5*iqr, q3 + 1.5*iqr
out = df.copy()
out["Outlier_IQR"] = ~out["Effektstärke"].between(lo, hi)
print(f"IQR-Grenzen: [{lo:.2f}, {hi:.2f}] | Ausreißer: {int(out['Outlier_IQR'].sum())}")
return out
def group_tests_by_kapitel(df: pd.DataFrame):
groups = [g.dropna().values for _, g in df.groupby("Kapitel")["Effektstärke"]]
if len(groups) >= 2:
lev = levene(*groups, center='median')
print(f"Levene (Homogenität): W={lev.statistic:.3f}, p={lev.pvalue:.4f}")
# robust: Kruskal–Wallis
if len(groups) >= 2:
kw = kruskal(*groups)
n_total = sum(len(g) for g in groups)
h = kw.statistic
eps2 = (h - (len(groups)-1)) / (n_total - 1)
print(f"Kruskal–Wallis: H={h:.3f}, p={kw.pvalue:.6f} | ε²={eps2:.3f}")
def text_vs_effect(df: pd.DataFrame):
if "Text_Dimension" not in df.columns:
encode_features_3d(df)
rho, p = spearmanr(df["Text_Dimension"], df["Effektstärke"], nan_policy='omit')
print(f"Spearman ρ(Text, d) = {rho:.3f}, p={p:.6f}")
styles = plotly_template.get_plot_styles()
fig = go.Figure()
fig.add_trace(go.Scatter(x=df["Text_Dimension"], y=df["Effektstärke"],
mode="markers", marker=styles["marker_brightArea"], name="Thermometer",
text=df["Stichwort"],
hovertemplate="Textdim: %{x:.3f}
d: %{y:.2f}
%{text}"))
x = df["Text_Dimension"].values; y = df["Effektstärke"].values
if len(x) >= 2 and np.isfinite(x).all() and np.isfinite(y).all():
m, b = np.polyfit(x, y, 1)
xx = np.linspace(x.min(), x.max(), 100)
fig.add_trace(go.Scatter(x=xx, y=m*xx+b, mode="lines", line=styles["linie_secondaryLine"], name="Trend"))
fig.update_layout(plotly_template.get_standard_layout("Textdimension × Effektstärke (Spearman)", "Textdimension (PCA1)", "Cohen d"))
fig.show()
def chi2_bins_kapitel(df: pd.DataFrame):
ct = pd.crosstab(df["Kapitel"], df["Bin"]).reindex(sorted(df["Kapitel"].unique()))
chi2 = stats.chi2_contingency(ct)
print("Kontingenztafel (Kapitel × Bin):")
print(ct)
print(f"Chi²={chi2[0]:.3f}, p={chi2[1]:.6f}, df={chi2[2]} (Unabhängigkeitstest)")
return ct
def cluster_diagnostics(df: pd.DataFrame, k_min: int = 2, k_max: int = 8, kapitel_weight: float = 0.0):
X, _ = encode_features(df, kapitel_weight=kapitel_weight)
inertias, sils, ks = [], [], []
for k in range(k_min, k_max + 1):
km = KMeans(n_clusters=k, n_init=20, random_state=42).fit(X)
inertias.append(km.inertia_)
ks.append(k)
sils.append(silhouette_score(X, km.labels_) if k > 1 else np.nan)
colors = plotly_template.get_colors()
fig = go.Figure()
fig.add_trace(go.Scatter(x=ks, y=inertias, mode="lines+markers",
line=dict(color=colors["primaryLine"], width=2), name="Inertia (Elbow)"))
fig.add_trace(go.Scatter(x=ks, y=sils, mode="lines+markers",
line=dict(color=colors["secondaryLine"], width=2), name="Silhouette"))
fig.update_layout(plotly_template.get_standard_layout("Cluster-Diagnostik (k)", "k", "Wert"))
fig.show()
def cluster_profiles(df: pd.DataFrame, labels: np.ndarray, top_terms: int = 3):
res = []
tmp = df.copy()
tmp["Cluster"] = labels
vect = TfidfVectorizer(max_features=300)
Xtxt = vect.fit_transform(tmp["Stichwort"].astype(str))
vocab = np.array(vect.get_feature_names_out())
for c in sorted(tmp["Cluster"].unique()):
sub = tmp[tmp["Cluster"] == c]
mean_d = sub["Effektstärke"].mean()
n = len(sub)
by_kap = sub["Kapitel"].value_counts().sort_index().to_dict()
# positionsbasierte Zeilenindizes für die Sparse-Matrix
pos_idx = tmp.index.get_indexer(sub.index)
mean_tfidf = np.asarray(Xtxt[pos_idx].mean(axis=0)).ravel()
top_idx = mean_tfidf.argsort()[::-1][:top_terms]
terms = vocab[top_idx].tolist()
res.append({"Cluster": c, "n": n, "Ø d": round(mean_d,3), "Kapitelverteilung": by_kap, "Top_Terme": terms})
prof = pd.DataFrame(res).set_index("Cluster")
print("\nCluster-Profile:")
print(prof)
return prof
# -----------------------------------------
# Signifikanz-geführte Sicht (kapitelunabhängig)
# -----------------------------------------
def _minmax_norm(a: np.ndarray) -> np.ndarray:
a = np.asarray(a, dtype=float)
if a.size == 0:
return a
lo, hi = np.nanmin(a), np.nanmax(a)
if not np.isfinite(lo) or not np.isfinite(hi) or hi - lo <= 1e-12:
return np.zeros_like(a)
return (a - lo) / (hi - lo)
def build_significance_view(df: pd.DataFrame) -> pd.DataFrame:
"""
Erzeugt eine kapitelunabhängige Sicht mit einem 'SignifikanzScore'.
Idee: Kombination aus Effektstärke-Magnitude und (falls vorhanden) individueller Silhouette-Trennschärfe.
- score_basis = |d| (größer = stärker)
- score_cluster = Silhouette_point (kleiner 0 -> auf 0 gesetzt), anschließend min-max-normalisiert
- Gesamt-Score = 0.6*norm(|d|) + 0.4*norm(max(Silhouette_point, 0))
Vorzeichen des Scores folgt dem Vorzeichen von d, damit negative Effekte unten landen.
Hinweis: Clustering/Score in dieser Ansicht wird kapitelunabhängig berechnet, indem Kapitel-OHE mit Gewicht 0.0 skaliert wird.
"""
tmp = df.copy()
# Basisgrößen
tmp["abs_d"] = tmp["Effektstärke"].abs()
if "Silhouette_point" not in tmp.columns:
tmp["Silhouette_point"] = np.nan
sil_nonneg = tmp["Silhouette_point"].fillna(0.0).clip(lower=0.0)
score_basis = _minmax_norm(tmp["abs_d"].values)
score_sil = _minmax_norm(sil_nonneg.values)
score = 0.6 * score_basis + 0.4 * score_sil
tmp["SignifikanzScore"] = score * np.sign(tmp["Effektstärke"].values)
# Ranglisten (absolut stärkste zuerst)
tmp["Rank_abs"] = (-tmp["abs_d"]).rank(method="min").astype(int)
tmp["Rank_score"] = (-tmp["SignifikanzScore"].abs()).rank(method="min").astype(int)
# Kategorien für schnelle Filterung
def impact_label(d):
if d >= 0.70:
return "hoch+"
if d >= 0.40:
return "mittel+"
if d >= 0.00:
return "gering+"
if d > -0.40:
return "gering−"
if d > -0.70:
return "mittel−"
return "hoch−"
tmp["Impact_Label"] = tmp["Effektstärke"].apply(impact_label)
return tmp
def plot_significance_space(df_sig: pd.DataFrame):
"""
2D-Signifikanzraum:
x = Effektstärke (Cohen d)
y = SignifikanzScore (vorzeichenbehaftet)
Punktgröße ~ |Score|, Farbe nach Vorzeichen (CI-Farben).
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
# Markergrößen (skaliert)
s = (df_sig["SignifikanzScore"].abs() * 20.0) + 6.0
# Farben nach Vorzeichen
color_pos = colors.get("positiveHighlight", "#2ca02c")
color_neg = colors.get("negativeHighlight", "#d62728")
point_colors = np.where(df_sig["SignifikanzScore"] >= 0, color_pos, color_neg)
hovertemplate = (
"Thermometer: %{customdata[0]}
"
"Stichwort: %{text}
"
"d: %{x:.2f}
"
"Score: %{y:.3f}
"
"Kapitel: %{customdata[1]}
"
"Impact: %{customdata[2]}
"
"Rank(|d|): %{customdata[3]} | Rank(|Score|): %{customdata[4]}"
)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df_sig["Effektstärke"],
y=df_sig["SignifikanzScore"],
mode="markers",
marker=dict(color=point_colors, size=s),
text=df_sig["Stichwort"],
customdata=np.stack([
df_sig["Thermometer_ID"],
df_sig["Kapitelname"],
df_sig["Impact_Label"],
df_sig["Rank_abs"],
df_sig["Rank_score"],
], axis=-1),
name="Thermometer",
hovertemplate=hovertemplate
))
# Referenzlinien
fig.add_hline(y=0, line=dict(color=colors.get("depthArea"), width=1))
for x0 in [0.0, 0.40, 0.70, -0.40, -0.70]:
fig.add_vline(x=x0, line=dict(color=colors.get("depthArea"), width=1, dash="dot"))
fig.update_layout(plotly_template.get_standard_layout(
"Signifikanz-geführter Raum: Effektstärke × Score (kapitelunabhängig)",
"Cohen d", "SignifikanzScore"
))
fig.show()
# -----------------------------------------
# Visualisierungen
# -----------------------------------------
def plot_heatmap_kapitel_vs_d(df: pd.DataFrame, kapitel: int | None = None, bins_d: int = 30):
"""2D-Heatmap (Histogram2d) von Kapitel (x) gegen Effektstärke (y).
- Zeigt die Dichte/Anzahl pro Zelle (Kapitel × d-Bereich)
- CI-Farbskala anhand Template-Farben (depthArea → brightArea)
"""
colors = plotly_template.get_colors()
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
# CI-konforme Farbskala zwischen depthArea und brightArea
def _two_color_scale(c1, c2):
def _hex_to_rgb(h):
h = h.lstrip('#')
return tuple(int(h[i:i+2], 16) for i in (0, 2, 4))
r1, g1, b1 = _hex_to_rgb(c1)
r2, g2, b2 = _hex_to_rgb(c2)
scale = []
for t in np.linspace(0, 1, 6):
r = int(r1*(1-t) + r2*t)
g = int(g1*(1-t) + g2*t)
b = int(b1*(1-t) + b2*t)
scale.append([float(t), f"rgb({r},{g},{b})"])
return scale
colorscale = _two_color_scale(colors.get("depthArea", "#444"), colors.get("brightArea", "#fff")) if "depthArea" in colors and "brightArea" in colors else colors.get("continuous", "Viridis")
# Histogram2d
fig = go.Figure(data=go.Histogram2d(
x=df["Kapitel"].astype(int),
y=df["Effektstärke"],
nbinsx=max(1, df["Kapitel"].nunique()),
nbinsy=bins_d,
colorscale=colorscale,
colorbar=dict(title="Anzahl"),
hovertemplate="Kapitel: %{x}
d-Bin: %{y}
Anzahl: %{z}",
))
fig.update_layout(plotly_template.get_standard_layout(
f"Heatmap: Kapitel × Effektstärke ({kapitel_label})", "Kapitel", "Cohen d"
))
# ganze Zahlen als Kapitel-Ticks
fig.update_layout(xaxis=dict(tickmode="linear", dtick=1))
fig.show()
export_figure(fig, "vl-heatmap-kapitel-vs-d", export_fig_visual, export_fig_png)
def export_json(obj: dict, name: str):
try:
p = os.path.join(EXPORT_DIR, name)
with open(p, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
except Exception:
pass
# -----------------------------------------
# Hilfsfunktion: DataFrame zu Records (mit None für NaN)
# -----------------------------------------
def _df_records(df: pd.DataFrame, cols: list[str]) -> list[dict]:
try:
return df[cols].replace({np.nan: None}).to_dict(orient="records")
except Exception:
return df.replace({np.nan: None}).to_dict(orient="records")
def plot_boxplots(df: pd.DataFrame, kapitel: int | None = None):
"""Boxplots der Effektstärken: nach Kapitel (falls Gesamt) und nach Bins.
- Zeigt Ausreißer (IQR-basiert), Notches für Median-Konfidenz.
- CI-Stile aus plotly_template.
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
# 1) Boxplot nach Kapitel (nur sinnvoll, wenn mehrere Kapitel vorhanden)
if kapitel is None and df["Kapitel"].nunique() > 1:
fig_kap = go.Figure()
fig_kap.add_trace(go.Box(
x=df["Kapitel"].astype(int),
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_brightArea"],
line=styles["linie_primaryLine"],
name="Kapitel",
hovertemplate="Kapitel: %{x}
d: %{y:.2f}",
))
fig_kap.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken nach Kapitel ({kapitel_label})", "Kapitel", "Cohen d"
))
# ganze Zahlen als Kapitel-Ticks
fig_kap.update_layout(xaxis=dict(tickmode="linear", dtick=1))
fig_kap.show()
export_figure(fig_kap, "vl-box-kapitel", export_fig_visual, export_fig_png)
# 2) Boxplot nach heuristischen Bins (immer möglich)
order = ["negativ", "gering", "mittel", "hoch"]
fig_bin = go.Figure()
fig_bin.add_trace(go.Box(
x=pd.Categorical(df["Bin"], categories=order, ordered=True),
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_accent"],
line=styles["linie_secondaryLine"],
name="Bin",
hovertemplate="Bin: %{x}
d: %{y:.2f}",
))
fig_bin.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken nach Bins ({kapitel_label})", "Bin", "Cohen d"
))
fig_bin.show()
export_figure(fig_bin, "vl-box-bins", export_fig_visual, export_fig_png)
# 3) Optional: Ein Gesamt-Boxplot für die aktuelle Auswahl
fig_all = go.Figure()
fig_all.add_trace(go.Box(
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_positiveHighlight"],
line=styles["linie_primaryLine"],
name=kapitel_label,
hovertemplate="d: %{y:.2f}",
))
fig_all.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken ({kapitel_label})", "", "Cohen d"
))
fig_all.show()
export_figure(fig_all, "vl-box-overall", export_fig_visual, export_fig_png)
def plot_hist(df: pd.DataFrame, kapitel: int | None = None):
# Use CI styles from plotly_template
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
fig = go.Figure()
fig.add_trace(go.Histogram(
x=df["Effektstärke"],
marker=styles["balken_accent"],
hovertemplate="Effektstärke: %{x:.2f}
Häufigkeit: %{y}"
))
fig.update_layout(plotly_template.get_standard_layout(
f"Verteilung der Effektstärken ({kapitel_label})", "Cohen d", "Häufigkeit"
))
fig.show()
export_figure(fig, "vl-hist-effekte", export_fig_visual, export_fig_png)
def plot_bins(df: pd.DataFrame, kapitel: int | None = None):
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
order = ["negativ", "gering", "mittel", "hoch"]
counts = df["Bin"].value_counts().reindex(order).fillna(0).astype(int)
fig = go.Figure()
fig.add_trace(go.Bar(
x=counts.index,
y=counts.values,
marker=styles["balken_accent"],
hovertemplate="Kategorie: %{x}
Anzahl: %{y}"
))
fig.update_layout(plotly_template.get_standard_layout(
f"Heuristische Einteilung nach Effektstärke (Bins) ({kapitel_label})", "Bin", "Anzahl"
))
fig.show()
export_figure(fig, "vl-bins", export_fig_visual, export_fig_png)
def plot_scatter(df: pd.DataFrame, cluster_labels: np.ndarray, model: KMeans, sil: float, title_suffix: str, kapitel: int | None = None, top_n: int = 5):
"""
Kapitelunabhängiger 2D-Scatter:
- x: künstlicher Index, aber so angeordnet, dass Punkte je Cluster zusammenstehen
- y: Effektstärke (Cohen d)
- Farben: Cluster
Zusätzlich:
• horizontale Linien bei den Cluster-Mitteln (Ø d)
• Labels für die Top-N nach |d|
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
tmp = df.copy()
tmp["Cluster"] = cluster_labels.astype(int)
# Clusterstärken (Mittelwert der Effektstärke im jeweiligen Clusterzentrum)
cluster_strengths = {i: float(model.cluster_centers_[i][0]) for i in range(len(model.cluster_centers_))}
tmp["Clusterstärke"] = tmp["Cluster"].map(cluster_strengths)
# Cluster-Reihenfolge: absteigend nach Ø d
clusters_sorted = sorted(tmp["Cluster"].unique(), key=lambda c: cluster_strengths[c], reverse=True)
# Gewünschte Markerpalette (robust mit Fallbacks)
def _get_marker(*candidates):
for key in candidates:
if key in styles:
return styles[key]
return styles.get("marker_accent", {})
palette_markers = [
_get_marker("marker_positiveHighlight", "marker_brightArea", "marker_accent"),
_get_marker("marker_primaryLine", "marker_brightArea", "marker_accent"),
_get_marker("marker_secondaryLine", "marker_accent", "marker_brightArea"),
_get_marker("marker_negativeHighlight", "marker_accent", "marker_brightArea"),
]
# x-Positionen so vergeben, dass Cluster-Blöcke entstehen
tmp = tmp.reset_index(drop=True)
tmp["_x"] = np.nan
x_cursor = 0
block_bounds = {} # für Centroid-Linien (x-Min/x-Max je Cluster)
for c in clusters_sorted:
sub_idx = tmp.index[tmp["Cluster"] == c].tolist()
n = len(sub_idx)
xs = np.arange(x_cursor, x_cursor + n)
tmp.loc[sub_idx, "_x"] = xs
block_bounds[c] = (xs.min(), xs.max())
x_cursor += n + 2 # +2 als optischer Abstand zwischen Blöcken
hovertemplate = (
"Thermometer: %{customdata[2]}
"
"Stichwort: %{text}
"
"Effektstärke: %{y:.2f}
"
"Kapitel: %{customdata[0]}
"
"Clusterstärke: %{customdata[1]:.2f}"
)
fig = go.Figure()
# Punkte je Cluster zeichnen
for idx, c in enumerate(clusters_sorted):
cdf = tmp[tmp["Cluster"] == c]
fig.add_trace(go.Scatter(
x=cdf["_x"],
y=cdf["Effektstärke"],
mode="markers",
marker={**palette_markers[idx % len(palette_markers)], "size": 10},
name=f"Cluster: {cluster_strengths[c]:.2f}",
text=cdf["Stichwort"],
customdata=np.stack([cdf["Kapitelname"], cdf["Clusterstärke"], cdf["Thermometer_ID"]], axis=-1),
hovertemplate=hovertemplate
))
# Centroid-Linien (horizontale Ø d pro Cluster)
for c in clusters_sorted:
x0, x1 = block_bounds[c]
yd = cluster_strengths[c]
centroid_color = colors.get("depthArea", "#444")
line_style = dict(styles.get("linie_secondaryLine", {"width": 2}))
line_style["color"] = centroid_color
fig.add_trace(go.Scatter(
x=[x0, x1],
y=[yd, yd],
mode="lines",
line=line_style,
name=None,
showlegend=False,
hovertemplate=f"Cluster-Mittel: {yd:.2f}"
))
# Vertikale Trennlinien zwischen Cluster-Blöcken (zur Orientierung)
# (nur als dezente Linien, keine Legende)
block_edges = sorted({bounds[1] + 1 for bounds in block_bounds.values()})
for edge in block_edges[:-1]: # letzte Kante führt bereits zum Abstand
fig.add_vline(x=edge - 1, line=dict(color=colors.get("depthArea"), width=1, dash="dot"))
fig.update_layout(plotly_template.get_standard_layout(
f"Effektstärke × Cluster ({title_suffix}) ({kapitel_label}) – Silhouette: {sil:.3f}",
"Thermometer (gruppiert nach Cluster)", "Cohen d"
))
fig.update_xaxes(showticklabels=False)
fig.show()
export_figure(fig, f"vl-scatter-{title_suffix}", export_fig_visual, export_fig_png)
def plot_scatter_3d(df: pd.DataFrame, cluster_labels: np.ndarray, sil: float, title_suffix: str, kapitel: int | None = None):
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
tmp = df.copy()
tmp["Cluster"] = cluster_labels.astype(int)
# Clusterzentren für durchschnittliche Effektstärke
cluster_strengths = {i: float(tmp[tmp["Cluster"] == i]["Effektstärke"].mean()) for i in sorted(set(cluster_labels))}
hovertemplate = (
"Thermometer: %{text}
"
"Kapitel: %{customdata[0]}
"
"Effektstärke: %{x:.2f}
"
"Textdim: %{z:.2f}
"
"Cluster: %{customdata[1]}"
)
fig = go.Figure()
clusters = sorted(tmp["Cluster"].unique())
palette_keys = ["positiveHighlight", "negativeHighlight", "accent", "brightArea"]
for idx, cluster in enumerate(clusters):
cluster_df = tmp[tmp["Cluster"] == cluster]
color_key = palette_keys[idx % len(palette_keys)]
marker_style = styles.get(f"marker_{color_key}", {})
fig.add_trace(go.Scatter3d(
x=cluster_df["Effektstärke"],
y=cluster_df["Kapitel"],
z=cluster_df["Text_Dimension"],
mode="markers",
marker={**marker_style, "size": 6},
name=f"Cluster {cluster} (Ø d = {cluster_strengths[cluster]:.2f})",
text=cluster_df["Stichwort"],
customdata=np.stack([cluster_df["Kapitelname"], cluster_df["Cluster"]], axis=-1),
hovertemplate=hovertemplate
))
fig.update_layout(plotly_template.get_standard_layout(
f"3D-Clustering (Effektstärke × Kapitel × Text) – {title_suffix} ({kapitel_label}) – Silhouette: {sil:.3f}",
"Effektstärke", "Kapitel", "Textdimension"
))
fig.update_layout(scene=dict(
yaxis=dict(
title="Kapitel",
tickmode="linear",
dtick=1
)
))
fig.show()
export_figure(fig, f"vl-scatter3d-{title_suffix}", export_fig_visual, export_fig_png)
# -----------------------------------------
# Pipeline
# -----------------------------------------
def analyse(csv_path: str = "Thermometer.csv", k: int = 4, kapitel: int | None = None):
# Laden
df = load_data(csv_path)
# Datenvalidierung
dq = validate_data(df)
print("\nDATA QUALITY REPORT:")
for key, val in dq.items():
print(f" {key}: {val}")
export_json(dq, "data_quality_report.json")
if kapitel is not None:
df = df[df["Kapitel"] == kapitel]
if df.empty:
print(f"Keine Daten für Kapitel {kapitel}.")
return None
# Bins
df = add_manual_bins(df)
# K-Means
# Kapitelgewicht = 0.0 => Kapitel-OHE trägt nicht zur Distanz bei (kapitelübergreifendes Clustering)
labels, sil, model = run_kmeans(df, k=k, kapitel_weight=0.0)
# Silhouette je Punkt anhängen
try:
X_for_sil, _ = encode_features(df, kapitel_weight=0.0)
X_for_sil = _sanitize_X(X_for_sil, clip=1e6)
if k > 1 and len(df) > k:
df["Silhouette_point"] = silhouette_samples(X_for_sil, labels)
else:
df["Silhouette_point"] = np.nan
except Exception:
df["Silhouette_point"] = np.nan
# Reports
print("—" * 60)
print("ANALYSE Sicht 1 | Heuristische Bins (Grenzen: <0 | <0.40 | <0.70 | ≥0.70)")
print(df["Bin"].value_counts().reindex(["negativ", "gering", "mittel", "hoch"]).fillna(0).astype(int))
print("\nANALYSE Sicht 2 | K-Means-Clustering")
if not math.isnan(sil):
print(f"Silhouette-Score (k={k}): {sil:.3f}")
else:
print(f"Silhouette-Score (k={k}): n/a (zu wenige Daten oder k zu groß)")
if not math.isnan(sil):
# Clusterzentren (nur Effektstärke + Kapitelmittelwerte)
centers = model.cluster_centers_
print("\nClusterzentren (erste Spalte = Effektstärke, Rest = Kapitel-OHE):")
for idx, center in enumerate(centers):
eff = center[0]
print(f" Cluster {idx}: Effektstärke-Mittel {eff:.3f}")
# --- Statistik-Block ---
stats_df = describe_effects(df)
plot_table_stats(stats_df, f"Deskriptive Statistik ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})")
# Deskriptive Statistik exportieren
try:
stats_df.to_csv(os.path.join(EXPORT_DIR, "deskriptiv.csv"))
except Exception:
pass
normality_and_qq(df, kapitel=kapitel)
df = mark_outliers_iqr(df)
if kapitel is None:
group_tests_by_kapitel(df)
text_vs_effect(df)
if kapitel is None:
chi2_bins_kapitel(df)
cluster_diagnostics(df, kapitel_weight=0.0)
profiles_df = cluster_profiles(df, labels)
try:
export_json(json.loads(profiles_df.to_json(orient="table")), "cluster_profile.json")
except Exception:
pass
# --- Signifikanz-geführte Sicht ---
df_sig = build_significance_view(df)
try:
df_sig.sort_values("Rank_score").to_csv(os.path.join(EXPORT_DIR, "signifikanz_ranking.csv"), index=False)
except Exception:
pass
plot_significance_space(df_sig)
# Tests zusammenfassen (vor Export sammeln)
tests_summary = {"silhouette_global": float(sil) if not math.isnan(sil) else None}
if kapitel is None and df["Kapitel"].nunique() > 1:
groups = [g.dropna().values for _, g in df.groupby("Kapitel")["Effektstärke"]]
try:
lev = levene(*groups, center='median')
tests_summary["levene_W"] = float(lev.statistic)
tests_summary["levene_p"] = float(lev.pvalue)
except Exception:
pass
try:
kw = kruskal(*groups)
n_total = sum(len(g) for g in groups)
h = float(kw.statistic)
eps2 = (h - (len(groups)-1)) / (n_total - 1) if n_total > 1 else None
tests_summary["kruskal_H"] = h
tests_summary["kruskal_p"] = float(kw.pvalue)
tests_summary["kruskal_eps2"] = float(eps2) if eps2 is not None else None
except Exception:
pass
try:
if "Text_Dimension" not in df.columns:
encode_features_3d(df)
rho, p = spearmanr(df["Text_Dimension"], df["Effektstärke"], nan_policy='omit')
tests_summary["spearman_rho_text_d"] = float(rho)
tests_summary["spearman_p_text_d"] = float(p)
except Exception:
pass
if kapitel is None and df["Kapitel"].nunique() > 1:
try:
ct = pd.crosstab(df["Kapitel"], df["Bin"]).reindex(sorted(df["Kapitel"].unique()))
chi2 = stats.chi2_contingency(ct)
tests_summary["chi2"] = float(chi2[0])
tests_summary["chi2_p"] = float(chi2[1])
tests_summary["chi2_df"] = int(chi2[2])
except Exception:
pass
export_json(tests_summary, "tests_summary.json")
# --- WERTEDATEI: Alles in einer JSON bündeln ---
if export_werte_all:
try:
# Kern-Datenzeilen
base_cols = [
"Thermometer_ID", "Stichwort", "Effektstärke", "Kapitel", "Kapitelname",
"Bin", "Text_Dimension", "Outlier_IQR", "Silhouette_point"
]
rows = _df_records(df, [c for c in base_cols if c in df.columns])
# Deskriptive Statistik als Records
desc_records = _df_records(
(stats_df.reset_index().rename(columns={"index": "Gruppe"})),
list(stats_df.reset_index().columns)
)
# Cluster-Profile als Records
prof_records = _df_records(
(profiles_df.reset_index().rename(columns={"index": "Cluster"})),
list(profiles_df.reset_index().columns)
)
# Cluster-Zentren (voll) und nur d-Komponente
centers_full = model.cluster_centers_.tolist() if hasattr(model, "cluster_centers_") else None
centers_d = [float(c[0]) for c in model.cluster_centers_] if hasattr(model, "cluster_centers_") else None
payload = {
"meta": {
"k": int(k),
"kapitel": int(kapitel) if kapitel is not None else None,
"theme": theme,
},
"data": rows,
"deskriptiv": desc_records,
"cluster": {
"silhouette_global": float(sil) if not math.isnan(sil) else None,
"centers_full": centers_full,
"centers_effekt_only": centers_d,
},
"profiles": prof_records,
"tests_summary": tests_summary if isinstance(tests_summary, dict) else {},
"data_quality": dq,
}
export_json(payload, "werte_all.json")
except Exception as _e:
pass
# Plots
plot_heatmap_kapitel_vs_d(df, kapitel=kapitel)
# Boxplots
plot_boxplots(df, kapitel=kapitel)
plot_hist(df, kapitel=kapitel)
plot_bins(df, kapitel=kapitel)
plot_scatter(df, labels, model, sil, title_suffix=f"k{k}", kapitel=kapitel)
# 3D-Clustering
X3d, _ = encode_features_3d(df, kapitel_weight=0.0)
X3d = _sanitize_X(X3d, clip=1e6)
model3d = KMeans(n_clusters=k, n_init=20, random_state=42)
labels3d = model3d.fit_predict(X3d)
sil3d = silhouette_score(X3d, labels3d) if k > 1 and len(df) > k else np.nan
plot_scatter_3d(df, labels3d, sil3d, title_suffix=f"k{k}-3d", kapitel=kapitel)
# Clusterzuordnung exportieren
try:
df_export = df.copy()
df_export["Cluster"] = labels
df_export.to_csv(os.path.join(EXPORT_DIR, "clusterzuordnung.csv"), index=False)
except Exception:
pass
return {
"df": df,
"kmeans_labels": labels,
"silhouette": sil,
"model": model,
}
# -----------------------------------------
# Main
# -----------------------------------------
if __name__ == "__main__":
# Passe den CSV-Pfad an, falls die Datei woanders liegt
if analyse_all:
df_all = load_data(os.path.join(os.path.dirname(__file__), csv_file))
for kap in sorted(df_all["Kapitel"].unique()):
analyse(csv_path=os.path.join(os.path.dirname(__file__), csv_file), k=k_clusters, kapitel=kap)
else:
analyse(csv_path=os.path.join(os.path.dirname(__file__), csv_file), k=k_clusters, kapitel=selected_kapitel)