This repository has been archived on 2025-10-27. You can view files and clone it, but cannot push or open issues or pull requests.
Files
visible-learning/visible-learning statistisch.py

1602 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
"""
Visible Learning Explorative Clusteranalyse
---------------------------------------------
CI: angelehnt an simulation_bildungswirkgefuege
Funktion: CSV mit Effektstärken laden, manuelle Bins + K-Means-Cluster bilden,
Silhouette-Score berechnen und Visualisierungen erzeugen.
"""
# -----------------------------------------
# Imports
# -----------------------------------------
import os
import math
import pandas as pd
import numpy as np
import json
from math import sqrt
from sklearn.preprocessing import OneHotEncoder
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score, silhouette_samples
from scipy import stats
from scipy.stats import anderson, kruskal, levene, spearmanr
import plotly.graph_objs as go
import plotly.io as pio
# -----------------------------------------
# Konfiguration laden
# -----------------------------------------
from config_visible_learning import (
csv_file,
k_clusters,
export_fig_visual,
export_fig_png,
theme,
selected_kapitel,
analyse_all,
export_werte_all,
)
# -----------------------------------------
# Template/CI
# -----------------------------------------
try:
from ci_template import plotly_template
plotly_template.set_theme(theme)
_ci_layout = lambda title, x, y: plotly_template.get_standard_layout(
title=title, x_title=x, y_title=y
)
except Exception:
# Fallback: neutrale Plotly-Defaults
_ci_layout = lambda title, x, y: dict(
title=title, xaxis_title=x, yaxis_title=y
)
# -----------------------------------------
# Export-Helfer (HTML/PNG) CI-kompatibel
# -----------------------------------------
EXPORT_DIR = os.path.join(os.path.dirname(__file__), "export")
if not os.path.exists(EXPORT_DIR):
try:
os.makedirs(EXPORT_DIR, exist_ok=True)
except Exception:
pass
def export_figure(fig, name: str, do_html: bool, do_png: bool):
"""Exportiert eine Plotly-Figur gemäß CI-Flags."""
base = os.path.join(EXPORT_DIR, name)
if do_html:
p = f"{base}.html"
pio.write_html(fig, file=p, auto_open=False, include_plotlyjs="cdn")
if do_png:
try:
p = f"{base}.png"
pio.write_image(fig, p, scale=2)
except Exception:
# PNG erfordert Kaleido; leise ignorieren, wenn nicht installiert
pass
# -----------------------------------------
# Daten | Laden & Vorbereiten
# -----------------------------------------
REQUIRED_COLS = ["Thermometer_ID", "Stichwort", "Effektstärke"]
def load_data(csv_path: str) -> pd.DataFrame:
df = pd.read_csv(csv_path)
missing = [c for c in REQUIRED_COLS if c not in df.columns]
if missing:
raise ValueError(f"Fehlende Spalten in CSV: {missing}")
# Typen bereinigen
df["Thermometer_ID"] = df["Thermometer_ID"].astype(str)
# Effektstärke robust in float wandeln
df["Effektstärke"] = (
df["Effektstärke"].astype(str).str.replace(",", ".", regex=False).str.strip()
)
df["Effektstärke"] = pd.to_numeric(df["Effektstärke"], errors="coerce")
# explizit ±inf auf NaN setzen, um sie zu entfernen
df["Effektstärke"] = df["Effektstärke"].replace([np.inf, -np.inf], np.nan)
# Kapitel aus Thermometer_ID ableiten und Kapitelname mappen
df["Kapitel"] = df["Thermometer_ID"].astype(str).str.split(".").str[0].astype(int)
kapitel_map = {
5: "Lernende",
6: "Elternhaus und Familie",
7: "Schule und Gesellschaft",
8: "Klassenzimmer",
9: "Lehrperson",
10: "Curriculum",
11: "Zielorientiertes Unterrichten",
12: "Lernstrategien",
13: "Lehrstrategien",
14: "Nutzung von Technologien",
15: "Schulische und außerschulische Einflüsse",
}
df["Kapitelname"] = df["Kapitel"].map(kapitel_map).fillna(df["Kapitel"].map(lambda k: f"Kapitel {k}"))
return df.dropna(subset=["Effektstärke"]) # nur gültige Zahlen
def validate_data(df: pd.DataFrame) -> dict:
"""Einfache Datenvalidierung und Qualitätsreport.
Hinweis: Fehlende Effektstärken wurden in load_data bereits entfernt.
"""
report = {}
report["n_rows"] = int(len(df))
# Duplikate
dup_counts = df["Thermometer_ID"].value_counts()
duplicates = dup_counts[dup_counts > 1]
report["duplicate_ids"] = duplicates.to_dict()
report["n_duplicates"] = int(duplicates.sum()) if not duplicates.empty else 0
# Gültige Kapitel (5..15)
valid_kap = set(range(5, 16))
invalid_kapitel = df.loc[~df["Kapitel"].isin(valid_kap), "Kapitel"].tolist()
report["invalid_kapitel_entries"] = invalid_kapitel
# Wertebereich d
report["effekt_min"] = float(df["Effektstärke"].min()) if not df.empty else None
report["effekt_max"] = float(df["Effektstärke"].max()) if not df.empty else None
# Leere Stichworte
empty_keywords = df["Stichwort"].astype(str).str.strip().eq("").sum()
report["empty_stichwort"] = int(empty_keywords)
return report
# -----------------------------------------
# Manuelle Bins (heuristische Einteilung)
# -----------------------------------------
BIN_LABELS = ["negativ", "gering", "mittel", "hoch"]
def manual_bin(d: float) -> str:
if d < 0:
return "negativ"
if 0 <= d < 0.40:
return "gering"
if 0.40 <= d < 0.70:
return "mittel"
return "hoch"
def add_manual_bins(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["Bin"] = df["Effektstärke"].apply(manual_bin)
return df
# -----------------------------------------
# K-Means-Clustering (Effektstärke + Kapitel)
# -----------------------------------------
def encode_features(df: pd.DataFrame, kapitel_weight: float = 1.0) -> tuple[np.ndarray, list[str]]:
"""One-Hot-Encoding des Kapitels + Effektstärke (metrisch)."""
try:
enc = OneHotEncoder(sparse_output=False, handle_unknown="ignore") # neuere sklearn-Versionen
except TypeError:
enc = OneHotEncoder(sparse=False, handle_unknown="ignore") # ältere sklearn-Versionen
cat = df[["Kapitel"]].fillna(-1)
cat_ohe = enc.fit_transform(cat)
cat_ohe = cat_ohe * float(kapitel_weight)
eff = df[["Effektstärke"]].values
X = np.hstack([eff, cat_ohe])
feature_names = ["Effektstärke"] + [f"kap::{c}" for c in enc.get_feature_names_out(["Kapitel"])]
return X, feature_names
def encode_features_3d(df: pd.DataFrame, kapitel_weight: float = 1.0) -> tuple[np.ndarray, list[str]]:
"""Effektstärke + Kapitel + Textdimension (TF-IDF + PCA) für 3D-Clustering."""
# Kapitel
try:
enc = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
except TypeError:
enc = OneHotEncoder(sparse=False, handle_unknown="ignore")
cat = df[["Kapitel"]].fillna(-1)
cat_ohe = enc.fit_transform(cat)
cat_ohe = cat_ohe * float(kapitel_weight)
# Effektstärke
eff = df[["Effektstärke"]].values
# Textdimension über TF-IDF + PCA
vectorizer = TfidfVectorizer(max_features=100)
X_text = vectorizer.fit_transform(df["Stichwort"].astype(str))
# Sicherstellen, dass TF-IDF keine Inf/NaN enthält (sollte nicht vorkommen)
X_text = X_text.tocsr()
pca = PCA(n_components=1, random_state=42)
text_dim = pca.fit_transform(X_text.toarray())
# Textdimension im DataFrame speichern
df["Text_Dimension"] = text_dim.flatten()
# Zusammenführen
X = np.hstack([eff, cat_ohe, text_dim])
feature_names = ["Effektstärke"] + list(enc.get_feature_names_out(["Kapitel"])) + ["Text_Dimension"]
return X, feature_names
# -----------------------------------------
# 3Achsige System-Clusteranalyse (x=Psychisch, y=Sozial, z=Effektstärke)
# -----------------------------------------
def load_system_axes(df: pd.DataFrame, mapping_csv: str | None = None) -> pd.DataFrame:
"""
Merged eine optionale Mapping-Datei (z. B. systems_mapping.csv) auf den DataFrame.
Erwartete Spalten-Varianten (mindestens eine der Varianten):
Variante A) getrennte Indikatoren:
- System_psychisch (0/1 oder float)
- System_sozial (0/1 oder float)
Variante B) gemeinsame Spalte:
- System in {'psychisch','sozial','beides','neutral'} (Groß/Kleinschreibung egal)
Falls keine Spalten gefunden werden, werden beide Achsen auf 0.0 gesetzt.
"""
out = df.copy()
# default: Spalten evtl. schon vorhanden -> normalisieren
def _coerce_float(col):
return pd.to_numeric(out[col], errors="coerce").fillna(0.0).astype(float)
has_psy = "System_psychisch" in out.columns
has_soz = "System_sozial" in out.columns
has_cat = "System" in out.columns
# optional: Mapping-Datei einlesen und per Thermometer_ID mergen
if mapping_csv is None:
mapping_csv = os.path.join(os.path.dirname(__file__), "systems_mapping.csv")
try:
if os.path.exists(mapping_csv):
mp = pd.read_csv(mapping_csv, dtype={"Thermometer_ID": str})
out = out.merge(mp, on="Thermometer_ID", how="left", suffixes=("", "_map"))
# falls es Dubletten gibt, map-Varianten bevorzugen
for col in ["System_psychisch", "System_sozial", "System"]:
col_map = col + "_map"
if col_map in out.columns:
out[col] = out[col].where(out[col].notna(), out[col_map])
# Aufräumen
drop_cols = [c for c in out.columns if c.endswith("_map")]
if drop_cols:
out = out.drop(columns=drop_cols)
# Flags neu prüfen (können durch Merge entstanden sein)
has_psy = "System_psychisch" in out.columns
has_soz = "System_sozial" in out.columns
has_cat = "System" in out.columns
except Exception:
# still silently continue; Achsen ggf. später auf 0 setzen
pass
# Variante A: numerische Spalten
if has_psy and has_soz:
out["System_psychisch"] = _coerce_float("System_psychisch")
out["System_sozial"] = _coerce_float("System_sozial")
# Variante B: kategoriale Spalte -> in numerische Achsen mappen
elif has_cat:
s = out["System"].astype(str).str.lower().str.strip()
out["System_psychisch"] = np.where(s.isin(["psychisch", "beides"]), 1.0, 0.0)
out["System_sozial"] = np.where(s.isin(["sozial", "beides"]), 1.0, 0.0)
out.loc[s.eq("neutral"), ["System_psychisch","System_sozial"]] = 0.0
else:
# Fallback: keine Angaben -> beide Achsen = 0.0
out["System_psychisch"] = 0.0
out["System_sozial"] = 0.0
print("Hinweis: Keine System-Spalten gefunden. Achsen 'psychisch'/'sozial' wurden auf 0.0 gesetzt. "
"Lege optional eine 'systems_mapping.csv' mit Spalten "
"[Thermometer_ID, System_psychisch, System_sozial] oder [Thermometer_ID, System] an.")
return out
def encode_features_system3(df: pd.DataFrame,
psych_weight: float = 1.0,
sozial_weight: float = 1.0,
effekt_weight: float = 1.0) -> tuple[np.ndarray, list[str]]:
"""
Baut die 3D-Featurematrix für die Systemanalyse:
X[:,0] = psych_weight * System_psychisch
X[:,1] = sozial_weight * System_sozial
X[:,2] = effekt_weight * Effektstärke
"""
X = np.column_stack([
df["System_psychisch"].astype(float).values * float(psych_weight),
df["System_sozial"].astype(float).values * float(sozial_weight),
df["Effektstärke"].astype(float).values * float(effekt_weight),
])
return _sanitize_X(X), ["System_psychisch", "System_sozial", "Effektstärke"]
def run_kmeans_system3(df: pd.DataFrame, k: int = 4, random_state: int = 42,
psych_weight: float = 1.0, sozial_weight: float = 1.0, effekt_weight: float = 1.0):
"""
K-Means auf den 3 System-Achsen (x=psychisch, y=sozial, z=Effektstärke).
Gibt (labels, silhouette, model, X) zurück.
"""
X, _ = encode_features_system3(df, psych_weight, sozial_weight, effekt_weight)
model = KMeans(n_clusters=k, n_init=20, random_state=random_state)
labels = model.fit_predict(X)
sil = silhouette_score(X, labels) if k > 1 and len(df) > k else np.nan
return labels, sil, model, X
def plot_cluster_system3(df: pd.DataFrame, labels: np.ndarray, sil: float, title_suffix: str = "system3d"):
"""
3D-Scatter mit Achsen:
x = System_psychisch, y = System_sozial, z = Effektstärke
Farben nach Cluster (CI-Template). Hover zeigt Thermometer, Kapitel, Stichwort und Koordinaten.
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
tmp = df.copy()
tmp["Cluster_sys3"] = labels.astype(int)
# Markerpalette robust aus CI-Styles ableiten
def _get_marker(*candidates):
for key in candidates:
if key in styles:
return styles[key]
return styles.get("marker_accent", {})
palette_markers = [
_get_marker("marker_positiveHighlight", "marker_brightArea", "marker_accent"),
_get_marker("marker_primaryLine", "marker_brightArea", "marker_accent"),
_get_marker("marker_secondaryLine", "marker_accent", "marker_brightArea"),
_get_marker("marker_negativeHighlight", "marker_accent", "marker_brightArea"),
]
hovertemplate = (
"Thermometer: %{customdata[0]}<br>"
"Kapitel: %{customdata[1]}<br>"
"Stichwort: %{text}<br>"
"Psychisch (x): %{x:.2f}<br>"
"Sozial (y): %{y:.2f}<br>"
"d (z): %{z:.2f}<extra></extra>"
)
fig = go.Figure()
for idx, c in enumerate(sorted(tmp["Cluster_sys3"].unique())):
sub = tmp[tmp["Cluster_sys3"] == c]
marker_style = palette_markers[idx % len(palette_markers)]
fig.add_trace(go.Scatter3d(
x=sub["System_psychisch"],
y=sub["System_sozial"],
z=sub["Effektstärke"],
mode="markers",
marker={**marker_style, "size": 7},
name=f"Cluster {c}",
text=sub["Stichwort"],
customdata=np.stack([sub["Thermometer_ID"], sub["Kapitelname"]], axis=-1),
hovertemplate=hovertemplate
))
fig.update_layout(plotly_template.get_standard_layout(
f"System-3D-Cluster (x=Psychisch, y=Sozial, z=d) Silhouette: {sil:.3f} {title_suffix}",
"Psychisches System", "Soziales System", "Cohen d"
))
# Achsen-Beschriftungen & Skalen
fig.update_layout(scene=dict(
xaxis=dict(title="Psychisches System", range=[-0.05, 1.05]),
yaxis=dict(title="Soziales System", range=[-0.05, 1.05]),
zaxis=dict(title="Cohen d")
))
fig.show()
# -----------------------------------------
# Hilfsfunktion zur Sanitisierung von Feature-Matrizen
# -----------------------------------------
def _sanitize_X(X: np.ndarray, clip: float | None = None) -> np.ndarray:
"""Ersetzt NaN/Inf in Feature-Matrizen und optionales Clipping gegen numerische Ausreißer.
Gibt eine *neue* Matrix zurück.
"""
X = np.asarray(X, dtype=float).copy()
# NaN/Inf -> 0
X[~np.isfinite(X)] = 0.0
if clip is not None and clip > 0:
X = np.clip(X, -float(clip), float(clip))
return X
def run_kmeans(df: pd.DataFrame, k: int = 4, random_state: int = 42, kapitel_weight: float = 1.0):
X, feature_names = encode_features(df, kapitel_weight=kapitel_weight)
X = _sanitize_X(X, clip=1e6)
if not np.isfinite(X).all():
print("Warnung: Nicht-endliche Werte in X nach Sanitisierung werden als 0 behandelt.")
model = KMeans(n_clusters=k, n_init=20, random_state=random_state)
labels = model.fit_predict(X)
sil = silhouette_score(X, labels) if k > 1 and len(df) > k else np.nan
return labels, sil, model
# -----------------------------------------
# Statistische Auswertungen
# -----------------------------------------
def describe_effects(df: pd.DataFrame) -> pd.DataFrame:
"""Deskriptive Statistik (gesamt & je Kapitel)."""
# Aggregation für Kapitel mit eindeutigen Spaltennamen
by_kap = df.groupby("Kapitel")["Effektstärke"].agg(
n="count",
mean="mean",
std=lambda s: s.std(ddof=1),
min="min",
q1=lambda s: s.quantile(0.25),
median="median",
q3=lambda s: s.quantile(0.75),
max="max",
skew="skew",
kurtosis=lambda s: s.kurt(),
)
# Gesamtzeile
overall = pd.DataFrame({
"n": [df["Effektstärke"].count()],
"mean": [df["Effektstärke"].mean()],
"std": [df["Effektstärke"].std(ddof=1)],
"min": [df["Effektstärke"].min()],
"q1": [df["Effektstärke"].quantile(0.25)],
"median": [df["Effektstärke"].median()],
"q3": [df["Effektstärke"].quantile(0.75)],
"max": [df["Effektstärke"].max()],
"skew": [df["Effektstärke"].skew()],
"kurtosis": [df["Effektstärke"].kurt()],
}, index=["Gesamt"])
# Kapitel-Index schöner beschriften
by_kap.index = [f"Kapitel {int(k)}" for k in by_kap.index]
# Zusammenführen
out = pd.concat([overall, by_kap])
return out
def plot_table_stats(stats_df: pd.DataFrame, title: str):
from plotly.graph_objs import Table, Figure
colors = plotly_template.get_colors()
fig = Figure(data=[Table(
header=dict(values=[""] + list(stats_df.columns),
fill_color=colors["brightArea"], font=dict(color=colors["white"])),
cells=dict(values=[stats_df.index.astype(str)] + [stats_df[c].round(3).tolist() for c in stats_df.columns],
fill_color=colors["depthArea"], font=dict(color=colors["white"]))
)])
fig.update_layout(plotly_template.get_standard_layout(title, "", ""))
fig.show()
def normality_and_qq(df: pd.DataFrame, kapitel: int | None = None):
x = df["Effektstärke"].dropna().values
ad = anderson(x, dist='norm')
print(f"AndersonDarling: A2={ad.statistic:.3f} | kritische Werte {ad.critical_values} | Sig-Level {ad.significance_level}")
# QQ-Plot
if len(x) < 3:
print("QQ-Plot: Zu wenige Datenpunkte (<3) Plot wird übersprungen.")
return
styles = plotly_template.get_plot_styles()
osm, osr = stats.probplot(x, dist="norm", rvalue=False)
# Kompatibel zu unterschiedlichen SciPy-Versionen:
# Variante A: osm=array(theoretische Quantile), osr=array(ordered responses)
# Variante B: osm=(array(theoretische Quantile), array(ordered responses)), osr=... (ungleich genutzt)
if isinstance(osm, (tuple, list)) and len(osm) == 2 and np.ndim(osm[0]) == 1:
th = np.asarray(osm[0])
ord_data = np.asarray(osm[1])
else:
th = np.asarray(osm)
ord_data = np.asarray(osr)
# NaNs/Inf filtern und Mindestlänge absichern
mask = np.isfinite(th) & np.isfinite(ord_data)
th = th[mask]
ord_data = ord_data[mask]
if th.size < 2:
print("QQ-Plot: Zu wenige gültige Punkte nach Filter Fit wird übersprungen.")
# nur Punkte plotten
fig = go.Figure()
fig.add_trace(go.Scatter(x=th, y=ord_data, mode="markers", marker=styles["marker_accent"], name="Daten"))
lab = f"QQ-Plot Effektstärken ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})"
fig.update_layout(plotly_template.get_standard_layout(lab, "Theoretische Quantile (Normal)", "Beobachtete Quantile"))
fig.show()
return
fig = go.Figure()
fig.add_trace(go.Scatter(x=th, y=ord_data, mode="markers", marker=styles["marker_accent"], name="Daten"))
# Diagonale (Least Squares Fit)
m, b = np.polyfit(th, ord_data, 1)
fig.add_trace(go.Scatter(x=th, y=m*th + b, mode="lines", line=styles["linie_primaryLine"], name="Fit"))
lab = f"QQ-Plot Effektstärken ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})"
fig.update_layout(plotly_template.get_standard_layout(lab, "Theoretische Quantile (Normal)", "Beobachtete Quantile"))
fig.show()
def mark_outliers_iqr(df: pd.DataFrame) -> pd.DataFrame:
q1, q3 = df["Effektstärke"].quantile([0.25, 0.75])
iqr = q3 - q1
lo, hi = q1 - 1.5*iqr, q3 + 1.5*iqr
out = df.copy()
out["Outlier_IQR"] = ~out["Effektstärke"].between(lo, hi)
print(f"IQR-Grenzen: [{lo:.2f}, {hi:.2f}] | Ausreißer: {int(out['Outlier_IQR'].sum())}")
return out
def group_tests_by_kapitel(df: pd.DataFrame):
groups = [g.dropna().values for _, g in df.groupby("Kapitel")["Effektstärke"]]
if len(groups) >= 2:
lev = levene(*groups, center='median')
print(f"Levene (Homogenität): W={lev.statistic:.3f}, p={lev.pvalue:.4f}")
# robust: KruskalWallis
if len(groups) >= 2:
kw = kruskal(*groups)
n_total = sum(len(g) for g in groups)
h = kw.statistic
eps2 = (h - (len(groups)-1)) / (n_total - 1)
print(f"KruskalWallis: H={h:.3f}, p={kw.pvalue:.6f} | ε²={eps2:.3f}")
def text_vs_effect(df: pd.DataFrame):
if "Text_Dimension" not in df.columns:
encode_features_3d(df)
rho, p = spearmanr(df["Text_Dimension"], df["Effektstärke"], nan_policy='omit')
print(f"Spearman ρ(Text, d) = {rho:.3f}, p={p:.6f}")
styles = plotly_template.get_plot_styles()
fig = go.Figure()
fig.add_trace(go.Scatter(x=df["Text_Dimension"], y=df["Effektstärke"],
mode="markers", marker=styles["marker_brightArea"], name="Thermometer",
text=df["Stichwort"],
hovertemplate="Textdim: %{x:.3f}<br>d: %{y:.2f}<br>%{text}<extra></extra>"))
x = df["Text_Dimension"].values; y = df["Effektstärke"].values
if len(x) >= 2 and np.isfinite(x).all() and np.isfinite(y).all():
m, b = np.polyfit(x, y, 1)
xx = np.linspace(x.min(), x.max(), 100)
fig.add_trace(go.Scatter(x=xx, y=m*xx+b, mode="lines", line=styles["linie_secondaryLine"], name="Trend"))
fig.update_layout(plotly_template.get_standard_layout("Textdimension × Effektstärke (Spearman)", "Textdimension (PCA1)", "Cohen d"))
fig.show()
def chi2_bins_kapitel(df: pd.DataFrame):
ct = pd.crosstab(df["Kapitel"], df["Bin"]).reindex(sorted(df["Kapitel"].unique()))
chi2 = stats.chi2_contingency(ct)
print("Kontingenztafel (Kapitel × Bin):")
print(ct)
print(f"Chi²={chi2[0]:.3f}, p={chi2[1]:.6f}, df={chi2[2]} (Unabhängigkeitstest)")
return ct
def cluster_diagnostics(df: pd.DataFrame, k_min: int = 2, k_max: int = 8, kapitel_weight: float = 0.0):
X, _ = encode_features(df, kapitel_weight=kapitel_weight)
inertias, sils, ks = [], [], []
for k in range(k_min, k_max + 1):
km = KMeans(n_clusters=k, n_init=20, random_state=42).fit(X)
inertias.append(km.inertia_)
ks.append(k)
sils.append(silhouette_score(X, km.labels_) if k > 1 else np.nan)
colors = plotly_template.get_colors()
fig = go.Figure()
fig.add_trace(go.Scatter(x=ks, y=inertias, mode="lines+markers",
line=dict(color=colors["primaryLine"], width=2), name="Inertia (Elbow)"))
fig.add_trace(go.Scatter(x=ks, y=sils, mode="lines+markers",
line=dict(color=colors["secondaryLine"], width=2), name="Silhouette"))
fig.update_layout(plotly_template.get_standard_layout("Cluster-Diagnostik (k)", "k", "Wert"))
fig.show()
def cluster_profiles(df: pd.DataFrame, labels: np.ndarray, top_terms: int = 3):
res = []
tmp = df.copy()
tmp["Cluster"] = labels
vect = TfidfVectorizer(max_features=300)
Xtxt = vect.fit_transform(tmp["Stichwort"].astype(str))
vocab = np.array(vect.get_feature_names_out())
for c in sorted(tmp["Cluster"].unique()):
sub = tmp[tmp["Cluster"] == c]
mean_d = sub["Effektstärke"].mean()
n = len(sub)
by_kap = sub["Kapitel"].value_counts().sort_index().to_dict()
# positionsbasierte Zeilenindizes für die Sparse-Matrix
pos_idx = tmp.index.get_indexer(sub.index)
mean_tfidf = np.asarray(Xtxt[pos_idx].mean(axis=0)).ravel()
top_idx = mean_tfidf.argsort()[::-1][:top_terms]
terms = vocab[top_idx].tolist()
res.append({"Cluster": c, "n": n, "Ø d": round(mean_d,3), "Kapitelverteilung": by_kap, "Top_Terme": terms})
prof = pd.DataFrame(res).set_index("Cluster")
print("\nCluster-Profile:")
print(prof)
return prof
# -----------------------------------------
# Signifikanz-geführte Sicht (kapitelunabhängig)
# -----------------------------------------
def _minmax_norm(a: np.ndarray) -> np.ndarray:
a = np.asarray(a, dtype=float)
if a.size == 0:
return a
lo, hi = np.nanmin(a), np.nanmax(a)
if not np.isfinite(lo) or not np.isfinite(hi) or hi - lo <= 1e-12:
return np.zeros_like(a)
return (a - lo) / (hi - lo)
def build_significance_view(df: pd.DataFrame) -> pd.DataFrame:
"""
Erzeugt eine kapitelunabhängige Sicht mit einem 'SignifikanzScore'.
Idee: Kombination aus Effektstärke-Magnitude und (falls vorhanden) individueller Silhouette-Trennschärfe.
- score_basis = |d| (größer = stärker)
- score_cluster = Silhouette_point (kleiner 0 -> auf 0 gesetzt), anschließend min-max-normalisiert
- Gesamt-Score = 0.6*norm(|d|) + 0.4*norm(max(Silhouette_point, 0))
Vorzeichen des Scores folgt dem Vorzeichen von d, damit negative Effekte unten landen.
Hinweis: Clustering/Score in dieser Ansicht wird kapitelunabhängig berechnet, indem Kapitel-OHE mit Gewicht 0.0 skaliert wird.
"""
tmp = df.copy()
# Basisgrößen
tmp["abs_d"] = tmp["Effektstärke"].abs()
if "Silhouette_point" not in tmp.columns:
tmp["Silhouette_point"] = np.nan
sil_nonneg = tmp["Silhouette_point"].fillna(0.0).clip(lower=0.0)
score_basis = _minmax_norm(tmp["abs_d"].values)
score_sil = _minmax_norm(sil_nonneg.values)
score = 0.6 * score_basis + 0.4 * score_sil
tmp["SignifikanzScore"] = score * np.sign(tmp["Effektstärke"].values)
# Ranglisten (absolut stärkste zuerst)
tmp["Rank_abs"] = (-tmp["abs_d"]).rank(method="min").astype(int)
tmp["Rank_score"] = (-tmp["SignifikanzScore"].abs()).rank(method="min").astype(int)
# Kategorien für schnelle Filterung
def impact_label(d):
if d >= 0.70:
return "hoch+"
if d >= 0.40:
return "mittel+"
if d >= 0.00:
return "gering+"
if d > -0.40:
return "gering"
if d > -0.70:
return "mittel"
return "hoch"
tmp["Impact_Label"] = tmp["Effektstärke"].apply(impact_label)
return tmp
def plot_significance_space(df_sig: pd.DataFrame):
"""
2D-Signifikanzraum:
x = Effektstärke (Cohen d)
y = SignifikanzScore (vorzeichenbehaftet)
Punktgröße ~ |Score|, Farbe nach Vorzeichen (CI-Farben).
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
# Markergrößen (skaliert)
s = (df_sig["SignifikanzScore"].abs() * 20.0) + 6.0
# Farben nach Vorzeichen
color_pos = colors.get("positiveHighlight", "#2ca02c")
color_neg = colors.get("negativeHighlight", "#d62728")
point_colors = np.where(df_sig["SignifikanzScore"] >= 0, color_pos, color_neg)
hovertemplate = (
"Thermometer: %{customdata[0]}<br>"
"Stichwort: %{text}<br>"
"d: %{x:.2f}<br>"
"Score: %{y:.3f}<br>"
"Kapitel: %{customdata[1]}<br>"
"Impact: %{customdata[2]}<br>"
"Rank(|d|): %{customdata[3]} | Rank(|Score|): %{customdata[4]}<extra></extra>"
)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df_sig["Effektstärke"],
y=df_sig["SignifikanzScore"],
mode="markers",
marker=dict(color=point_colors, size=s),
text=df_sig["Stichwort"],
customdata=np.stack([
df_sig["Thermometer_ID"],
df_sig["Kapitelname"],
df_sig["Impact_Label"],
df_sig["Rank_abs"],
df_sig["Rank_score"],
], axis=-1),
name="Thermometer",
hovertemplate=hovertemplate
))
# Referenzlinien
fig.add_hline(y=0, line=dict(color=colors.get("depthArea"), width=1))
for x0 in [0.0, 0.40, 0.70, -0.40, -0.70]:
fig.add_vline(x=x0, line=dict(color=colors.get("depthArea"), width=1, dash="dot"))
fig.update_layout(plotly_template.get_standard_layout(
"Signifikanz-geführter Raum: Effektstärke × Score (kapitelunabhängig)",
"Cohen d", "SignifikanzScore"
))
fig.show()
# -----------------------------------------
# Fibonacci-Ähnlichkeitsanalyse
# -----------------------------------------
def _fibonacci_n(n: int) -> np.ndarray:
"""Erzeugt die ersten n Fibonacci-Zahlen (startend 1,1,2,3,5,...) als float64."""
if n <= 0:
return np.array([], dtype=float)
fib = np.zeros(n, dtype=float)
if n >= 1:
fib[0] = 1.0
if n >= 2:
fib[1] = 1.0
for i in range(2, n):
fib[i] = fib[i - 1] + fib[i - 2]
return fib
def _ols_fit(x: np.ndarray, y: np.ndarray) -> tuple[float, float, float, float]:
"""
Linearer Fit y ≈ a·x + b; gibt (a, b, r, r2) zurück.
r = Pearson, r2 = R². x,y werden als float Arrays erwartet.
"""
x = np.asarray(x, float)
y = np.asarray(y, float)
xm, ym = x.mean(), y.mean()
num = ((x - xm) * (y - ym)).sum()
den = ((x - xm) ** 2).sum()
a = 0.0 if den == 0 else num / den
b = ym - a * xm
# Pearson
sx = x.std(ddof=1)
sy = y.std(ddof=1)
r = 0.0 if sx == 0 or sy == 0 else num / ((len(x) - 1) * sx * sy)
r2 = r * r
return a, b, r, r2
def _golden_ratio():
return (1 + sqrt(5)) / 2
def fib_like_analysis(df: pd.DataFrame, export_prefix: str = "fib"):
"""
Prüft Fibonacci-Ähnlichkeit der Effektstärken.
zwei Sichten: signed (mit Vorzeichen), abs (Betrag)
skaliert Fibonacci per OLS (a,b), berechnet Kennzahlen und Residuen
Permutationstest für R² (2000 Shuffles)
Visuals im CI-Look
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
results = {}
phi = _golden_ratio()
for view in ("signed", "abs"):
if view == "signed":
series = np.sort(df["Effektstärke"].values)[::-1] # absteigend (mit Vorzeichen)
label = "Effektstärken (absteigend, mit Vorzeichen)"
else:
series = np.sort(np.abs(df["Effektstärke"].values))[::-1]
label = "Effektstärken |d| (absteigend)"
n = len(series)
if n < 5:
print("Fibonacci-Analyse: zu wenige Punkte.")
return
fib = _fibonacci_n(n)
a, b, r, r2 = _ols_fit(fib, series)
fitted = a * fib + b
resid = series - fitted
# Fehlermaße
rmse = float(np.sqrt(np.mean((series - fitted) ** 2)))
mape = float(np.mean(np.abs((series - fitted) / np.where(series == 0, 1e-9, series)))) * 100.0
# Verhältnis-Folge gegen φ
with np.errstate(divide="ignore", invalid="ignore"):
ratios = series[:-1] / series[1:]
ratios = ratios[np.isfinite(ratios) & (series[1:] != 0)]
ratio_med = float(np.median(ratios)) if ratios.size else np.nan
ratio_mae = float(np.mean(np.abs(ratios - phi))) if ratios.size else np.nan
# Permutationstest (R²)
rng = np.random.default_rng(42)
n_perm = 2000
r2_null = []
for _ in range(n_perm):
shuf = np.array(series, copy=True)
rng.shuffle(shuf)
_, _, _, r2p = _ols_fit(fib, shuf)
r2_null.append(r2p)
r2_null = np.array(r2_null)
p_value = float((r2_null >= r2).mean())
# ---------------- Plots ----------------
# Overlay
fig1 = go.Figure()
fig1.add_trace(go.Scatter(
x=np.arange(n), y=series, mode="lines+markers",
line=styles.get("linie_primaryLine", {}), marker=styles.get("marker_brightArea", {}),
name=label, hovertemplate="i=%{x}<br>Effekt: %{y:.3f}<extra></extra>"
))
fig1.add_trace(go.Scatter(
x=np.arange(n), y=fitted, mode="lines",
line=styles.get("linie_secondaryLine", {}),
name=f"Skalierte Fibonacci (a={a:.3f}, b={b:.3f})",
hovertemplate="i=%{x}<br>Fit: %{y:.3f}<extra></extra>"
))
fig1.update_layout(plotly_template.get_standard_layout(
f"Fibonacci-Overlay ({'signed' if view=='signed' else '|d|'}) r={r:.3f}, R²={r2:.3f}, p≈{p_value:.3f}",
"rangsortierter Index", "Wert"
))
fig1.show()
# Residuen
fig2 = go.Figure()
fig2.add_trace(go.Bar(
x=np.arange(n), y=resid, marker=dict(color=colors.get("brightArea", "#BBBBBB")),
name="Residuen", hovertemplate="i=%{x}<br>Residuum: %{y:.3f}<extra></extra>"
))
fig2.add_hline(y=0, line=dict(width=1, color=colors.get("depthArea")))
fig2.update_layout(plotly_template.get_standard_layout(
f"Residuen zum Fibonacci-Fit ({'signed' if view=='signed' else '|d|'})",
"rangsortierter Index", "Residuum"
))
fig2.show()
# Ratio-Histogramm vs. φ
if ratios.size:
fig3 = go.Figure()
fig3.add_trace(go.Histogram(
x=ratios, nbinsx=min(30, max(10, int(np.sqrt(len(ratios))))),
marker=dict(color=colors.get("brightArea", "#BBBBBB")),
name="d_i / d_{i+1}",
hovertemplate="Ratio: %{x:.3f}<br>n=%{y}<extra></extra>"
))
fig3.add_vline(x=phi, line=dict(width=2, dash="dot", color=colors.get("depthArea")),
annotation_text="φ", annotation_position="top right")
fig3.update_layout(plotly_template.get_standard_layout(
f"Verhältnisverteilung dᵢ/dᵢ₊₁ vs. φ ({'signed' if view=='signed' else '|d|'})",
"dᵢ / dᵢ₊₁", "Häufigkeit"
))
fig3.show()
# ---------------- Exporte ----------------
try:
out = {
"view": view,
"n": int(n),
"a": float(a), "b": float(b),
"pearson_r": float(r), "r2": float(r2),
"rmse": rmse, "mape_percent": mape,
"phi": float(phi),
"ratio_median": ratio_med,
"ratio_mae_to_phi": ratio_mae,
"perm_n": int(n_perm),
"perm_p_r2": p_value,
}
export_json(out, f"{export_prefix}_{view}_metrics.json")
df_export = pd.DataFrame({
"rank": np.arange(n),
"effect": series,
"fib": fib,
"fit": fitted,
"resid": resid
})
df_export.to_csv(os.path.join(EXPORT_DIR, f"{export_prefix}_{view}_series.csv"), index=False)
except Exception:
pass
results[view] = out
print("\nFIBONACCI-CHECK (kurz):")
for view in ("signed", "abs"):
m = results[view]
print(f" {view:6s} | R²={m['r2']:.3f} | p(R²)≈{m['perm_p_r2']:.3f} | "
f"ratio_median≈{m['ratio_median']:.3f} (φ≈{_golden_ratio():.3f})")
return results
# -----------------------------------------
# Zusätzliche Visualisierungen: Silhouette-Balken & Cluster-Embedding
# -----------------------------------------
def plot_silhouette_bars(df: pd.DataFrame, labels: np.ndarray, sil_global: float):
"""
Klassisches Silhouette-Plot als Balkenvisualisierung.
Nutzt df["Silhouette_point"] (falls vorhanden). Farben stammen aus dem CI-Template.
"""
if "Silhouette_point" not in df.columns or df["Silhouette_point"].isna().all():
print("Silhouette-Plot: keine Punkt-Silhouetten verfügbar übersprungen.")
return
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
tmp = df.copy()
tmp["Cluster"] = labels.astype(int)
# zur stabilen Sortierung innerhalb der Cluster absteigend nach Silhouette
tmp = tmp.sort_values(["Cluster", "Silhouette_point"], ascending=[True, False]).reset_index(drop=True)
tmp["_idx"] = np.arange(len(tmp))
# Markerpalette (robust, nur CI-Keys)
def _get_marker(*candidates):
for key in candidates:
if key in styles:
return styles[key]
return styles.get("marker_accent", {})
palette_markers = [
_get_marker("marker_positiveHighlight", "marker_brightArea", "marker_accent"),
_get_marker("marker_primaryLine", "marker_brightArea", "marker_accent"),
_get_marker("marker_secondaryLine", "marker_accent", "marker_brightArea"),
_get_marker("marker_negativeHighlight", "marker_accent", "marker_brightArea"),
]
fig = go.Figure()
clusters_sorted = sorted(tmp["Cluster"].unique())
for idx, c in enumerate(clusters_sorted):
sub = tmp[tmp["Cluster"] == c]
# Balkenfarbe aus Marker-Stil ableiten (falls vorhanden)
marker_style = palette_markers[idx % len(palette_markers)].copy()
bar_color = marker_style.get("color", colors.get("brightArea"))
fig.add_trace(go.Bar(
x=sub["_idx"],
y=sub["Silhouette_point"],
marker=dict(color=bar_color),
name=f"Cluster {c}",
hovertemplate=(
"Cluster: " + str(c) + "<br>"
"Silhouette: %{y:.3f}<br>"
"Thermometer: %{customdata[0]}<br>"
"Kapitel: %{customdata[1]}<br>"
"Stichwort: %{text}<extra></extra>"
),
text=sub["Stichwort"],
customdata=np.stack([sub["Thermometer_ID"], sub["Kapitelname"]], axis=-1),
))
# Referenzlinie: globaler Silhouette-Score
ref_line_color = colors.get("depthArea")
fig.add_hline(y=float(sil_global) if sil_global is not None else 0.0,
line=dict(width=2, color=ref_line_color, dash="dot"))
fig.update_layout(plotly_template.get_standard_layout(
f"Silhouette je Punkt (globaler Silhouette-Score = {sil_global:.3f})",
"Index (gruppiert nach Cluster)", "Silhouette"
))
fig.update_xaxes(showticklabels=False)
fig.show()
def plot_cluster_embedding(df: pd.DataFrame, labels: np.ndarray, title_suffix: str, kapitel_weight: float = 0.0):
"""
2D-Embedding der Merkmale (Effektstärke + ggf. Kapitel-OHE (gewichtet) + Text-TFIDF via PCA).
Schritt 1: Feature-Matrix konstruieren (wie encode_features_3d, aber mit 2 PCA-Komponenten).
Schritt 2: PCA auf (Effekt + Text + Kapitel (gewichtet)) -> PC1/PC2.
Farben nach Cluster (CI-Styles), Kapitel/Stichwort in Hover.
"""
# Text-Vektoren
vectorizer = TfidfVectorizer(max_features=300)
X_text = vectorizer.fit_transform(df["Stichwort"].astype(str)).toarray()
# Kapitel-OHE (gewichtet)
try:
enc = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
except TypeError:
enc = OneHotEncoder(sparse=False, handle_unknown="ignore")
cat = df[["Kapitel"]].fillna(-1)
X_kap = enc.fit_transform(cat) * float(kapitel_weight)
# Effekt
X_d = df[["Effektstärke"]].values
# Feature-Matrix
X = np.hstack([X_d, X_kap, X_text])
X = _sanitize_X(X, clip=1e6)
# PCA 2D
pca = PCA(n_components=2, random_state=42)
Z = pca.fit_transform(X)
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
# Markerpalette (nur CI-Keys, robust)
def _get_marker(*candidates):
for key in candidates:
if key in styles:
return styles[key]
return styles.get("marker_accent", {})
palette_markers = [
_get_marker("marker_positiveHighlight", "marker_brightArea", "marker_accent"),
_get_marker("marker_primaryLine", "marker_brightArea", "marker_accent"),
_get_marker("marker_secondaryLine", "marker_accent", "marker_brightArea"),
_get_marker("marker_negativeHighlight", "marker_accent", "marker_brightArea"),
]
tmp = df.copy()
tmp["Cluster"] = labels.astype(int)
tmp["PC1"] = Z[:, 0]
tmp["PC2"] = Z[:, 1]
fig = go.Figure()
clusters_sorted = sorted(tmp["Cluster"].unique())
for idx, c in enumerate(clusters_sorted):
sub = tmp[tmp["Cluster"] == c]
marker_style = palette_markers[idx % len(palette_markers)]
fig.add_trace(go.Scatter(
x=sub["PC1"], y=sub["PC2"],
mode="markers",
marker={**marker_style, "size": 10},
name=f"Cluster {c}",
text=sub["Stichwort"],
customdata=np.stack([sub["Thermometer_ID"], sub["Kapitelname"], sub["Effektstärke"]], axis=-1),
hovertemplate="Cluster: " + str(c) + "<br>PC1: %{x:.3f}<br>PC2: %{y:.3f}<br>d: %{customdata[2]:.2f}<br>Thermometer: %{customdata[0]}<br>Kapitel: %{customdata[1]}<extra></extra>"
))
exp_var = pca.explained_variance_ratio_
fig.update_layout(plotly_template.get_standard_layout(
f"PCA-Embedding der Cluster (PC1 {exp_var[0]*100:.1f}%, PC2 {exp_var[1]*100:.1f}%) {title_suffix}",
"PC1", "PC2"
))
fig.show()
# -----------------------------------------
# Visualisierungen
# -----------------------------------------
def plot_heatmap_kapitel_vs_d(df: pd.DataFrame, kapitel: int | None = None, bins_d: int = 30):
"""2D-Heatmap (Histogram2d) von Kapitel (x) gegen Effektstärke (y).
- Zeigt die Dichte/Anzahl pro Zelle (Kapitel × d-Bereich)
- CI-Farbskala anhand Template-Farben (depthArea → brightArea)
"""
colors = plotly_template.get_colors()
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
# CI-konforme Farbskala zwischen depthArea und brightArea
def _two_color_scale(c1, c2):
def _hex_to_rgb(h):
h = h.lstrip('#')
return tuple(int(h[i:i+2], 16) for i in (0, 2, 4))
r1, g1, b1 = _hex_to_rgb(c1)
r2, g2, b2 = _hex_to_rgb(c2)
scale = []
for t in np.linspace(0, 1, 6):
r = int(r1*(1-t) + r2*t)
g = int(g1*(1-t) + g2*t)
b = int(b1*(1-t) + b2*t)
scale.append([float(t), f"rgb({r},{g},{b})"])
return scale
colorscale = _two_color_scale(colors.get("depthArea", "#444"), colors.get("brightArea", "#fff")) if "depthArea" in colors and "brightArea" in colors else colors.get("continuous", "Viridis")
# Histogram2d
fig = go.Figure(data=go.Histogram2d(
x=df["Kapitel"].astype(int),
y=df["Effektstärke"],
nbinsx=max(1, df["Kapitel"].nunique()),
nbinsy=bins_d,
colorscale=colorscale,
colorbar=dict(title="Anzahl"),
hovertemplate="Kapitel: %{x}<br>d-Bin: %{y}<br>Anzahl: %{z}<extra></extra>",
))
fig.update_layout(plotly_template.get_standard_layout(
f"Heatmap: Kapitel × Effektstärke ({kapitel_label})", "Kapitel", "Cohen d"
))
# ganze Zahlen als Kapitel-Ticks
fig.update_layout(xaxis=dict(tickmode="linear", dtick=1))
fig.show()
export_figure(fig, "vl-heatmap-kapitel-vs-d", export_fig_visual, export_fig_png)
def export_json(obj: dict, name: str):
try:
p = os.path.join(EXPORT_DIR, name)
with open(p, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
except Exception:
pass
# -----------------------------------------
# Hilfsfunktion: DataFrame zu Records (mit None für NaN)
# -----------------------------------------
def _df_records(df: pd.DataFrame, cols: list[str]) -> list[dict]:
try:
return df[cols].replace({np.nan: None}).to_dict(orient="records")
except Exception:
return df.replace({np.nan: None}).to_dict(orient="records")
def plot_boxplots(df: pd.DataFrame, kapitel: int | None = None):
"""Boxplots der Effektstärken: nach Kapitel (falls Gesamt) und nach Bins.
- Zeigt Ausreißer (IQR-basiert), Notches für Median-Konfidenz.
- CI-Stile aus plotly_template.
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
# 1) Boxplot nach Kapitel (nur sinnvoll, wenn mehrere Kapitel vorhanden)
if kapitel is None and df["Kapitel"].nunique() > 1:
fig_kap = go.Figure()
fig_kap.add_trace(go.Box(
x=df["Kapitel"].astype(int),
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_brightArea"],
line=styles["linie_primaryLine"],
name="Kapitel",
hovertemplate="Kapitel: %{x}<br>d: %{y:.2f}<extra></extra>",
))
fig_kap.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken nach Kapitel ({kapitel_label})", "Kapitel", "Cohen d"
))
# ganze Zahlen als Kapitel-Ticks
fig_kap.update_layout(xaxis=dict(tickmode="linear", dtick=1))
fig_kap.show()
export_figure(fig_kap, "vl-box-kapitel", export_fig_visual, export_fig_png)
# 2) Boxplot nach heuristischen Bins (immer möglich)
order = ["negativ", "gering", "mittel", "hoch"]
fig_bin = go.Figure()
fig_bin.add_trace(go.Box(
x=pd.Categorical(df["Bin"], categories=order, ordered=True),
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_accent"],
line=styles["linie_secondaryLine"],
name="Bin",
hovertemplate="Bin: %{x}<br>d: %{y:.2f}<extra></extra>",
))
fig_bin.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken nach Bins ({kapitel_label})", "Bin", "Cohen d"
))
fig_bin.show()
export_figure(fig_bin, "vl-box-bins", export_fig_visual, export_fig_png)
# 3) Optional: Ein Gesamt-Boxplot für die aktuelle Auswahl
fig_all = go.Figure()
fig_all.add_trace(go.Box(
y=df["Effektstärke"],
boxpoints="outliers",
notched=True,
marker=styles["marker_positiveHighlight"],
line=styles["linie_primaryLine"],
name=kapitel_label,
hovertemplate="d: %{y:.2f}<extra></extra>",
))
fig_all.update_layout(plotly_template.get_standard_layout(
f"Boxplot Effektstärken ({kapitel_label})", "", "Cohen d"
))
fig_all.show()
export_figure(fig_all, "vl-box-overall", export_fig_visual, export_fig_png)
def plot_hist(df: pd.DataFrame, kapitel: int | None = None):
# Use CI styles from plotly_template
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
fig = go.Figure()
fig.add_trace(go.Histogram(
x=df["Effektstärke"],
marker=styles["balken_accent"],
hovertemplate="Effektstärke: %{x:.2f}<br>Häufigkeit: %{y}<extra></extra>"
))
fig.update_layout(plotly_template.get_standard_layout(
f"Verteilung der Effektstärken ({kapitel_label})", "Cohen d", "Häufigkeit"
))
fig.show()
export_figure(fig, "vl-hist-effekte", export_fig_visual, export_fig_png)
def plot_bins(df: pd.DataFrame, kapitel: int | None = None):
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
order = ["negativ", "gering", "mittel", "hoch"]
counts = df["Bin"].value_counts().reindex(order).fillna(0).astype(int)
fig = go.Figure()
fig.add_trace(go.Bar(
x=counts.index,
y=counts.values,
marker=styles["balken_accent"],
hovertemplate="Kategorie: %{x}<br>Anzahl: %{y}<extra></extra>"
))
fig.update_layout(plotly_template.get_standard_layout(
f"Heuristische Einteilung nach Effektstärke (Bins) ({kapitel_label})", "Bin", "Anzahl"
))
fig.show()
export_figure(fig, "vl-bins", export_fig_visual, export_fig_png)
def plot_scatter(df: pd.DataFrame, cluster_labels: np.ndarray, model: KMeans, sil: float, title_suffix: str, kapitel: int | None = None, top_n: int = 5):
"""
Kapitelunabhängiger 2D-Scatter:
- x: künstlicher Index, aber so angeordnet, dass Punkte je Cluster zusammenstehen
- y: Effektstärke (Cohen d)
- Farben: Cluster
Zusätzlich:
• horizontale Linien bei den Cluster-Mitteln (Ø d)
• Labels für die Top-N nach |d|
"""
styles = plotly_template.get_plot_styles()
colors = plotly_template.get_colors()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
tmp = df.copy()
tmp["Cluster"] = cluster_labels.astype(int)
# Clusterstärken (Mittelwert der Effektstärke im jeweiligen Clusterzentrum)
cluster_strengths = {i: float(model.cluster_centers_[i][0]) for i in range(len(model.cluster_centers_))}
tmp["Clusterstärke"] = tmp["Cluster"].map(cluster_strengths)
# Cluster-Reihenfolge: absteigend nach Ø d
clusters_sorted = sorted(tmp["Cluster"].unique(), key=lambda c: cluster_strengths[c], reverse=True)
# Gewünschte Markerpalette (robust mit Fallbacks)
def _get_marker(*candidates):
for key in candidates:
if key in styles:
return styles[key]
return styles.get("marker_accent", {})
palette_markers = [
_get_marker("marker_positiveHighlight", "marker_brightArea", "marker_accent"),
_get_marker("marker_primaryLine", "marker_brightArea", "marker_accent"),
_get_marker("marker_secondaryLine", "marker_accent", "marker_brightArea"),
_get_marker("marker_negativeHighlight", "marker_accent", "marker_brightArea"),
]
# x-Positionen so vergeben, dass Cluster-Blöcke entstehen
tmp = tmp.reset_index(drop=True)
tmp["_x"] = np.nan
x_cursor = 0
block_bounds = {} # für Centroid-Linien (x-Min/x-Max je Cluster)
for c in clusters_sorted:
sub_idx = tmp.index[tmp["Cluster"] == c].tolist()
n = len(sub_idx)
xs = np.arange(x_cursor, x_cursor + n)
tmp.loc[sub_idx, "_x"] = xs
block_bounds[c] = (xs.min(), xs.max())
x_cursor += n + 2 # +2 als optischer Abstand zwischen Blöcken
hovertemplate = (
"Thermometer: %{customdata[2]}<br>"
"Stichwort: %{text}<br>"
"Effektstärke: %{y:.2f}<br>"
"Kapitel: %{customdata[0]}<br>"
"Clusterstärke: %{customdata[1]:.2f}<extra></extra>"
)
fig = go.Figure()
# Punkte je Cluster zeichnen
for idx, c in enumerate(clusters_sorted):
cdf = tmp[tmp["Cluster"] == c]
fig.add_trace(go.Scatter(
x=cdf["_x"],
y=cdf["Effektstärke"],
mode="markers",
marker={**palette_markers[idx % len(palette_markers)], "size": 10},
name=f"Cluster: {cluster_strengths[c]:.2f}",
text=cdf["Stichwort"],
customdata=np.stack([cdf["Kapitelname"], cdf["Clusterstärke"], cdf["Thermometer_ID"]], axis=-1),
hovertemplate=hovertemplate
))
# Centroid-Linien (horizontale Ø d pro Cluster)
for c in clusters_sorted:
x0, x1 = block_bounds[c]
yd = cluster_strengths[c]
centroid_color = colors.get("depthArea", "#444")
line_style = dict(styles.get("linie_secondaryLine", {"width": 2}))
line_style["color"] = centroid_color
fig.add_trace(go.Scatter(
x=[x0, x1],
y=[yd, yd],
mode="lines",
line=line_style,
name=None,
showlegend=False,
hovertemplate=f"Cluster-Mittel: {yd:.2f}<extra></extra>"
))
# Vertikale Trennlinien zwischen Cluster-Blöcken (zur Orientierung)
# (nur als dezente Linien, keine Legende)
block_edges = sorted({bounds[1] + 1 for bounds in block_bounds.values()})
for edge in block_edges[:-1]: # letzte Kante führt bereits zum Abstand
fig.add_vline(x=edge - 1, line=dict(color=colors.get("depthArea"), width=1, dash="dot"))
fig.update_layout(plotly_template.get_standard_layout(
f"Effektstärke × Cluster ({title_suffix}) ({kapitel_label}) Silhouette: {sil:.3f}",
"Thermometer (gruppiert nach Cluster)", "Cohen d"
))
fig.update_xaxes(showticklabels=False)
fig.show()
export_figure(fig, f"vl-scatter-{title_suffix}", export_fig_visual, export_fig_png)
def plot_scatter_3d(df: pd.DataFrame, cluster_labels: np.ndarray, sil: float, title_suffix: str, kapitel: int | None = None):
styles = plotly_template.get_plot_styles()
kapitel_label = f"Kapitel {kapitel}" if kapitel else "Gesamt"
tmp = df.copy()
tmp["Cluster"] = cluster_labels.astype(int)
# Clusterzentren für durchschnittliche Effektstärke
cluster_strengths = {i: float(tmp[tmp["Cluster"] == i]["Effektstärke"].mean()) for i in sorted(set(cluster_labels))}
hovertemplate = (
"Thermometer: %{text}<br>"
"Kapitel: %{customdata[0]}<br>"
"Effektstärke: %{x:.2f}<br>"
"Textdim: %{z:.2f}<br>"
"Cluster: %{customdata[1]}<extra></extra>"
)
fig = go.Figure()
clusters = sorted(tmp["Cluster"].unique())
palette_keys = ["positiveHighlight", "negativeHighlight", "accent", "brightArea"]
for idx, cluster in enumerate(clusters):
cluster_df = tmp[tmp["Cluster"] == cluster]
color_key = palette_keys[idx % len(palette_keys)]
marker_style = styles.get(f"marker_{color_key}", {})
fig.add_trace(go.Scatter3d(
x=cluster_df["Effektstärke"],
y=cluster_df["Kapitel"],
z=cluster_df["Text_Dimension"],
mode="markers",
marker={**marker_style, "size": 6},
name=f"Cluster {cluster} (Ø d = {cluster_strengths[cluster]:.2f})",
text=cluster_df["Stichwort"],
customdata=np.stack([cluster_df["Kapitelname"], cluster_df["Cluster"]], axis=-1),
hovertemplate=hovertemplate
))
fig.update_layout(plotly_template.get_standard_layout(
f"3D-Clustering (Effektstärke × Kapitel × Text) {title_suffix} ({kapitel_label}) Silhouette: {sil:.3f}",
"Effektstärke", "Kapitel", "Textdimension"
))
fig.update_layout(scene=dict(
yaxis=dict(
title="Kapitel",
tickmode="linear",
dtick=1
)
))
fig.show()
export_figure(fig, f"vl-scatter3d-{title_suffix}", export_fig_visual, export_fig_png)
# -----------------------------------------
# Pipeline
# -----------------------------------------
def analyse(csv_path: str = "Thermometer.csv", k: int = 4, kapitel: int | None = None):
# Laden
df = load_data(csv_path)
# Datenvalidierung
dq = validate_data(df)
print("\nDATA QUALITY REPORT:")
for key, val in dq.items():
print(f" {key}: {val}")
export_json(dq, "data_quality_report.json")
if kapitel is not None:
df = df[df["Kapitel"] == kapitel]
if df.empty:
print(f"Keine Daten für Kapitel {kapitel}.")
return None
# Bins
df = add_manual_bins(df)
# Systeme (psychisch/sozial) mappen (optional CSV, sonst 0/0)
df = load_system_axes(df, mapping_csv=os.path.join(os.path.dirname(__file__), "systems_mapping.csv"))
# K-Means
# Kapitelgewicht = 0.0 => Kapitel-OHE trägt nicht zur Distanz bei (kapitelübergreifendes Clustering)
labels, sil, model = run_kmeans(df, k=k, kapitel_weight=0.0)
# Silhouette je Punkt anhängen
try:
X_for_sil, _ = encode_features(df, kapitel_weight=0.0)
X_for_sil = _sanitize_X(X_for_sil, clip=1e6)
if k > 1 and len(df) > k:
df["Silhouette_point"] = silhouette_samples(X_for_sil, labels)
else:
df["Silhouette_point"] = np.nan
except Exception:
df["Silhouette_point"] = np.nan
# Zusätzliche Cluster-Visualisierungen
plot_silhouette_bars(df, labels, sil)
plot_cluster_embedding(df, labels, title_suffix=f"k{k}-embedding", kapitel_weight=0.0)
# Reports
print("" * 60)
print("ANALYSE Sicht 1 | Heuristische Bins (Grenzen: <0 | <0.40 | <0.70 | ≥0.70)")
print(df["Bin"].value_counts().reindex(["negativ", "gering", "mittel", "hoch"]).fillna(0).astype(int))
print("\nANALYSE Sicht 2 | K-Means-Clustering")
if not math.isnan(sil):
print(f"Silhouette-Score (k={k}): {sil:.3f}")
else:
print(f"Silhouette-Score (k={k}): n/a (zu wenige Daten oder k zu groß)")
if not math.isnan(sil):
# Clusterzentren (nur Effektstärke + Kapitelmittelwerte)
centers = model.cluster_centers_
print("\nClusterzentren (erste Spalte = Effektstärke, Rest = Kapitel-OHE):")
for idx, center in enumerate(centers):
eff = center[0]
print(f" Cluster {idx}: Effektstärke-Mittel {eff:.3f}")
# --- Statistik-Block ---
stats_df = describe_effects(df)
plot_table_stats(stats_df, f"Deskriptive Statistik ({'Kapitel '+str(kapitel) if kapitel else 'Gesamt'})")
# Deskriptive Statistik exportieren
try:
stats_df.to_csv(os.path.join(EXPORT_DIR, "deskriptiv.csv"))
except Exception:
pass
normality_and_qq(df, kapitel=kapitel)
df = mark_outliers_iqr(df)
if kapitel is None:
group_tests_by_kapitel(df)
text_vs_effect(df)
if kapitel is None:
chi2_bins_kapitel(df)
cluster_diagnostics(df, kapitel_weight=0.0)
profiles_df = cluster_profiles(df, labels)
try:
export_json(json.loads(profiles_df.to_json(orient="table")), "cluster_profile.json")
except Exception:
pass
# --- Signifikanz-geführte Sicht ---
df_sig = build_significance_view(df)
try:
df_sig.sort_values("Rank_score").to_csv(os.path.join(EXPORT_DIR, "signifikanz_ranking.csv"), index=False)
except Exception:
pass
plot_significance_space(df_sig)
# --- Fibonacci-Ähnlichkeit prüfen ---
try:
fib_like_analysis(df, export_prefix="fib")
except Exception as _e:
print(f"Fibonacci-Analyse übersprungen: {_e}")
# Tests zusammenfassen (vor Export sammeln)
tests_summary = {"silhouette_global": float(sil) if not math.isnan(sil) else None}
if kapitel is None and df["Kapitel"].nunique() > 1:
groups = [g.dropna().values for _, g in df.groupby("Kapitel")["Effektstärke"]]
try:
lev = levene(*groups, center='median')
tests_summary["levene_W"] = float(lev.statistic)
tests_summary["levene_p"] = float(lev.pvalue)
except Exception:
pass
try:
kw = kruskal(*groups)
n_total = sum(len(g) for g in groups)
h = float(kw.statistic)
eps2 = (h - (len(groups)-1)) / (n_total - 1) if n_total > 1 else None
tests_summary["kruskal_H"] = h
tests_summary["kruskal_p"] = float(kw.pvalue)
tests_summary["kruskal_eps2"] = float(eps2) if eps2 is not None else None
except Exception:
pass
try:
if "Text_Dimension" not in df.columns:
encode_features_3d(df)
rho, p = spearmanr(df["Text_Dimension"], df["Effektstärke"], nan_policy='omit')
tests_summary["spearman_rho_text_d"] = float(rho)
tests_summary["spearman_p_text_d"] = float(p)
except Exception:
pass
if kapitel is None and df["Kapitel"].nunique() > 1:
try:
ct = pd.crosstab(df["Kapitel"], df["Bin"]).reindex(sorted(df["Kapitel"].unique()))
chi2 = stats.chi2_contingency(ct)
tests_summary["chi2"] = float(chi2[0])
tests_summary["chi2_p"] = float(chi2[1])
tests_summary["chi2_df"] = int(chi2[2])
except Exception:
pass
export_json(tests_summary, "tests_summary.json")
# --- WERTEDATEI: Alles in einer JSON bündeln ---
if export_werte_all:
try:
# Kern-Datenzeilen
base_cols = [
"Thermometer_ID", "Stichwort", "Effektstärke", "Kapitel", "Kapitelname",
"Bin", "Text_Dimension", "Outlier_IQR", "Silhouette_point"
]
rows = _df_records(df, [c for c in base_cols if c in df.columns])
# Deskriptive Statistik als Records
desc_records = _df_records(
(stats_df.reset_index().rename(columns={"index": "Gruppe"})),
list(stats_df.reset_index().columns)
)
# Cluster-Profile als Records
prof_records = _df_records(
(profiles_df.reset_index().rename(columns={"index": "Cluster"})),
list(profiles_df.reset_index().columns)
)
# Cluster-Zentren (voll) und nur d-Komponente
centers_full = model.cluster_centers_.tolist() if hasattr(model, "cluster_centers_") else None
centers_d = [float(c[0]) for c in model.cluster_centers_] if hasattr(model, "cluster_centers_") else None
payload = {
"meta": {
"k": int(k),
"kapitel": int(kapitel) if kapitel is not None else None,
"theme": theme,
},
"data": rows,
"deskriptiv": desc_records,
"cluster": {
"silhouette_global": float(sil) if not math.isnan(sil) else None,
"centers_full": centers_full,
"centers_effekt_only": centers_d,
},
"profiles": prof_records,
"tests_summary": tests_summary if isinstance(tests_summary, dict) else {},
"data_quality": dq,
}
export_json(payload, "werte_all.json")
except Exception as _e:
pass
# Plots
plot_heatmap_kapitel_vs_d(df, kapitel=kapitel)
# Boxplots
plot_boxplots(df, kapitel=kapitel)
plot_hist(df, kapitel=kapitel)
plot_bins(df, kapitel=kapitel)
plot_scatter(df, labels, model, sil, title_suffix=f"k{k}", kapitel=kapitel)
# --- 3Achsige System-Clusteranalyse (x=Psychisch, y=Sozial, z=d) ---
try:
labels_sys3, sil_sys3, model_sys3, X_sys3 = run_kmeans_system3(df, k=k, psych_weight=1.0, sozial_weight=1.0, effekt_weight=1.0)
plot_cluster_system3(df, labels_sys3, sil_sys3, title_suffix=f"k{k}-system3d")
except Exception as _e:
print(f"System-3D-Analyse übersprungen: {_e}")
# 3D-Clustering
X3d, _ = encode_features_3d(df, kapitel_weight=0.0)
X3d = _sanitize_X(X3d, clip=1e6)
model3d = KMeans(n_clusters=k, n_init=20, random_state=42)
labels3d = model3d.fit_predict(X3d)
sil3d = silhouette_score(X3d, labels3d) if k > 1 and len(df) > k else np.nan
plot_scatter_3d(df, labels3d, sil3d, title_suffix=f"k{k}-3d", kapitel=kapitel)
# Clusterzuordnung exportieren
try:
df_export = df.copy()
df_export["Cluster"] = labels
df_export.to_csv(os.path.join(EXPORT_DIR, "clusterzuordnung.csv"), index=False)
except Exception:
pass
return {
"df": df,
"kmeans_labels": labels,
"silhouette": sil,
"model": model,
}
# -----------------------------------------
# Main
# -----------------------------------------
if __name__ == "__main__":
# Passe den CSV-Pfad an, falls die Datei woanders liegt
if analyse_all:
df_all = load_data(os.path.join(os.path.dirname(__file__), csv_file))
for kap in sorted(df_all["Kapitel"].unique()):
analyse(csv_path=os.path.join(os.path.dirname(__file__), csv_file), k=k_clusters, kapitel=kap)
else:
analyse(csv_path=os.path.join(os.path.dirname(__file__), csv_file), k=k_clusters, kapitel=selected_kapitel)