Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions PROGRESS.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ Phase docs live in `docs/progress/` (untracked, local only).
| 8 | Factor backtest Session 3 | COMPLETE — multi-horizon IC backtest on 140-ticker universe (21d/63d/126d); Phase 5 t-stats shown to be N=30 artifacts (momentum_12_1 126d: 4.149→1.506); graph_delta_eigenvector sign-flipped (t=−3.194 at 63d) → wired as centrality penalty overlay in portfolio.py; graph_customer_momentum definitively null (CLOSED); rolling_registry rebuilt (406 rows); **Phase 8 paper trader CAGR +5.97%, Sharpe 0.374, Max DD −36.68%** — degradation vs Phase 7A reflects accurate N=140 factor gates + extended cash periods | `docs/progress/phase_8.md` |
| 9 | EDGAR XBRL fundamentals Session 1 | COMPLETE — migration 0011 adds form_type/accession_number/UNIQUE; `form_xbrl.py` parser (filed≥end integrity invariant, 5 metrics, 4 revenue aliases); 74,662 rows / 137 of 140 tickers covered; 4 factors; **`fundamental_margin_compression` t=+4.834 at 126d — first NEXUS factor to PASS HLZ M=400 Bonferroni** (composer sign-flipped vs literature: compression = buy; CALM-regime t=+6.35; sub-window late-third t=+5.13 — strengthening, not decaying); registered status='approved' in signal_registry; rolling_registry refreshed (464 rows); ROA decayed (late-third t=-0.06) → NOT registered; rd_intensity / asset_growth null; **paper trader CAGR +8.72%, Sharpe 0.488, Max DD -32.68%** (vs Phase 8 baseline +5.97% / 0.374 / -36.68%) | `docs/progress/phase_9.md` |
| 9 | HGT retraining Session 2 | COMPLETE — retrained HGT on 140-ticker graph (no code changes; metadata extracted dynamically); val AUC **0.9807** at epoch 280 (vs 0.9803 / e=240 on the prior 30-ticker run); 7m 03s wall-clock; `MODEL_VERSION` bumped `hgt_link_pred_v1` → **`hgt_link_pred_v2`**; node_embeddings re-backfilled at 58 monthly snapshots → **8,120 rows** (140 × 58, dim=64); embedding validation passed (cos(NVDA,AMD)=0.98 > cos(NVDA,ARW)=0.63; per-component std median 0.05); **`graph_gnn_embedding_drift` IC backtest NULL at all horizons** (t=+0.382 @ 21d / +0.524 @ 63d / +0.368 @ 126d on N=52..57; HLZ fail by 10×); registered `status='rejected'` in signal_registry with full evidence record; paper trader unchanged from Phase 9 Session 1 | `docs/progress/phase_9.md` |
| 10 | Regime-aware aggregator Session 1 | COMPLETE — **NEGATIVE result, hypothesis refuted, flag rolled back**. Built `_apply_regime_gate` in aggregator + `fsi_value` param on `load_factor_records` + FSI wiring in paper_trader (5 new unit tests, 155/156 suite pass). Tested `non_calm_action: 'zero'` on `fundamental_margin_compression` (126d NON-CALM t=−2.31, N=8). Paper trader: **CAGR +8.72% → +7.68%, Sharpe 0.488 → 0.450, Max DD −32.68% → −35.39%** — all three metrics worsened. Monthly-horizon audit: factor made money in 4 of 6 NON-CALM forward months (gated rebalances sat at start of late-2022 recovery). 126d drag is a horizon artifact; doesn't translate to monthly rebalancing. Registry flag rolled back; regime-aware *infrastructure* retained as opt-in capability for future factors | `docs/progress/phase_10.md` |
22 changes: 21 additions & 1 deletion nexus/execution/paper_trader.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@

