This repository has been archived on 2025-10-27. You can view files and clone it, but cannot push or open issues or pull requests.
Files
visible-learning/visible-learning.py

1002 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
"""
Visible Learning Explorative Clusteranalyse
---------------------------------------------
CI: angelehnt an simulation_bildungswirkgefuege
Funktion: CSV mit Effektstärken laden, manuelle Bins + K-Means-Cluster bilden,
Silhouette-Score berechnen und Visualisierungen erzeugen.
"""
# -----------------------------------------
# Imports
# -----------------------------------------
import os
import math
import pandas as pd
import numpy as np
import json
from sklearn.preprocessing import OneHotEncoder
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score, silhouette_samples
from scipy import stats
from scipy.stats import anderson, kruskal, levene, spearmanr
import plotly.graph_objs as go
import plotly.io as pio
# -----------------------------------------
# Konfiguration laden
# -----------------------------------------
from config_visible_learning import (
csv_file,
k_clusters,
export_fig_visual,
export_fig_png,
theme,
selected_kapitel,
analyse_all,
export_werte_all,
)
# -----------------------------------------
# Template/CI
# -----------------------------------------
try:
from ci_template import plotly_template
plotly_template.set_theme(theme)
_ci_layout = lambda title, x, y: plotly_template.get_standard_layout(
title=title, x_title=x, y_title=y
)
except Exception:
# Fallback: neutrale Plotly-Defaults
_ci_layout = lambda title, x, y: dict(
title=title, xaxis_title=x, yaxis_title=y
)
# -----------------------------------------
# Export-Helfer (HTML/PNG) CI-kompatibel
# -----------------------------------------
EXPORT_DIR = os.path.join(os.path.dirname(__file__), "export")
if not os.path.exists(EXPORT_DIR):
try:
os.makedirs(EXPORT_DIR, exist_ok=True)
except Exception:
pass
def export_figure(fig, name: str, do_html: bool, do_png: bool):
"""Exportiert eine Plotly-Figur gemäß CI-Flags."""
base = os.path.join(EXPORT_DIR, name)
if do_html:
p = f"{base}.html"
pio.write_html(fig, file=p, auto_open=False, include_plotlyjs="cdn")
if do_png:
try:
p = f"{base}.png"
pio.write_image(fig, p, scale=2)
except Exception:
# PNG erfordert Kaleido; leise ignorieren, wenn nicht installiert
pass
# -----------------------------------------
# Daten | Laden & Vorbereiten
# -----------------------------------------
REQUIRED_COLS = ["Thermometer_ID", "Stichwort", "Effektstärke"]
def load_data(csv_path: str) -> pd.DataFrame:
df = pd.read_csv(csv_path)
missing = [c for c in REQUIRED_COLS if c not in df.columns]
if missing:
raise ValueError(f"Fehlende Spalten in CSV: {missing}")
# Typen bereinigen
df["Thermometer_ID"] = df["Thermometer_ID"].astype(str)
# Effektstärke robust in float wandeln
df["Effektstärke"] = (
df["Effektstärke"].astype(str).str.replace(",", ".", regex=False).str.strip()
)
df["Effektstärke"] = pd.to_numeric(df["Effektstärke"], errors="coerce")
# Kapitel aus Thermometer_ID ableiten und Kapitelname mappen
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
kapitel_map = {
5: "Lernende",
6: "Elternhaus und Familie",
7: "Schule und Gesellschaft",
8: "Klassenzimmer",
9: "Lehrperson",
10: "Curriculum",
11: "Zielorientiertes Unterrichten",
12: "Lernstrategien",
13: "Lehrstrategien",
14: "Nutzung von Technologien",
15: "Schulische und außerschulische Einflüsse",
}
df["Kapitelname"] = df["Kapitel"].map(kapitel_map).fillna(df["Kapitel"].map(lambda k: f"Kapitel {k}"))
return df.dropna(subset=["Effektstärke"]) # nur gültige Zahlen
def validate_data(df: pd.DataFrame) -> dict:
"""Einfache Datenvalidierung und Qualitätsreport.
Hinweis: Fehlende Effektstärken wurden in load_data bereits entfernt.
"""
report = {}
report["n_rows"] = int(len(df))
# Duplikate
dup_counts = df["Thermometer_ID"].value_counts()
duplicates = dup_counts[dup_counts > 1]
report["duplicate_ids"] = duplicates.to_dict()
report["n_duplicates"] = int(duplicates.sum()) if not duplicates.empty else 0
# Gültige Kapitel (5..15)
valid_kap = set(range(5, 16))
invalid_kapitel = df.loc[~df["Kapitel"].isin(valid_kap), "Kapitel"].tolist()
report["invalid_kapitel_entries"] = invalid_kapitel
# Wertebereich d
report["effekt_min"] = float(df["Effektstärke"].min()) if not df.empty else None
report["effekt_max"] = float(df["Effektstärke"].max()) if not df.empty else None
# Leere Stichworte
empty_keywords = df["Stichwort"].astype(str).str.strip().eq("").sum()
report["empty_stichwort"] = int(empty_keywords)
return report
# -----------------------------------------
# Manuelle Bins (heuristische Einteilung)
# -----------------------------------------
BIN_LABELS = ["negativ", "gering", "mittel", "hoch"]
def manual_bin(d: float) -> str:
if d < 0:
return "negativ"
if 0 <= d < 0.40:
return "gering"
if 0.40 <= d < 0.70:
return "mittel"
return "hoch"
def add_manual_bins(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["Bin"] = df["Effektstärke"].apply(manual_bin)
return df
# -----------------------------------------
# K-Means-Clustering (Effektstärke + Kapitel)
# -----------------------------------------
def encode_features(df: pd.DataFrame) -> tuple[np.ndarray, list[str]]:
"""One-Hot-Encoding des Kapitels + Effektstärke (metrisch)."""
try:
enc = OneHotEncoder(sparse_output=False, handle_unknown="ignore") # neuere sklearn-Versionen
except TypeError:
enc = OneHotEncoder(sparse=False, handle_unknown="ignore") # ältere sklearn-Versionen
cat = df[["Kapitel"]].fillna(-1)
cat_ohe = enc.fit_transform(cat)
eff = df[["Effektstärke"]].values
X = np.hstack([eff, cat_ohe])
feature_names = ["Effektstärke"] + [f"kap::{c}" for c in enc.get_feature_names_out(["Kapitel"])]
return X, feature_names
def encode_features_3d(df: pd.DataFrame) -> tuple[np.ndarray, list[str]]:
"""Effektstärke + Kapitel + Textdimension (TF-IDF + PCA) für 3D-Clustering."""
# Kapitel
try:
enc = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
except TypeError:
enc = OneHotEncoder(sparse=False, handle_unknown="ignore")
cat = df[["Kapitel"]].fillna(-1)
cat_ohe = enc.fit_transform(cat)
# Effektstärke
eff = df[["Effektstärke"]].values
# Textdimension über TF-IDF + PCA
vectorizer = TfidfVectorizer(max_features=100)
X_text = vectorizer.fit_transform(df["Stichwort"].astype(str))
pca = PCA(n_components=1, random_state=42)
text_dim = pca.fit_transform(X_text.toarray())
# Textdimension im DataFrame speichern
df["Text_Dimension"] = text_dim.flatten()
# Zusammenführen
X = np.hstack([eff, cat_ohe, text_dim])
feature_names = ["Effektstärke"] + list(enc.get_feature_names_out(["Kapitel"])) + ["Text_Dimension"]
return X, feature_names
def run_kmeans(df: pd.DataFrame, k: int = 4, random_state: int = 42):
X, feature_names = encode_features(df)
model = KMeans(n_clusters=k, n_init=20, random_state=random_state)
labels = model.fit_predict(X)
sil = silhouette_score(X, labels) if k > 1 and len(df) > k else np.nan
return labels, sil, model
# -----------------------------------------
# Statistische Auswertungen
# -----------------------------------------
def describe_effects(df: pd.DataFrame) -> pd.DataFrame:
"""Deskriptive Statistik (gesamt & je Kapitel)."""
# Aggregation für Kapitel mit eindeutigen Spaltennamen
by_kap = df.groupby("Kapitel")["Effektstärke"].agg(
n="count",
mean="mean",
std=lambda s: s.std(ddof=1),
min="min",
q1=lambda s: s.quantile(0.25),
median="median",
q3=lambda s: s.quantile(0.75),
max="max",
skew="skew",
kurtosis=lambda s: s.kurt(),
)
# Gesamtzeile
overall = pd.DataFrame({
"n": [df["Effektstärke"].count()],
"mean": [df["Effektstärke"].mean()],
"std": [df["Effektstärke"].std(ddof=1)],
"min": [df["Effektstärke"].min()],
"q1": [df["Effektstärke"].quantile(0.25)],
"median": [df["Effektstärke"].median()],
"q3": [df["Effektstärke"].quantile(0.75)],
"max": [df["Effektstärke"].max()],
"skew": [df["Effektstärke"].skew()],
"kurtosis": [df["Effektstärke"].kurt()],
}, index=["Gesamt"])
# Kapitel-Index schöner beschriften
by_kap.index = [f"Kapitel {int(k)}" for k in by_kap.index]
# Zusammenführen
out = pd.concat([overall, by_kap])
return out
def plot_table_stats(stats_df: pd.DataFrame, title: str):
from plotly.graph_objs import Table, Figure
colors = plotly_template.get_colors()
fig = Figure(data=[Table(
header=dict(values=[""] + list(stats_df.columns),
fill_color=colors["brightArea"], font=dict(color=colors["white"])),
cells=dict(values=[stats_df.index.astype(str)] + [stats_df[c].round(3).tolist() for c in stats_df.columns],
fill_color=colors["depthArea"], font=dict(color=colors["white"]))
)])
fig.update_layout(plotly_template.get_standard_layout(title, "", ""))
fig.show()
def normality_and_qq(df: pd.DataFrame, kapitel: int | None = None):
x = df["Effektstärke"].dropna().values
ad = anderson(x, dist='norm')
print(f"AndersonDarling: A2={ad.statistic:.3f} | kritische Werte {ad.critical_values} | Sig-Level {ad.significance_level}")
# QQ-Plot
if len(x) < 3:
print("QQ-Plot: Zu wenige Datenpunkte (<3) Plot wird übersprungen.")
return
styles = plotly_template.get_plot_styles()
osm, osr = stats.probplot(x, dist="norm", rvalue=False)
# Kompatibel zu unterschiedlichen SciPy-Versionen:
# Variante A: osm=array(theoretische Quantile), osr=array(ordered responses)
# Variante B: osm=(array(theoretische Quantile), array(ordered responses)), osr=... (ungleich genutzt)
if isinstance(osm, (tuple, list)) and len(osm) == 2 and np.ndim(osm[0]) == 1:
th = np.asarray(osm[0])
ord_data = np.asarray(osm[1])
else:
th = np.asarray(osm)
ord_data = np.asarray(osr)
# NaNs/Inf filtern und Mindestlänge absichern
mask = np.isfinite(th) & np.isfinite(ord_data)
th = th[mask]
ord_data = ord_data[mask]
if th.size < 2:
print("QQ-Plot: Zu wenige gültige Punkte nach Filter Fit wird übersprungen.")
# nur Punkte plotten
fig = go.Figure()
fig.add_trace(go.Scatter(x=th, y=ord_data, mode="markers", marker=styles["marker_accent"], name="Daten"))
lab = f"QQ-Plot Effektstärken ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})"
fig.update_layout(plotly_template.get_standard_layout(lab, "Theoretische Quantile (Normal)", "Beobachtete Quantile"))
fig.show()
return
fig = go.Figure()
fig.add_trace(go.Scatter(x=th, y=ord_data, mode="markers", marker=styles["marker_accent"], name="Daten"))
# Diagonale (Least Squares Fit)
m, b = np.polyfit(th, ord_data, 1)
fig.add_trace(go.Scatter(x=th, y=m*th + b, mode="lines", line=styles["linie_primaryLine"], name="Fit"))
lab = f"QQ-Plot Effektstärken ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})"
fig.update_layout(plotly_template.get_standard_layout(lab, "Theoretische Quantile (Normal)", "Beobachtete Quantile"))
fig.show()
def mark_outliers_iqr(df: pd.DataFrame) -> pd.DataFrame:
q1, q3 = df["Effektstärke"].quantile([0.25, 0.75])
iqr = q3 - q1
lo, hi = q1 - 1.5*iqr, q3 + 1.5*iqr
out = df.copy()
out["Outlier_IQR"] = ~out["Effektstärke"].between(lo, hi)
print(f"IQR-Grenzen: [{lo:.2f}, {hi:.2f}] | Ausreißer: {int(out['Outlier_IQR'].sum())}")
return out
def group_tests_by_kapitel(df: pd.DataFrame):
groups = [g.dropna().values for _, g in df.groupby("Kapitel")["Effektstärke"]]
if len(groups) >= 2:
lev = levene(*groups, center='median')
print(f"Levene (Homogenität): W={lev.statistic:.3f}, p={lev.pvalue:.4f}")
# robust: KruskalWallis
if len(groups) >= 2:
kw = kruskal(*groups)
n_total = sum(len(g) for g in groups)
h = kw.statistic
eps2 = (h - (len(groups)-1)) / (n_total - 1)
print(f"KruskalWallis: H={h:.3f}, p={kw.pvalue:.6f} | ε²={eps2:.3f}")
def text_vs_effect(df: pd.DataFrame):
if "Text_Dimension" not in df.columns:
encode_features_3d(df)
rho, p = spearmanr(df["Text_Dimension"], df["Effektstärke"], nan_policy='omit')
print(f"Spearman ρ(Text, d) = {rho:.3f}, p={p:.6f}")
styles = plotly_template.get_plot_styles()
fig = go.Figure()
fig.add_trace(go.Scatter(x=df["Text_Dimension"], y=df["Effektstärke"],
mode="markers", marker=styles["marker_brightArea"], name="Thermometer",
text=df["Stichwort"],
hovertemplate="Textdim: %{x:.3f}<br>d: %{y:.2f}<br>%{text}<extra></extra>"))
x = df["Text_Dimension"].values; y = df["Effektstärke"].values
if len(x) >= 2 and np.isfinite(x).all() and np.isfinite(y).all():
m, b = np.polyfit(x, y, 1)
xx = np.linspace(x.min(), x.max(), 100)
fig.add_trace(go.Scatter(x=xx, y=m*xx+b, mode="lines", line=styles["linie_secondaryLine"], name="Trend"))
fig.update_layout(plotly_template.get_standard_layout("Textdimension × Effektstärke (Spearman)", "Textdimension (PCA1)", "Cohen d"))
fig.show()
def chi2_bins_kapitel(df: pd.DataFrame):
ct = pd.crosstab(df["Kapitel"], df["Bin"]).reindex(sorted(df["Kapitel"].unique()))
chi2 = stats.chi2_contingency(ct)
print("Kontingenztafel (Kapitel × Bin):")
print(ct)
print(f"Chi²={chi2[0]:.3f}, p={chi2[1]:.6f}, df={chi2[2]} (Unabhängigkeitstest)")
return ct
def cluster_diagnostics(df: pd.DataFrame, k_min: int = 2, k_max: int = 8):
X, _ = encode_features(df)
inertias, sils, ks = [], [], []
for k in range(k_min, k_max+1):
km = KMeans(n_clusters=k, n_init=20, random_state=42).fit(X)
inertias.append(km.inertia_)
ks.append(k)
sils.append(silhouette_score(X, km.labels_) if k>1 else np.nan)
colors = plotly_template.get_colors()
fig = go.Figure()
fig.add_trace(go.Scatter(x=ks, y=inertias, mode="lines+markers",
line=dict(color=colors["primaryLine"], width=2), name="Inertia (Elbow)"))
fig.add_trace(go.Scatter(x=ks, y=sils, mode="lines+markers",
line=dict(color=colors["secondaryLine"], width=2), name="Silhouette"))
fig.update_layout(plotly_template.get_standard_layout("Cluster-Diagnostik (k)", "k", "Wert"))
fig.show()
def cluster_profiles(df: pd.DataFrame, labels: np.ndarray, top_terms: int = 3):
res = []
tmp = df.copy()
tmp["Cluster"] = labels
vect = TfidfVectorizer(max_features=300)
Xtxt = vect.fit_transform(tmp["Stichwort"].astype(str))
vocab = np.array(vect.get_feature_names_out())
for c in sorted(tmp["Cluster"].unique()):
sub = tmp[tmp["Cluster"] == c]
mean_d = sub["Effektstärke"].mean()
n = len(sub)
by_kap = sub["Kapitel"].value_counts().sort_index().to_dict()
# positionsbasierte Zeilenindizes für die Sparse-Matrix
pos_idx = tmp.index.get_indexer(sub.index)
mean_tfidf = np.asarray(Xtxt[pos_idx].mean(axis=0)).ravel()
top_idx = mean_tfidf.argsort()[::-1][:top_terms]
terms = vocab[top_idx].tolist()
res.append({"Cluster": c, "n": n, "Ø d": round(mean_d,3), "Kapitelverteilung": by_kap, "Top_Terme": terms})
prof = pd.DataFrame(res).set_index("Cluster")
print("\nCluster-Profile:")
print(prof)
return prof
# -----------------------------------------
# Signifikanz-geführte Sicht (kapitelunabhängig)
# -----------------------------------------
def _minmax_norm(a: np.ndarray) -> np.ndarray:
a = np.asarray(a, dtype=float)
if a.size == 0:
return a
lo, hi = np.nanmin(a), np.nanmax(a)
if not np.isfinite(lo) or not np.isfinite(hi) or hi - lo <= 1e-12:
return np.zeros_like(a)
return (a - lo) / (hi - lo)
def build_significance_view(df: pd.DataFrame) -> pd.DataFrame:
"""
Erzeugt eine kapitelunabhängige Sicht mit einem 'SignifikanzScore'.
Idee: Kombination aus Effektstärke-Magnitude und (falls vorhanden) individueller Silhouette-Trennschärfe.
- score_basis = |d| (größer = stärker)
- score_cluster = Silhouette_point (kleiner 0 -> auf 0 gesetzt), anschließend min-max-normalisiert
- Gesamt-Score = 0.6*norm(|d|) + 0.4*norm(max(Silhouette_point, 0))
Vorzeichen des Scores folgt dem Vorzeichen von d, damit negative Effekte unten landen.
"""
tmp = df.copy()
# Basisgrößen
tmp["abs_d"] = tmp["Effektstärke"].abs()
if "Silhouette_point" not in tmp.columns:
tmp["Silhouette_point"] = np.nan
sil_nonneg = tmp["Silhouette_point"].fillna(0.0).clip(lower=0.0)
score_basis = _minmax_norm(tmp["abs_d"].values)
score_sil = _minmax_norm(sil_nonneg.values)
score = 0.6 * score_basis + 0.4 * score_sil
tmp["SignifikanzScore"] = score * np.sign(tmp["Effektstärke"].values)
# Ranglisten (absolut stärkste zuerst)
tmp["Rank_abs"] = (-tmp["abs_d"]).rank(method="min").astype(int)
tmp["Rank_score"] = (-tmp["SignifikanzScore"].abs()).rank(method="min").astype(int)
# Kategorien für schnelle Filterung
def impact_label(d):
if d >= 0.70:
return "hoch+"
if d >= 0.40:
return "mittel+"
if d >= 0.00:
return "gering+"
if d > -0.40:
return "gering"
if d > -0.70:
return "mittel"
return "hoch"
tmp["Impact_Label"] = tmp["Effektstärke"].apply(impact_label)
return tmp
def plot_significance_space(df_sig: pd.DataFrame):
"""
2D-Signifikanzraum:
x = Effektstärke (Cohen d)
y = SignifikanzScore (vorzeichenbehaftet)
Punktgröße ~ |Score|, Farbe nach Vorzeichen (CI-Farben).
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
# Markergrößen (skaliert)
s = (df_sig["SignifikanzScore"].abs() * 20.0) + 6.0
# Farben nach Vorzeichen
color_pos = colors.get("positiveHighlight", "#2ca02c")
color_neg = colors.get("negativeHighlight", "#d62728")
point_colors = np.where(df_sig["SignifikanzScore"] >= 0, color_pos, color_neg)
hovertemplate = (
"Thermometer: %{customdata[0]}<br>"
"Stichwort: %{text}<br>"
"d: %{x:.2f}<br>"
"Score: %{y:.3f}<br>"
"Kapitel: %{customdata[1]}<br>"
"Impact: %{customdata[2]}<br>"
"Rank(|d|): %{customdata[3]} | Rank(|Score|): %{customdata[4]}<extra></extra>"
)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df_sig["Effektstärke"],
y=df_sig["SignifikanzScore"],
mode="markers",
marker=dict(color=point_colors, size=s),
text=df_sig["Stichwort"],
customdata=np.stack([
df_sig["Thermometer_ID"],
df_sig["Kapitelname"],
df_sig["Impact_Label"],
df_sig["Rank_abs"],
df_sig["Rank_score"],
], axis=-1),
name="Thermometer",
hovertemplate=hovertemplate
))
# Referenzlinien
fig.add_hline(y=0, line=dict(color=colors["border"], width=1))
for x0 in [0.0, 0.40, 0.70, -0.40, -0.70]:
fig.add_vline(x=x0, line=dict(color=colors["border"], width=1, dash="dot"))
fig.update_layout(plotly_template.get_standard_layout(
"Signifikanz-geführter Raum: Effektstärke × Score (kapitelunabhängig)",
"Cohen d", "SignifikanzScore"
))
fig.show()
# -----------------------------------------
# Visualisierungen
# -----------------------------------------
def plot_heatmap_kapitel_vs_d(df: pd.DataFrame, kapitel: int | None = None, bins_d: int = 30):
"""2D-Heatmap (Histogram2d) von Kapitel (x) gegen Effektstärke (y).
- Zeigt die Dichte/Anzahl pro Zelle (Kapitel × d-Bereich)
- CI-Farbskala anhand Template-Farben (depthArea → brightArea)
"""
colors = plotly_template.get_colors()
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
# CI-konforme Farbskala zwischen depthArea und brightArea
def _two_color_scale(c1, c2):
def _hex_to_rgb(h):
h = h.lstrip('#')
return tuple(int(h[i:i+2], 16) for i in (0, 2, 4))
r1, g1, b1 = _hex_to_rgb(c1)
r2, g2, b2 = _hex_to_rgb(c2)
scale = []
for t in np.linspace(0, 1, 6):
r = int(r1*(1-t) + r2*t)
g = int(g1*(1-t) + g2*t)
b = int(b1*(1-t) + b2*t)
scale.append([float(t), f"rgb({r},{g},{b})"])
return scale
colorscale = _two_color_scale(colors["depthArea"], colors["brightArea"]) if "depthArea" in colors else "Viridis"
# Histogram2d
fig = go.Figure(data=go.Histogram2d(
x=df["Kapitel"].astype(int),
y=df["Effektstärke"],
nbinsx=max(1, df["Kapitel"].nunique()),
nbinsy=bins_d,
colorscale=colorscale,
colorbar=dict(title="Anzahl"),
hovertemplate="Kapitel: %{x}<br>d-Bin: %{y}<br>Anzahl: %{z}<extra></extra>",
))
fig.update_layout(plotly_template.get_standard_layout(
f"Heatmap: Kapitel × Effektstärke ({kapitel_label})", "Kapitel", "Cohen d"
))
# ganze Zahlen als Kapitel-Ticks
fig.update_layout(xaxis=dict(tickmode="linear", dtick=1))
fig.show()
export_figure(fig, "vl-heatmap-kapitel-vs-d", export_fig_visual, export_fig_png)
def export_json(obj: dict, name: str):
try:
p = os.path.join(EXPORT_DIR, name)
with open(p, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
except Exception:
pass
# -----------------------------------------
# Hilfsfunktion: DataFrame zu Records (mit None für NaN)
# -----------------------------------------
def _df_records(df: pd.DataFrame, cols: list[str]) -> list[dict]:
try:
return df[cols].replace({np.nan: None}).to_dict(orient="records")
except Exception:
return df.replace({np.nan: None}).to_dict(orient="records")
def plot_boxplots(df: pd.DataFrame, kapitel: int | None = None):
"""Boxplots der Effektstärken: nach Kapitel (falls Gesamt) und nach Bins.
- Zeigt Ausreißer (IQR-basiert), Notches für Median-Konfidenz.
- CI-Stile aus plotly_template.
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
# 1) Boxplot nach Kapitel (nur sinnvoll, wenn mehrere Kapitel vorhanden)
if kapitel is None and df["Kapitel"].nunique() > 1:
fig_kap = go.Figure()
fig_kap.add_trace(go.Box(
x=df["Kapitel"].astype(int),
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_brightArea"],
line=styles["linie_primaryLine"],
name="Kapitel",
hovertemplate="Kapitel: %{x}<br>d: %{y:.2f}<extra></extra>",
))
fig_kap.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken nach Kapitel ({kapitel_label})", "Kapitel", "Cohen d"
))
# ganze Zahlen als Kapitel-Ticks
fig_kap.update_layout(xaxis=dict(tickmode="linear", dtick=1))
fig_kap.show()
export_figure(fig_kap, "vl-box-kapitel", export_fig_visual, export_fig_png)
# 2) Boxplot nach heuristischen Bins (immer möglich)
order = ["negativ", "gering", "mittel", "hoch"]
fig_bin = go.Figure()
fig_bin.add_trace(go.Box(
x=pd.Categorical(df["Bin"], categories=order, ordered=True),
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_accent"],
line=styles["linie_secondaryLine"],
name="Bin",
hovertemplate="Bin: %{x}<br>d: %{y:.2f}<extra></extra>",
))
fig_bin.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken nach Bins ({kapitel_label})", "Bin", "Cohen d"
))
fig_bin.show()
export_figure(fig_bin, "vl-box-bins", export_fig_visual, export_fig_png)
# 3) Optional: Ein Gesamt-Boxplot für die aktuelle Auswahl
fig_all = go.Figure()
fig_all.add_trace(go.Box(
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_positiveHighlight"],
line=styles["linie_primaryLine"],
name=kapitel_label,
hovertemplate="d: %{y:.2f}<extra></extra>",
))
fig_all.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken ({kapitel_label})", "", "Cohen d"
))
fig_all.show()
export_figure(fig_all, "vl-box-overall", export_fig_visual, export_fig_png)
def plot_hist(df: pd.DataFrame, kapitel: int | None = None):
# Use CI styles from plotly_template
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
fig = go.Figure()
fig.add_trace(go.Histogram(
x=df["Effektstärke"],
marker=styles["balken_accent"],
hovertemplate="Effektstärke: %{x:.2f}<br>Häufigkeit: %{y}<extra></extra>"
))
fig.update_layout(plotly_template.get_standard_layout(
f"Verteilung der Effektstärken ({kapitel_label})", "Cohen d", "Häufigkeit"
))
fig.show()
export_figure(fig, "vl-hist-effekte", export_fig_visual, export_fig_png)
def plot_bins(df: pd.DataFrame, kapitel: int | None = None):
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
order = ["negativ", "gering", "mittel", "hoch"]
counts = df["Bin"].value_counts().reindex(order).fillna(0).astype(int)
fig = go.Figure()
fig.add_trace(go.Bar(
x=counts.index,
y=counts.values,
marker=styles["balken_accent"],
hovertemplate="Kategorie: %{x}<br>Anzahl: %{y}<extra></extra>"
))
fig.update_layout(plotly_template.get_standard_layout(
f"Heuristische Einteilung nach Effektstärke (Bins) ({kapitel_label})", "Bin", "Anzahl"
))
fig.show()
export_figure(fig, "vl-bins", export_fig_visual, export_fig_png)
def plot_scatter(df: pd.DataFrame, cluster_labels: np.ndarray, model: KMeans, sil: float, title_suffix: str, kapitel: int | None = None):
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
tmp = df.copy()
tmp["Cluster"] = cluster_labels.astype(int)
# Plot-X: Kapitel als ganze Zahlen; kleine Jitter-Verschiebung, damit Punkte nicht exakt übereinander liegen
rng = np.random.default_rng(42)
tmp["_kapitel_x"] = tmp["Kapitel"].astype(int) + (rng.random(len(tmp)) - 0.5) * 0.12
# Clusterstärken (Mittelwert der Effektstärke im jeweiligen Clusterzentrum)
cluster_strengths = {i: float(model.cluster_centers_[i][0]) for i in range(len(model.cluster_centers_))}
tmp["Clusterstärke"] = tmp["Cluster"].map(cluster_strengths)
hovertemplate = (
"Thermometer: %{customdata[2]}<br>"
"Stichwort: %{text}<br>"
"Effektstärke: %{y:.2f}<br>"
"Kapitel: %{customdata[0]}<br>"
"Clusterstärke: %{customdata[1]:.3f}<extra></extra>"
)
fig = go.Figure()
clusters = sorted(tmp["Cluster"].unique())
palette_keys = ["positiveHighlight", "negativeHighlight", "accent", "brightArea"]
for idx, cluster in enumerate(clusters):
cluster_df = tmp[tmp["Cluster"] == cluster]
color_key = palette_keys[idx % len(palette_keys)]
fig.add_trace(go.Scatter(
x=cluster_df["_kapitel_x"],
y=cluster_df["Effektstärke"],
mode="markers",
marker={**styles[f"marker_{color_key}"], "size": 10},
name=f"Cluster: {cluster_strengths[cluster]:.2f}",
text=cluster_df["Stichwort"],
customdata=np.stack([cluster_df["Kapitelname"], cluster_df["Clusterstärke"], cluster_df["Thermometer_ID"]], axis=-1),
hovertemplate=hovertemplate
))
fig.update_layout(plotly_template.get_standard_layout(
f"Effektstärke × Cluster ({title_suffix}) ({kapitel_label}) Silhouette: {sil:.3f}", "Kapitel", "Cohen d"
))
# Ganze Zahlen auf der xAchse (Kapitel)
fig.update_layout(xaxis=dict(tickmode="linear", dtick=1))
fig.show()
export_figure(fig, f"vl-scatter-{title_suffix}", export_fig_visual, export_fig_png)
def plot_scatter_3d(df: pd.DataFrame, cluster_labels: np.ndarray, sil: float, title_suffix: str, kapitel: int | None = None):
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
tmp = df.copy()
tmp["Cluster"] = cluster_labels.astype(int)
# Clusterzentren für durchschnittliche Effektstärke
cluster_strengths = {i: float(tmp[tmp["Cluster"] == i]["Effektstärke"].mean()) for i in sorted(set(cluster_labels))}
hovertemplate = (
"Thermometer: %{text}<br>"
"Kapitel: %{customdata[0]}<br>"
"Effektstärke: %{x:.2f}<br>"
"Textdim: %{z:.2f}<br>"
"Cluster: %{customdata[1]}<extra></extra>"
)
fig = go.Figure()
clusters = sorted(tmp["Cluster"].unique())
palette_keys = ["positiveHighlight", "negativeHighlight", "accent", "brightArea"]
for idx, cluster in enumerate(clusters):
cluster_df = tmp[tmp["Cluster"] == cluster]
color_key = palette_keys[idx % len(palette_keys)]
fig.add_trace(go.Scatter3d(
x=cluster_df["Effektstärke"],
y=cluster_df["Kapitel"],
z=cluster_df["Text_Dimension"],
mode="markers",
marker={**styles[f"marker_{color_key}"], "size": 6},
name=f"Cluster {cluster} (Ø d = {cluster_strengths[cluster]:.2f})",
text=cluster_df["Stichwort"],
customdata=np.stack([cluster_df["Kapitelname"], cluster_df["Cluster"]], axis=-1),
hovertemplate=hovertemplate
))
fig.update_layout(plotly_template.get_standard_layout(
f"3D-Clustering (Effektstärke × Kapitel × Text) {title_suffix} ({kapitel_label}) Silhouette: {sil:.3f}",
"Effektstärke", "Kapitel", "Textdimension"
))
fig.update_layout(scene=dict(
yaxis=dict(
title="Kapitel",
tickmode="linear",
dtick=1
)
))
fig.show()
export_figure(fig, f"vl-scatter3d-{title_suffix}", export_fig_visual, export_fig_png)
# -----------------------------------------
# Pipeline
# -----------------------------------------
def analyse(csv_path: str = "Thermometer.csv", k: int = 4, kapitel: int | None = None):
# Laden
df = load_data(csv_path)
# Datenvalidierung
dq = validate_data(df)
print("\nDATA QUALITY REPORT:")
for key, val in dq.items():
print(f" {key}: {val}")
export_json(dq, "data_quality_report.json")
if kapitel is not None:
df = df[df["Kapitel"] == kapitel]
if df.empty:
print(f"Keine Daten für Kapitel {kapitel}.")
return None
# Bins
df = add_manual_bins(df)
# K-Means
labels, sil, model = run_kmeans(df, k=k)
# Silhouette je Punkt anhängen
try:
X_for_sil, _ = encode_features(df)
if k > 1 and len(df) > k:
df["Silhouette_point"] = silhouette_samples(X_for_sil, labels)
else:
df["Silhouette_point"] = np.nan
except Exception:
df["Silhouette_point"] = np.nan
# Reports
print("" * 60)
print("ANALYSE Sicht 1 | Heuristische Bins (Grenzen: <0 | <0.40 | <0.70 | ≥0.70)")
print(df["Bin"].value_counts().reindex(["negativ", "gering", "mittel", "hoch"]).fillna(0).astype(int))
print("\nANALYSE Sicht 2 | K-Means-Clustering")
if not math.isnan(sil):
print(f"Silhouette-Score (k={k}): {sil:.3f}")
else:
print(f"Silhouette-Score (k={k}): n/a (zu wenige Daten oder k zu groß)")
if not math.isnan(sil):
# Clusterzentren (nur Effektstärke + Kapitelmittelwerte)
centers = model.cluster_centers_
print("\nClusterzentren (erste Spalte = Effektstärke, Rest = Kapitel-OHE):")
for idx, center in enumerate(centers):
eff = center[0]
print(f" Cluster {idx}: Effektstärke-Mittel {eff:.3f}")
# --- Statistik-Block ---
stats_df = describe_effects(df)
plot_table_stats(stats_df, f"Deskriptive Statistik ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})")
# Deskriptive Statistik exportieren
try:
stats_df.to_csv(os.path.join(EXPORT_DIR, "deskriptiv.csv"))
except Exception:
pass
normality_and_qq(df, kapitel=kapitel)
df = mark_outliers_iqr(df)
if kapitel is None:
group_tests_by_kapitel(df)
text_vs_effect(df)
if kapitel is None:
chi2_bins_kapitel(df)
cluster_diagnostics(df)
profiles_df = cluster_profiles(df, labels)
try:
export_json(json.loads(profiles_df.to_json(orient="table")), "cluster_profile.json")
except Exception:
pass
# --- Signifikanz-geführte Sicht ---
df_sig = build_significance_view(df)
try:
df_sig.sort_values("Rank_score").to_csv(os.path.join(EXPORT_DIR, "signifikanz_ranking.csv"), index=False)
except Exception:
pass
plot_significance_space(df_sig)
# Tests zusammenfassen (vor Export sammeln)
tests_summary = {"silhouette_global": float(sil) if not math.isnan(sil) else None}
if kapitel is None and df["Kapitel"].nunique() > 1:
groups = [g.dropna().values for _, g in df.groupby("Kapitel")["Effektstärke"]]
try:
lev = levene(*groups, center='median')
tests_summary["levene_W"] = float(lev.statistic)
tests_summary["levene_p"] = float(lev.pvalue)
except Exception:
pass
try:
kw = kruskal(*groups)
n_total = sum(len(g) for g in groups)
h = float(kw.statistic)
eps2 = (h - (len(groups)-1)) / (n_total - 1) if n_total > 1 else None
tests_summary["kruskal_H"] = h
tests_summary["kruskal_p"] = float(kw.pvalue)
tests_summary["kruskal_eps2"] = float(eps2) if eps2 is not None else None
except Exception:
pass
try:
if "Text_Dimension" not in df.columns:
encode_features_3d(df)
rho, p = spearmanr(df["Text_Dimension"], df["Effektstärke"], nan_policy='omit')
tests_summary["spearman_rho_text_d"] = float(rho)
tests_summary["spearman_p_text_d"] = float(p)
except Exception:
pass
if kapitel is None and df["Kapitel"].nunique() > 1:
try:
ct = pd.crosstab(df["Kapitel"], df["Bin"]).reindex(sorted(df["Kapitel"].unique()))
chi2 = stats.chi2_contingency(ct)
tests_summary["chi2"] = float(chi2[0])
tests_summary["chi2_p"] = float(chi2[1])
tests_summary["chi2_df"] = int(chi2[2])
except Exception:
pass
export_json(tests_summary, "tests_summary.json")
# --- WERTEDATEI: Alles in einer JSON bündeln ---
if export_werte_all:
try:
# Kern-Datenzeilen
base_cols = [
"Thermometer_ID", "Stichwort", "Effektstärke", "Kapitel", "Kapitelname",
"Bin", "Text_Dimension", "Outlier_IQR", "Silhouette_point"
]
rows = _df_records(df, [c for c in base_cols if c in df.columns])
# Deskriptive Statistik als Records
desc_records = _df_records(
(stats_df.reset_index().rename(columns={"index": "Gruppe"})),
list(stats_df.reset_index().columns)
)
# Cluster-Profile als Records
prof_records = _df_records(
(profiles_df.reset_index().rename(columns={"index": "Cluster"})),
list(profiles_df.reset_index().columns)
)
# Cluster-Zentren (voll) und nur d-Komponente
centers_full = model.cluster_centers_.tolist() if hasattr(model, "cluster_centers_") else None
centers_d = [float(c[0]) for c in model.cluster_centers_] if hasattr(model, "cluster_centers_") else None
payload = {
"meta": {
"k": int(k),
"kapitel": int(kapitel) if kapitel is not None else None,
"theme": theme,
},
"data": rows,
"deskriptiv": desc_records,
"cluster": {
"silhouette_global": float(sil) if not math.isnan(sil) else None,
"centers_full": centers_full,
"centers_effekt_only": centers_d,
},
"profiles": prof_records,
"tests_summary": tests_summary if isinstance(tests_summary, dict) else {},
"data_quality": dq,
}
export_json(payload, "werte_all.json")
except Exception as _e:
pass
# Plots
plot_heatmap_kapitel_vs_d(df, kapitel=kapitel)
# Boxplots
plot_boxplots(df, kapitel=kapitel)
plot_hist(df, kapitel=kapitel)
plot_bins(df, kapitel=kapitel)
plot_scatter(df, labels, model, sil, title_suffix=f"k{k}", kapitel=kapitel)
# 3D-Clustering
X3d, _ = encode_features_3d(df)
model3d = KMeans(n_clusters=k, n_init=20, random_state=42)
labels3d = model3d.fit_predict(X3d)
sil3d = silhouette_score(X3d, labels3d) if k > 1 and len(df) > k else np.nan
plot_scatter_3d(df, labels3d, sil3d, title_suffix=f"k{k}-3d", kapitel=kapitel)
# Clusterzuordnung exportieren
try:
df_export = df.copy()
df_export["Cluster"] = labels
df_export.to_csv(os.path.join(EXPORT_DIR, "clusterzuordnung.csv"), index=False)
except Exception:
pass
return {
"df": df,
"kmeans_labels": labels,
"silhouette": sil,
"model": model,
}
# -----------------------------------------
# Main
# -----------------------------------------
if __name__ == "__main__":
# Passe den CSV-Pfad an, falls die Datei woanders liegt
if analyse_all:
df_all = load_data(os.path.join(os.path.dirname(__file__), csv_file))
for kap in sorted(df_all["Kapitel"].unique()):
analyse(csv_path=os.path.join(os.path.dirname(__file__), csv_file), k=k_clusters, kapitel=kap)
else:
analyse(csv_path=os.path.join(os.path.dirname(__file__), csv_file), k=k_clusters, kapitel=selected_kapitel)