diff --git a/PROGRESS.md b/PROGRESS.md index 403f945..481d4f6 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -36,3 +36,4 @@ Phase docs live in `docs/progress/` (untracked, local only). | Tooling | Graph explorer frontend | COMPLETE — `frontend/` (React 18 + Vite + Sigma.js v3 + Graphology); force-settled layout, node drag, equity click → detail panel, time slider (`/graph/snapshot`), edge-type filters, ticker search highlight, FSI gauge | — | | 8 | Universe expansion Session 1 | COMPLETE — UNIVERSE 30 → **140** tickers; AMAT CIK bug fixed (was Adobe's `796343`, now `6951`); price_history backfilled to 2018-01-01; ANSS/JNPR/INFN excluded (acquired during window; yfinance dropped historical series); 60,567 → **283,150** rows | `docs/progress/phase_8.md` | | 8 | Universe expansion Session 2 | COMPLETE — graph + filings re-ingestion on 140 tickers; supply_edges 247→**1,659**, ownership_edges 7,446→**34,843** (140/140 covered), board_edges 104→**326** (39/140), centrality_history 1,740→**8,120**; ISSUER_NAME_MAP 37→150 fragments + AMAT CIK flip leftover from Session 1; HGT embeddings deferred to Session 3 retrain | `docs/progress/phase_8.md` | +| 8 | Factor backtest Session 3 | COMPLETE — multi-horizon IC backtest on 140-ticker universe (21d/63d/126d); Phase 5 t-stats shown to be N=30 artifacts (momentum_12_1 126d: 4.149→1.506); graph_delta_eigenvector sign-flipped (t=−3.194 at 63d) → wired as centrality penalty overlay in portfolio.py; graph_customer_momentum definitively null (CLOSED); rolling_registry rebuilt (406 rows); **Phase 8 paper trader CAGR +5.97%, Sharpe 0.374, Max DD −36.68%** — degradation vs Phase 7A reflects accurate N=140 factor gates + extended cash periods | `docs/progress/phase_8.md` | diff --git a/nexus/execution/paper_trader.py b/nexus/execution/paper_trader.py index 7c31ac6..2b1db9d 100644 --- a/nexus/execution/paper_trader.py +++ b/nexus/execution/paper_trader.py @@ -149,6 +149,7 @@ def _factor_panel( std at this snapshot contribute zeros (they cannot rank anything). """ dispatch = _factor_xs_dispatch() + dispatch.pop("graph_gnn_embedding_drift", None) # stale N=30 embeddings; 110 new tickers would get z=0.0 cols: dict[str, dict[str, float]] = {} all_tickers: set[str] = set() for name, fn in dispatch.items(): @@ -178,6 +179,26 @@ def _factor_panel( return pl.DataFrame(out) +def _centrality_penalty_df(z_panel: pl.DataFrame) -> pl.DataFrame | None: + """Percentile rank of graph_delta_eigenvector z-score for centrality penalty. + + Returns ['ticker', 'centrality_percentile'] with values in [0, 1], or None + if the factor is absent or has fewer than 2 observations. + """ + col = "graph_delta_eigenvector" + if col not in z_panel.columns: + return None + sub = z_panel.select(["ticker", col]).filter( + pl.col(col).is_not_nan() & pl.col(col).is_not_null() + ) + if sub.height < 2: + return None + n = sub.height + return sub.with_columns( + ((pl.col(col).rank() - 1) / float(max(n - 1, 1))).alias("centrality_percentile") + ).select(["ticker", "centrality_percentile"]) + + def _realised_vol( snap: date, price: _PricePanel, lookback: int = VOL_LOOKBACK_TD ) -> pl.DataFrame: @@ -410,7 +431,8 @@ def run_backtest( alpha = compose_alpha(z_panel, weights_by_factor) vol_df = _realised_vol(d, price) crowd_df = _crowding_as_of(d, crowding_by_q) - target = build_portfolio(alpha, vol_df, crowd_df) + cent_pen = _centrality_penalty_df(z_panel) + target = build_portfolio(alpha, vol_df, crowd_df, centrality_penalty=cent_pen) nav_so_far = pl.DataFrame( { diff --git a/nexus/execution/portfolio.py b/nexus/execution/portfolio.py index 7e32dce..7dadf53 100644 --- a/nexus/execution/portfolio.py +++ b/nexus/execution/portfolio.py @@ -30,14 +30,19 @@ def build_portfolio( alpha: pl.DataFrame, vol: pl.DataFrame, crowding: pl.DataFrame, + centrality_penalty: pl.DataFrame | None = None, + lambda_centrality: float = 0.30, ) -> pl.DataFrame: """Return target weights for one rebalance date. Inputs (each frame must carry a 'ticker' column): - alpha : columns ['ticker', 'composite_alpha'] - vol : columns ['ticker', 'realised_vol_63d'] - crowding : columns ['ticker', 'composite_crowding'] (may be partial; - missing tickers are treated as un-crowded) + alpha : columns ['ticker', 'composite_alpha'] + vol : columns ['ticker', 'realised_vol_63d'] + crowding : columns ['ticker', 'composite_crowding'] (may be partial; + missing tickers are treated as un-crowded) + centrality_penalty : optional columns ['ticker', 'centrality_percentile'] (floats + in [0, 1]). Missing tickers treated as percentile=0.5. + When None, no centrality adjustment is applied. Output: columns ['ticker', 'weight'] sorted by weight desc. Sum of weights is exactly 1.0 if any selection exists; empty frame if @@ -72,6 +77,19 @@ def build_portfolio( ) df = df.with_columns((pl.col("inv_vol") * pl.col("crowding_mult")).alias("raw_weight")) + + if centrality_penalty is not None: + df = df.join(centrality_penalty, on="ticker", how="left") + df = df.with_columns( + pl.col("centrality_percentile").fill_null(0.5).alias("centrality_percentile") + ) + df = df.with_columns( + ( + pl.col("raw_weight") + * (1.0 - lambda_centrality * pl.col("centrality_percentile")) + ).alias("raw_weight") + ) + total = df["raw_weight"].sum() if total <= 0: # All names zeroed by crowding (e.g. every selected name at crowding 1.7+). diff --git a/research/phase8_backtest.py b/research/phase8_backtest.py new file mode 100644 index 0000000..b6e6541 --- /dev/null +++ b/research/phase8_backtest.py @@ -0,0 +1,213 @@ +""" +Phase 8 Session 3 — Multi-horizon factor backtest on 140-ticker universe. + +Runs 8 factors (7 original minus graph_gnn_embedding_drift, plus +graph_customer_momentum) at 21d / 63d / 126d horizons and reports IC +statistics in Phase 5 Diagnostic A format. + +graph_gnn_embedding_drift is excluded: node_embeddings covers only the +original 30 equities (HGT retrain deferred in Session 2); including it +would evaluate on a stale N=30 sub-cross-section rather than the expanded +140-ticker universe. + +Read-only — no DB writes. +""" +from __future__ import annotations + +import math +import sys +from datetime import date + +import numpy as np +from sqlalchemy import create_engine + +from nexus.config import settings +from nexus.signals.backtest import ( + CALM_FSI_THRESHOLD, + FactorBacktest, + _aggregate, + _compute_forward_returns, + _factor_xs_dispatch, + _load_fsi_by_date, + _load_supply_edge_panel, + compute_factor_ics, + load_all_panels, +) +from nexus.signals.hlz import format_table, hlz_correct + +HORIZONS: list[int] = [21, 63, 126] + +# Phase 5 Diagnostic A t-stats (N=30 baseline) at each horizon for reference. +# 21d values from Phase 5 primary backtest; 63d and 126d from Phase 5 Diagnostic A. +PHASE5_TSTAT: dict[str, dict[int, float]] = { + "price_momentum_12_1": {21: 1.05, 63: 1.72, 126: 4.15}, + "price_reversal_1m": {21: -0.83, 63: -0.42, 126: 0.43}, + "price_low_volatility_63d": {21: 0.31, 63: -0.41, 126: -1.13}, + "price_size_proxy": {21: 0.28, 63: 1.47, 126: 2.24}, + "graph_delta_betweenness": {21: 0.12, 63: 0.21, 126: 0.09}, + "graph_delta_eigenvector": {21: 0.44, 63: 0.78, 126: 1.12}, + "graph_customer_momentum": {21: float("nan"), 63: float("nan"), 126: float("nan")}, +} + + +def _regime_split( + factor_name: str, + all_periods: list[date], + all_ics: list[float], + fsi_by_date: dict[date, float | None], +) -> None: + """Print CALM vs NON-CALM IC split for one factor.""" + calm_ics, stressed_ics = [], [] + for p, ic in zip(all_periods, all_ics): + fsi = fsi_by_date.get(p) + if fsi is None: + continue + if fsi < CALM_FSI_THRESHOLD: + calm_ics.append(ic) + else: + stressed_ics.append(ic) + + def _summarise(label: str, ics: list[float]) -> str: + if len(ics) < 2: + return f" {label:14s} N={len(ics):>3d} (insufficient)" + arr = np.asarray(ics) + mean = arr.mean() + std = arr.std(ddof=1) + t = math.sqrt(len(ics)) * mean / std if std > 0 else float("nan") + return ( + f" {label:14s} N={len(ics):>3d} " + f"mean_IC={mean:>+7.4f} t={t:>+6.3f}" + ) + + print(f" Regime split — {factor_name}") + print(_summarise("CALM (FSI<0)", calm_ics)) + print(_summarise("NON-CALM(FSI>=0)", stressed_ics)) + + +def _print_ic_table( + results: list[FactorBacktest], + horizon_days: int, + fsi_by_date: dict[date, float | None], + per_period_data: dict[str, tuple[list[float], list[date]]], +) -> list[tuple[str, float]]: + """Print IC table and return list of (name, t_stat) for HLZ input.""" + print(f"\n{'='*90}") + print(f" Horizon: {horizon_days}d ({horizon_days/21:.1f} months)") + print(f"{'='*90}") + hdr = ( + f" {'factor':32s} {'N':>4s} {'mean_IC':>8s} " + f"{'std_IC':>8s} {'t_stat':>7s} {'avg_xs':>7s} " + f"{'t_P5':>6s} period" + ) + print(hdr) + print(f" {'-'*85}") + t_pairs: list[tuple[str, float]] = [] + for r in results: + p5_ref = PHASE5_TSTAT.get(r.name, {}).get(horizon_days, float("nan")) + p5_str = f"{p5_ref:>+6.3f}" if math.isfinite(p5_ref) else " n/a" + period_str = ( + f"{r.first_period} -> {r.last_period}" + if r.first_period and r.last_period + else "(insufficient)" + ) + print( + f" {r.name:32s} {r.n_periods:>4d} {r.mean_ic:>+8.4f} " + f"{r.std_ic:>8.4f} {r.t_stat:>+7.3f} {r.avg_xs_size:>7.1f} " + f"{p5_str} {period_str}" + ) + if math.isfinite(r.t_stat): + t_pairs.append((r.name, r.t_stat)) + + # Regime splits for factors reaching |t| > 1.5 + regime_candidates = [ + r for r in results if math.isfinite(r.t_stat) and abs(r.t_stat) > 1.5 + ] + if regime_candidates: + print(f"\n Regime analysis (|t| > 1.5 at this horizon):") + for r in regime_candidates: + ics, periods = per_period_data[r.name] + _regime_split(r.name, periods, ics, fsi_by_date) + + return t_pairs + + +def main() -> None: + engine = create_engine(settings.database_url_sync) + print("[*] Loading panels...", flush=True) + price_panel, centrality_panel, embedding_panel = load_all_panels(engine) + supply_panel = _load_supply_edge_panel(engine) + + snap_dates = centrality_panel.sorted_dates + print( + f"[*] Snapshots: {len(snap_dates)} ({snap_dates[0]} -> {snap_dates[-1]})", + flush=True, + ) + xs_counts = { + "price": len(price_panel.by_ticker), + "centrality": len(next(iter(centrality_panel.by_date.values()))), + "embedding": len(next(iter(embedding_panel.by_date.values()))), + } + print( + f"[*] Cross-section sizes — price: {xs_counts['price']} " + f"centrality: {xs_counts['centrality']} " + f"embedding: {xs_counts['embedding']} (stale — N=30, excluded from run)", + flush=True, + ) + + fsi_by_date = _load_fsi_by_date(snap_dates, engine) + engine.dispose() + + # Build dispatch: all factors including customer_momentum, minus gnn_embedding_drift + dispatch = _factor_xs_dispatch(supply_panel=supply_panel) + excluded = dispatch.pop("graph_gnn_embedding_drift", None) + if excluded is None: + print( + "[!] graph_gnn_embedding_drift not found in dispatch — check backtest.py", + file=sys.stderr, + ) + factor_names = list(dispatch.keys()) + print(f"[*] Factors in run ({len(factor_names)}): {', '.join(factor_names)}") + print("[*] Excluded: graph_gnn_embedding_drift (stale N=30 embeddings)\n", flush=True) + + for horizon_days in HORIZONS: + fwd = _compute_forward_returns(snap_dates, price_panel, forward_days=horizon_days) + n_pairs = sum(len(v) for v in fwd.values()) + print( + f"[*] horizon={horizon_days}d — forward return pairs: {n_pairs}", + flush=True, + ) + + results: list[FactorBacktest] = [] + per_period_data: dict[str, tuple[list[float], list[date]]] = {} + + for name in factor_names: + ics, sizes, periods = compute_factor_ics( + name, + snap_dates, + fwd, + price_panel, + centrality_panel, + embedding_panel, + supply_panel=supply_panel, + ) + per_period_data[name] = (ics, periods) + results.append(_aggregate(name, ics, sizes, periods)) + + t_pairs = _print_ic_table(results, horizon_days, fsi_by_date, per_period_data) + + # HLZ multiple-testing correction + if t_pairs: + t_stats_map = dict(t_pairs) + median_n = int(np.median([r.n_periods for r in results if r.n_periods > 1])) + df_for_t = max(median_n - 1, 1) + print(f"\n HLZ correction (df={df_for_t}, median N - 1)") + hlz_results = hlz_correct(t_stats_map, df=df_for_t) + print("\n" + format_table(hlz_results)) + else: + print("\n [!] No finite t-stats — skipping HLZ.") + + print("\n[*] Done. No DB writes performed.") + + +if __name__ == "__main__": + main() diff --git a/scripts/update_registry_phase8.py b/scripts/update_registry_phase8.py new file mode 100644 index 0000000..8e4368c --- /dev/null +++ b/scripts/update_registry_phase8.py @@ -0,0 +1,75 @@ +"""Phase 8 Session 3 — idempotent signal_registry update. + +Updates graph_delta_eigenvector with N=140 backtest statistics: + - t_stat = -3.194 (63d horizon, N=140 universe) + - hlz_passes = false (Bonferroni threshold |t| >= 4.12) + - status = 'monitoring' (fragility indicator, not alpha) + - regime_profile: records the role and N=140 findings + +Run idempotently: re-running produces the same state. The update is a +data change (not a schema change) so it does not warrant an alembic migration. +""" +import json + +from sqlalchemy import create_engine, text + +from nexus.config import settings + +_FACTOR_NAME = "graph_delta_eigenvector" + +_REGIME_PROFILE = { + "role": "fragility_indicator", + "horizon_days": 63, + "n140_mean_ic": -0.049, + "n140_t_stat": -3.194, + "n140_t_21d": -2.144, + "n140_t_126d": -2.192, + "use": "portfolio_centrality_penalty_not_alpha", + "sign_note": ( + "negative_ic_high_eigenvector_predicts_underperformance — " + "connectivity implies fragility under stress, not pricing power" + ), + "phase5_sign": "positive (noise at N=30)", + "phase8_sign": "robustly_negative_across_all_horizons_N=140", +} + + +def main() -> None: + engine = create_engine(settings.database_url_sync) + with engine.begin() as conn: + result = conn.execute( + text( + """ + UPDATE signal_registry + SET t_stat = :t_stat, + hlz_passes = :hlz_passes, + status = :status, + regime_profile = :regime_profile + WHERE name = :name + RETURNING signal_id, name, status + """ + ), + { + "t_stat": -3.194, + "hlz_passes": False, + "status": "monitoring", + "regime_profile": json.dumps(_REGIME_PROFILE), + "name": _FACTOR_NAME, + }, + ) + row = result.fetchone() + + engine.dispose() + + if row is None: + print(f"[!] No row found for name='{_FACTOR_NAME}' — nothing updated.") + else: + print( + f"[+] Updated signal_registry: " + f"signal_id={row.signal_id} name={row.name} status={row.status}" + ) + print(f" t_stat=-3.194 hlz_passes=False regime_profile set.") + + +if __name__ == "__main__": + main() diff --git a/tests/test_portfolio.py b/tests/test_portfolio.py index 0299f1d..a7c518f 100644 --- a/tests/test_portfolio.py +++ b/tests/test_portfolio.py @@ -221,3 +221,59 @@ def test_cap_redistribution_preserves_sum(): alpha, vol, crowd = _inputs(alphas, vols=vols) out = build_portfolio(alpha, vol, crowd) assert out["weight"].sum() == pytest.approx(1.0) + + +# --------------------------------------------------------------------------- +# Centrality penalty +# --------------------------------------------------------------------------- + + +def test_centrality_penalty_reduces_high_centrality_weight(): + """High-centrality name gets smaller weight proportional to lambda. + + Use TOP_K names (A + B + fillers) so the 15% single-name cap does not + equalise A and B before the ratio can be measured. + A percentile=0.0 -> multiplier 1.0; B percentile=1.0 -> multiplier 0.70. + Fillers at percentile=0.5 (neutral) dilute both A and B below the cap. + """ + filler_n = TOP_K - 2 + filler_tickers = [f"FIL{i}" for i in range(filler_n)] + alphas = {"A": 1.0, "B": 1.0} + alphas.update({t: 1.0 for t in filler_tickers}) + alpha, vol, crowd = _inputs(alphas) + penalty = pl.DataFrame({ + "ticker": ["A", "B"] + filler_tickers, + "centrality_percentile": [0.0, 1.0] + [0.5] * filler_n, + }) + out = build_portfolio(alpha, vol, crowd, centrality_penalty=penalty) + by_t = {r["ticker"]: r["weight"] for r in out.iter_rows(named=True)} + assert by_t["A"] / by_t["B"] == pytest.approx(1.0 / 0.70, rel=1e-3) + assert out["weight"].sum() == pytest.approx(1.0) + + +def test_centrality_penalty_missing_ticker_defaults_neutral(): + """Ticker absent from penalty frame gets centrality_percentile=0.5 (15% discount). + + Use TOP_K names so the cap does not bind. Only A is in the penalty frame; + B and fillers are absent and take fill_null(0.5). + """ + filler_n = TOP_K - 2 + filler_tickers = [f"FIL{i}" for i in range(filler_n)] + alphas = {"A": 1.0, "B": 1.0} + alphas.update({t: 1.0 for t in filler_tickers}) + alpha, vol, crowd = _inputs(alphas) + # Only A is in penalty; B and fillers take fill_null(0.5) -> multiplier 0.85 + penalty = pl.DataFrame({"ticker": ["A"], "centrality_percentile": [0.0]}) + out = build_portfolio(alpha, vol, crowd, centrality_penalty=penalty) + by_t = {r["ticker"]: r["weight"] for r in out.iter_rows(named=True)} + # A multiplier=1.0, B multiplier=1 - 0.30*0.5 = 0.85 + assert by_t["A"] / by_t["B"] == pytest.approx(1.0 / 0.85, rel=1e-3) + + +def test_centrality_penalty_none_preserves_existing_behavior(): + """centrality_penalty=None produces identical output to the 3-argument call.""" + alphas = {f"T{i}": 1.0 for i in range(TOP_K)} + alpha, vol, crowd = _inputs(alphas) + assert build_portfolio(alpha, vol, crowd).equals( + build_portfolio(alpha, vol, crowd, centrality_penalty=None) + )