log = logging.getLogger(__name__)
from nexus.signals.backtest import (
CALM_FSI_THRESHOLD,
_factor_xs_dispatch,
_load_centrality_panel,
_load_embedding_panel,
_load_fsi_by_date,
_load_fundamentals_panel,
_load_price_panel,
_CentralityPanel,
Expand Down Expand Up @@ -389,6 +391,7 @@ class BacktestResult:
max_drawdown: float
monthly_nav: pl.DataFrame
vol_scalar_log: list[tuple[date, float]] = field(default_factory=list)
regime_log: list[tuple[date, float | None, str]] = field(default_factory=list)


def run_backtest(
Expand All @@ -404,12 +407,14 @@ def run_backtest(
emb = _load_embedding_panel(engine)
fund = _load_fundamentals_panel(engine)
crowding_by_q = _load_crowding(engine)
fsi_by_date: dict[date, float | None] = {}

rebalance_grid = _rebalance_dates(cent, start, end)
if not rebalance_grid:
raise RuntimeError(
"No rebalance dates in window — check centrality_history coverage"
)
fsi_by_date = _load_fsi_by_date(rebalance_grid, engine)
trading_dates = _all_trading_dates(price, start, end)
if not trading_dates:
raise RuntimeError("No trading dates in window")
Expand All @@ -425,12 +430,22 @@ def run_backtest(
rebalance_set = set(rebalance_grid)

vol_scalar_log: list[tuple[date, float]] = []
regime_log: list[tuple[date, float | None, str]] = []

for d in trading_dates:
if d in rebalance_set:
fsi_at_d = fsi_by_date.get(d)
regime = "NON-CALM" if (fsi_at_d is not None and fsi_at_d >= CALM_FSI_THRESHOLD) else "CALM"
regime_log.append((d, fsi_at_d, regime))
log.info(
"[rebalance] %s fsi=%s regime=%s",
d,
f"{fsi_at_d:+.3f}" if fsi_at_d is not None else "NA",
regime,
)
z_panel = _factor_panel(d, price, cent, emb, fund=fund)
if z_panel.height > 0:
records = load_factor_records(as_of=d, engine=engine)
records = load_factor_records(as_of=d, engine=engine, fsi_value=fsi_at_d)
weights_by_factor = compute_factor_weights(records)
alpha = compose_alpha(z_panel, weights_by_factor)
vol_df = _realised_vol(d, price)
Expand Down Expand Up @@ -479,6 +494,7 @@ def run_backtest(
max_drawdown=max_dd,
monthly_nav=monthly,
vol_scalar_log=vol_scalar_log,
regime_log=regime_log,
)


Expand Down Expand Up @@ -625,6 +641,10 @@ def _print_results(result: BacktestResult) -> None:
print(f" Vol scalar < 1.0 : {scalars_below_one:>4d} / {len(result.vol_scalar_log)} rebalances")
print(f" Avg vol scalar : {avg_scalar:>8.3f}")
print(f" NAV observations : {len(result.nav_series):>8d}")
non_calm = [(d, f) for (d, f, r) in result.regime_log if r == "NON-CALM"]
print(f" NON-CALM rebals : {len(non_calm):>4d} / {len(result.regime_log)} (regime gate eligible)")
for d, f in non_calm:
print(f" {d!s:>12} fsi={f:+.4f}")
print("\n=== Monthly NAV ===")
hdr = f"{'month_end':>12} {'nav':>14} {'cash':>14} {'gross':>14} {'drawdown':>10}"
print(hdr)
Expand Down
96 changes: 71 additions & 25 deletions nexus/signals/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@

import json
import logging
from dataclasses import dataclass
from dataclasses import dataclass, replace
from datetime import date

import polars as pl
from sqlalchemy import Engine, create_engine, text

from nexus.config import settings
from nexus.signals.backtest import CALM_FSI_THRESHOLD

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -119,15 +120,15 @@ def compose_alpha(z_scores: pl.DataFrame, weights: dict[str, float]) -> pl.DataF

_FULL_SAMPLE_QUERY = text(
"""
SELECT name, t_stat, decay_profile
SELECT name, t_stat, decay_profile, regime_profile
FROM signal_registry
WHERE status IN ('testing', 'approved', 'monitoring')
"""
)

_ROLLING_QUERY = text(
"""
SELECT sr.name,
SELECT sr.name, sr.regime_profile,
srh.mean_ic, srh.std_ic, srh.t_stat, srh.n_periods,
srh.declining, srh.trailing_12_mean_ic
FROM signal_registry sr
Expand All @@ -143,10 +144,35 @@ def compose_alpha(z_scores: pl.DataFrame, weights: dict[str, float]) -> pl.DataF
)


def _apply_regime_gate(
record: FactorRecord,
regime_profile: dict | None,
fsi_value: float | None,
) -> FactorRecord:
"""Zero a factor's mean_ic when its regime profile flags it for the current regime.

Activates when ``regime_profile['non_calm_action'] == 'zero'`` and
``fsi_value >= CALM_FSI_THRESHOLD``. The downstream ``compute_factor_weights``
rule (``mean_ic <= 0 -> weight = 0``) then naturally excludes the factor at
this rebalance without any change to the weighting logic itself.

Backward compatible: when ``fsi_value`` is None or the profile lacks the
flag, the record is returned unchanged.
"""
if fsi_value is None or not regime_profile:
return record
if regime_profile.get("non_calm_action") != "zero":
return record
if fsi_value < CALM_FSI_THRESHOLD:
return record
return replace(record, mean_ic=0.0)


def load_factor_records(
as_of: date | None = None,
*,
engine: Engine | None = None,
fsi_value: float | None = None,
) -> list[FactorRecord]:
"""Load FactorRecords for the aggregator.

Expand All @@ -168,7 +194,7 @@ def load_factor_records(
engine = create_engine(settings.database_url_sync)
try:
if as_of is not None:
records = _load_from_history(engine, as_of)
records = _load_from_history(engine, as_of, fsi_value=fsi_value)
if records is not None:
# History table populated — return whatever rows survive the
# thin-window filter. An empty list is the *correct* walk-forward
Expand All @@ -181,13 +207,15 @@ def load_factor_records(
"rolling_registry.py for walk-forward integrity.",
as_of,
)
return _load_from_full_sample(engine)
return _load_from_full_sample(engine, fsi_value=fsi_value)
finally:
if owns_engine:
engine.dispose()


def _load_from_history(engine: Engine, as_of: date) -> list[FactorRecord] | None:
def _load_from_history(
engine: Engine, as_of: date, *, fsi_value: float | None = None
) -> list[FactorRecord] | None:
"""Return rolling factor records for ``as_of``.

Returns ``None`` when the history table contains no rows for any signal at
Expand All @@ -207,21 +235,31 @@ def _load_from_history(engine: Engine, as_of: date) -> list[FactorRecord] | None
for r in rows:
if r.mean_ic is None or r.std_ic is None or r.n_periods is None or r.n_periods < 2:
continue
records.append(
FactorRecord(
name=r.name,
mean_ic=float(r.mean_ic),
std_ic=abs(float(r.std_ic)),
trailing_mean_ic=(
float(r.trailing_12_mean_ic) if r.trailing_12_mean_ic is not None else None
),
declining=bool(r.declining),
)
record = FactorRecord(
name=r.name,
mean_ic=float(r.mean_ic),
std_ic=abs(float(r.std_ic)),
trailing_mean_ic=(
float(r.trailing_12_mean_ic) if r.trailing_12_mean_ic is not None else None
),
declining=bool(r.declining),
)
regime_profile = r.regime_profile
if isinstance(regime_profile, str):
regime_profile = json.loads(regime_profile)
gated = _apply_regime_gate(record, regime_profile, fsi_value)
if gated is not record and gated.mean_ic == 0.0:
log.info(
"regime gate fired: factor=%s as_of=%s fsi=%+.4f",
record.name, as_of, fsi_value,
)
records.append(gated)
return records


def _load_from_full_sample(engine: Engine) -> list[FactorRecord]:
def _load_from_full_sample(
engine: Engine, *, fsi_value: float | None = None
) -> list[FactorRecord]:
with engine.connect() as conn:
rows = conn.execute(_FULL_SAMPLE_QUERY).fetchall()
records: list[FactorRecord] = []
Expand All @@ -243,13 +281,21 @@ def _load_from_full_sample(engine: Engine) -> list[FactorRecord]:
# std_ic = sqrt(N) * mean_ic / t_stat, derived from
# t = sqrt(N) * mean_ic / std_ic.
std_ic = (float(n_periods) ** 0.5) * float(mean_ic) / t_stat
records.append(
FactorRecord(
name=r.name,
mean_ic=float(mean_ic),
std_ic=abs(std_ic),
trailing_mean_ic=float(trailing) if trailing is not None else None,
declining=declining,
)
record = FactorRecord(
name=r.name,
mean_ic=float(mean_ic),
std_ic=abs(std_ic),
trailing_mean_ic=float(trailing) if trailing is not None else None,
declining=declining,
)
regime_profile = r.regime_profile
if isinstance(regime_profile, str):
regime_profile = json.loads(regime_profile)
gated = _apply_regime_gate(record, regime_profile, fsi_value)
if gated is not record and gated.mean_ic == 0.0:
log.info(
"regime gate fired (full-sample): factor=%s fsi=%+.4f",
record.name, fsi_value,
)
records.append(gated)
return records
7 changes: 7 additions & 0 deletions scripts/register_margin_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
"Strengthens over time: thirds at 126d horizon t = +0.21 / -4.33 / -5.13 "
"(early/mid/late). Late window is strongest — opposite of momentum-style decay."
),
# Phase 10 Session 1 tested `non_calm_action: 'zero'` and rolled it back:
# the NON-CALM drag is a 126d artifact (N=8) that does not hold at the
# paper trader's monthly horizon. Of 6 NON-CALM rebalances in the
# backtest window, 4 produced positive forward-month returns under the
# baseline (no-gate) trader. Gating reduced CAGR +8.72% -> +7.68%,
# Sharpe 0.488 -> 0.450, deepened Max DD -32.68% -> -35.39%. See
# docs/progress/phase_10.md.
}


Expand Down
50 changes: 50 additions & 0 deletions tests/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

from nexus.signals.aggregator import (
FactorRecord,
_apply_regime_gate,
compose_alpha,
compute_factor_weights,
)
from nexus.signals.backtest import CALM_FSI_THRESHOLD


def _rec(
Expand Down Expand Up @@ -114,6 +116,54 @@ def test_all_factors_excluded_returns_empty_weights():
assert sum(w.values()) == 0.0


# ---------------------------------------------------------------------------
# _apply_regime_gate — regime-conditional zeroing of mean_ic
# ---------------------------------------------------------------------------


def test_regime_gate_zeros_factor_in_non_calm():
rec = _rec("fundamental_margin_compression", mean_ic=0.05, trailing_mean_ic=0.04)
profile = {"non_calm_action": "zero"}
out = _apply_regime_gate(rec, profile, fsi_value=0.3)
assert out.mean_ic == 0.0
assert out.name == rec.name
assert out.std_ic == rec.std_ic
assert out.trailing_mean_ic == rec.trailing_mean_ic
assert out.declining == rec.declining


def test_regime_gate_passes_factor_in_calm():
rec = _rec("fundamental_margin_compression", mean_ic=0.05, trailing_mean_ic=0.04)
profile = {"non_calm_action": "zero"}
out = _apply_regime_gate(rec, profile, fsi_value=-0.5)
assert out.mean_ic == 0.05
assert out == rec


def test_regime_gate_absent_when_fsi_none():
"""Backward compatibility: callers that pass fsi_value=None see no change."""
rec = _rec("fundamental_margin_compression", mean_ic=0.05, trailing_mean_ic=0.04)
profile = {"non_calm_action": "zero"}
out = _apply_regime_gate(rec, profile, fsi_value=None)
assert out == rec


def test_regime_gate_ignores_factors_without_flag():
rec = _rec("graph_delta_eigenvector", mean_ic=0.05, trailing_mean_ic=0.04)
# Factor with no non_calm_action key is unaffected regardless of FSI.
assert _apply_regime_gate(rec, {}, fsi_value=2.0) == rec
assert _apply_regime_gate(rec, None, fsi_value=2.0) == rec
assert _apply_regime_gate(rec, {"role": "graph_alpha"}, fsi_value=2.0) == rec


def test_regime_gate_threshold_boundary():
"""fsi_value == threshold (0.0) is treated as NON-CALM (fsi >= threshold)."""
rec = _rec("fundamental_margin_compression", mean_ic=0.05, trailing_mean_ic=0.04)
profile = {"non_calm_action": "zero"}
out = _apply_regime_gate(rec, profile, fsi_value=CALM_FSI_THRESHOLD)
assert out.mean_ic == 0.0


# ---------------------------------------------------------------------------
# compose_alpha — pure z-score blending
# ---------------------------------------------------------------------------
Expand Down
Loading