Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions PROGRESS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ Phase docs live in `docs/progress/` (untracked, local only).
| Tooling | Graph explorer frontend | COMPLETE — `frontend/` (React 18 + Vite + Sigma.js v3 + Graphology); force-settled layout, node drag, equity click → detail panel, time slider (`/graph/snapshot`), edge-type filters, ticker search highlight, FSI gauge | — |
| 8 | Universe expansion Session 1 | COMPLETE — UNIVERSE 30 → **140** tickers; AMAT CIK bug fixed (was Adobe's `796343`, now `6951`); price_history backfilled to 2018-01-01; ANSS/JNPR/INFN excluded (acquired during window; yfinance dropped historical series); 60,567 → **283,150** rows | `docs/progress/phase_8.md` |
| 8 | Universe expansion Session 2 | COMPLETE — graph + filings re-ingestion on 140 tickers; supply_edges 247→**1,659**, ownership_edges 7,446→**34,843** (140/140 covered), board_edges 104→**326** (39/140), centrality_history 1,740→**8,120**; ISSUER_NAME_MAP 37→150 fragments + AMAT CIK flip leftover from Session 1; HGT embeddings deferred to Session 3 retrain | `docs/progress/phase_8.md` |
| 8 | Factor backtest Session 3 | COMPLETE — multi-horizon IC backtest on 140-ticker universe (21d/63d/126d); Phase 5 t-stats shown to be N=30 artifacts (momentum_12_1 126d: 4.149→1.506); graph_delta_eigenvector sign-flipped (t=−3.194 at 63d) → wired as centrality penalty overlay in portfolio.py; graph_customer_momentum definitively null (CLOSED); rolling_registry rebuilt (406 rows); **Phase 8 paper trader CAGR +5.97%, Sharpe 0.374, Max DD −36.68%** — degradation vs Phase 7A reflects accurate N=140 factor gates + extended cash periods | `docs/progress/phase_8.md` |
24 changes: 23 additions & 1 deletion nexus/execution/paper_trader.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def _factor_panel(
std at this snapshot contribute zeros (they cannot rank anything).
"""
dispatch = _factor_xs_dispatch()
dispatch.pop("graph_gnn_embedding_drift", None) # stale N=30 embeddings; 110 new tickers would get z=0.0
cols: dict[str, dict[str, float]] = {}
all_tickers: set[str] = set()
for name, fn in dispatch.items():
Expand Down Expand Up @@ -178,6 +179,26 @@ def _factor_panel(
return pl.DataFrame(out)


def _centrality_penalty_df(z_panel: pl.DataFrame) -> pl.DataFrame | None:
"""Percentile rank of graph_delta_eigenvector z-score for centrality penalty.

Returns ['ticker', 'centrality_percentile'] with values in [0, 1], or None
if the factor is absent or has fewer than 2 observations.
"""
col = "graph_delta_eigenvector"
if col not in z_panel.columns:
return None
sub = z_panel.select(["ticker", col]).filter(
pl.col(col).is_not_nan() & pl.col(col).is_not_null()
)
if sub.height < 2:
return None
n = sub.height
return sub.with_columns(
((pl.col(col).rank() - 1) / float(max(n - 1, 1))).alias("centrality_percentile")
).select(["ticker", "centrality_percentile"])


def _realised_vol(
snap: date, price: _PricePanel, lookback: int = VOL_LOOKBACK_TD
) -> pl.DataFrame:
Expand Down Expand Up @@ -410,7 +431,8 @@ def run_backtest(
alpha = compose_alpha(z_panel, weights_by_factor)
vol_df = _realised_vol(d, price)
crowd_df = _crowding_as_of(d, crowding_by_q)
target = build_portfolio(alpha, vol_df, crowd_df)
cent_pen = _centrality_penalty_df(z_panel)
target = build_portfolio(alpha, vol_df, crowd_df, centrality_penalty=cent_pen)

nav_so_far = pl.DataFrame(
{
Expand Down
26 changes: 22 additions & 4 deletions nexus/execution/portfolio.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,19 @@ def build_portfolio(
alpha: pl.DataFrame,
vol: pl.DataFrame,
crowding: pl.DataFrame,
centrality_penalty: pl.DataFrame | None = None,
lambda_centrality: float = 0.30,
) -> pl.DataFrame:
"""Return target weights for one rebalance date.

Inputs (each frame must carry a 'ticker' column):
alpha : columns ['ticker', 'composite_alpha']
vol : columns ['ticker', 'realised_vol_63d']
crowding : columns ['ticker', 'composite_crowding'] (may be partial;
missing tickers are treated as un-crowded)
alpha : columns ['ticker', 'composite_alpha']
vol : columns ['ticker', 'realised_vol_63d']
crowding : columns ['ticker', 'composite_crowding'] (may be partial;
missing tickers are treated as un-crowded)
centrality_penalty : optional columns ['ticker', 'centrality_percentile'] (floats
in [0, 1]). Missing tickers treated as percentile=0.5.
When None, no centrality adjustment is applied.

Output: columns ['ticker', 'weight'] sorted by weight desc.
Sum of weights is exactly 1.0 if any selection exists; empty frame if
Expand Down Expand Up @@ -72,6 +77,19 @@ def build_portfolio(
)

df = df.with_columns((pl.col("inv_vol") * pl.col("crowding_mult")).alias("raw_weight"))

if centrality_penalty is not None:
df = df.join(centrality_penalty, on="ticker", how="left")
df = df.with_columns(
pl.col("centrality_percentile").fill_null(0.5).alias("centrality_percentile")
)
df = df.with_columns(
(
pl.col("raw_weight")
* (1.0 - lambda_centrality * pl.col("centrality_percentile"))
).alias("raw_weight")
)

total = df["raw_weight"].sum()
if total <= 0:
# All names zeroed by crowding (e.g. every selected name at crowding 1.7+).
Expand Down
213 changes: 213 additions & 0 deletions research/phase8_backtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""
Phase 8 Session 3 — Multi-horizon factor backtest on 140-ticker universe.

Runs 8 factors (7 original minus graph_gnn_embedding_drift, plus
graph_customer_momentum) at 21d / 63d / 126d horizons and reports IC
statistics in Phase 5 Diagnostic A format.

graph_gnn_embedding_drift is excluded: node_embeddings covers only the
original 30 equities (HGT retrain deferred in Session 2); including it
would evaluate on a stale N=30 sub-cross-section rather than the expanded
140-ticker universe.

Read-only — no DB writes.
"""
from __future__ import annotations

import math
import sys
from datetime import date

import numpy as np
from sqlalchemy import create_engine

from nexus.config import settings
from nexus.signals.backtest import (
CALM_FSI_THRESHOLD,
FactorBacktest,
_aggregate,
_compute_forward_returns,
_factor_xs_dispatch,
_load_fsi_by_date,
_load_supply_edge_panel,
compute_factor_ics,
load_all_panels,
)
from nexus.signals.hlz import format_table, hlz_correct

HORIZONS: list[int] = [21, 63, 126]

# Phase 5 Diagnostic A t-stats (N=30 baseline) at each horizon for reference.
# 21d values from Phase 5 primary backtest; 63d and 126d from Phase 5 Diagnostic A.
PHASE5_TSTAT: dict[str, dict[int, float]] = {
"price_momentum_12_1": {21: 1.05, 63: 1.72, 126: 4.15},
"price_reversal_1m": {21: -0.83, 63: -0.42, 126: 0.43},
"price_low_volatility_63d": {21: 0.31, 63: -0.41, 126: -1.13},
"price_size_proxy": {21: 0.28, 63: 1.47, 126: 2.24},
"graph_delta_betweenness": {21: 0.12, 63: 0.21, 126: 0.09},
"graph_delta_eigenvector": {21: 0.44, 63: 0.78, 126: 1.12},
"graph_customer_momentum": {21: float("nan"), 63: float("nan"), 126: float("nan")},
}


def _regime_split(
factor_name: str,
all_periods: list[date],
all_ics: list[float],
fsi_by_date: dict[date, float | None],
) -> None:
"""Print CALM vs NON-CALM IC split for one factor."""
calm_ics, stressed_ics = [], []
for p, ic in zip(all_periods, all_ics):
fsi = fsi_by_date.get(p)
if fsi is None:
continue
if fsi < CALM_FSI_THRESHOLD:
calm_ics.append(ic)
else:
stressed_ics.append(ic)

def _summarise(label: str, ics: list[float]) -> str:
if len(ics) < 2:
return f" {label:14s} N={len(ics):>3d} (insufficient)"
arr = np.asarray(ics)
mean = arr.mean()
std = arr.std(ddof=1)
t = math.sqrt(len(ics)) * mean / std if std > 0 else float("nan")
return (
f" {label:14s} N={len(ics):>3d} "
f"mean_IC={mean:>+7.4f} t={t:>+6.3f}"
)

print(f" Regime split — {factor_name}")
print(_summarise("CALM (FSI<0)", calm_ics))
print(_summarise("NON-CALM(FSI>=0)", stressed_ics))


def _print_ic_table(
results: list[FactorBacktest],
horizon_days: int,
fsi_by_date: dict[date, float | None],
per_period_data: dict[str, tuple[list[float], list[date]]],
) -> list[tuple[str, float]]:
"""Print IC table and return list of (name, t_stat) for HLZ input."""
print(f"\n{'='*90}")
print(f" Horizon: {horizon_days}d ({horizon_days/21:.1f} months)")
print(f"{'='*90}")
hdr = (
f" {'factor':32s} {'N':>4s} {'mean_IC':>8s} "
f"{'std_IC':>8s} {'t_stat':>7s} {'avg_xs':>7s} "
f"{'t_P5':>6s} period"
)
print(hdr)
print(f" {'-'*85}")
t_pairs: list[tuple[str, float]] = []
for r in results:
p5_ref = PHASE5_TSTAT.get(r.name, {}).get(horizon_days, float("nan"))
p5_str = f"{p5_ref:>+6.3f}" if math.isfinite(p5_ref) else " n/a"
period_str = (
f"{r.first_period} -> {r.last_period}"
if r.first_period and r.last_period
else "(insufficient)"
)
print(
f" {r.name:32s} {r.n_periods:>4d} {r.mean_ic:>+8.4f} "
f"{r.std_ic:>8.4f} {r.t_stat:>+7.3f} {r.avg_xs_size:>7.1f} "
f"{p5_str} {period_str}"
)
if math.isfinite(r.t_stat):
t_pairs.append((r.name, r.t_stat))

# Regime splits for factors reaching |t| > 1.5
regime_candidates = [
r for r in results if math.isfinite(r.t_stat) and abs(r.t_stat) > 1.5
]
if regime_candidates:
print(f"\n Regime analysis (|t| > 1.5 at this horizon):")
for r in regime_candidates:
ics, periods = per_period_data[r.name]
_regime_split(r.name, periods, ics, fsi_by_date)

return t_pairs


def main() -> None:
engine = create_engine(settings.database_url_sync)
print("[*] Loading panels...", flush=True)
price_panel, centrality_panel, embedding_panel = load_all_panels(engine)
supply_panel = _load_supply_edge_panel(engine)

snap_dates = centrality_panel.sorted_dates
print(
f"[*] Snapshots: {len(snap_dates)} ({snap_dates[0]} -> {snap_dates[-1]})",
flush=True,
)
xs_counts = {
"price": len(price_panel.by_ticker),
"centrality": len(next(iter(centrality_panel.by_date.values()))),
"embedding": len(next(iter(embedding_panel.by_date.values()))),
}
print(
f"[*] Cross-section sizes — price: {xs_counts['price']} "
f"centrality: {xs_counts['centrality']} "
f"embedding: {xs_counts['embedding']} (stale — N=30, excluded from run)",
flush=True,
)

fsi_by_date = _load_fsi_by_date(snap_dates, engine)
engine.dispose()

# Build dispatch: all factors including customer_momentum, minus gnn_embedding_drift
dispatch = _factor_xs_dispatch(supply_panel=supply_panel)
excluded = dispatch.pop("graph_gnn_embedding_drift", None)
if excluded is None:
print(
"[!] graph_gnn_embedding_drift not found in dispatch — check backtest.py",
file=sys.stderr,
)
factor_names = list(dispatch.keys())
print(f"[*] Factors in run ({len(factor_names)}): {', '.join(factor_names)}")
print("[*] Excluded: graph_gnn_embedding_drift (stale N=30 embeddings)\n", flush=True)

for horizon_days in HORIZONS:
fwd = _compute_forward_returns(snap_dates, price_panel, forward_days=horizon_days)
n_pairs = sum(len(v) for v in fwd.values())
print(
f"[*] horizon={horizon_days}d — forward return pairs: {n_pairs}",
flush=True,
)

results: list[FactorBacktest] = []
per_period_data: dict[str, tuple[list[float], list[date]]] = {}

for name in factor_names:
ics, sizes, periods = compute_factor_ics(
name,
snap_dates,
fwd,
price_panel,
centrality_panel,
embedding_panel,
supply_panel=supply_panel,
)
per_period_data[name] = (ics, periods)
results.append(_aggregate(name, ics, sizes, periods))

t_pairs = _print_ic_table(results, horizon_days, fsi_by_date, per_period_data)

# HLZ multiple-testing correction
if t_pairs:
t_stats_map = dict(t_pairs)
median_n = int(np.median([r.n_periods for r in results if r.n_periods > 1]))
df_for_t = max(median_n - 1, 1)
print(f"\n HLZ correction (df={df_for_t}, median N - 1)")
hlz_results = hlz_correct(t_stats_map, df=df_for_t)
print("\n" + format_table(hlz_results))
else:
print("\n [!] No finite t-stats — skipping HLZ.")

print("\n[*] Done. No DB writes performed.")


if __name__ == "__main__":
main()
75 changes: 75 additions & 0 deletions scripts/update_registry_phase8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Phase 8 Session 3 — idempotent signal_registry update.

Updates graph_delta_eigenvector with N=140 backtest statistics:
- t_stat = -3.194 (63d horizon, N=140 universe)
- hlz_passes = false (Bonferroni threshold |t| >= 4.12)
- status = 'monitoring' (fragility indicator, not alpha)
- regime_profile: records the role and N=140 findings

Run idempotently: re-running produces the same state. The update is a
data change (not a schema change) so it does not warrant an alembic migration.
"""
import json

from sqlalchemy import create_engine, text

from nexus.config import settings

_FACTOR_NAME = "graph_delta_eigenvector"

_REGIME_PROFILE = {
"role": "fragility_indicator",
"horizon_days": 63,
"n140_mean_ic": -0.049,
"n140_t_stat": -3.194,
"n140_t_21d": -2.144,
"n140_t_126d": -2.192,
"use": "portfolio_centrality_penalty_not_alpha",
"sign_note": (
"negative_ic_high_eigenvector_predicts_underperformance — "
"connectivity implies fragility under stress, not pricing power"
),
"phase5_sign": "positive (noise at N=30)",
"phase8_sign": "robustly_negative_across_all_horizons_N=140",
}


def main() -> None:
engine = create_engine(settings.database_url_sync)
with engine.begin() as conn:
result = conn.execute(
text(
"""
UPDATE signal_registry
SET t_stat = :t_stat,
hlz_passes = :hlz_passes,
status = :status,
regime_profile = :regime_profile
WHERE name = :name
RETURNING signal_id, name, status
"""
),
{
"t_stat": -3.194,
"hlz_passes": False,
"status": "monitoring",
"regime_profile": json.dumps(_REGIME_PROFILE),
"name": _FACTOR_NAME,
},
)
row = result.fetchone()

engine.dispose()

if row is None:
print(f"[!] No row found for name='{_FACTOR_NAME}' — nothing updated.")
else:
print(
f"[+] Updated signal_registry: "
f"signal_id={row.signal_id} name={row.name} status={row.status}"
)
print(f" t_stat=-3.194 hlz_passes=False regime_profile set.")


if __name__ == "__main__":
main()
Loading
Loading