From 5ce33d35e5fec2674e73b8b21f19bd3a2ba58514 Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Sun, 17 May 2026 22:34:12 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(phase10):=20regime-aware=20aggregator?= =?UTF-8?q?=20infrastructure=20=E2=80=94=20=5Fapply=5Fregime=5Fgate=20help?= =?UTF-8?q?er=20+=20fsi=5Fvalue=20kwarg=20+=20FSI=20wiring=20in=20paper=20?= =?UTF-8?q?trader=20(155=20tests=20passing)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nexus/execution/paper_trader.py | 22 +++++++- nexus/signals/aggregator.py | 96 ++++++++++++++++++++++++--------- tests/test_aggregator.py | 50 +++++++++++++++++ 3 files changed, 142 insertions(+), 26 deletions(-) diff --git a/nexus/execution/paper_trader.py b/nexus/execution/paper_trader.py index 65e4332..5a0ff94 100644 --- a/nexus/execution/paper_trader.py +++ b/nexus/execution/paper_trader.py @@ -50,9 +50,11 @@ log = logging.getLogger(__name__) from nexus.signals.backtest import ( + CALM_FSI_THRESHOLD, _factor_xs_dispatch, _load_centrality_panel, _load_embedding_panel, + _load_fsi_by_date, _load_fundamentals_panel, _load_price_panel, _CentralityPanel, @@ -389,6 +391,7 @@ class BacktestResult: max_drawdown: float monthly_nav: pl.DataFrame vol_scalar_log: list[tuple[date, float]] = field(default_factory=list) + regime_log: list[tuple[date, float | None, str]] = field(default_factory=list) def run_backtest( @@ -404,12 +407,14 @@ def run_backtest( emb = _load_embedding_panel(engine) fund = _load_fundamentals_panel(engine) crowding_by_q = _load_crowding(engine) + fsi_by_date: dict[date, float | None] = {} rebalance_grid = _rebalance_dates(cent, start, end) if not rebalance_grid: raise RuntimeError( "No rebalance dates in window — check centrality_history coverage" ) + fsi_by_date = _load_fsi_by_date(rebalance_grid, engine) trading_dates = _all_trading_dates(price, start, end) if not trading_dates: raise RuntimeError("No trading dates in window") @@ -425,12 +430,22 @@ def run_backtest( rebalance_set = set(rebalance_grid) vol_scalar_log: list[tuple[date, float]] = [] + regime_log: list[tuple[date, float | None, str]] = [] for d in trading_dates: if d in rebalance_set: + fsi_at_d = fsi_by_date.get(d) + regime = "NON-CALM" if (fsi_at_d is not None and fsi_at_d >= CALM_FSI_THRESHOLD) else "CALM" + regime_log.append((d, fsi_at_d, regime)) + log.info( + "[rebalance] %s fsi=%s regime=%s", + d, + f"{fsi_at_d:+.3f}" if fsi_at_d is not None else "NA", + regime, + ) z_panel = _factor_panel(d, price, cent, emb, fund=fund) if z_panel.height > 0: - records = load_factor_records(as_of=d, engine=engine) + records = load_factor_records(as_of=d, engine=engine, fsi_value=fsi_at_d) weights_by_factor = compute_factor_weights(records) alpha = compose_alpha(z_panel, weights_by_factor) vol_df = _realised_vol(d, price) @@ -479,6 +494,7 @@ def run_backtest( max_drawdown=max_dd, monthly_nav=monthly, vol_scalar_log=vol_scalar_log, + regime_log=regime_log, ) @@ -625,6 +641,10 @@ def _print_results(result: BacktestResult) -> None: print(f" Vol scalar < 1.0 : {scalars_below_one:>4d} / {len(result.vol_scalar_log)} rebalances") print(f" Avg vol scalar : {avg_scalar:>8.3f}") print(f" NAV observations : {len(result.nav_series):>8d}") + non_calm = [(d, f) for (d, f, r) in result.regime_log if r == "NON-CALM"] + print(f" NON-CALM rebals : {len(non_calm):>4d} / {len(result.regime_log)} (regime gate eligible)") + for d, f in non_calm: + print(f" {d!s:>12} fsi={f:+.4f}") print("\n=== Monthly NAV ===") hdr = f"{'month_end':>12} {'nav':>14} {'cash':>14} {'gross':>14} {'drawdown':>10}" print(hdr) diff --git a/nexus/signals/aggregator.py b/nexus/signals/aggregator.py index f9d1d58..0d58265 100644 --- a/nexus/signals/aggregator.py +++ b/nexus/signals/aggregator.py @@ -23,13 +23,14 @@ import json import logging -from dataclasses import dataclass +from dataclasses import dataclass, replace from datetime import date import polars as pl from sqlalchemy import Engine, create_engine, text from nexus.config import settings +from nexus.signals.backtest import CALM_FSI_THRESHOLD log = logging.getLogger(__name__) @@ -119,7 +120,7 @@ def compose_alpha(z_scores: pl.DataFrame, weights: dict[str, float]) -> pl.DataF _FULL_SAMPLE_QUERY = text( """ - SELECT name, t_stat, decay_profile + SELECT name, t_stat, decay_profile, regime_profile FROM signal_registry WHERE status IN ('testing', 'approved', 'monitoring') """ @@ -127,7 +128,7 @@ def compose_alpha(z_scores: pl.DataFrame, weights: dict[str, float]) -> pl.DataF _ROLLING_QUERY = text( """ - SELECT sr.name, + SELECT sr.name, sr.regime_profile, srh.mean_ic, srh.std_ic, srh.t_stat, srh.n_periods, srh.declining, srh.trailing_12_mean_ic FROM signal_registry sr @@ -143,10 +144,35 @@ def compose_alpha(z_scores: pl.DataFrame, weights: dict[str, float]) -> pl.DataF ) +def _apply_regime_gate( + record: FactorRecord, + regime_profile: dict | None, + fsi_value: float | None, +) -> FactorRecord: + """Zero a factor's mean_ic when its regime profile flags it for the current regime. + + Activates when ``regime_profile['non_calm_action'] == 'zero'`` and + ``fsi_value >= CALM_FSI_THRESHOLD``. The downstream ``compute_factor_weights`` + rule (``mean_ic <= 0 -> weight = 0``) then naturally excludes the factor at + this rebalance without any change to the weighting logic itself. + + Backward compatible: when ``fsi_value`` is None or the profile lacks the + flag, the record is returned unchanged. + """ + if fsi_value is None or not regime_profile: + return record + if regime_profile.get("non_calm_action") != "zero": + return record + if fsi_value < CALM_FSI_THRESHOLD: + return record + return replace(record, mean_ic=0.0) + + def load_factor_records( as_of: date | None = None, *, engine: Engine | None = None, + fsi_value: float | None = None, ) -> list[FactorRecord]: """Load FactorRecords for the aggregator. @@ -168,7 +194,7 @@ def load_factor_records( engine = create_engine(settings.database_url_sync) try: if as_of is not None: - records = _load_from_history(engine, as_of) + records = _load_from_history(engine, as_of, fsi_value=fsi_value) if records is not None: # History table populated — return whatever rows survive the # thin-window filter. An empty list is the *correct* walk-forward @@ -181,13 +207,15 @@ def load_factor_records( "rolling_registry.py for walk-forward integrity.", as_of, ) - return _load_from_full_sample(engine) + return _load_from_full_sample(engine, fsi_value=fsi_value) finally: if owns_engine: engine.dispose() -def _load_from_history(engine: Engine, as_of: date) -> list[FactorRecord] | None: +def _load_from_history( + engine: Engine, as_of: date, *, fsi_value: float | None = None +) -> list[FactorRecord] | None: """Return rolling factor records for ``as_of``. Returns ``None`` when the history table contains no rows for any signal at @@ -207,21 +235,31 @@ def _load_from_history(engine: Engine, as_of: date) -> list[FactorRecord] | None for r in rows: if r.mean_ic is None or r.std_ic is None or r.n_periods is None or r.n_periods < 2: continue - records.append( - FactorRecord( - name=r.name, - mean_ic=float(r.mean_ic), - std_ic=abs(float(r.std_ic)), - trailing_mean_ic=( - float(r.trailing_12_mean_ic) if r.trailing_12_mean_ic is not None else None - ), - declining=bool(r.declining), - ) + record = FactorRecord( + name=r.name, + mean_ic=float(r.mean_ic), + std_ic=abs(float(r.std_ic)), + trailing_mean_ic=( + float(r.trailing_12_mean_ic) if r.trailing_12_mean_ic is not None else None + ), + declining=bool(r.declining), ) + regime_profile = r.regime_profile + if isinstance(regime_profile, str): + regime_profile = json.loads(regime_profile) + gated = _apply_regime_gate(record, regime_profile, fsi_value) + if gated is not record and gated.mean_ic == 0.0: + log.info( + "regime gate fired: factor=%s as_of=%s fsi=%+.4f", + record.name, as_of, fsi_value, + ) + records.append(gated) return records -def _load_from_full_sample(engine: Engine) -> list[FactorRecord]: +def _load_from_full_sample( + engine: Engine, *, fsi_value: float | None = None +) -> list[FactorRecord]: with engine.connect() as conn: rows = conn.execute(_FULL_SAMPLE_QUERY).fetchall() records: list[FactorRecord] = [] @@ -243,13 +281,21 @@ def _load_from_full_sample(engine: Engine) -> list[FactorRecord]: # std_ic = sqrt(N) * mean_ic / t_stat, derived from # t = sqrt(N) * mean_ic / std_ic. std_ic = (float(n_periods) ** 0.5) * float(mean_ic) / t_stat - records.append( - FactorRecord( - name=r.name, - mean_ic=float(mean_ic), - std_ic=abs(std_ic), - trailing_mean_ic=float(trailing) if trailing is not None else None, - declining=declining, - ) + record = FactorRecord( + name=r.name, + mean_ic=float(mean_ic), + std_ic=abs(std_ic), + trailing_mean_ic=float(trailing) if trailing is not None else None, + declining=declining, ) + regime_profile = r.regime_profile + if isinstance(regime_profile, str): + regime_profile = json.loads(regime_profile) + gated = _apply_regime_gate(record, regime_profile, fsi_value) + if gated is not record and gated.mean_ic == 0.0: + log.info( + "regime gate fired (full-sample): factor=%s fsi=%+.4f", + record.name, fsi_value, + ) + records.append(gated) return records diff --git a/tests/test_aggregator.py b/tests/test_aggregator.py index 49121c5..957e7b2 100644 --- a/tests/test_aggregator.py +++ b/tests/test_aggregator.py @@ -8,9 +8,11 @@ from nexus.signals.aggregator import ( FactorRecord, + _apply_regime_gate, compose_alpha, compute_factor_weights, ) +from nexus.signals.backtest import CALM_FSI_THRESHOLD def _rec( @@ -114,6 +116,54 @@ def test_all_factors_excluded_returns_empty_weights(): assert sum(w.values()) == 0.0 +# --------------------------------------------------------------------------- +# _apply_regime_gate — regime-conditional zeroing of mean_ic +# --------------------------------------------------------------------------- + + +def test_regime_gate_zeros_factor_in_non_calm(): + rec = _rec("fundamental_margin_compression", mean_ic=0.05, trailing_mean_ic=0.04) + profile = {"non_calm_action": "zero"} + out = _apply_regime_gate(rec, profile, fsi_value=0.3) + assert out.mean_ic == 0.0 + assert out.name == rec.name + assert out.std_ic == rec.std_ic + assert out.trailing_mean_ic == rec.trailing_mean_ic + assert out.declining == rec.declining + + +def test_regime_gate_passes_factor_in_calm(): + rec = _rec("fundamental_margin_compression", mean_ic=0.05, trailing_mean_ic=0.04) + profile = {"non_calm_action": "zero"} + out = _apply_regime_gate(rec, profile, fsi_value=-0.5) + assert out.mean_ic == 0.05 + assert out == rec + + +def test_regime_gate_absent_when_fsi_none(): + """Backward compatibility: callers that pass fsi_value=None see no change.""" + rec = _rec("fundamental_margin_compression", mean_ic=0.05, trailing_mean_ic=0.04) + profile = {"non_calm_action": "zero"} + out = _apply_regime_gate(rec, profile, fsi_value=None) + assert out == rec + + +def test_regime_gate_ignores_factors_without_flag(): + rec = _rec("graph_delta_eigenvector", mean_ic=0.05, trailing_mean_ic=0.04) + # Factor with no non_calm_action key is unaffected regardless of FSI. + assert _apply_regime_gate(rec, {}, fsi_value=2.0) == rec + assert _apply_regime_gate(rec, None, fsi_value=2.0) == rec + assert _apply_regime_gate(rec, {"role": "graph_alpha"}, fsi_value=2.0) == rec + + +def test_regime_gate_threshold_boundary(): + """fsi_value == threshold (0.0) is treated as NON-CALM (fsi >= threshold).""" + rec = _rec("fundamental_margin_compression", mean_ic=0.05, trailing_mean_ic=0.04) + profile = {"non_calm_action": "zero"} + out = _apply_regime_gate(rec, profile, fsi_value=CALM_FSI_THRESHOLD) + assert out.mean_ic == 0.0 + + # --------------------------------------------------------------------------- # compose_alpha — pure z-score blending # --------------------------------------------------------------------------- From ae5afbace7e0b025f99694e22f9a8dcfae636504 Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Sun, 17 May 2026 22:34:16 +0200 Subject: [PATCH 2/2] =?UTF-8?q?docs(phase10):=20session=201=20=E2=80=94=20?= =?UTF-8?q?regime=20gate=20negative=20result=20+=20root=20cause=20analysis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- PROGRESS.md | 1 + scripts/register_margin_compression.py | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/PROGRESS.md b/PROGRESS.md index 8475bbf..1199dc6 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -39,3 +39,4 @@ Phase docs live in `docs/progress/` (untracked, local only). | 8 | Factor backtest Session 3 | COMPLETE — multi-horizon IC backtest on 140-ticker universe (21d/63d/126d); Phase 5 t-stats shown to be N=30 artifacts (momentum_12_1 126d: 4.149→1.506); graph_delta_eigenvector sign-flipped (t=−3.194 at 63d) → wired as centrality penalty overlay in portfolio.py; graph_customer_momentum definitively null (CLOSED); rolling_registry rebuilt (406 rows); **Phase 8 paper trader CAGR +5.97%, Sharpe 0.374, Max DD −36.68%** — degradation vs Phase 7A reflects accurate N=140 factor gates + extended cash periods | `docs/progress/phase_8.md` | | 9 | EDGAR XBRL fundamentals Session 1 | COMPLETE — migration 0011 adds form_type/accession_number/UNIQUE; `form_xbrl.py` parser (filed≥end integrity invariant, 5 metrics, 4 revenue aliases); 74,662 rows / 137 of 140 tickers covered; 4 factors; **`fundamental_margin_compression` t=+4.834 at 126d — first NEXUS factor to PASS HLZ M=400 Bonferroni** (composer sign-flipped vs literature: compression = buy; CALM-regime t=+6.35; sub-window late-third t=+5.13 — strengthening, not decaying); registered status='approved' in signal_registry; rolling_registry refreshed (464 rows); ROA decayed (late-third t=-0.06) → NOT registered; rd_intensity / asset_growth null; **paper trader CAGR +8.72%, Sharpe 0.488, Max DD -32.68%** (vs Phase 8 baseline +5.97% / 0.374 / -36.68%) | `docs/progress/phase_9.md` | | 9 | HGT retraining Session 2 | COMPLETE — retrained HGT on 140-ticker graph (no code changes; metadata extracted dynamically); val AUC **0.9807** at epoch 280 (vs 0.9803 / e=240 on the prior 30-ticker run); 7m 03s wall-clock; `MODEL_VERSION` bumped `hgt_link_pred_v1` → **`hgt_link_pred_v2`**; node_embeddings re-backfilled at 58 monthly snapshots → **8,120 rows** (140 × 58, dim=64); embedding validation passed (cos(NVDA,AMD)=0.98 > cos(NVDA,ARW)=0.63; per-component std median 0.05); **`graph_gnn_embedding_drift` IC backtest NULL at all horizons** (t=+0.382 @ 21d / +0.524 @ 63d / +0.368 @ 126d on N=52..57; HLZ fail by 10×); registered `status='rejected'` in signal_registry with full evidence record; paper trader unchanged from Phase 9 Session 1 | `docs/progress/phase_9.md` | +| 10 | Regime-aware aggregator Session 1 | COMPLETE — **NEGATIVE result, hypothesis refuted, flag rolled back**. Built `_apply_regime_gate` in aggregator + `fsi_value` param on `load_factor_records` + FSI wiring in paper_trader (5 new unit tests, 155/156 suite pass). Tested `non_calm_action: 'zero'` on `fundamental_margin_compression` (126d NON-CALM t=−2.31, N=8). Paper trader: **CAGR +8.72% → +7.68%, Sharpe 0.488 → 0.450, Max DD −32.68% → −35.39%** — all three metrics worsened. Monthly-horizon audit: factor made money in 4 of 6 NON-CALM forward months (gated rebalances sat at start of late-2022 recovery). 126d drag is a horizon artifact; doesn't translate to monthly rebalancing. Registry flag rolled back; regime-aware *infrastructure* retained as opt-in capability for future factors | `docs/progress/phase_10.md` | diff --git a/scripts/register_margin_compression.py b/scripts/register_margin_compression.py index a0e2e3f..4a2543e 100644 --- a/scripts/register_margin_compression.py +++ b/scripts/register_margin_compression.py @@ -45,6 +45,13 @@ "Strengthens over time: thirds at 126d horizon t = +0.21 / -4.33 / -5.13 " "(early/mid/late). Late window is strongest — opposite of momentum-style decay." ), + # Phase 10 Session 1 tested `non_calm_action: 'zero'` and rolled it back: + # the NON-CALM drag is a 126d artifact (N=8) that does not hold at the + # paper trader's monthly horizon. Of 6 NON-CALM rebalances in the + # backtest window, 4 produced positive forward-month returns under the + # baseline (no-gate) trader. Gating reduced CAGR +8.72% -> +7.68%, + # Sharpe 0.488 -> 0.450, deepened Max DD -32.68% -> -35.39%. See + # docs/progress/phase_10.md. }