diff --git a/main.py b/main.py index dfb7828..d0bb9a4 100644 --- a/main.py +++ b/main.py @@ -50,7 +50,7 @@ def main(): "--mode", choices=[ "live", "once", "morning", "midday", "close", "evening", - "intra_check", "earnings_preprocess", "meta", + "intra_check", "earnings_preprocess", "meta", "weekly", ], default="once", help="Run mode", ) @@ -134,6 +134,8 @@ def main(): result = pipeline.run_earnings_preprocess() elif args.mode == "meta": result = pipeline.run_quarterly_meta_reflection(force=args.force) + elif args.mode == "weekly": + result = pipeline.run_weekly() except BaseException as exc: # Catch broadly (incl. SystemExit / KeyboardInterrupt) so a # wrapper-kill or ctrl-C still gets a notification — but diff --git a/src/execution/broker.py b/src/execution/broker.py index 22bd08d..194da77 100644 --- a/src/execution/broker.py +++ b/src/execution/broker.py @@ -227,6 +227,41 @@ def get_recent_daily_closes(self, lookback_days: int = 10) -> list[tuple[str, fl out.append((d, eq)) return out + def get_full_portfolio_history(self) -> list[tuple[str, float]]: + """All available 1D equity history from Alpaca portfolio_history. + + Returns [(et_date_str, equity), ...] oldest-first, skipping zero + rows (pre-funding). Best-effort — never raises. + """ + from datetime import datetime, timedelta, timezone + from src.util.time import ET + try: + from alpaca.trading.requests import GetPortfolioHistoryRequest + now = datetime.now(timezone.utc) + req = GetPortfolioHistoryRequest( + timeframe="1D", extended_hours=False, + start=now - timedelta(days=365 * 5), end=now, + ) + history = self.client.get_portfolio_history(history_filter=req) + except Exception as exc: + logger.warning("get_full_portfolio_history failed: %s", exc) + return [] + timestamps = getattr(history, "timestamp", None) or [] + equities = getattr(history, "equity", None) or [] + out: list[tuple[str, float]] = [] + for i, ts in enumerate(timestamps): + if i >= len(equities) or equities[i] is None: + continue + try: + d = datetime.fromtimestamp(int(ts), tz=timezone.utc).astimezone(ET).strftime("%Y-%m-%d") + eq = float(equities[i]) + except (TypeError, ValueError, OSError): + continue + if eq == 0.0: + continue # skip pre-funding rows + out.append((d, eq)) + return out + def get_positions(self) -> list[Position]: raw_positions = self.client.get_all_positions() positions = [] diff --git a/src/notifier.py b/src/notifier.py index c46e2a6..48efb64 100644 --- a/src/notifier.py +++ b/src/notifier.py @@ -103,6 +103,23 @@ def send(self, text: str) -> bool: logger.warning("Telegram notify failed: %s", exc) return False + def send_document(self, csv_bytes: bytes, filename: str, caption: str = "") -> bool: + """Send a file (e.g. CSV) via Telegram sendDocument. Best-effort.""" + if not self.enabled: + return False + try: + response = requests.post( + f"https://api.telegram.org/bot{self.token}/sendDocument", + data={"chat_id": self.chat_id, "caption": caption}, + files={"document": (filename, csv_bytes, "text/csv")}, + timeout=30.0, + ) + response.raise_for_status() + return True + except Exception as exc: + logger.warning("Telegram send_document failed: %s", exc) + return False + # === Session result formatting === # Built as a free function (not a TelegramNotifier method) so it's @@ -193,6 +210,10 @@ def format_session_result( _append_intra_check_body(lines, result) elif mode == "meta": _append_meta_body(lines, result) + elif mode == "weekly": + rows = result.get("rows", "?") + filename = result.get("filename", "") + lines.append(f"📊 {rows} rows → {filename}") lines.append(f"elapsed: {elapsed_str}") return "\n".join(lines) @@ -820,6 +841,77 @@ def _fmt_elapsed(seconds: float) -> str: return f"{minutes}m {secs}s" +def build_weekly_csv(closes: list[tuple[str, float]]) -> bytes: + """Build a P&L history CSV from portfolio_history closes. + + Columns: Date, NAV, Daily P&L, Daily Return %, Drawdown %, SPY Close, + SPY Return % + + SPY data is fetched via yfinance for the same date range. On any + yfinance failure the SPY columns are left blank. + """ + import io, csv, math + from datetime import datetime, timedelta + + if not closes: + return b"" + + # Fetch SPY closes for the same date range. + spy_closes: dict[str, float] = {} + try: + import yfinance as yf + import pandas as pd + earliest = closes[0][0] + start = (datetime.strptime(earliest, "%Y-%m-%d") - timedelta(days=5)).strftime("%Y-%m-%d") + end_dt = datetime.strptime(closes[-1][0], "%Y-%m-%d") + timedelta(days=2) + end = end_dt.strftime("%Y-%m-%d") + df = yf.download("SPY", start=start, end=end, progress=False, auto_adjust=True) + if not df.empty: + if hasattr(df.columns, "get_level_values"): + df.columns = df.columns.get_level_values(0) + # dropna()+isfinite: a NaN close (data gap / halt) is truthy as a + # float, so it would slip past the `spy_close and prev_spy` guard, + # render as "+nan" in the CSV, AND poison prev_spy for every later + # row. Keep only valid finite closes out of the dict entirely. + for dt_idx, row in df["Close"].dropna().items(): + val = float(row) + if math.isfinite(val): + spy_closes[str(dt_idx.date())] = val + except Exception as exc: + logger.warning("build_weekly_csv: SPY fetch failed: %s", exc) + + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(["Date", "NAV", "Daily P&L", "Daily Return %", "Drawdown %", "SPY Close", "SPY Return %"]) + + prev_nav: float | None = None + prev_spy: float | None = None + peak_nav: float | None = None + for date, nav in closes: + daily_pnl = nav - prev_nav if prev_nav is not None else 0.0 + daily_ret = (daily_pnl / prev_nav * 100) if prev_nav else 0.0 + peak_nav = max(peak_nav, nav) if peak_nav is not None else nav + drawdown = (nav - peak_nav) / peak_nav * 100 if peak_nav else 0.0 + spy_close = spy_closes.get(date) + if spy_close is not None and math.isfinite(spy_close) and prev_spy: + spy_ret = (spy_close - prev_spy) / prev_spy * 100 + else: + spy_ret = "" + writer.writerow([ + date, + f"{nav:.2f}", + f"{daily_pnl:+.2f}", + f"{daily_ret:+.4f}", + f"{drawdown:+.4f}", + f"{spy_close:.2f}" if spy_close else "", + f"{spy_ret:+.4f}" if spy_ret != "" else "", + ]) + prev_nav = nav + prev_spy = spy_close if spy_close else prev_spy + + return buf.getvalue().encode("utf-8") + + def _attr_or_key(obj: Any, name: str) -> Any: """Get `name` from either an attribute (Pydantic model) or a dict key (raw JSON). Returns None on miss without raising.""" diff --git a/src/pipeline.py b/src/pipeline.py index eb97a82..ba23eeb 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -6162,3 +6162,47 @@ def run_quarterly_meta_reflection( "proposed_learnings_count": len(reflection.proposed_learnings), "editor_report": editor_report, } + + def run_weekly(self) -> dict: + """Fetch full portfolio history from Alpaca, build a CSV, and send + via Telegram. No LLM calls — pure data export. Runs on Saturdays. + + Returns {"status": "sent", "rows": N, "filename": ...} on delivery, + {"status": "skipped", ...} when Telegram is disabled (CSV built but no + sink), {"status": "error", ...} on a real failure. The status must be + honest: previously it reported "sent" even when the upload failed or + the notifier was disabled, so the operator couldn't tell a delivered + export from a silently-dropped one. + """ + from src.notifier import build_weekly_csv, TelegramNotifier + from src.trading_calendar import et_today + try: + closes = self.broker.get_full_portfolio_history() + if not closes: + logger.warning("run_weekly: no portfolio history returned") + return {"status": "error", "error": "no data from portfolio_history"} + csv_bytes = build_weekly_csv(closes) + date_str = et_today().strftime("%Y-%m-%d") + filename = f"pnl_history_{date_str}.csv" + caption = f"📊 P&L History export — {date_str} ({len(closes)} trading days)" + notifier = TelegramNotifier() + delivered = notifier.send_document(csv_bytes, filename, caption) + base = {"rows": len(closes), "filename": filename} + if delivered: + logger.info("run_weekly: sent %d rows as %s", len(closes), filename) + return {"status": "sent", **base} + if not notifier.enabled: + # CSV built fine; Telegram simply isn't configured — not a + # failure, just nowhere to deliver it. + logger.info( + "run_weekly: built %d-row CSV %s but Telegram is disabled", + len(closes), filename, + ) + return {"status": "skipped", **base} + # Enabled but the upload failed (network / API / rate limit). + logger.error("run_weekly: Telegram delivery failed for %s", filename) + return {"status": "error", "error": "telegram delivery failed", **base} + except Exception as exc: + logger.error("run_weekly failed: %s", exc, exc_info=True) + return {"status": "error", "error": str(exc)} + diff --git a/tests/test_weekly_report.py b/tests/test_weekly_report.py new file mode 100644 index 0000000..95c64ef --- /dev/null +++ b/tests/test_weekly_report.py @@ -0,0 +1,186 @@ +"""Weekly Saturday P&L CSV export (PR #98). + +Covers: build_weekly_csv settlement math (close-to-close, drawdown, return), +SPY column population + graceful degradation, broker.get_full_portfolio_history +ET-date mapping + pre-funding skip, send_document, run_weekly orchestration, +and the format_session_result weekly body. +""" +import csv +import io +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + + +def _parse_csv(b: bytes) -> list[dict]: + return list(csv.DictReader(io.StringIO(b.decode("utf-8")))) + + +def test_build_weekly_csv_close_to_close_pnl_and_drawdown(monkeypatch): + """Per-row Daily P&L = consecutive close diff; drawdown vs running peak.""" + from src import notifier + # No network: force the SPY fetch to fail → SPY columns blank. + monkeypatch.setattr("yfinance.download", lambda *a, **k: (_ for _ in ()).throw(RuntimeError("no net"))) + closes = [ + ("2026-05-26", 100_000.0), + ("2026-05-27", 100_500.0), # +500 + ("2026-05-28", 100_200.0), # -300, drawdown from 100500 + ] + out = _parse_csv(notifier.build_weekly_csv(closes)) + assert [r["Date"] for r in out] == ["2026-05-26", "2026-05-27", "2026-05-28"] + assert out[0]["Daily P&L"] == "+0.00" # first row has no predecessor + assert out[1]["Daily P&L"] == "+500.00" + assert out[2]["Daily P&L"] == "-300.00" + assert out[1]["NAV"] == "100500.00" + # drawdown: row1 at peak → 0; row2 = (100200-100500)/100500 = -0.2985% + assert out[1]["Drawdown %"] == "+0.0000" + assert float(out[2]["Drawdown %"]) == pytest.approx(-0.2985, abs=1e-3) + # daily return row1 = 500/100000 = +0.5000% + assert float(out[1]["Daily Return %"]) == pytest.approx(0.5, abs=1e-3) + # SPY blank on fetch failure + assert out[1]["SPY Close"] == "" and out[1]["SPY Return %"] == "" + + +def test_build_weekly_csv_empty_returns_empty_bytes(): + from src import notifier + assert notifier.build_weekly_csv([]) == b"" + + +def test_build_weekly_csv_populates_spy(monkeypatch): + """SPY Close + Return % populated when yfinance returns data.""" + import pandas as pd + from src import notifier + idx = pd.to_datetime(["2026-05-26", "2026-05-27", "2026-05-28"]) + df = pd.DataFrame({"Close": [500.0, 505.0, 503.0]}, index=idx) + monkeypatch.setattr("yfinance.download", lambda *a, **k: df) + closes = [("2026-05-26", 100_000.0), ("2026-05-27", 100_500.0), ("2026-05-28", 100_200.0)] + out = _parse_csv(notifier.build_weekly_csv(closes)) + assert out[0]["SPY Close"] == "500.00" + assert out[1]["SPY Close"] == "505.00" + # SPY return row1 = (505-500)/500 = +1.0000% + assert float(out[1]["SPY Return %"]) == pytest.approx(1.0, abs=1e-3) + + +@patch("src.execution.broker.TradingClient") +def test_get_full_portfolio_history_maps_dates_and_skips_prefunding(mock_tc_cls): + from datetime import datetime, timezone + from src.execution.broker import AlpacaBroker + ts = lambda d: int(datetime(d[0], d[1], d[2], 20, 0, tzinfo=timezone.utc).timestamp()) + mock_client = MagicMock() + mock_client.get_portfolio_history.return_value = SimpleNamespace( + timestamp=[ts((2026, 5, 25)), ts((2026, 5, 26)), ts((2026, 5, 27))], + equity=[0.0, 100_000.0, 100_500.0], # first row = pre-funding → skipped + ) + mock_tc_cls.return_value = mock_client + broker = AlpacaBroker(api_key="k", secret_key="s", paper=True) + out = broker.get_full_portfolio_history() + assert out == [("2026-05-26", 100_000.0), ("2026-05-27", 100_500.0)] + + +@patch("src.execution.broker.TradingClient") +def test_get_full_portfolio_history_swallows_errors(mock_tc_cls): + from src.execution.broker import AlpacaBroker + mock_client = MagicMock() + mock_client.get_portfolio_history.side_effect = RuntimeError("api down") + mock_tc_cls.return_value = mock_client + broker = AlpacaBroker(api_key="k", secret_key="s", paper=True) + assert broker.get_full_portfolio_history() == [] + + +def test_send_document_posts_and_swallows(monkeypatch): + from src.notifier import TelegramNotifier + monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "tok") + monkeypatch.setenv("TELEGRAM_CHAT_ID", "chat") + monkeypatch.delenv("TELEGRAM_DISABLED", raising=False) + n = TelegramNotifier() + with patch("src.notifier.requests.post") as mp: + mp.return_value = MagicMock(raise_for_status=MagicMock()) + assert n.send_document(b"a,b\n1,2", "x.csv", "cap") is True + assert "sendDocument" in mp.call_args.args[0] + assert mp.call_args.kwargs["files"]["document"][0] == "x.csv" + with patch("src.notifier.requests.post", side_effect=RuntimeError("boom")): + assert n.send_document(b"x", "x.csv") is False # swallowed + + +def test_run_weekly_sends_and_reports(monkeypatch): + from src.pipeline import TradingPipeline + pipe = TradingPipeline.__new__(TradingPipeline) + pipe.broker = MagicMock() + pipe.broker.get_full_portfolio_history.return_value = [ + ("2026-05-27", 100_000.0), ("2026-05-28", 100_500.0), + ] + monkeypatch.setattr("yfinance.download", lambda *a, **k: (_ for _ in ()).throw(RuntimeError("no net"))) + sent = {} + with patch("src.notifier.TelegramNotifier") as TN: + TN.return_value.send_document = lambda b, f, c="": sent.update(filename=f, n=len(b)) or True + res = pipe.run_weekly() + assert res["status"] == "sent" + assert res["rows"] == 2 + assert res["filename"].startswith("pnl_history_") and res["filename"].endswith(".csv") + + +def test_run_weekly_error_on_no_data(): + from src.pipeline import TradingPipeline + pipe = TradingPipeline.__new__(TradingPipeline) + pipe.broker = MagicMock() + pipe.broker.get_full_portfolio_history.return_value = [] + res = pipe.run_weekly() + assert res["status"] == "error" + + +def test_format_session_result_weekly_body(): + from src.notifier import format_session_result + msg = format_session_result("weekly", {"status": "sent", "run_id": "run-w", "rows": 42, "filename": "pnl_history_2026-05-30.csv"}, 3.0) + assert msg is not None + assert "42 rows" in msg and "pnl_history_2026-05-30.csv" in msg + + +def test_build_weekly_csv_filters_nan_spy(monkeypatch): + """[Bug 1] A NaN SPY close (data gap/halt) must NOT render as '+nan' and + must NOT poison prev_spy for later rows — the NaN day is dropped and the + next valid day diffs against the last *valid* prior close.""" + import pandas as pd + from src import notifier + idx = pd.to_datetime(["2026-05-26", "2026-05-27", "2026-05-28"]) + df = pd.DataFrame({"Close": [500.0, float("nan"), 503.0]}, index=idx) + monkeypatch.setattr("yfinance.download", lambda *a, **k: df) + closes = [("2026-05-26", 100_000.0), ("2026-05-27", 100_500.0), ("2026-05-28", 100_200.0)] + raw = notifier.build_weekly_csv(closes) + assert b"nan" not in raw.lower() # no '+nan' leak anywhere + out = _parse_csv(raw) + assert out[1]["SPY Close"] == "" and out[1]["SPY Return %"] == "" # NaN day blank + assert out[2]["SPY Close"] == "503.00" + # 05-28 diffs vs the last VALID prior close (05-26 = 500): (503-500)/500 = +0.6% + assert float(out[2]["SPY Return %"]) == pytest.approx(0.6, abs=1e-3) + + +def test_run_weekly_skipped_when_telegram_disabled(monkeypatch): + """[Bug 2] Telegram disabled (no creds) → CSV built but undelivered → + honest 'skipped', not 'sent'.""" + from src.pipeline import TradingPipeline + pipe = TradingPipeline.__new__(TradingPipeline) + pipe.broker = MagicMock() + pipe.broker.get_full_portfolio_history.return_value = [ + ("2026-05-27", 100_000.0), ("2026-05-28", 100_500.0), + ] + monkeypatch.setattr("yfinance.download", lambda *a, **k: (_ for _ in ()).throw(RuntimeError("no net"))) + with patch("src.notifier.TelegramNotifier") as TN: + TN.return_value.enabled = False + TN.return_value.send_document.return_value = False + res = pipe.run_weekly() + assert res["status"] == "skipped" and res["rows"] == 2 + + +def test_run_weekly_error_when_delivery_fails(monkeypatch): + """[Bug 2] Telegram enabled but the upload failed → 'error', not 'sent'.""" + from src.pipeline import TradingPipeline + pipe = TradingPipeline.__new__(TradingPipeline) + pipe.broker = MagicMock() + pipe.broker.get_full_portfolio_history.return_value = [("2026-05-27", 100_000.0)] + monkeypatch.setattr("yfinance.download", lambda *a, **k: (_ for _ in ()).throw(RuntimeError("no net"))) + with patch("src.notifier.TelegramNotifier") as TN: + TN.return_value.enabled = True + TN.return_value.send_document.return_value = False + res = pipe.run_weekly() + assert res["status"] == "error"