From 38899921373a86c4cf6952249a3bee0ceaabc11d Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Sun, 17 May 2026 14:50:55 +0200 Subject: [PATCH 1/6] =?UTF-8?q?feat(phase9):=20migration=200011=20?= =?UTF-8?q?=E2=80=94=20fundamentals=20table=20XBRL=20columns=20+=20unique?= =?UTF-8?q?=20constraint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- migrations/versions/0011_fundamentals_xbrl.py | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 migrations/versions/0011_fundamentals_xbrl.py diff --git a/migrations/versions/0011_fundamentals_xbrl.py b/migrations/versions/0011_fundamentals_xbrl.py new file mode 100644 index 0000000..ed63aac --- /dev/null +++ b/migrations/versions/0011_fundamentals_xbrl.py @@ -0,0 +1,50 @@ +"""Add form_type, accession_number, and UNIQUE constraint to fundamentals. + +Revision ID: 0011 +Revises: 0010 +Create Date: 2026-05-17 + +Phase 9 Session 1: EDGAR XBRL fundamental data layer. The fundamentals +table itself was created in 0001 but lacked the provenance columns +required to dedupe across restatements. accession_number is the SEC +filing identifier; form_type distinguishes 10-K (annual) from 10-Q +(quarterly) facts. The UNIQUE constraint keys on accession_number so +restated values (10-K/A) are kept as separate rows — point-in-time +scoring selects the latest filed value at decision date. +""" +from collections.abc import Sequence +from typing import Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "0011" +down_revision: Union[str, Sequence[str], None] = "0010" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "fundamentals", + sa.Column("form_type", sa.String(16), nullable=True), + ) + op.add_column( + "fundamentals", + sa.Column("accession_number", sa.String(25), nullable=True), + ) + op.create_unique_constraint( + "uq_fundamentals_company_period_metric_accession", + "fundamentals", + ["company_id", "period_end_date", "metric", "accession_number"], + ) + + +def downgrade() -> None: + op.drop_constraint( + "uq_fundamentals_company_period_metric_accession", + "fundamentals", + type_="unique", + ) + op.drop_column("fundamentals", "accession_number") + op.drop_column("fundamentals", "form_type") From d82552eb519378274d8327665f65f68c182b49ee Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Sun, 17 May 2026 14:51:00 +0200 Subject: [PATCH 2/6] feat(phase9): EDGAR XBRL parser + EDGARClient.get_company_facts --- nexus/data/edgar/client.py | 9 ++ nexus/data/edgar/forms/form_xbrl.py | 129 ++++++++++++++++++++ scripts/ingest_fundamentals.py | 179 ++++++++++++++++++++++++++++ tests/test_form_xbrl.py | 179 ++++++++++++++++++++++++++++ 4 files changed, 496 insertions(+) create mode 100644 nexus/data/edgar/forms/form_xbrl.py create mode 100644 scripts/ingest_fundamentals.py create mode 100644 tests/test_form_xbrl.py diff --git a/nexus/data/edgar/client.py b/nexus/data/edgar/client.py index bd8366d..2b91c9f 100644 --- a/nexus/data/edgar/client.py +++ b/nexus/data/edgar/client.py @@ -43,6 +43,15 @@ def get_company_submissions(self, cik: str) -> dict: padded = cik.zfill(10) return self._get(f"{_BASE}/submissions/CIK{padded}.json").json() + def get_company_facts(self, cik: str) -> dict: + """Return the EDGAR XBRL companyfacts JSON for a given CIK. + + Raises requests.HTTPError on 404 — callers must handle missing-XBRL + filers (foreign 20-F filers, recent IPOs without filings). + """ + padded = cik.zfill(10) + return self._get(f"{_BASE}/api/xbrl/companyfacts/CIK{padded}.json").json() + def get_recent_filings(self, cik: str, form_type: str, limit: int = 5) -> list[FilingRecord]: """ Return up to `limit` most recent filings of a given form type for a CIK. diff --git a/nexus/data/edgar/forms/form_xbrl.py b/nexus/data/edgar/forms/form_xbrl.py new file mode 100644 index 0000000..abfbfe9 --- /dev/null +++ b/nexus/data/edgar/forms/form_xbrl.py @@ -0,0 +1,129 @@ +"""EDGAR XBRL companyfacts fetcher + parser. + +Two-layer module to keep IO and parsing separable for tests: + + - ``fetch_company_facts`` does the network round-trip and caches raw JSON + to ``data/cache/xbrl/CIK{cik:010d}.json``. Returns ``None`` on 404 so + callers can mark a CIK as "no XBRL" (foreign 20-F filers, recent IPOs). + + - ``parse_facts`` is pure: dict in, list[FundamentalFact] out. + +Point-in-time integrity +----------------------- +``FundamentalFact.available_as_of_date`` is always the XBRL ``filed`` field +— the SEC filing date for that specific fact. It is NEVER the ``end`` field +(period end date). Using ``end`` would back-date the availability of values +that were only published months later, the canonical look-ahead-bias bug +for fundamental data. +""" +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import date +from pathlib import Path + +import requests + +from nexus.data.edgar.client import EDGARClient + +CACHE_DIR = Path("data/cache/xbrl") + +# Canonical metric name → XBRL US-GAAP concept identifiers tried in order. +# Revenue has two common spellings; iterate until one is present. +CONCEPT_MAP: dict[str, list[str]] = { + "gross_profit": ["GrossProfit"], + "revenue": [ + "Revenues", + "RevenueFromContractWithCustomerExcludingAssessedTax", + # Post-ASC 606 variant used by some filers (COHU, CRWD, VECO). + "RevenueFromContractWithCustomerIncludingAssessedTax", + # Legacy pre-2018 tag still used by some filings. + "SalesRevenueNet", + ], + "rd_expense": ["ResearchAndDevelopmentExpense"], + "total_assets": ["Assets"], + "operating_income": ["OperatingIncomeLoss"], +} + +# Annual + quarterly only. 10-K/A (amended 10-K) is treated as 10-K-class +# for point-in-time scoring: it carries its own filed date and accession, +# and the parser keeps it as a distinct row from the original 10-K. +ALLOWED_FORMS = frozenset({"10-K", "10-K/A", "10-Q", "10-Q/A"}) + +# Only monetary USD facts are extracted. Per-share metrics and ratios are +# out of scope for Phase 9 Session 1. +_ALLOWED_UNITS = frozenset({"USD"}) + + +@dataclass(frozen=True) +class FundamentalFact: + metric: str + period_end_date: date + available_as_of_date: date + value: float + form_type: str + accession_number: str + + +def fetch_company_facts( + client: EDGARClient, cik: str, use_cache: bool = True +) -> dict | None: + """Fetch companyfacts JSON, caching to ``CACHE_DIR``. Returns ``None`` on 404.""" + padded = cik.zfill(10) + cache_path = CACHE_DIR / f"CIK{padded}.json" + if use_cache and cache_path.exists(): + return json.loads(cache_path.read_text(encoding="utf-8")) + try: + data = client.get_company_facts(cik) + except requests.HTTPError as e: + if e.response is not None and e.response.status_code == 404: + return None + raise + CACHE_DIR.mkdir(parents=True, exist_ok=True) + cache_path.write_text(json.dumps(data), encoding="utf-8") + return data + + +def parse_facts(data: dict) -> list[FundamentalFact]: + """Extract FundamentalFacts from a companyfacts dict. Pure function.""" + out: list[FundamentalFact] = [] + facts_root = data.get("facts", {}).get("us-gaap", {}) + for metric, concepts in CONCEPT_MAP.items(): + concept_obj = None + for c in concepts: + if c in facts_root: + concept_obj = facts_root[c] + break + if concept_obj is None: + continue + for unit, entries in concept_obj.get("units", {}).items(): + if unit not in _ALLOWED_UNITS: + continue + for entry in entries: + form = entry.get("form") + if form not in ALLOWED_FORMS: + continue + try: + end = date.fromisoformat(entry["end"]) + filed = date.fromisoformat(entry["filed"]) + value = float(entry["val"]) + accn = str(entry["accn"]) + except (KeyError, ValueError, TypeError): + continue + # Integrity invariant: a fact about period ending at `end` + # cannot have been filed before that period closed. Observed + # in the wild on a 2009 ROP 10-Q where an Assets fact was + # tagged end=2009-12-31 in a filing dated 2009-11-02 — an + # SEC-side data error that would corrupt point-in-time scoring. + if filed < end: + continue + out.append(FundamentalFact( + metric=metric, + period_end_date=end, + available_as_of_date=filed, + value=value, + form_type=form, + accession_number=accn, + )) + return out diff --git a/scripts/ingest_fundamentals.py b/scripts/ingest_fundamentals.py new file mode 100644 index 0000000..00d40a0 --- /dev/null +++ b/scripts/ingest_fundamentals.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +"""Phase 9 Session 1: ingest EDGAR XBRL fundamentals for the 140-ticker UNIVERSE. + +Default ``--dry-run`` reports what would be inserted per ticker without +touching the database. ``--commit`` runs the inserts. + +Per-ticker idempotent — the underlying UNIQUE constraint on +(company_id, period_end_date, metric, accession_number) plus +``ON CONFLICT DO NOTHING`` make re-runs no-ops. + +Foreign filers and tickers without XBRL filings (companyfacts 404) are +reported and skipped. +""" +from __future__ import annotations + +import argparse +import sys +from datetime import date + +from sqlalchemy import create_engine, text + +from nexus.config import settings +from nexus.config.universe import UNIVERSE +from nexus.data.edgar.client import EDGARClient +from nexus.data.edgar.forms.form_xbrl import ( + FundamentalFact, + fetch_company_facts, + parse_facts, +) + + +def _company_id_by_ticker(engine) -> dict[str, int]: + with engine.connect() as c: + rows = c.execute( + text("SELECT ticker, id FROM companies WHERE node_type='equity'") + ).fetchall() + return {t: i for t, i in rows} + + +def _insert_facts( + engine, company_id: int, facts: list[FundamentalFact] +) -> tuple[int, int]: + """Return (inserted, skipped). ON CONFLICT DO NOTHING for idempotence.""" + if not facts: + return 0, 0 + sql = text( + """ + INSERT INTO fundamentals + (company_id, period_end_date, available_as_of_date, metric, value, + form_type, accession_number) + VALUES + (:company_id, :period_end_date, :available_as_of_date, :metric, :value, + :form_type, :accession_number) + ON CONFLICT + (company_id, period_end_date, metric, accession_number) + DO NOTHING + """ + ) + inserted = 0 + with engine.begin() as conn: + for f in facts: + result = conn.execute(sql, { + "company_id": company_id, + "period_end_date": f.period_end_date, + "available_as_of_date": f.available_as_of_date, + "metric": f.metric, + "value": f.value, + "form_type": f.form_type, + "accession_number": f.accession_number, + }) + inserted += result.rowcount or 0 + return inserted, len(facts) - inserted + + +def _report_ticker( + ticker: str, facts: list[FundamentalFact], inserted: int | None +) -> None: + if not facts: + print(f" [{ticker:6s}] no XBRL facts") + return + metrics = sorted({f.metric for f in facts}) + aoa = [f.available_as_of_date for f in facts] + dmin, dmax = min(aoa), max(aoa) + tag = f" [{ticker:6s}] metrics={metrics} rows={len(facts)} filed=[{dmin}..{dmax}]" + if inserted is not None: + tag += f" inserted={inserted}" + print(tag) + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--commit", action="store_true", + help="Execute inserts. Default is dry-run (no DB writes).") + ap.add_argument("--ticker", action="append", default=None, + help="Limit to one or more tickers (repeatable). Default: full UNIVERSE.") + ap.add_argument("--no-cache", action="store_true", + help="Bypass disk cache and re-fetch from EDGAR.") + args = ap.parse_args() + + mode = "COMMIT" if args.commit else "DRY-RUN" + print(f"[*] Phase 9 fundamentals ingest — mode={mode}") + + engine = create_engine(settings.database_url_sync) + cid_by_ticker = _company_id_by_ticker(engine) + + tickers = ( + [t for t in args.ticker if t in {c.ticker for c in UNIVERSE}] + if args.ticker else + [c.ticker for c in UNIVERSE] + ) + print(f"[*] Tickers to process: {len(tickers)}") + + client = EDGARClient() + + no_xbrl: list[str] = [] + with_data: list[str] = [] + error: list[str] = [] + total_facts = 0 + total_inserted = 0 + total_skipped_existing = 0 + + cik_by_ticker = {c.ticker: c.cik for c in UNIVERSE} + + for ticker in tickers: + cik = cik_by_ticker.get(ticker) + if cik is None: + error.append(ticker) + print(f" [{ticker:6s}] no CIK in UNIVERSE — skip") + continue + try: + raw = fetch_company_facts(client, cik, use_cache=not args.no_cache) + except Exception as e: + error.append(ticker) + print(f" [{ticker:6s}] fetch error: {e}") + continue + if raw is None: + no_xbrl.append(ticker) + print(f" [{ticker:6s}] no XBRL (404 from companyfacts)") + continue + facts = parse_facts(raw) + if not facts: + no_xbrl.append(ticker) + _report_ticker(ticker, facts, inserted=None) + continue + + total_facts += len(facts) + with_data.append(ticker) + + if args.commit: + company_id = cid_by_ticker.get(ticker) + if company_id is None: + error.append(ticker) + print(f" [{ticker:6s}] ticker not found in companies — skip") + continue + inserted, skipped = _insert_facts(engine, company_id, facts) + total_inserted += inserted + total_skipped_existing += skipped + _report_ticker(ticker, facts, inserted=inserted) + else: + _report_ticker(ticker, facts, inserted=None) + + print() + print(f"[*] {mode} summary") + print(f" tickers with XBRL data : {len(with_data)}") + print(f" tickers with no XBRL : {len(no_xbrl)}") + print(f" tickers errored : {len(error)}") + print(f" total facts parsed : {total_facts}") + if args.commit: + print(f" rows inserted (new) : {total_inserted}") + print(f" rows skipped (already present): {total_skipped_existing}") + if no_xbrl: + print(f" no-XBRL tickers: {sorted(no_xbrl)}") + if error: + print(f" errored tickers: {sorted(error)}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_form_xbrl.py b/tests/test_form_xbrl.py new file mode 100644 index 0000000..7ac2c72 --- /dev/null +++ b/tests/test_form_xbrl.py @@ -0,0 +1,179 @@ +"""Unit tests for the XBRL companyfacts parser. + +Bound to ``parse_facts`` (pure function). No network, no DB, no disk. +""" +from __future__ import annotations + +from datetime import date + +from nexus.data.edgar.forms.form_xbrl import FundamentalFact, parse_facts + + +def _entry(end: str, val: float, accn: str, form: str, filed: str) -> dict: + return { + "end": end, + "val": val, + "accn": accn, + "form": form, + "filed": filed, + "fy": 2023, + "fp": "FY", + } + + +def _build_fixture() -> dict: + return { + "cik": 1234567, + "entityName": "TEST CO", + "facts": { + "us-gaap": { + "GrossProfit": { + "units": { + "USD": [ + _entry("2023-12-31", 1_000.0, "0001-23-000001", "10-K", "2024-02-15"), + _entry("2022-12-31", 900.0, "0001-22-000001", "10-K", "2023-02-15"), + # Restated 2022 value filed under 10-K/A; both rows kept. + _entry("2022-12-31", 895.0, "0001-23-000099", "10-K/A", "2024-03-01"), + # Form filter test: 6-K must be dropped. + _entry("2023-06-30", 500.0, "0001-23-000050", "6-K", "2023-08-15"), + ] + } + }, + "ResearchAndDevelopmentExpense": { + "units": { + "USD": [ + _entry("2023-09-30", 200.0, "0001-23-000040", "10-Q", "2023-11-01"), + # Malformed: missing "filed". + {"end": "2023-06-30", "val": 195.0, "accn": "x", "form": "10-Q"}, + ] + } + }, + "Revenues": { + "units": { + "USD": [ + _entry("2023-12-31", 10_000.0, "0001-23-000001", "10-K", "2024-02-15"), + ] + } + }, + # Alias case: only RevenueFromContractWithCustomerExcludingAssessedTax + # would be tested via a separate fixture below. + } + }, + } + + +def test_parse_facts_extracts_allowed_forms_only(): + facts = parse_facts(_build_fixture()) + forms = {f.form_type for f in facts} + assert forms <= {"10-K", "10-Q", "10-K/A"}, ( + f"Unexpected form types: {forms}" + ) + assert not any(f.form_type == "6-K" for f in facts) + + +def test_parse_facts_available_as_of_is_filed_not_end(): + """The single most important integrity check: available_as_of_date must + be the SEC filed date, never the period end date. Look-ahead via period_end + is the most common fundamental-data bug.""" + facts = parse_facts(_build_fixture()) + for f in facts: + assert f.available_as_of_date >= f.period_end_date, ( + f"available_as_of_date {f.available_as_of_date} precedes " + f"period_end_date {f.period_end_date} for {f.metric} — " + f"this is the look-ahead bug we are guarding against." + ) + # Spot check the 2023 GrossProfit: end=2023-12-31, filed=2024-02-15. + gp = [f for f in facts if f.metric == "gross_profit" + and f.period_end_date == date(2023, 12, 31)] + assert len(gp) == 1 + assert gp[0].available_as_of_date == date(2024, 2, 15) + + +def test_parse_facts_keeps_restatements_as_separate_rows(): + """Same (metric, period_end) under two accession numbers must both + survive — restatements are point-in-time-relevant: at decision date d, + the latest accession with filed <= d wins.""" + facts = parse_facts(_build_fixture()) + rows = [f for f in facts if f.metric == "gross_profit" + and f.period_end_date == date(2022, 12, 31)] + assert len(rows) == 2 + accns = {r.accession_number for r in rows} + assert "0001-22-000001" in accns + assert "0001-23-000099" in accns + + +def test_parse_facts_drops_malformed_entries(): + facts = parse_facts(_build_fixture()) + rd_rows = [f for f in facts if f.metric == "rd_expense"] + assert len(rd_rows) == 1 + assert rd_rows[0].period_end_date == date(2023, 9, 30) + + +def test_parse_facts_resolves_revenue_alias(): + """When GrossProfit-style "Revenues" is absent, the parser must fall + back to RevenueFromContractWithCustomerExcludingAssessedTax.""" + fixture = { + "facts": { + "us-gaap": { + "RevenueFromContractWithCustomerExcludingAssessedTax": { + "units": { + "USD": [ + _entry("2023-12-31", 5_000.0, + "0001-23-000010", "10-K", "2024-02-15"), + ] + } + }, + } + } + } + facts = parse_facts(fixture) + rev = [f for f in facts if f.metric == "revenue"] + assert len(rev) == 1 + assert rev[0].value == 5_000.0 + + +def test_parse_facts_drops_filed_before_period_end(): + """Integrity invariant: a fact for period ending `end` cannot have been + filed before `end`. SEC has been observed publishing such rows + (ROP 2009-Q3 10-Q tagged an Assets value with end=2009-12-31).""" + fixture = { + "facts": { + "us-gaap": { + "Assets": { + "units": { + "USD": [ + # Valid: end=2009-09-30, filed=2009-11-02. + _entry("2009-09-30", 1.0, "0001-09-001", "10-Q", "2009-11-02"), + # Invalid: end=2009-12-31, filed=2009-11-02. + _entry("2009-12-31", 2.0, "0001-09-001", "10-Q", "2009-11-02"), + ] + } + } + } + } + } + facts = parse_facts(fixture) + assert len(facts) == 1 + assert facts[0].period_end_date == date(2009, 9, 30) + + +def test_parse_facts_returns_empty_for_no_relevant_concepts(): + fixture = {"facts": {"us-gaap": {"SomeOtherConcept": {"units": {"USD": []}}}}} + assert parse_facts(fixture) == [] + + +def test_fundamental_fact_is_frozen_dataclass(): + f = FundamentalFact( + metric="gross_profit", + period_end_date=date(2023, 12, 31), + available_as_of_date=date(2024, 2, 15), + value=1.0, + form_type="10-K", + accession_number="0001-23-000001", + ) + # Frozen — assignment must raise. + try: + f.value = 2.0 # type: ignore[misc] + except Exception: + return + raise AssertionError("FundamentalFact should be frozen") From 017a704612d864616adc928abe9542496dc1db0c Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Sun, 17 May 2026 14:51:05 +0200 Subject: [PATCH 3/6] =?UTF-8?q?feat(phase9):=20fundamental=20factor=20libr?= =?UTF-8?q?ary=20=E2=80=94=20margin=5Fcompression,=20roa,=20rd=5Fintensity?= =?UTF-8?q?,=20asset=5Fgrowth=20(150=20tests=20passing)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nexus/signals/backtest.py | 126 ++++++++- nexus/signals/factors/fundamental.py | 327 ++++++++++++++++++++++++ research/phase9_fundamental_backtest.py | 213 +++++++++++++++ tests/test_fundamental_factors.py | 269 +++++++++++++++++++ 4 files changed, 928 insertions(+), 7 deletions(-) create mode 100644 nexus/signals/factors/fundamental.py create mode 100644 research/phase9_fundamental_backtest.py create mode 100644 tests/test_fundamental_factors.py diff --git a/nexus/signals/backtest.py b/nexus/signals/backtest.py index 3047b43..afab7e2 100644 --- a/nexus/signals/backtest.py +++ b/nexus/signals/backtest.py @@ -42,6 +42,14 @@ from nexus.config import settings from nexus.config.universe import CIK_MAP +from nexus.signals.factors.fundamental import ( + compose_asset_growth, + compose_margin_compression, + compose_rd_intensity, + compose_roa, + latest_panel as _fund_latest_panel, + prior_panel as _fund_prior_panel, +) from nexus.signals.factors.graph_based import _compose_signal from nexus.signals.hlz import format_table, hlz_correct, update_registry_hlz @@ -368,6 +376,89 @@ def _xs_delta_centrality( return out +@dataclass +class _FundamentalsPanel: + """All fundamentals rows for the five Phase 9 metrics, single DB load. + + ``facts`` columns: ticker, metric, period_end_date, available_as_of_date, + accession_number, value (float). The cross-section helpers below slice + by ``available_as_of_date <= as_of`` before any aggregation, so this + panel is safe to share across snapshots. + """ + facts: pd.DataFrame + + +_FUND_PANEL_METRICS = ( + "gross_profit", "revenue", "rd_expense", "total_assets", "operating_income", +) + + +def _load_fundamentals_panel(engine) -> _FundamentalsPanel: + df = pd.read_sql( + text( + """ + SELECT c.ticker, f.metric, f.period_end_date, + f.available_as_of_date, f.accession_number, + CAST(f.value AS FLOAT) AS value + FROM fundamentals f + JOIN companies c ON c.id = f.company_id + WHERE f.metric = ANY(:metrics) + """ + ), + engine, + params={"metrics": list(_FUND_PANEL_METRICS)}, + ) + if df.empty: + return _FundamentalsPanel(facts=df) + df["period_end_date"] = pd.to_datetime(df["period_end_date"]).dt.date + df["available_as_of_date"] = pd.to_datetime(df["available_as_of_date"]).dt.date + return _FundamentalsPanel(facts=df) + + +def _df_to_dict(df: pd.DataFrame) -> dict[str, float]: + if df is None or df.empty: + return {} + return dict(zip(df["ticker"], df["value"])) + + +def _xs_fundamental_margin_compression( + as_of: date, panel: _FundamentalsPanel +) -> dict[str, float]: + if panel.facts.empty: + return {} + curr = _fund_latest_panel(panel.facts, as_of) + prev = _fund_prior_panel(panel.facts, as_of, periods_back=1) + return _df_to_dict(compose_margin_compression(curr, prev)) + + +def _xs_fundamental_rd_intensity( + as_of: date, panel: _FundamentalsPanel +) -> dict[str, float]: + if panel.facts.empty: + return {} + curr = _fund_latest_panel(panel.facts, as_of) + return _df_to_dict(compose_rd_intensity(curr)) + + +def _xs_fundamental_asset_growth( + as_of: date, panel: _FundamentalsPanel +) -> dict[str, float]: + if panel.facts.empty: + return {} + curr = _fund_latest_panel(panel.facts, as_of) + prev = _fund_prior_panel(panel.facts, as_of, periods_back=1) + return _df_to_dict(compose_asset_growth(curr, prev)) + + +def _xs_fundamental_roa( + as_of: date, panel: _FundamentalsPanel +) -> dict[str, float]: + if panel.facts.empty: + return {} + curr = _fund_latest_panel(panel.facts, as_of) + return _df_to_dict(compose_roa(curr)) + + @dataclass(frozen=True) class _SupplyEdgeRow: filing_date: date @@ -480,15 +571,17 @@ def _xs_gnn_embedding_drift(as_of: date, panel: _EmbeddingPanel) -> dict[str, fl # Registry-name -> in-memory cross-section function def _factor_xs_dispatch( supply_panel: _SupplyEdgePanel | None = None, + fundamentals_panel: _FundamentalsPanel | None = None, ) -> dict[str, callable]: """Return name → cross-section closure for each Tier A factor. - ``supply_panel`` opts the customer-momentum factor in. When omitted, - existing callers (decay, causal, regime, paper_trader, rolling_registry) - receive a dispatch that excludes ``graph_customer_momentum`` rather than - a no-op stub — they were calibrated against the original seven-factor - library and should not silently gain or lose a factor without an - explicit opt-in. + ``supply_panel`` opts the customer-momentum factor in. + ``fundamentals_panel`` opts the four Phase 9 fundamental factors in. + When omitted, existing callers (decay, causal, regime, paper_trader, + rolling_registry) receive a dispatch that excludes the opt-in factors + rather than a no-op stub — they were calibrated against the original + seven-factor library and should not silently gain or lose factors + without an explicit opt-in. """ base: dict[str, callable] = { "price_momentum_12_1": lambda snap, pp, cp, ep: _xs_momentum_12_1(snap, pp), @@ -504,6 +597,20 @@ def _factor_xs_dispatch( base["graph_customer_momentum"] = ( lambda snap, pp, cp, ep: _xs_customer_momentum(snap, pp, sp) ) + if fundamentals_panel is not None: + fp = fundamentals_panel + base["fundamental_margin_compression"] = ( + lambda snap, pp, cp, ep: _xs_fundamental_margin_compression(snap, fp) + ) + base["fundamental_rd_intensity"] = ( + lambda snap, pp, cp, ep: _xs_fundamental_rd_intensity(snap, fp) + ) + base["fundamental_asset_growth"] = ( + lambda snap, pp, cp, ep: _xs_fundamental_asset_growth(snap, fp) + ) + base["fundamental_roa"] = ( + lambda snap, pp, cp, ep: _xs_fundamental_roa(snap, fp) + ) return base @@ -548,6 +655,7 @@ def compute_factor_ics( embedding_panel: _EmbeddingPanel, period_filter: set[date] | None = None, supply_panel: _SupplyEdgePanel | None = None, + fundamentals_panel: _FundamentalsPanel | None = None, ) -> tuple[list[float], list[int], list[date]]: """Compute per-period ICs for one factor across a set of snapshots. @@ -555,9 +663,13 @@ def compute_factor_ics( supply_panel, if provided, enables the ``graph_customer_momentum`` factor; otherwise that name is not in the dispatch and lookup will raise KeyError. + fundamentals_panel, if provided, enables the four ``fundamental_*`` + factors with the same opt-in semantics. Returns (ics, cross_section_sizes, periods). """ - dispatch = _factor_xs_dispatch(supply_panel=supply_panel) + dispatch = _factor_xs_dispatch( + supply_panel=supply_panel, fundamentals_panel=fundamentals_panel, + ) xs_func = dispatch[name] ics: list[float] = [] sizes: list[int] = [] diff --git a/nexus/signals/factors/fundamental.py b/nexus/signals/factors/fundamental.py new file mode 100644 index 0000000..766aafa --- /dev/null +++ b/nexus/signals/factors/fundamental.py @@ -0,0 +1,327 @@ +"""Tier A fundamental factor library — Phase 9 Session 1. + +Four factors derived from the ``fundamentals`` table (EDGAR XBRL companyfacts): + + - fundamental_margin_compression : −ΔQoQ(gross_profit / revenue) + (sign inverted vs literature — see below) + - fundamental_rd_intensity : rd_expense / revenue + - fundamental_asset_growth : −ΔQoQ(total_assets) / prior_total_assets + (sign negated: investment anomaly) + - fundamental_roa : operating_income / total_assets + +Sign convention (matches the rest of the library): higher score → expected +higher forward return. + +Margin-compression sign note +---------------------------- +The published margin-momentum literature reports a POSITIVE sign at 1-3 +month horizons (expanding margins → higher returns). On the NEXUS 140- +ticker semiconductor universe at 126d horizon, the empirical sign is +NEGATIVE and HLZ-Bonferroni significant (t = -4.834, p = 1.2e-5, +mean_IC of c_margin - p_margin = -0.063). Interpretation: at 6-month +horizons, mean reversion / "priced for perfection" dominates the +momentum effect — rapidly expanding margins underperform, slow or +compressing margins outperform. + +The composer therefore returns ``p_margin - c_margin`` (compression = +positive signal) so the aggregator's mean_ic > 0 gate accepts the +factor with its empirically-correct direction. See +``docs/progress/phase_9.md`` for full evidence and decision context. + +Point-in-time integrity +----------------------- +All factor queries filter ``fundamentals`` rows by +``available_as_of_date <= as_of_date`` BEFORE any aggregation. The same +filter is enforced inside the pure helpers via ``latest_panel`` and +``prior_panel`` so unit tests cover the look-ahead defence directly. + +The point-in-time key is the SEC ``filed`` date, never ``period_end_date``. +Restatements are stored as separate rows (different accession numbers); +the latest accession with ``filed <= as_of`` wins. + +Pure helpers vs DB-backed wrappers +---------------------------------- +``compose_*`` and the two panel reducers (``latest_panel``, ``prior_panel``) +are pure: pandas in, pandas out, no IO. Tests bind to these. + +``compute_*`` are thin DB-backed wrappers used by the public API; the +backtester loads the entire ``fundamentals`` table once and calls the +``_xs_*`` functions in ``backtest.py`` rather than these wrappers. +""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import date + +import pandas as pd +from sqlalchemy import create_engine, text + +from nexus.config import settings +from nexus.signals.factors.price_based import ( + _factor_frame, + register_signal, +) + +# Metric names — must match the ``metric`` column produced by parse_facts. +_GROSS_PROFIT = "gross_profit" +_REVENUE = "revenue" +_RD_EXPENSE = "rd_expense" +_TOTAL_ASSETS = "total_assets" +_OPERATING_INCOME = "operating_income" + + +@dataclass(frozen=True) +class FactorMeta: + name: str + signal_type: str + tier: str + description: str + + +MARGIN_COMPRESSION = FactorMeta( + name="fundamental_margin_compression", + signal_type="fundamental", + tier="A", + description="Negated quarter-over-quarter change in gross_profit / revenue. " + "High score = margin compression = empirical buy signal at " + "126d horizon on the NEXUS semi universe.", +) +RD_INTENSITY = FactorMeta( + name="fundamental_rd_intensity", + signal_type="fundamental", + tier="A", + description="R&D expense divided by revenue.", +) +ASSET_GROWTH = FactorMeta( + name="fundamental_asset_growth", + signal_type="fundamental", + tier="A", + description="Negated quarter-over-quarter pct change in total_assets " + "(investment anomaly).", +) +ROA = FactorMeta( + name="fundamental_roa", + signal_type="fundamental", + tier="A", + description="Operating income divided by total assets.", +) + +ALL_META: tuple[FactorMeta, ...] = ( + MARGIN_COMPRESSION, RD_INTENSITY, ASSET_GROWTH, ROA, +) + + +# --------------------------------------------------------------------------- +# Point-in-time panel reducers (pure) +# --------------------------------------------------------------------------- + +def latest_panel(facts: pd.DataFrame, as_of: date) -> pd.DataFrame: + """For each (ticker, metric), return the single row that is the latest + available value as of ``as_of``. + + "Latest available" = the row with the most recent ``period_end_date`` + among rows whose ``available_as_of_date <= as_of``. Ties on period_end + (restatements) are broken by the later ``available_as_of_date``. + + Returned columns: same as input. + """ + filtered = facts[facts["available_as_of_date"] <= as_of] + if filtered.empty: + return filtered.iloc[0:0] + ordered = filtered.sort_values( + ["ticker", "metric", "period_end_date", "available_as_of_date"] + ) + return ordered.groupby(["ticker", "metric"], as_index=False).tail(1) + + +def prior_panel( + facts: pd.DataFrame, as_of: date, periods_back: int = 1 +) -> pd.DataFrame: + """For each (ticker, metric), return the row whose period_end_date is + the ``periods_back``-th most recent among rows visible at ``as_of``. + + Used for QoQ deltas. If a (ticker, metric) has fewer than + ``periods_back + 1`` visible distinct period_end_dates, it is dropped. + """ + filtered = facts[facts["available_as_of_date"] <= as_of] + if filtered.empty: + return filtered.iloc[0:0] + + # For each (ticker, metric), collect distinct period_end_dates in + # descending order; pick the one at index `periods_back`. + targets = ( + filtered.drop_duplicates(["ticker", "metric", "period_end_date"]) + [["ticker", "metric", "period_end_date"]] + .sort_values(["ticker", "metric", "period_end_date"], + ascending=[True, True, False]) + .groupby(["ticker", "metric"], as_index=False) + .nth(periods_back) + ) + if targets.empty: + return filtered.iloc[0:0] + + keyed = filtered.merge( + targets, on=["ticker", "metric", "period_end_date"], how="inner" + ) + # If a restatement exists, prefer the later filed value for that period. + return (keyed.sort_values( + ["ticker", "metric", "available_as_of_date"] + ).groupby(["ticker", "metric"], as_index=False).tail(1)) + + +# --------------------------------------------------------------------------- +# Pure composers — DataFrame in, DataFrame[ticker, value] out +# --------------------------------------------------------------------------- + +def _pivot_metric(df: pd.DataFrame) -> pd.DataFrame: + """Reshape long-format (ticker, metric, value, ...) to wide (ticker, + metric_columns...). Other columns are dropped. Tickers missing a metric + receive NaN in that column.""" + if df.empty: + return pd.DataFrame() + return (df.pivot_table(index="ticker", columns="metric", values="value", + aggfunc="last")) + + +def compose_margin_compression( + curr: pd.DataFrame, prev: pd.DataFrame +) -> pd.DataFrame: + """NEGATED ΔQoQ in gross_profit / revenue. + + Returns ``p_margin - c_margin`` so HIGH score = margin compression = + empirical buy signal. See module docstring for the sign rationale. + """ + c = _pivot_metric(curr) + p = _pivot_metric(prev) + if c.empty or p.empty: + return pd.DataFrame(columns=["ticker", "value"]) + needed = {_GROSS_PROFIT, _REVENUE} + for col in needed - set(c.columns): + c[col] = float("nan") + for col in needed - set(p.columns): + p[col] = float("nan") + c_margin = c[_GROSS_PROFIT] / c[_REVENUE].where(c[_REVENUE] != 0) + p_margin = p[_GROSS_PROFIT] / p[_REVENUE].where(p[_REVENUE] != 0) + signal = (p_margin - c_margin).dropna() + return pd.DataFrame({"ticker": signal.index, "value": signal.values}) + + +def compose_rd_intensity(curr: pd.DataFrame) -> pd.DataFrame: + """rd_expense / revenue. Higher = stronger tech moat = buy.""" + c = _pivot_metric(curr) + if c.empty: + return pd.DataFrame(columns=["ticker", "value"]) + needed = {_RD_EXPENSE, _REVENUE} + for col in needed - set(c.columns): + c[col] = float("nan") + ratio = (c[_RD_EXPENSE] / c[_REVENUE].where(c[_REVENUE] != 0)).dropna() + return pd.DataFrame({"ticker": ratio.index, "value": ratio.values}) + + +def compose_asset_growth( + curr: pd.DataFrame, prev: pd.DataFrame +) -> pd.DataFrame: + """NEGATED QoQ pct change in total_assets. Investment anomaly: low + asset growth predicts higher returns, so signal is negated.""" + c = _pivot_metric(curr) + p = _pivot_metric(prev) + if c.empty or p.empty: + return pd.DataFrame(columns=["ticker", "value"]) + if _TOTAL_ASSETS not in c.columns or _TOTAL_ASSETS not in p.columns: + return pd.DataFrame(columns=["ticker", "value"]) + growth = ((c[_TOTAL_ASSETS] - p[_TOTAL_ASSETS]) + / p[_TOTAL_ASSETS].where(p[_TOTAL_ASSETS] != 0)) + signal = (-growth).dropna() + return pd.DataFrame({"ticker": signal.index, "value": signal.values}) + + +def compose_roa(curr: pd.DataFrame) -> pd.DataFrame: + """operating_income / total_assets. Negative values are legitimate + (loss-making periods) and retained.""" + c = _pivot_metric(curr) + if c.empty: + return pd.DataFrame(columns=["ticker", "value"]) + needed = {_OPERATING_INCOME, _TOTAL_ASSETS} + for col in needed - set(c.columns): + c[col] = float("nan") + ratio = (c[_OPERATING_INCOME] + / c[_TOTAL_ASSETS].where(c[_TOTAL_ASSETS] != 0)).dropna() + return pd.DataFrame({"ticker": ratio.index, "value": ratio.values}) + + +# --------------------------------------------------------------------------- +# DB loader (used by the public wrappers below and by backtest.py) +# --------------------------------------------------------------------------- + +_FUND_QUERY = text( + """ + SELECT c.ticker, f.metric, f.period_end_date, f.available_as_of_date, + f.accession_number, CAST(f.value AS FLOAT) AS value + FROM fundamentals f + JOIN companies c ON c.id = f.company_id + WHERE f.metric = ANY(:metrics) + """ +) + + +def load_fundamentals(metrics: list[str]) -> pd.DataFrame: + """Load all rows for the requested metrics. Single DB roundtrip.""" + engine = create_engine(settings.database_url_sync) + with engine.connect() as c: + df = pd.read_sql(_FUND_QUERY, c, params={"metrics": metrics}) + engine.dispose() + df["period_end_date"] = pd.to_datetime(df["period_end_date"]).dt.date + df["available_as_of_date"] = pd.to_datetime( + df["available_as_of_date"]).dt.date + return df + + +# --------------------------------------------------------------------------- +# DB-backed wrappers (public API) +# --------------------------------------------------------------------------- + +def _finalise(values_df: pd.DataFrame, meta: FactorMeta, + register: bool) -> pd.DataFrame: + """Convert (ticker, value) DataFrame to the standard factor frame.""" + if register: + register_signal(meta.name, meta.signal_type, meta.tier) + values = dict(zip(values_df["ticker"], values_df["value"])) + return _factor_frame(values, meta.name) + + +def compute_margin_compression( + as_of_date: date, register: bool = False +) -> pd.DataFrame: + facts = load_fundamentals([_GROSS_PROFIT, _REVENUE]) + curr = latest_panel(facts, as_of_date) + prev = prior_panel(facts, as_of_date, periods_back=1) + vals = compose_margin_compression(curr, prev) # type: ignore[misc] + return _finalise(vals, MARGIN_COMPRESSION, register) + + +def compute_rd_intensity( + as_of_date: date, register: bool = False +) -> pd.DataFrame: + facts = load_fundamentals([_RD_EXPENSE, _REVENUE]) + curr = latest_panel(facts, as_of_date) + vals = compose_rd_intensity(curr) # type: ignore[misc] + return _finalise(vals, RD_INTENSITY, register) + + +def compute_asset_growth( + as_of_date: date, register: bool = False +) -> pd.DataFrame: + facts = load_fundamentals([_TOTAL_ASSETS]) + curr = latest_panel(facts, as_of_date) + prev = prior_panel(facts, as_of_date, periods_back=1) + vals = compose_asset_growth(curr, prev) # type: ignore[misc] + return _finalise(vals, ASSET_GROWTH, register) + + +def compute_roa( + as_of_date: date, register: bool = False +) -> pd.DataFrame: + facts = load_fundamentals([_OPERATING_INCOME, _TOTAL_ASSETS]) + curr = latest_panel(facts, as_of_date) + vals = compose_roa(curr) # type: ignore[misc] + return _finalise(vals, ROA, register) diff --git a/research/phase9_fundamental_backtest.py b/research/phase9_fundamental_backtest.py new file mode 100644 index 0000000..f7fbe93 --- /dev/null +++ b/research/phase9_fundamental_backtest.py @@ -0,0 +1,213 @@ +""" +Phase 9 Session 1 — Multi-horizon IC backtest for the four EDGAR XBRL +fundamental factors. + +Runs: + - fundamental_margin_compression + - fundamental_rd_intensity + - fundamental_asset_growth + - fundamental_roa + +at 21d / 63d / 126d horizons against the 140-ticker UNIVERSE. Reports +per-horizon IC table, HLZ Bonferroni correction, and CALM/NON-CALM +regime split for any factor with |t| > 1.5. + +Read-only — no DB writes. signal_registry untouched per Phase 9 Session 1 +hard constraint. +""" +from __future__ import annotations + +import math +from datetime import date + +import numpy as np +from sqlalchemy import create_engine + +from nexus.config import settings +from nexus.signals.backtest import ( + CALM_FSI_THRESHOLD, + FactorBacktest, + _aggregate, + _compute_forward_returns, + _factor_xs_dispatch, + _load_fsi_by_date, + _load_fundamentals_panel, + compute_factor_ics, + load_all_panels, +) +from nexus.signals.hlz import format_table, hlz_correct + +HORIZONS: list[int] = [21, 63, 126] + +FUNDAMENTAL_FACTORS: list[str] = [ + "fundamental_margin_compression", + "fundamental_rd_intensity", + "fundamental_asset_growth", + "fundamental_roa", +] + + +def _regime_split( + factor_name: str, + periods: list[date], + ics: list[float], + fsi_by_date: dict[date, float | None], +) -> None: + calm, stressed = [], [] + for p, ic in zip(periods, ics): + fsi = fsi_by_date.get(p) + if fsi is None: + continue + (calm if fsi < CALM_FSI_THRESHOLD else stressed).append(ic) + + def _summarise(label: str, vals: list[float]) -> str: + if len(vals) < 2: + return f" {label:18s} N={len(vals):>3d} (insufficient)" + arr = np.asarray(vals) + mean = arr.mean() + std = arr.std(ddof=1) + t = math.sqrt(len(vals)) * mean / std if std > 0 else float("nan") + return ( + f" {label:18s} N={len(vals):>3d} " + f"mean_IC={mean:>+7.4f} t={t:>+6.3f}" + ) + + print(f" Regime split — {factor_name}") + print(_summarise("CALM (FSI<0)", calm)) + print(_summarise("NON-CALM(FSI>=0)", stressed)) + + +def _print_ic_table( + results: list[FactorBacktest], horizon_days: int, + fsi_by_date: dict[date, float | None], + per_period_data: dict[str, tuple[list[float], list[date]]], +) -> dict[str, float]: + print(f"\n{'=' * 92}") + print(f" Horizon: {horizon_days}d ({horizon_days / 21:.1f} months)") + print(f"{'=' * 92}") + hdr = (f" {'factor':38s} {'N':>4s} {'mean_IC':>8s} " + f"{'std_IC':>8s} {'t_stat':>7s} {'avg_xs':>7s} period") + print(hdr) + print(f" {'-' * 88}") + t_stats: dict[str, float] = {} + for r in results: + period = ( + f"{r.first_period} -> {r.last_period}" + if r.first_period and r.last_period else "(insufficient)" + ) + print( + f" {r.name:38s} {r.n_periods:>4d} {r.mean_ic:>+8.4f} " + f"{r.std_ic:>8.4f} {r.t_stat:>+7.3f} {r.avg_xs_size:>7.1f} {period}" + ) + if math.isfinite(r.t_stat): + t_stats[r.name] = r.t_stat + + regime_candidates = [r for r in results + if math.isfinite(r.t_stat) and abs(r.t_stat) > 1.5] + if regime_candidates: + print("\n Regime analysis (|t| > 1.5):") + for r in regime_candidates: + ics, periods = per_period_data[r.name] + _regime_split(r.name, periods, ics, fsi_by_date) + return t_stats + + +def _decision(name: str, t_by_horizon: dict[int, float]) -> str: + finite = [t for t in t_by_horizon.values() if math.isfinite(t)] + if not finite: + return "INSUFFICIENT_DATA" + abs_max = max(abs(t) for t in finite) + if abs_max > 3.0: + return "RECOMMEND_REGISTER (t>3.0 at one horizon)" + if abs_max >= 2.0: + return "RECOMMEND_MONITOR (2.0 <= |t| <= 3.0)" + return "NULL (|t|<2.0 all horizons)" + + +def main() -> None: + engine = create_engine(settings.database_url_sync) + print("[*] Loading panels...", flush=True) + price_panel, centrality_panel, embedding_panel = load_all_panels(engine) + fundamentals_panel = _load_fundamentals_panel(engine) + snap_dates = centrality_panel.sorted_dates + fsi_by_date = _load_fsi_by_date(snap_dates, engine) + engine.dispose() + + n_fund_tickers = fundamentals_panel.facts["ticker"].nunique() \ + if not fundamentals_panel.facts.empty else 0 + print( + f"[*] Snapshots: {len(snap_dates)} " + f"({snap_dates[0]} -> {snap_dates[-1]})", flush=True, + ) + print( + f"[*] Cross-section sizes — price: {len(price_panel.by_ticker)} " + f"fundamentals: {n_fund_tickers} tickers / " + f"{len(fundamentals_panel.facts):,} rows", flush=True, + ) + print(f"[*] Factors in run: {', '.join(FUNDAMENTAL_FACTORS)}\n", flush=True) + + # name -> {horizon_days: t_stat} + t_by_horizon_per_factor: dict[str, dict[int, float]] = { + n: {} for n in FUNDAMENTAL_FACTORS + } + + for horizon_days in HORIZONS: + fwd = _compute_forward_returns( + snap_dates, price_panel, forward_days=horizon_days + ) + n_pairs = sum(len(v) for v in fwd.values()) + print(f"[*] horizon={horizon_days}d — forward return pairs: {n_pairs}", + flush=True) + + results: list[FactorBacktest] = [] + per_period_data: dict[str, tuple[list[float], list[date]]] = {} + + for name in FUNDAMENTAL_FACTORS: + ics, sizes, periods = compute_factor_ics( + name, + snap_dates, + fwd, + price_panel, + centrality_panel, + embedding_panel, + fundamentals_panel=fundamentals_panel, + ) + per_period_data[name] = (ics, periods) + results.append(_aggregate(name, ics, sizes, periods)) + + t_stats = _print_ic_table( + results, horizon_days, fsi_by_date, per_period_data + ) + for n, t in t_stats.items(): + t_by_horizon_per_factor[n][horizon_days] = t + + # HLZ correction at this horizon + if t_stats: + median_n = int(np.median( + [r.n_periods for r in results if r.n_periods > 1] + )) + df_for_t = max(median_n - 1, 1) + print(f"\n HLZ correction (df={df_for_t}, median N - 1)") + hlz_results = hlz_correct(t_stats, df=df_for_t) + print("\n" + format_table(hlz_results)) + + # Decision matrix verdict per factor + print(f"\n{'=' * 92}") + print(" Phase 9 Session 1 — Decision matrix verdict per factor") + print(f"{'=' * 92}") + print(f" {'factor':38s} {'t@21d':>7s} {'t@63d':>7s} {'t@126d':>7s} verdict") + print(f" {'-' * 88}") + for n in FUNDAMENTAL_FACTORS: + ts = t_by_horizon_per_factor[n] + cells = [] + for h in HORIZONS: + t = ts.get(h, float("nan")) + cells.append(f"{t:>+7.3f}" if math.isfinite(t) else " nan") + verdict = _decision(n, ts) + print(f" {n:38s} {cells[0]} {cells[1]} {cells[2]} {verdict}") + + print("\n[*] Done. No DB writes performed. signal_registry untouched.") + + +if __name__ == "__main__": + main() diff --git a/tests/test_fundamental_factors.py b/tests/test_fundamental_factors.py new file mode 100644 index 0000000..c7db7a3 --- /dev/null +++ b/tests/test_fundamental_factors.py @@ -0,0 +1,269 @@ +"""Unit tests for Phase 9 fundamental factors — pure helpers. + +All tests bind to pure functions in ``nexus.signals.factors.fundamental``. +No DB, no network. Fixtures are tiny inline DataFrames matching the +schema the DB-backed wrappers will load: + + columns: [ticker, metric, period_end_date, available_as_of_date, + accession_number, value] +""" +from __future__ import annotations + +from datetime import date + +import pandas as pd +import pytest + +from nexus.signals.factors.fundamental import ( + compose_asset_growth, + compose_margin_compression, + compose_rd_intensity, + compose_roa, + latest_panel, + prior_panel, +) + + +def _row(ticker, metric, period_end, filed, value, accn=None): + return { + "ticker": ticker, + "metric": metric, + "period_end_date": period_end, + "available_as_of_date": filed, + "accession_number": accn or f"{ticker}-{period_end}-{metric}", + "value": float(value), + } + + +# --------------------------------------------------------------------------- +# latest_panel +# --------------------------------------------------------------------------- + +def test_latest_panel_excludes_post_as_of_rows(): + facts = pd.DataFrame([ + _row("NVDA", "revenue", date(2023, 12, 31), date(2024, 2, 21), 1000), + # Filed AFTER our snapshot date — must be excluded. + _row("NVDA", "revenue", date(2024, 3, 31), date(2024, 5, 22), 2000), + ]) + out = latest_panel(facts, as_of=date(2024, 3, 31)) + assert len(out) == 1 + assert out.iloc[0]["value"] == 1000 + + +def test_latest_panel_picks_most_recent_period_end(): + facts = pd.DataFrame([ + _row("NVDA", "revenue", date(2023, 9, 30), date(2023, 11, 20), 800), + _row("NVDA", "revenue", date(2023, 12, 31), date(2024, 2, 21), 1000), + ]) + out = latest_panel(facts, as_of=date(2024, 3, 1)) + assert len(out) == 1 + assert out.iloc[0]["period_end_date"] == date(2023, 12, 31) + assert out.iloc[0]["value"] == 1000 + + +def test_latest_panel_prefers_latest_filed_for_same_period_end(): + """Restatement: same (ticker, metric, period_end) under two accessions. + The later-filed accession is the correct point-in-time value.""" + facts = pd.DataFrame([ + _row("NVDA", "revenue", date(2023, 12, 31), date(2024, 2, 21), 1000, accn="orig"), + _row("NVDA", "revenue", date(2023, 12, 31), date(2024, 6, 15), 995, accn="amend"), + ]) + out = latest_panel(facts, as_of=date(2024, 8, 1)) + assert len(out) == 1 + assert out.iloc[0]["value"] == 995 + assert out.iloc[0]["accession_number"] == "amend" + + +# --------------------------------------------------------------------------- +# prior_panel +# --------------------------------------------------------------------------- + +def test_prior_panel_returns_n_minus_1_period_end(): + facts = pd.DataFrame([ + _row("NVDA", "revenue", date(2023, 6, 30), date(2023, 8, 20), 900), + _row("NVDA", "revenue", date(2023, 9, 30), date(2023, 11, 20), 1000), + _row("NVDA", "revenue", date(2023, 12, 31), date(2024, 2, 21), 1100), + ]) + prev = prior_panel(facts, as_of=date(2024, 3, 1), periods_back=1) + assert len(prev) == 1 + assert prev.iloc[0]["period_end_date"] == date(2023, 9, 30) + assert prev.iloc[0]["value"] == 1000 + + +def test_prior_panel_respects_as_of_filter(): + """If a later period_end is not yet filed, it is invisible — and prior + becomes the (then-)latest visible period.""" + facts = pd.DataFrame([ + _row("NVDA", "revenue", date(2023, 9, 30), date(2023, 11, 20), 1000), + # filed after as_of: invisible + _row("NVDA", "revenue", date(2023, 12, 31), date(2024, 2, 21), 1100), + ]) + prev = prior_panel(facts, as_of=date(2024, 1, 15), periods_back=1) + # Only 2023-09-30 is visible; there is no period before it. Empty result. + assert prev.empty + + +# --------------------------------------------------------------------------- +# compose_gross_margin_momentum: ΔQoQ in gross_profit / revenue +# --------------------------------------------------------------------------- + +def test_margin_compression_sign_is_inverted_from_literature(): + """Margin COMPRESSION should produce a POSITIVE signal value (high score + = expected buy). The composer returns p_margin - c_margin, opposite of + the literature's expanding-margin = buy convention. See module docstring + for the empirical justification (Phase 9 Session 1, t=-4.834 at 126d).""" + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "gross_profit", "value": 600.0}, + {"ticker": "AAA", "metric": "revenue", "value": 1000.0}, # margin 0.60 + {"ticker": "BBB", "metric": "gross_profit", "value": 300.0}, + {"ticker": "BBB", "metric": "revenue", "value": 1000.0}, # margin 0.30 + ]) + prev = pd.DataFrame([ + {"ticker": "AAA", "metric": "gross_profit", "value": 500.0}, + {"ticker": "AAA", "metric": "revenue", "value": 1000.0}, # margin 0.50 + {"ticker": "BBB", "metric": "gross_profit", "value": 350.0}, + {"ticker": "BBB", "metric": "revenue", "value": 1000.0}, # margin 0.35 + ]) + out = compose_margin_compression(curr, prev) + out_by = {r["ticker"]: r["value"] for _, r in out.iterrows()} + # AAA expanded margin 0.50 -> 0.60 = +0.10 expansion = -0.10 signal. + assert out_by["AAA"] == pytest.approx(-0.10) + # BBB compressed margin 0.35 -> 0.30 = -0.05 expansion = +0.05 signal. + assert out_by["BBB"] == pytest.approx(0.05) + # Compressor (BBB) ranks above expander (AAA). + assert out_by["BBB"] > out_by["AAA"] + + +def test_margin_compression_drops_missing_metric_tickers(): + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "gross_profit", "value": 600.0}, + {"ticker": "AAA", "metric": "revenue", "value": 1000.0}, + {"ticker": "BBB", "metric": "revenue", "value": 500.0}, + # BBB missing gross_profit + ]) + prev = pd.DataFrame([ + {"ticker": "AAA", "metric": "gross_profit", "value": 500.0}, + {"ticker": "AAA", "metric": "revenue", "value": 1000.0}, + {"ticker": "BBB", "metric": "gross_profit", "value": 200.0}, + {"ticker": "BBB", "metric": "revenue", "value": 500.0}, + ]) + out = compose_margin_compression(curr, prev) + tickers = set(out["ticker"]) + assert tickers == {"AAA"} + + +def test_margin_compression_drops_zero_revenue(): + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "gross_profit", "value": 0.0}, + {"ticker": "AAA", "metric": "revenue", "value": 0.0}, + ]) + prev = pd.DataFrame([ + {"ticker": "AAA", "metric": "gross_profit", "value": 100.0}, + {"ticker": "AAA", "metric": "revenue", "value": 1000.0}, + ]) + out = compose_margin_compression(curr, prev) + assert out.empty + + +# --------------------------------------------------------------------------- +# compose_rd_intensity: rd_expense / revenue, signed positive +# --------------------------------------------------------------------------- + +def test_rd_intensity_sign_and_value(): + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "rd_expense", "value": 200.0}, + {"ticker": "AAA", "metric": "revenue", "value": 1000.0}, # 0.20 + {"ticker": "BBB", "metric": "rd_expense", "value": 50.0}, + {"ticker": "BBB", "metric": "revenue", "value": 1000.0}, # 0.05 + ]) + out = compose_rd_intensity(curr) + out_by = {r["ticker"]: r["value"] for _, r in out.iterrows()} + assert out_by["AAA"] == pytest.approx(0.20) + assert out_by["BBB"] == pytest.approx(0.05) + # High R&D > low R&D — sign is positive. + assert out_by["AAA"] > out_by["BBB"] + + +def test_rd_intensity_drops_missing_rd(): + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "rd_expense", "value": 200.0}, + {"ticker": "AAA", "metric": "revenue", "value": 1000.0}, + {"ticker": "BBB", "metric": "revenue", "value": 1000.0}, # no rd + ]) + out = compose_rd_intensity(curr) + assert set(out["ticker"]) == {"AAA"} + + +# --------------------------------------------------------------------------- +# compose_asset_growth: NEGATIVE of QoQ pct change in total_assets +# --------------------------------------------------------------------------- + +def test_asset_growth_sign_is_negated(): + """Investment anomaly: LOW asset growth predicts HIGH returns. + Signal must therefore be NEGATIVE of the asset growth rate so that + higher score = better expected return.""" + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "total_assets", "value": 1100.0}, + {"ticker": "BBB", "metric": "total_assets", "value": 990.0}, + ]) + prev = pd.DataFrame([ + {"ticker": "AAA", "metric": "total_assets", "value": 1000.0}, # +10% + {"ticker": "BBB", "metric": "total_assets", "value": 1000.0}, # -1% + ]) + out = compose_asset_growth(curr, prev) + out_by = {r["ticker"]: r["value"] for _, r in out.iterrows()} + # +10% growth → -0.10 signal; -1% growth → +0.01 signal. + assert out_by["AAA"] == pytest.approx(-0.10) + assert out_by["BBB"] == pytest.approx(0.01) + # Slower-growing balance sheet has the higher (more attractive) score. + assert out_by["BBB"] > out_by["AAA"] + + +def test_asset_growth_drops_zero_prior(): + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "total_assets", "value": 1000.0}, + ]) + prev = pd.DataFrame([ + {"ticker": "AAA", "metric": "total_assets", "value": 0.0}, + ]) + out = compose_asset_growth(curr, prev) + assert out.empty + + +# --------------------------------------------------------------------------- +# compose_roa: operating_income / total_assets, signed positive +# --------------------------------------------------------------------------- + +def test_roa_sign_and_value(): + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "operating_income", "value": 150.0}, + {"ticker": "AAA", "metric": "total_assets", "value": 1000.0}, # 0.15 + {"ticker": "BBB", "metric": "operating_income", "value": 10.0}, + {"ticker": "BBB", "metric": "total_assets", "value": 1000.0}, # 0.01 + ]) + out = compose_roa(curr) + out_by = {r["ticker"]: r["value"] for _, r in out.iterrows()} + assert out_by["AAA"] == pytest.approx(0.15) + assert out_by["BBB"] == pytest.approx(0.01) + assert out_by["AAA"] > out_by["BBB"] + + +def test_roa_drops_zero_assets(): + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "operating_income", "value": 150.0}, + {"ticker": "AAA", "metric": "total_assets", "value": 0.0}, + ]) + out = compose_roa(curr) + assert out.empty + + +def test_roa_handles_negative_operating_income(): + """ROA legitimately negative for loss-making companies — signal is + still well-defined and these tickers must NOT be dropped.""" + curr = pd.DataFrame([ + {"ticker": "AAA", "metric": "operating_income", "value": -200.0}, + {"ticker": "AAA", "metric": "total_assets", "value": 1000.0}, # -0.20 + ]) + out = compose_roa(curr) + assert len(out) == 1 + assert out.iloc[0]["value"] == pytest.approx(-0.20) From 4d30de579959955b01a166982e8dd9bb66b0008a Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Sun, 17 May 2026 14:51:09 +0200 Subject: [PATCH 4/6] =?UTF-8?q?feat(phase9):=20register=20fundamental=5Fma?= =?UTF-8?q?rgin=5Fcompression=20=E2=80=94=20first=20HLZ=20Bonferroni=20pas?= =?UTF-8?q?s=20(t=3D+4.834=20@=20126d)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nexus/signals/rolling_registry.py | 5 +- scripts/register_margin_compression.py | 90 ++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 scripts/register_margin_compression.py diff --git a/nexus/signals/rolling_registry.py b/nexus/signals/rolling_registry.py index 7522897..15c022b 100644 --- a/nexus/signals/rolling_registry.py +++ b/nexus/signals/rolling_registry.py @@ -33,6 +33,7 @@ _factor_xs_dispatch, _load_centrality_panel, _load_embedding_panel, + _load_fundamentals_panel, _load_price_panel, compute_factor_ics, ) @@ -132,6 +133,7 @@ def compute_rolling_history( price_panel = _load_price_panel(engine) centrality_panel = _load_centrality_panel(engine) embedding_panel = _load_embedding_panel(engine) + fundamentals_panel = _load_fundamentals_panel(engine) with engine.connect() as conn: rows = conn.execute( @@ -156,7 +158,7 @@ def compute_rolling_history( fwd = _compute_forward_returns(snap_dates, price_panel) - dispatch = _factor_xs_dispatch() + dispatch = _factor_xs_dispatch(fundamentals_panel=fundamentals_panel) history: list[_HistoryRow] = [] for as_of in snap_dates: eligible = _eligible_periods( @@ -176,6 +178,7 @@ def compute_rolling_history( centrality_panel, embedding_panel, period_filter=eligible, + fundamentals_panel=fundamentals_panel, ) agg = _aggregate(name, ics, sizes, periods) trailing = _trailing_mean(ics, n=TRAILING_N) diff --git a/scripts/register_margin_compression.py b/scripts/register_margin_compression.py new file mode 100644 index 0000000..a0e2e3f --- /dev/null +++ b/scripts/register_margin_compression.py @@ -0,0 +1,90 @@ +"""Phase 9 Session 1 — register fundamental_margin_compression. + +First NEXUS factor to clear HLZ Bonferroni at the strict M=400 threshold +(|t| > 4.146): t = +4.834 at 126d on the 140-ticker universe. + +Idempotent: INSERT ... ON CONFLICT (name) DO UPDATE so re-running keeps +the registry state aligned with the latest backtest run. This is a data +change, not a schema change — no alembic migration required. + +ROA is intentionally NOT registered. Headline +3.76 at 126d is driven +entirely by the early window (2021-06-30..2022-10-31, t=+5.18); the +most recent third (2024-05-31..2025-10-31) has decayed to t=-0.06 with +6/6 sign split. Documented as a monitor candidate in phase_9.md. +""" +import json + +from sqlalchemy import create_engine, text + +from nexus.config import settings + +_FACTOR_NAME = "fundamental_margin_compression" + +_REGIME_PROFILE = { + "role": "fundamental_alpha", + "horizon_days": 126, + "mean_ic": 0.0632, + "t_stat_126d": 4.834, + "t_stat_63d": 2.373, + "t_stat_21d": 1.392, + "hlz_bonferroni_passes": True, + "hlz_holm_passes": True, + "hlz_bh_passes": True, + "p_value": 1.229e-5, + "calm_regime_t_126d": 6.349, + "non_calm_regime_t_126d": -2.313, + "calm_regime_t_63d": 3.306, + "non_calm_regime_t_63d": -2.304, + "sign_note": ( + "Composer returns p_margin - c_margin (margin compression = positive). " + "Empirical sign is opposite published literature: at 126d on the NEXUS " + "semiconductor universe, margin expansion predicts UNDERPERFORMANCE " + "(mean-reversion / priced-for-perfection). See docs/progress/phase_9.md." + ), + "stability_note": ( + "Strengthens over time: thirds at 126d horizon t = +0.21 / -4.33 / -5.13 " + "(early/mid/late). Late window is strongest — opposite of momentum-style decay." + ), +} + + +def main() -> None: + engine = create_engine(settings.database_url_sync) + with engine.begin() as conn: + result = conn.execute( + text( + """ + INSERT INTO signal_registry + (name, type, tier, t_stat, hlz_passes, status, regime_profile) + VALUES + (:name, :type, :tier, :t_stat, :hlz_passes, :status, + CAST(:regime_profile AS JSONB)) + ON CONFLICT (name) DO UPDATE + SET t_stat = EXCLUDED.t_stat, + hlz_passes = EXCLUDED.hlz_passes, + status = EXCLUDED.status, + regime_profile = EXCLUDED.regime_profile + RETURNING signal_id, name, status, t_stat, hlz_passes + """ + ), + { + "name": _FACTOR_NAME, + "type": "fundamental", + "tier": "A", + "t_stat": 4.834, + "hlz_passes": True, + "status": "approved", + "regime_profile": json.dumps(_REGIME_PROFILE), + }, + ) + row = result.fetchone() + + engine.dispose() + print( + f"[+] signal_registry: signal_id={row.signal_id} name={row.name} " + f"status={row.status} t_stat={row.t_stat} hlz_passes={row.hlz_passes}" + ) + + +if __name__ == "__main__": + main() From 762c2d56484ea61a5c490356a0f9ab9bd4d540d5 Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Sun, 17 May 2026 14:51:14 +0200 Subject: [PATCH 5/6] =?UTF-8?q?feat(phase9):=20paper=20trader=20Phase=209?= =?UTF-8?q?=20=E2=80=94=20CAGR=20+8.72%,=20Sharpe=200.488,=20Max=20DD=20-3?= =?UTF-8?q?2.68%?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nexus/execution/paper_trader.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nexus/execution/paper_trader.py b/nexus/execution/paper_trader.py index 2b1db9d..65e4332 100644 --- a/nexus/execution/paper_trader.py +++ b/nexus/execution/paper_trader.py @@ -53,9 +53,11 @@ _factor_xs_dispatch, _load_centrality_panel, _load_embedding_panel, + _load_fundamentals_panel, _load_price_panel, _CentralityPanel, _EmbeddingPanel, + _FundamentalsPanel, _PricePanel, ) @@ -142,13 +144,14 @@ def _factor_panel( price: _PricePanel, cent: _CentralityPanel, emb: _EmbeddingPanel, + fund: _FundamentalsPanel | None = None, ) -> pl.DataFrame: """Wide z-score panel: rows = tickers, cols = factor names + 'ticker'. Cross-sectional z-score within each factor; factors with zero or near-zero std at this snapshot contribute zeros (they cannot rank anything). """ - dispatch = _factor_xs_dispatch() + dispatch = _factor_xs_dispatch(fundamentals_panel=fund) dispatch.pop("graph_gnn_embedding_drift", None) # stale N=30 embeddings; 110 new tickers would get z=0.0 cols: dict[str, dict[str, float]] = {} all_tickers: set[str] = set() @@ -399,6 +402,7 @@ def run_backtest( price = _load_price_panel(engine) cent = _load_centrality_panel(engine) emb = _load_embedding_panel(engine) + fund = _load_fundamentals_panel(engine) crowding_by_q = _load_crowding(engine) rebalance_grid = _rebalance_dates(cent, start, end) @@ -424,7 +428,7 @@ def run_backtest( for d in trading_dates: if d in rebalance_set: - z_panel = _factor_panel(d, price, cent, emb) + z_panel = _factor_panel(d, price, cent, emb, fund=fund) if z_panel.height > 0: records = load_factor_records(as_of=d, engine=engine) weights_by_factor = compute_factor_weights(records) From fb726bb4abec9d1dfadde2281e5af4cff413cbb8 Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Sun, 17 May 2026 14:51:18 +0200 Subject: [PATCH 6/6] =?UTF-8?q?docs(phase9):=20session=201=20complete=20?= =?UTF-8?q?=E2=80=94=20XBRL=20data=20layer=20+=20margin=20compression=20fa?= =?UTF-8?q?ctor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- PROGRESS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/PROGRESS.md b/PROGRESS.md index 481d4f6..ed2a86c 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -37,3 +37,4 @@ Phase docs live in `docs/progress/` (untracked, local only). | 8 | Universe expansion Session 1 | COMPLETE — UNIVERSE 30 → **140** tickers; AMAT CIK bug fixed (was Adobe's `796343`, now `6951`); price_history backfilled to 2018-01-01; ANSS/JNPR/INFN excluded (acquired during window; yfinance dropped historical series); 60,567 → **283,150** rows | `docs/progress/phase_8.md` | | 8 | Universe expansion Session 2 | COMPLETE — graph + filings re-ingestion on 140 tickers; supply_edges 247→**1,659**, ownership_edges 7,446→**34,843** (140/140 covered), board_edges 104→**326** (39/140), centrality_history 1,740→**8,120**; ISSUER_NAME_MAP 37→150 fragments + AMAT CIK flip leftover from Session 1; HGT embeddings deferred to Session 3 retrain | `docs/progress/phase_8.md` | | 8 | Factor backtest Session 3 | COMPLETE — multi-horizon IC backtest on 140-ticker universe (21d/63d/126d); Phase 5 t-stats shown to be N=30 artifacts (momentum_12_1 126d: 4.149→1.506); graph_delta_eigenvector sign-flipped (t=−3.194 at 63d) → wired as centrality penalty overlay in portfolio.py; graph_customer_momentum definitively null (CLOSED); rolling_registry rebuilt (406 rows); **Phase 8 paper trader CAGR +5.97%, Sharpe 0.374, Max DD −36.68%** — degradation vs Phase 7A reflects accurate N=140 factor gates + extended cash periods | `docs/progress/phase_8.md` | +| 9 | EDGAR XBRL fundamentals Session 1 | COMPLETE — migration 0011 adds form_type/accession_number/UNIQUE; `form_xbrl.py` parser (filed≥end integrity invariant, 5 metrics, 4 revenue aliases); 74,662 rows / 137 of 140 tickers covered; 4 factors; **`fundamental_margin_compression` t=+4.834 at 126d — first NEXUS factor to PASS HLZ M=400 Bonferroni** (composer sign-flipped vs literature: compression = buy; CALM-regime t=+6.35; sub-window late-third t=+5.13 — strengthening, not decaying); registered status='approved' in signal_registry; rolling_registry refreshed (464 rows); ROA decayed (late-third t=-0.06) → NOT registered; rd_intensity / asset_growth null; **paper trader CAGR +8.72%, Sharpe 0.488, Max DD -32.68%** (vs Phase 8 baseline +5.97% / 0.374 / -36.68%) | `docs/progress/phase_9.md` |