Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def main():
"--mode",
choices=[
"live", "once", "morning", "midday", "close", "evening",
"intra_check", "earnings_preprocess", "meta",
"intra_check", "earnings_preprocess", "meta", "weekly",
],
default="once", help="Run mode",
)
Expand Down Expand Up @@ -134,6 +134,8 @@ def main():
result = pipeline.run_earnings_preprocess()
elif args.mode == "meta":
result = pipeline.run_quarterly_meta_reflection(force=args.force)
elif args.mode == "weekly":
result = pipeline.run_weekly()
except BaseException as exc:
# Catch broadly (incl. SystemExit / KeyboardInterrupt) so a
# wrapper-kill or ctrl-C still gets a notification — but
Expand Down
35 changes: 35 additions & 0 deletions src/execution/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,41 @@ def get_recent_daily_closes(self, lookback_days: int = 10) -> list[tuple[str, fl
out.append((d, eq))
return out

def get_full_portfolio_history(self) -> list[tuple[str, float]]:
"""All available 1D equity history from Alpaca portfolio_history.

Returns [(et_date_str, equity), ...] oldest-first, skipping zero
rows (pre-funding). Best-effort — never raises.
"""
from datetime import datetime, timedelta, timezone
from src.util.time import ET
try:
from alpaca.trading.requests import GetPortfolioHistoryRequest
now = datetime.now(timezone.utc)
req = GetPortfolioHistoryRequest(
timeframe="1D", extended_hours=False,
start=now - timedelta(days=365 * 5), end=now,
)
history = self.client.get_portfolio_history(history_filter=req)
except Exception as exc:
logger.warning("get_full_portfolio_history failed: %s", exc)
return []
timestamps = getattr(history, "timestamp", None) or []
equities = getattr(history, "equity", None) or []
out: list[tuple[str, float]] = []
for i, ts in enumerate(timestamps):
if i >= len(equities) or equities[i] is None:
continue
try:
d = datetime.fromtimestamp(int(ts), tz=timezone.utc).astimezone(ET).strftime("%Y-%m-%d")
eq = float(equities[i])
except (TypeError, ValueError, OSError):
continue
if eq == 0.0:
continue # skip pre-funding rows
out.append((d, eq))
return out

def get_positions(self) -> list[Position]:
raw_positions = self.client.get_all_positions()
positions = []
Expand Down
82 changes: 82 additions & 0 deletions src/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,23 @@ def send(self, text: str) -> bool:
logger.warning("Telegram notify failed: %s", exc)
return False

def send_document(self, csv_bytes: bytes, filename: str, caption: str = "") -> bool:
"""Send a file (e.g. CSV) via Telegram sendDocument. Best-effort."""
if not self.enabled:
return False
try:
response = requests.post(
f"https://api.telegram.org/bot{self.token}/sendDocument",
data={"chat_id": self.chat_id, "caption": caption},
files={"document": (filename, csv_bytes, "text/csv")},
timeout=30.0,
)
response.raise_for_status()
return True
except Exception as exc:
logger.warning("Telegram send_document failed: %s", exc)
return False


# === Session result formatting ===
# Built as a free function (not a TelegramNotifier method) so it's
Expand Down Expand Up @@ -193,6 +210,10 @@ def format_session_result(
_append_intra_check_body(lines, result)
elif mode == "meta":
_append_meta_body(lines, result)
elif mode == "weekly":
rows = result.get("rows", "?")
filename = result.get("filename", "")
lines.append(f"📊 {rows} rows → {filename}")

lines.append(f"elapsed: {elapsed_str}")
return "\n".join(lines)
Expand Down Expand Up @@ -820,6 +841,67 @@ def _fmt_elapsed(seconds: float) -> str:
return f"{minutes}m {secs}s"


def build_weekly_csv(closes: list[tuple[str, float]]) -> bytes:
"""Build a P&L history CSV from portfolio_history closes.

Columns: Date, NAV, Daily P&L, Daily Return %, SPY Close, SPY Return %

SPY data is fetched via yfinance for the same date range. On any
yfinance failure the SPY columns are left blank.
"""
import io, csv
from datetime import datetime, timedelta

if not closes:
return b""

# Fetch SPY closes for the same date range.
spy_closes: dict[str, float] = {}
try:
import yfinance as yf
import pandas as pd
earliest = closes[0][0]
start = (datetime.strptime(earliest, "%Y-%m-%d") - timedelta(days=5)).strftime("%Y-%m-%d")
end_dt = datetime.strptime(closes[-1][0], "%Y-%m-%d") + timedelta(days=2)
end = end_dt.strftime("%Y-%m-%d")
df = yf.download("SPY", start=start, end=end, progress=False, auto_adjust=True)
if not df.empty:
if hasattr(df.columns, "get_level_values"):
df.columns = df.columns.get_level_values(0)
for dt_idx, row in df["Close"].items():
spy_closes[str(dt_idx.date())] = float(row)
except Exception as exc:
logger.warning("build_weekly_csv: SPY fetch failed: %s", exc)

buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["Date", "NAV", "Daily P&L", "Daily Return %", "Drawdown %", "SPY Close", "SPY Return %"])

prev_nav: float | None = None
prev_spy: float | None = None
peak_nav: float | None = None
for date, nav in closes:
daily_pnl = nav - prev_nav if prev_nav is not None else 0.0
daily_ret = (daily_pnl / prev_nav * 100) if prev_nav else 0.0
peak_nav = max(peak_nav, nav) if peak_nav is not None else nav
drawdown = (nav - peak_nav) / peak_nav * 100 if peak_nav else 0.0
spy_close = spy_closes.get(date)
spy_ret = ((spy_close - prev_spy) / prev_spy * 100) if (spy_close and prev_spy) else ""
writer.writerow([
date,
f"{nav:.2f}",
f"{daily_pnl:+.2f}",
f"{daily_ret:+.4f}",
f"{drawdown:+.4f}",
f"{spy_close:.2f}" if spy_close else "",
f"{spy_ret:+.4f}" if spy_ret != "" else "",
])
prev_nav = nav
prev_spy = spy_close if spy_close else prev_spy

return buf.getvalue().encode("utf-8")


def _attr_or_key(obj: Any, name: str) -> Any:
"""Get `name` from either an attribute (Pydantic model) or a
dict key (raw JSON). Returns None on miss without raising."""
Expand Down
26 changes: 26 additions & 0 deletions src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6162,3 +6162,29 @@ def run_quarterly_meta_reflection(
"proposed_learnings_count": len(reflection.proposed_learnings),
"editor_report": editor_report,
}

def run_weekly(self) -> dict:
"""Fetch full portfolio history from Alpaca, build a CSV, and send
via Telegram. No LLM calls — pure data export. Runs on Saturdays.

Returns {"status": "sent", "rows": N} on success, or
{"status": "error", "error": "..."} on failure.
"""
from src.notifier import build_weekly_csv, TelegramNotifier
from src.trading_calendar import et_today
try:
closes = self.broker.get_full_portfolio_history()
if not closes:
logger.warning("run_weekly: no portfolio history returned")
return {"status": "error", "error": "no data from portfolio_history"}
csv_bytes = build_weekly_csv(closes)
date_str = et_today().strftime("%Y-%m-%d")
filename = f"pnl_history_{date_str}.csv"
caption = f"📊 P&L History export — {date_str} ({len(closes)} trading days)"
TelegramNotifier().send_document(csv_bytes, filename, caption)
logger.info("run_weekly: sent %d rows as %s", len(closes), filename)
return {"status": "sent", "rows": len(closes), "filename": filename}
except Exception as exc:
logger.error("run_weekly failed: %s", exc, exc_info=True)
return {"status": "error", "error": str(exc)}

Loading