diff --git a/.Jules/palette.md b/.Jules/palette.md new file mode 100644 index 0000000..3c216f9 --- /dev/null +++ b/.Jules/palette.md @@ -0,0 +1,3 @@ +## 2026-03-08 - Contextual Tooltips for Financial Inputs +**Learning:** Users often lack context on acceptable formats or magnitudes for statistical parameters like quantiles or basis points when interacting with Streamlit widgets. +**Action:** Add `help` tooltips to all critical statistical and financial inputs to provide concrete examples (e.g., '10 bps = 0.10%') directly within the widget parameters rather than using separate text blocks. \ No newline at end of file diff --git a/src/dashboard.py b/src/dashboard.py index 4156c1d..8680d60 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -1,74 +1,76 @@ -import streamlit as st -import pandas as pd +import hashlib +from datetime import datetime, timedelta + import numpy as np -import plotly.graph_objects as go +import pandas as pd import plotly.express as px -from datetime import datetime, timedelta -import hashlib +import plotly.graph_objects as go +import streamlit as st # Import custom modules try: from modules import ( - data_model, - signals, + alerts, backtester, - portfolio, - risk, + data_model, factors, - scenario, liquidity, - alerts, - reporting, + portfolio, regime_analysis, + reporting, + risk, + scenario, + signals, sweep, ) from modules.config import ( - PRESET_UNIVERSE, - DEFAULT_SMA_WINDOW, - DEFAULT_MOMENTUM_WINDOW, - DEFAULT_VOL_QUANTILE_HIGH, - DEFAULT_COST_BPS, - MIN_DATA_POINTS, + DEFAULT_ADV_PCT, DEFAULT_BENCHMARK, + DEFAULT_BOOTSTRAP_ITER, + DEFAULT_COST_BPS, + DEFAULT_MOMENTUM_WINDOW, DEFAULT_PORTFOLIO_VALUE, - DEFAULT_ADV_PCT, + DEFAULT_SMA_SWEEP, + DEFAULT_SMA_WINDOW, + DEFAULT_VOL_QUANTILE_HIGH, FACTOR_PROXIES, MACRO_PROXIES, - DEFAULT_BOOTSTRAP_ITER, - DEFAULT_SMA_SWEEP, + MIN_DATA_POINTS, + PRESET_UNIVERSE, ) except ImportError: - import sys import os - sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + import sys + + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.modules import ( - data_model, - signals, + alerts, backtester, - portfolio, - risk, + data_model, factors, - scenario, liquidity, - alerts, - reporting, + portfolio, regime_analysis, + reporting, + risk, + scenario, + signals, sweep, ) from src.modules.config import ( - PRESET_UNIVERSE, - DEFAULT_SMA_WINDOW, - DEFAULT_MOMENTUM_WINDOW, - DEFAULT_VOL_QUANTILE_HIGH, - DEFAULT_COST_BPS, - MIN_DATA_POINTS, + DEFAULT_ADV_PCT, DEFAULT_BENCHMARK, + DEFAULT_BOOTSTRAP_ITER, + DEFAULT_COST_BPS, + DEFAULT_MOMENTUM_WINDOW, DEFAULT_PORTFOLIO_VALUE, - DEFAULT_ADV_PCT, + DEFAULT_SMA_SWEEP, + DEFAULT_SMA_WINDOW, + DEFAULT_VOL_QUANTILE_HIGH, FACTOR_PROXIES, MACRO_PROXIES, - DEFAULT_BOOTSTRAP_ITER, - DEFAULT_SMA_SWEEP, + MIN_DATA_POINTS, + PRESET_UNIVERSE, ) @@ -79,9 +81,9 @@ def get_cache_key(*args) -> str: # Initialize session state for caching expensive computations -if 'computed_signals' not in st.session_state: +if "computed_signals" not in st.session_state: st.session_state.computed_signals = {} -if 'backtest_results' not in st.session_state: +if "backtest_results" not in st.session_state: st.session_state.backtest_results = {} @@ -90,11 +92,12 @@ def get_cache_key(*args) -> str: page_title="Quantitative Research Dashboard", page_icon="โ™Ÿ๏ธ", layout="wide", - initial_sidebar_state="expanded" + initial_sidebar_state="expanded", ) # --- CSS Styling --- -st.markdown(""" +st.markdown( + """ -""", unsafe_allow_html=True) +""", + unsafe_allow_html=True, +) # --- Sidebar Inputs --- with st.sidebar: @@ -139,9 +144,7 @@ def get_cache_key(*args) -> str: preset_select = st.multiselect("Preset Universe", PRESET_UNIVERSE, default=[]) weights_input = st.text_input("Weights (comma-separated, optional)") portfolio_value = st.number_input( - "Portfolio Value (USD)", - value=float(DEFAULT_PORTFOLIO_VALUE), - step=100000.0 + "Portfolio Value (USD)", value=float(DEFAULT_PORTFOLIO_VALUE), step=100000.0 ) benchmark_ticker = st.text_input("Benchmark Ticker", value=DEFAULT_BENCHMARK).upper() @@ -150,7 +153,7 @@ def get_cache_key(*args) -> str: if date_mode == "Custom": d_col1, d_col2 = st.columns(2) - start_date = d_col1.date_input("Start", value=datetime.today() - timedelta(days=365*2)) + start_date = d_col1.date_input("Start", value=datetime.today() - timedelta(days=365 * 2)) end_date = d_col2.date_input("End", value=datetime.today()) period_arg = "max" else: @@ -160,24 +163,39 @@ def get_cache_key(*args) -> str: st.subheader("3. Signal Parameters") if mode == "Single-Asset": sma_window = st.slider( - "Trend SMA Window", 10, 200, DEFAULT_SMA_WINDOW, 10, - help="Lookback days for Simple Moving Average trend signal." + "Trend SMA Window", + 10, + 200, + DEFAULT_SMA_WINDOW, + 10, + help="Lookback days for Simple Moving Average trend signal.", ) mom_window = st.slider( - "Momentum Lookback (Months)", 1, 24, DEFAULT_MOMENTUM_WINDOW, 1, - help="Lookback months for Momentum signal." + "Momentum Lookback (Months)", + 1, + 24, + DEFAULT_MOMENTUM_WINDOW, + 1, + help="Lookback months for Momentum signal.", ) else: factor_window = st.slider("Factor Beta Window (days)", 20, 252, 63, 7) vol_window = st.slider("Regime Vol Window (days)", 10, 60, 21, 5) - adv_pct = st.slider("ADV Participation %", 0.01, 0.30, float(DEFAULT_ADV_PCT), 0.01) + adv_pct = st.slider( + "ADV Participation %", + 0.01, + 0.30, + float(DEFAULT_ADV_PCT), + 0.01, + help="Average Daily Volume participation limit (e.g., 0.10 = 10% of ADV).", + ) st.markdown("---") st.subheader("4. Research Rigor") use_oos = st.toggle( "Out-of-Sample Mode", value=False, - help="Uses expanding-window quantiles for regime classification to avoid look-ahead bias. Enable for rigorous backtesting." + help="Uses expanding-window quantiles for regime classification to avoid look-ahead bias. Enable for rigorous backtesting.", ) if use_oos: st.success("โœ“ Look-ahead bias removed") @@ -185,12 +203,25 @@ def get_cache_key(*args) -> str: st.info("Using full-sample quantiles (exploratory mode)") vol_q_high = st.slider( - "High Volatility Quantile", 0.5, 0.95, DEFAULT_VOL_QUANTILE_HIGH, 0.05 + "High Volatility Quantile", + 0.5, + 0.95, + DEFAULT_VOL_QUANTILE_HIGH, + 0.05, + help="Quantile threshold for high volatility (e.g., 0.80 = top 20% most volatile days).", ) if mode == "Single-Asset": st.subheader("5. Backtest Settings") - bt_cost = st.number_input("Transaction Cost (bps)", value=DEFAULT_COST_BPS, step=1) / 10000 + bt_cost = ( + st.number_input( + "Transaction Cost (bps)", + value=DEFAULT_COST_BPS, + step=1, + help="Transaction cost per trade in basis points (e.g., 10 bps = 0.10%).", + ) + / 10000 + ) allow_short = st.checkbox("Allow Short Selling?", value=False) else: st.subheader("5. Alert Thresholds") @@ -260,7 +291,9 @@ def get_cache_key(*args) -> str: volume_df = volume_df.loc[mask].copy() if len(price_df) < MIN_DATA_POINTS: - st.warning(f"Not enough data points for selected range/period (need at least {MIN_DATA_POINTS}).") + st.warning( + f"Not enough data points for selected range/period (need at least {MIN_DATA_POINTS})." + ) st.stop() # Align weights to available tickers @@ -280,7 +313,11 @@ def get_cache_key(*args) -> str: if date_mode == "Custom": bmask = (benchmark_df.index.date >= start_date) & (benchmark_df.index.date <= end_date) benchmark_df = benchmark_df.loc[bmask].copy() - benchmark_returns = benchmark_df["Close"].pct_change().dropna() if not benchmark_df.empty else pd.Series(dtype=float) + benchmark_returns = ( + benchmark_df["Close"].pct_change().dropna() + if not benchmark_df.empty + else pd.Series(dtype=float) + ) # Risk metrics ann_vol = port_returns.std() * np.sqrt(252) @@ -295,7 +332,9 @@ def get_cache_key(*args) -> str: max_dttl = liquidity_df["DaysToLiquidate"].max() if not liquidity_df.empty else np.nan # Factor attribution - factor_data = data_model.fetch_multi_asset_data(tuple(FACTOR_PROXIES.values()), period=period_arg) + factor_data = data_model.fetch_multi_asset_data( + tuple(FACTOR_PROXIES.values()), period=period_arg + ) factor_prices = data_model.align_close_prices(factor_data) if date_mode == "Custom" and not factor_prices.empty: fmask = (factor_prices.index.date >= start_date) & (factor_prices.index.date <= end_date) @@ -314,13 +353,17 @@ def get_cache_key(*args) -> str: macro_betas = factors.compute_factor_betas(port_returns, macro_returns, window=factor_window) # Alpha series - alpha_series = factors.compute_alpha_series(port_returns, benchmark_returns, window=factor_window) + alpha_series = factors.compute_alpha_series( + port_returns, benchmark_returns, window=factor_window + ) # Regime classification on benchmark regime_label = "N/A" benchmark_regimes = None if not benchmark_df.empty: - bench_ind = signals.add_technical_indicators(benchmark_df, sma_window=200, mom_window=DEFAULT_MOMENTUM_WINDOW, vol_window=vol_window) + bench_ind = signals.add_technical_indicators( + benchmark_df, sma_window=200, mom_window=DEFAULT_MOMENTUM_WINDOW, vol_window=vol_window + ) bench_ind = signals.detect_volatility_regime( bench_ind, vol_col=f"Vol_{vol_window}d", @@ -347,13 +390,17 @@ def get_cache_key(*args) -> str: temp.columns = ["Return", "Vol_Regime"] stats = backtester.calculate_regime_stats(temp, "Return", "Vol_Regime") sens = regime_analysis.compute_regime_sensitivity(stats) - rows.append({ - "Asset": asset, - "Sharpe_Diff": sens.get("Sharpe_Diff", np.nan), - "CAGR_Diff": sens.get("CAGR_Diff", np.nan), - }) + rows.append( + { + "Asset": asset, + "Sharpe_Diff": sens.get("Sharpe_Diff", np.nan), + "CAGR_Diff": sens.get("CAGR_Diff", np.nan), + } + ) if rows: - regime_sensitivity_df = pd.DataFrame(rows).set_index("Asset").sort_values("Sharpe_Diff", ascending=False) + regime_sensitivity_df = ( + pd.DataFrame(rows).set_index("Asset").sort_values("Sharpe_Diff", ascending=False) + ) port_temp = pd.concat([port_returns, benchmark_regimes], axis=1).dropna() if not port_temp.empty: @@ -401,7 +448,15 @@ def get_cache_key(*args) -> str: # --- Portfolio Tabs --- tab_ov, tab_risk, tab_attr, tab_scen, tab_sig, tab_alert, tab_rep = st.tabs( - ["๐Ÿ“ˆ Overview", "๐Ÿ›ก๏ธ Risk & Liquidity", "๐Ÿงฌ Attribution", "๐Ÿงช Scenario", "๐Ÿ“ก Signals Health", "๐Ÿšจ Alerts", "๐Ÿ“„ Report"] + [ + "๐Ÿ“ˆ Overview", + "๐Ÿ›ก๏ธ Risk & Liquidity", + "๐Ÿงฌ Attribution", + "๐Ÿงช Scenario", + "๐Ÿ“ก Signals Health", + "๐Ÿšจ Alerts", + "๐Ÿ“„ Report", + ] ) # Overview @@ -417,7 +472,9 @@ def get_cache_key(*args) -> str: q = query.lower() if "top" in q and "mover" in q: last_returns = price_df.pct_change().iloc[-1].sort_values(ascending=False) - st.write(last_returns.head(5).to_frame("Last Day Return").style.format("{:.2%}")) + st.write( + last_returns.head(5).to_frame("Last Day Return").style.format("{:.2%}") + ) elif "factor" in q and "drift" in q and not factor_betas.empty: drift = factor_betas.iloc[-1] - factor_betas.iloc[-min(21, len(factor_betas))] st.write(drift.to_frame("Beta Drift (20d)").style.format("{:.2f}")) @@ -430,10 +487,21 @@ def get_cache_key(*args) -> str: st.subheader("Equity Curve vs Benchmark") fig_eq = go.Figure() - fig_eq.add_trace(go.Scatter(x=port_equity.index, y=port_equity, name="Portfolio", line=dict(color="#00ff00"))) + fig_eq.add_trace( + go.Scatter( + x=port_equity.index, y=port_equity, name="Portfolio", line={"color": "#00ff00"} + ) + ) if not benchmark_returns.empty: bench_equity = (1 + benchmark_returns).cumprod() - fig_eq.add_trace(go.Scatter(x=bench_equity.index, y=bench_equity, name=benchmark_ticker, line=dict(color="#888"))) + fig_eq.add_trace( + go.Scatter( + x=bench_equity.index, + y=bench_equity, + name=benchmark_ticker, + line={"color": "#888"}, + ) + ) fig_eq.update_layout(template="plotly_dark", height=420) st.plotly_chart(fig_eq, use_container_width=True) @@ -461,12 +529,22 @@ def get_cache_key(*args) -> str: st.subheader("Risk Posture") score = risk.risk_posture_score(ann_vol, max_dd, beta, max_dttl) st.metric("Risk Posture Score", f"{score:.0f}/100") - st.caption("Higher is better. Penalizes high vol, deep drawdowns, high beta, and illiquidity.") + st.caption( + "Higher is better. Penalizes high vol, deep drawdowns, high beta, and illiquidity." + ) st.subheader("Drawdown") dd_series = risk.compute_drawdown_series(port_equity) fig_dd = go.Figure() - fig_dd.add_trace(go.Scatter(x=dd_series.index, y=dd_series * 100, name="Drawdown", fill="tozeroy", line=dict(color="#ff4b4b"))) + fig_dd.add_trace( + go.Scatter( + x=dd_series.index, + y=dd_series * 100, + name="Drawdown", + fill="tozeroy", + line={"color": "#ff4b4b"}, + ) + ) fig_dd.update_layout(template="plotly_dark", height=300, yaxis_title="Drawdown (%)") st.plotly_chart(fig_dd, use_container_width=True) @@ -474,12 +552,16 @@ def get_cache_key(*args) -> str: if liquidity_df.empty: st.info("Liquidity data not available.") else: - st.dataframe(liquidity_df.style.format({ - "Weight": "{:.2%}", - "PositionValue": "${:,.0f}", - "ADV$": "${:,.0f}", - "DaysToLiquidate": "{:.1f}", - })) + st.dataframe( + liquidity_df.style.format( + { + "Weight": "{:.2%}", + "PositionValue": "${:,.0f}", + "ADV$": "${:,.0f}", + "DaysToLiquidate": "{:.1f}", + } + ) + ) st.subheader("Correlation Matrix") corr = risk.compute_correlation_matrix(price_df.pct_change().dropna()) @@ -506,7 +588,11 @@ def get_cache_key(*args) -> str: st.subheader("Rolling Alpha") if not alpha_series.empty: fig_alpha = go.Figure() - fig_alpha.add_trace(go.Scatter(x=alpha_series.index, y=alpha_series, name="Alpha", line=dict(color="#00ff00"))) + fig_alpha.add_trace( + go.Scatter( + x=alpha_series.index, y=alpha_series, name="Alpha", line={"color": "#00ff00"} + ) + ) fig_alpha.update_layout(template="plotly_dark", height=300) st.plotly_chart(fig_alpha, use_container_width=True) @@ -540,7 +626,9 @@ def get_cache_key(*args) -> str: else: bench_tmp = benchmark_df.copy() bench_tmp["Daily_Return"] = bench_tmp["Close"].pct_change() - bench_tmp["Signal"] = np.sign(bench_tmp["Close"] - bench_tmp["Close"].rolling(50).mean()) + bench_tmp["Signal"] = np.sign( + bench_tmp["Close"] - bench_tmp["Close"].rolling(50).mean() + ) for h in [21, 63, 126]: bench_tmp[f"Fwd_{h}"] = bench_tmp["Close"].pct_change(h).shift(-h) decay = { @@ -554,7 +642,7 @@ def get_cache_key(*args) -> str: st.subheader("Rolling IC (Signal vs 1M Forward Return)") ic = bench_tmp["Signal"].rolling(63).corr(bench_tmp["Fwd_21"]) fig_ic = go.Figure() - fig_ic.add_trace(go.Scatter(x=ic.index, y=ic, name="IC", line=dict(color="#ff9f43"))) + fig_ic.add_trace(go.Scatter(x=ic.index, y=ic, name="IC", line={"color": "#ff9f43"})) fig_ic.update_layout(template="plotly_dark", height=300) st.plotly_chart(fig_ic, use_container_width=True) @@ -584,14 +672,18 @@ def get_cache_key(*args) -> str: "Beta": f"{beta:.2f}" if not np.isnan(beta) else "N/A", "Regime Sensitivity (Sharpe Diff)": f"{portfolio_sensitivity.get('Sharpe_Diff', np.nan):.2f}", "Benchmark Sensitivity (Sharpe Diff)": f"{benchmark_sensitivity.get('Sharpe_Diff', np.nan):.2f}", - "VaR (95%)": f"{var_cvar['VaR']:.2%}" if not np.isnan(var_cvar['VaR']) else "N/A", - "CVaR (95%)": f"{var_cvar['CVaR']:.2%}" if not np.isnan(var_cvar['CVaR']) else "N/A", + "VaR (95%)": f"{var_cvar['VaR']:.2%}" if not np.isnan(var_cvar["VaR"]) else "N/A", + "CVaR (95%)": f"{var_cvar['CVaR']:.2%}" if not np.isnan(var_cvar["CVaR"]) else "N/A", } tables = { "Weights": weights.to_frame("Weight"), "Liquidity": liquidity_df, - "Latest Factor Betas": factor_betas.tail(1) if not factor_betas.empty else pd.DataFrame(), - "Regime Sensitivity": regime_sensitivity_df if not regime_sensitivity_df.empty else pd.DataFrame(), + "Latest Factor Betas": ( + factor_betas.tail(1) if not factor_betas.empty else pd.DataFrame() + ), + "Regime Sensitivity": ( + regime_sensitivity_df if not regime_sensitivity_df.empty else pd.DataFrame() + ), } payload = reporting.build_report_payload(summary, tables) st.markdown(payload["markdown"]) @@ -634,7 +726,9 @@ def get_cache_key(*args) -> str: df = raw_df.copy() if len(df) < MIN_DATA_POINTS: - st.warning(f"Not enough data points for selected range/period (need at least {MIN_DATA_POINTS}).") + st.warning( + f"Not enough data points for selected range/period (need at least {MIN_DATA_POINTS})." + ) st.stop() # --- Signal Calculation (with session state caching) --- @@ -642,7 +736,9 @@ def get_cache_key(*args) -> str: if signal_cache_key not in st.session_state.computed_signals: with st.spinner("Computing technical indicators..."): - computed_df = signals.add_technical_indicators(df, sma_window=sma_window, mom_window=mom_window) + computed_df = signals.add_technical_indicators( + df, sma_window=sma_window, mom_window=mom_window + ) st.session_state.computed_signals[signal_cache_key] = computed_df df = st.session_state.computed_signals[signal_cache_key].copy() @@ -650,64 +746,59 @@ def get_cache_key(*args) -> str: # --- Regime Detection --- # Using 21-day annualized vol with option for out-of-sample analysis df = signals.detect_volatility_regime( - df, - vol_col='Vol_21d', - quantile_high=vol_q_high, + df, + vol_col="Vol_21d", + quantile_high=vol_q_high, quantile_low=0.25, - use_expanding=use_oos # Toggle between in-sample and out-of-sample + use_expanding=use_oos, # Toggle between in-sample and out-of-sample ) # --- Dashboard Header --- st.markdown("## ๐Ÿ” Research Question") -st.markdown("> **How sensitive is trend-following performance to volatility regimes in US equities?**") +st.markdown( + "> **How sensitive is trend-following performance to volatility regimes in US equities?**" +) latest = df.iloc[-1] prev = df.iloc[-2] -chg_pct = latest['Daily_Return'] +chg_pct = latest["Daily_Return"] h1, h2, h3, h4 = st.columns(4) h1.metric("Asset", f"{ticker} (${latest['Close']:.2f})", f"{chg_pct:.2%}") -h2.metric("Current Regime", latest['Vol_Regime']) +h2.metric("Current Regime", latest["Vol_Regime"]) h3.metric(f"Volatility ({vol_q_high:.0%}-tile)", f"{latest['Vol_21d']:.2%}") -h4.metric("Trend Status", "BULLISH" if latest['Close'] > latest[f'SMA_{sma_window}'] else "BEARISH") +h4.metric("Trend Status", "BULLISH" if latest["Close"] > latest[f"SMA_{sma_window}"] else "BEARISH") # --- Backtest (cached for reuse) --- -df['Signal_Trend'] = np.where(df['Close'] > df[f'SMA_{sma_window}'], 1, -1 if allow_short else 0) -bt_cache_key = get_cache_key( - signal_cache_key, bt_cost, allow_short, use_oos, vol_q_high -) +df["Signal_Trend"] = np.where(df["Close"] > df[f"SMA_{sma_window}"], 1, -1 if allow_short else 0) +bt_cache_key = get_cache_key(signal_cache_key, bt_cost, allow_short, use_oos, vol_q_high) if bt_cache_key not in st.session_state.backtest_results: with st.spinner("Running backtest simulation..."): - res_df = backtester.run_backtest(df, 'Signal_Trend', cost_bps=bt_cost, rebalance_freq='M') + res_df = backtester.run_backtest(df, "Signal_Trend", cost_bps=bt_cost, rebalance_freq="M") st.session_state.backtest_results[bt_cache_key] = res_df res_df = st.session_state.backtest_results[bt_cache_key] if not res_df.empty: res_df = res_df.copy() - res_df['Vol_Regime'] = df['Vol_Regime'] + res_df["Vol_Regime"] = df["Vol_Regime"] - cond_stats = backtester.calculate_conditional_stats( - res_df, 'Strategy_Net_Return', 'Vol_Regime' - ) - bench_cond = backtester.calculate_conditional_stats( - res_df, 'Daily_Return', 'Vol_Regime' - ) - stats_by_regime = backtester.calculate_regime_stats( - res_df, 'Strategy_Net_Return', 'Vol_Regime' - ) + cond_stats = backtester.calculate_conditional_stats(res_df, "Strategy_Net_Return", "Vol_Regime") + bench_cond = backtester.calculate_conditional_stats(res_df, "Daily_Return", "Vol_Regime") + stats_by_regime = backtester.calculate_regime_stats(res_df, "Strategy_Net_Return", "Vol_Regime") regime_sensitivity = regime_analysis.compute_regime_sensitivity(stats_by_regime) bootstrap_sharpe = regime_analysis.bootstrap_regime_diff( - res_df['Strategy_Net_Return'], res_df['Vol_Regime'], metric="Sharpe", n_boot=DEFAULT_BOOTSTRAP_ITER + res_df["Strategy_Net_Return"], + res_df["Vol_Regime"], + metric="Sharpe", + n_boot=DEFAULT_BOOTSTRAP_ITER, ) - transition_matrix = regime_analysis.compute_transition_matrix(df['Vol_Regime']) + transition_matrix = regime_analysis.compute_transition_matrix(df["Vol_Regime"]) transition_stats = regime_analysis.compute_transition_stats( - res_df['Strategy_Net_Return'], res_df['Vol_Regime'] - ) - sweep_df = sweep.run_sma_regime_sweep( - df, DEFAULT_SMA_SWEEP, mom_window, vol_q_high, use_oos + res_df["Strategy_Net_Return"], res_df["Vol_Regime"] ) + sweep_df = sweep.run_sma_regime_sweep(df, DEFAULT_SMA_SWEEP, mom_window, vol_q_high, use_oos) else: cond_stats = pd.DataFrame() bench_cond = pd.DataFrame() @@ -727,23 +818,42 @@ def get_cache_key(*args) -> str: with tab_ov: # Interactive Price Chart fig = go.Figure() - fig.add_trace(go.Scatter(x=df.index, y=df['Close'], name='Close Price', line=dict(color='white', width=1))) - fig.add_trace(go.Scatter(x=df.index, y=df[f'SMA_{sma_window}'], name=f'{sma_window}-Day SMA', line=dict(color='#ff9f43', width=1))) - + fig.add_trace( + go.Scatter( + x=df.index, y=df["Close"], name="Close Price", line={"color": "white", "width": 1} + ) + ) + fig.add_trace( + go.Scatter( + x=df.index, + y=df[f"SMA_{sma_window}"], + name=f"{sma_window}-Day SMA", + line={"color": "#ff9f43", "width": 1}, + ) + ) + # Highlight High Volatility Regimes # Filter high vol periods - high_vol_mask = df['Vol_Regime'] == 'High' + high_vol_mask = df["Vol_Regime"] == "High" # We can plot markers or shade areas. Shading is valid but tricky in Plotly without shapes list. # Let's plot points high_vol_pts = df[high_vol_mask] - fig.add_trace(go.Scatter(x=high_vol_pts.index, y=high_vol_pts['Close'], mode='markers', name='High Volatility', marker=dict(color='red', size=2))) - + fig.add_trace( + go.Scatter( + x=high_vol_pts.index, + y=high_vol_pts["Close"], + mode="markers", + name="High Volatility", + marker={"color": "red", "size": 2}, + ) + ) + fig.update_layout( title=f"{ticker} Price History & Regime Context", yaxis_title="Price ($)", template="plotly_dark", height=500, - hovermode="x unified" + hovermode="x unified", ) st.plotly_chart(fig, use_container_width=True) st.caption("Red dots indicate days classified as 'High Volatility' regime.") @@ -751,25 +861,38 @@ def get_cache_key(*args) -> str: # --- TAB 2: REGIME ANALYSIS --- with tab_regime: st.subheader("Volatility Regime Classification") - + c1, c2 = st.columns(2) with c1: # Scatter: Vol vs Returns needed? Maybe just distribution - fig_hist = px.histogram(df, x="Vol_21d", color="Vol_Regime", nbins=50, title="Volatility Distribution", template="plotly_dark", - color_discrete_map={"High": "#ff4b4b", "Low": "#00ff00", "Normal": "#888888"}) + fig_hist = px.histogram( + df, + x="Vol_21d", + color="Vol_Regime", + nbins=50, + title="Volatility Distribution", + template="plotly_dark", + color_discrete_map={"High": "#ff4b4b", "Low": "#00ff00", "Normal": "#888888"}, + ) st.plotly_chart(fig_hist, use_container_width=True) - + with c2: # Pie chart of time spent in regimes - regime_counts = df['Vol_Regime'].value_counts() - fig_pie = px.pie(values=regime_counts, names=regime_counts.index, title="Time Spent in Regimes", template="plotly_dark", - color=regime_counts.index, color_discrete_map={"High": "#ff4b4b", "Low": "#00ff00", "Normal": "#888888"}) + regime_counts = df["Vol_Regime"].value_counts() + fig_pie = px.pie( + values=regime_counts, + names=regime_counts.index, + title="Time Spent in Regimes", + template="plotly_dark", + color=regime_counts.index, + color_discrete_map={"High": "#ff4b4b", "Low": "#00ff00", "Normal": "#888888"}, + ) st.plotly_chart(fig_pie, use_container_width=True) - + st.markdown("### Regime Characteristics") - stats = df.groupby('Vol_Regime')[['Daily_Return', 'Vol_21d']].mean() + stats = df.groupby("Vol_Regime")[["Daily_Return", "Vol_21d"]].mean() # Annualize return - stats['Ann_Return'] = stats['Daily_Return'] * 252 + stats["Ann_Return"] = stats["Daily_Return"] * 252 st.dataframe(stats.style.format("{:.2%}")) # --- TAB 3: REGIME LAB --- @@ -778,28 +901,39 @@ def get_cache_key(*args) -> str: if transition_matrix.empty: st.info("Not enough data to compute transition matrix.") else: - fig_tm = go.Figure(data=go.Heatmap( - z=transition_matrix.values, - x=transition_matrix.columns, - y=transition_matrix.index, - colorscale="Blues", - zmin=0, - zmax=1 - )) - fig_tm.update_layout(template="plotly_dark", height=350, xaxis_title="Current Regime", yaxis_title="Previous Regime") + fig_tm = go.Figure( + data=go.Heatmap( + z=transition_matrix.values, + x=transition_matrix.columns, + y=transition_matrix.index, + colorscale="Blues", + zmin=0, + zmax=1, + ) + ) + fig_tm.update_layout( + template="plotly_dark", + height=350, + xaxis_title="Current Regime", + yaxis_title="Previous Regime", + ) st.plotly_chart(fig_tm, use_container_width=True) st.subheader("Transition Impact") if transition_stats.empty: st.info("Not enough data to compute transition performance.") else: - st.dataframe(transition_stats.style.format({ - "Mean": "{:.2%}", - "Sharpe": "{:.2f}", - "WinRate": "{:.1%}", - "CAGR": "{:.2%}", - "Count": "{:.0f}", - })) + st.dataframe( + transition_stats.style.format( + { + "Mean": "{:.2%}", + "Sharpe": "{:.2f}", + "WinRate": "{:.1%}", + "CAGR": "{:.2%}", + "Count": "{:.0f}", + } + ) + ) st.subheader("Regime Sensitivity") c_s1, c_s2, c_s3 = st.columns(3) @@ -820,7 +954,7 @@ def get_cache_key(*args) -> str: sweep_sharpe, aspect="auto", color_continuous_scale="RdYlGn", - title="Sharpe by SMA Window and Regime" + title="Sharpe by SMA Window and Regime", ) fig_sweep.update_layout(template="plotly_dark", height=350) st.plotly_chart(fig_sweep, use_container_width=True) @@ -828,146 +962,192 @@ def get_cache_key(*args) -> str: # --- TAB 3: BACKTEST --- with tab_bt: st.subheader("Strategy Simulation") - + # Out-of-sample mode indicator if use_oos: - st.success("๐Ÿ”ฌ **Out-of-Sample Mode Active** - Regime classification uses only past data at each point") + st.success( + "๐Ÿ”ฌ **Out-of-Sample Mode Active** - Regime classification uses only past data at each point" + ) if not res_df.empty: - + # 1. Global Metrics with Bootstrap CI strat_metrics = backtester.calculate_perf_metrics( - res_df['Equity_Strategy'], - include_bootstrap_ci=True, - n_bootstrap=500 + res_df["Equity_Strategy"], include_bootstrap_ci=True, n_bootstrap=500 ) - bench_metrics = backtester.calculate_perf_metrics(res_df['Equity_Benchmark']) - + bench_metrics = backtester.calculate_perf_metrics(res_df["Equity_Benchmark"]) + col_m1, col_m2, col_m3, col_m4 = st.columns(4) col_m1.metric("Global CAGR", f"{strat_metrics['CAGR']:.2%}") - + # Show Sharpe with CI if available sharpe_display = f"{strat_metrics['Sharpe']:.2f}" - if strat_metrics.get('Sharpe_CI_Lower') is not None: - sharpe_display += f" [{strat_metrics['Sharpe_CI_Lower']:.2f}, {strat_metrics['Sharpe_CI_Upper']:.2f}]" + if strat_metrics.get("Sharpe_CI_Lower") is not None: + sharpe_display += ( + f" [{strat_metrics['Sharpe_CI_Lower']:.2f}, {strat_metrics['Sharpe_CI_Upper']:.2f}]" + ) col_m2.metric("Sharpe (95% CI)", sharpe_display) - + col_m3.metric("Max Drawdown", f"{strat_metrics['MaxDD']:.2%}") col_m4.metric("Max DD Duration", f"{strat_metrics.get('MaxDD_Duration', 0)} days") - + # Additional metrics row col_a1, col_a2, col_a3, col_a4 = st.columns(4) col_a1.metric("Sortino", f"{strat_metrics.get('Sortino', 0):.2f}") col_a2.metric("Calmar", f"{strat_metrics.get('Calmar', 0):.2f}") col_a3.metric("Win Rate", f"{strat_metrics.get('WinRate', 0):.1%}") col_a4.metric("Avg DD Duration", f"{strat_metrics.get('AvgDD_Duration', 0):.0f} days") - + # 2. Equity Curve fig_eq = go.Figure() - fig_eq.add_trace(go.Scatter(x=res_df.index, y=res_df['Equity_Strategy'], name='Trend Strategy', line=dict(color='#00ff00'))) - fig_eq.add_trace(go.Scatter(x=res_df.index, y=res_df['Equity_Benchmark'], name='Buy & Hold', line=dict(color='gray', dash='dot'))) + fig_eq.add_trace( + go.Scatter( + x=res_df.index, + y=res_df["Equity_Strategy"], + name="Trend Strategy", + line={"color": "#00ff00"}, + ) + ) + fig_eq.add_trace( + go.Scatter( + x=res_df.index, + y=res_df["Equity_Benchmark"], + name="Buy & Hold", + line={"color": "gray", "dash": "dot"}, + ) + ) fig_eq.update_layout(title="Equity Curve", template="plotly_dark", height=400) st.plotly_chart(fig_eq, use_container_width=True) - + # 3. Drawdown Chart with st.expander("๐Ÿ“‰ Drawdown Analysis", expanded=False): fig_dd = go.Figure() - fig_dd.add_trace(go.Scatter( - x=res_df.index, y=res_df['DD_Strategy'] * 100, - name='Strategy Drawdown', fill='tozeroy', - line=dict(color='#ff4b4b') - )) - fig_dd.add_trace(go.Scatter( - x=res_df.index, y=res_df['DD_Benchmark'] * 100, - name='Benchmark Drawdown', - line=dict(color='gray', dash='dot') - )) + fig_dd.add_trace( + go.Scatter( + x=res_df.index, + y=res_df["DD_Strategy"] * 100, + name="Strategy Drawdown", + fill="tozeroy", + line={"color": "#ff4b4b"}, + ) + ) + fig_dd.add_trace( + go.Scatter( + x=res_df.index, + y=res_df["DD_Benchmark"] * 100, + name="Benchmark Drawdown", + line={"color": "gray", "dash": "dot"}, + ) + ) fig_dd.update_layout( title="Underwater Equity (Drawdown %)", yaxis_title="Drawdown (%)", template="plotly_dark", - height=300 + height=300, ) st.plotly_chart(fig_dd, use_container_width=True) - + # 4. Conditional Analysis st.markdown("### ๐Ÿ”ฌ Conditional Performance by Regime") st.info("Does the strategy outperform during High Volatility?") - + # Merge - comparison = pd.concat([cond_stats.add_suffix('_Strat'), bench_cond.add_suffix('_Bench')], axis=1) - + comparison = pd.concat( + [cond_stats.add_suffix("_Strat"), bench_cond.add_suffix("_Bench")], axis=1 + ) + # Reorder columns - handle missing columns gracefully available_cols = [] - for col in ['Ann_Return_Strat', 'Ann_Return_Bench', 'Sharpe_Strat', 'Sharpe_Bench', 'WinRate_Strat']: + for col in [ + "Ann_Return_Strat", + "Ann_Return_Bench", + "Sharpe_Strat", + "Sharpe_Bench", + "WinRate_Strat", + ]: if col in comparison.columns: available_cols.append(col) comparison = comparison[available_cols] - - st.dataframe(comparison.style.background_gradient(cmap='RdYlGn', subset=['Ann_Return_Strat', 'Sharpe_Strat']).format("{:.2f}")) - - st.markdown("**Key Insight:** Compare 'Sharpe_Strat' vs 'Sharpe_Bench' in the **High** volatility row.") - + + st.dataframe( + comparison.style.background_gradient( + cmap="RdYlGn", subset=["Ann_Return_Strat", "Sharpe_Strat"] + ).format("{:.2f}") + ) + + st.markdown( + "**Key Insight:** Compare 'Sharpe_Strat' vs 'Sharpe_Bench' in the **High** volatility row." + ) + # 5. Walk-Forward Validation (Advanced) with st.expander("๐Ÿš€ Walk-Forward Validation (Advanced)", expanded=False): st.markdown(""" - Walk-forward validation splits data into rolling train/test windows to evaluate + Walk-forward validation splits data into rolling train/test windows to evaluate out-of-sample performance. This is more rigorous than a single full-sample backtest. """) - + wf_col1, wf_col2 = st.columns(2) - wf_train = wf_col1.number_input("Training Window (months)", value=24, min_value=6, max_value=60) - wf_test = wf_col2.number_input("Test Window (months)", value=6, min_value=1, max_value=12) - + wf_train = wf_col1.number_input( + "Training Window (months)", value=24, min_value=6, max_value=60 + ) + wf_test = wf_col2.number_input( + "Test Window (months)", value=6, min_value=1, max_value=12 + ) + if st.button("Run Walk-Forward Analysis"): with st.spinner("Running walk-forward validation..."): wf_results = backtester.walk_forward_backtest( - df, 'Signal_Trend', + df, + "Signal_Trend", train_months=wf_train, test_months=wf_test, cost_bps=bt_cost, - rebalance_freq='M' + rebalance_freq="M", ) - + if wf_results: st.success(f"โœ… Completed {wf_results['n_periods']} walk-forward periods") - - wf_summary = wf_results['summary'] + + wf_summary = wf_results["summary"] wf_c1, wf_c2, wf_c3 = st.columns(3) wf_c1.metric("OOS CAGR", f"{wf_summary.get('CAGR', 0):.2%}") wf_c2.metric("OOS Sharpe", f"{wf_summary.get('Sharpe', 0):.2f}") wf_c3.metric("OOS Max DD", f"{wf_summary.get('MaxDD', 0):.2%}") - + # Show per-period results st.markdown("#### Per-Period Results") period_data = [] - for p in wf_results['periods']: - period_data.append({ - 'Test Period': f"{p['test_start']} to {p['test_end']}", - 'CAGR': p['metrics'].get('CAGR', 0), - 'Sharpe': p['metrics'].get('Sharpe', 0), - 'MaxDD': p['metrics'].get('MaxDD', 0) - }) - st.dataframe(pd.DataFrame(period_data).style.format({ - 'CAGR': '{:.2%}', - 'Sharpe': '{:.2f}', - 'MaxDD': '{:.2%}' - })) + for p in wf_results["periods"]: + period_data.append( + { + "Test Period": f"{p['test_start']} to {p['test_end']}", + "CAGR": p["metrics"].get("CAGR", 0), + "Sharpe": p["metrics"].get("Sharpe", 0), + "MaxDD": p["metrics"].get("MaxDD", 0), + } + ) + st.dataframe( + pd.DataFrame(period_data).style.format( + {"CAGR": "{:.2%}", "Sharpe": "{:.2f}", "MaxDD": "{:.2%}"} + ) + ) else: - st.warning("Insufficient data for walk-forward validation with current settings.") + st.warning( + "Insufficient data for walk-forward validation with current settings." + ) # --- TAB 4: REPORT --- with tab_rep: st.subheader("Research Note Generation") - + st.markdown("### Findings Summary") st.write(f"**Asset**: {ticker}") st.write(f"**Trend Model**: {sma_window}-Day SMA") - + if not res_df.empty: # Create text summary - high_vol_perf = cond_stats.loc['High', 'Sharpe'] if 'High' in cond_stats.index else 0 - normal_vol_perf = cond_stats.loc['Normal', 'Sharpe'] if 'Normal' in cond_stats.index else 0 + high_vol_perf = cond_stats.loc["High", "Sharpe"] if "High" in cond_stats.index else 0 + normal_vol_perf = cond_stats.loc["Normal", "Sharpe"] if "Normal" in cond_stats.index else 0 transition_risk = "N/A" if not transition_stats.empty and "Sharpe" in transition_stats.columns: @@ -978,16 +1158,18 @@ def get_cache_key(*args) -> str: sweep_std = sweep_df.groupby("Regime")["Sharpe"].std().dropna() if not sweep_std.empty: sweep_stability = ", ".join([f"{k}: {v:.2f}" for k, v in sweep_std.items()]) - + st.success(f"Strategy Sharpe in High Vol: **{high_vol_perf:.2f}**") st.info(f"Strategy Sharpe in Normal Vol: **{normal_vol_perf:.2f}**") - st.write(f"**Regime Sensitivity (Sharpe High - Normal)**: {regime_sensitivity.get('Sharpe_Diff', np.nan):.2f}") + st.write( + f"**Regime Sensitivity (Sharpe High - Normal)**: {regime_sensitivity.get('Sharpe_Diff', np.nan):.2f}" + ) st.write(f"**Top Transition Risk**: {transition_risk}") st.write(f"**Sweep Stability (Sharpe Std)**: {sweep_stability}") - + st.download_button( label="Download Full Research Data (CSV)", - data=res_df.to_csv().encode('utf-8'), + data=res_df.to_csv().encode("utf-8"), file_name=f"{ticker}_research_data.csv", - mime="text/csv" + mime="text/csv", ) diff --git a/src/import_prices.py b/src/import_prices.py index 9bfcab9..b6b0238 100644 --- a/src/import_prices.py +++ b/src/import_prices.py @@ -5,34 +5,34 @@ def analyze_stock(ticker_symbol, period="1y"): Demonstrates using the new modular architecture. """ print(f"\n--- Analyzing {ticker_symbol} ---") - + # 1. Use the data model df = data_model.fetch_stock_data(ticker_symbol, period=period) - + if df.empty: print(f"No data found for {ticker_symbol}") return None # 2. Use the signals module df = signals.add_technical_indicators(df) - + latest = df.iloc[-1] - + print(f"Current Price: ${latest['Close']:.2f}") print(f"50-Day SMA: ${latest['SMA_50']:.2f}") print(f"Momentum: {latest['Momentum_12M_1M']:.2%}") print(f"RSI (14): {latest['RSI_14']:.2f}") - + if latest['Close'] > latest['SMA_50']: print("Trend: BULLISH ๐Ÿ‚") else: print("Trend: BEARISH ๐Ÿป") - + return df if __name__ == "__main__": # Add your favorite stocks here! portfolio = ["AAPL", "MSFT", "GOOGL", "TSLA"] - + for ticker in portfolio: analyze_stock(ticker) \ No newline at end of file diff --git a/src/modules/backtester.py b/src/modules/backtester.py index 949aaed..c523c2a 100644 --- a/src/modules/backtester.py +++ b/src/modules/backtester.py @@ -39,7 +39,7 @@ class PerformanceMetrics: avg_dd_duration: float = 0.0 # Days calmar: float = 0.0 win_rate: float = 0.0 - + def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for backward compatibility.""" return { @@ -58,47 +58,47 @@ def to_dict(self) -> Dict[str, Any]: def bootstrap_sharpe_ci( - returns: pd.Series, - n_bootstrap: int = 1000, + returns: pd.Series, + n_bootstrap: int = 1000, confidence_level: float = 0.95, random_state: Optional[int] = None ) -> Tuple[float, float]: """ Calculate bootstrap confidence interval for Sharpe ratio. - + Args: returns: Series of daily returns. n_bootstrap: Number of bootstrap samples. confidence_level: Confidence level (e.g., 0.95 for 95% CI). random_state: Random seed for reproducibility. - + Returns: Tuple of (lower_bound, upper_bound) for the CI. """ if len(returns) < 30: logger.warning("Insufficient data for reliable bootstrap CI (n < 30)") return (np.nan, np.nan) - + rng = np.random.default_rng(random_state) sharpes = [] - + returns_arr = returns.dropna().values n = len(returns_arr) - + for _ in range(n_bootstrap): sample = rng.choice(returns_arr, size=n, replace=True) sample_std = sample.std() if sample_std > 0: sample_sharpe = sample.mean() / sample_std * np.sqrt(TRADING_DAYS_PER_YEAR) sharpes.append(sample_sharpe) - + if not sharpes: return (np.nan, np.nan) - + alpha = 1 - confidence_level lower = np.percentile(sharpes, alpha / 2 * 100) upper = np.percentile(sharpes, (1 - alpha / 2) * 100) - + logger.debug(f"Bootstrap Sharpe CI ({confidence_level:.0%}): [{lower:.3f}, {upper:.3f}]") return (lower, upper) @@ -106,23 +106,23 @@ def bootstrap_sharpe_ci( def calculate_drawdown_duration(equity_curve: pd.Series) -> Tuple[int, float]: """ Calculate maximum and average drawdown duration. - + Args: equity_curve: Series of equity values (cumulative returns). - + Returns: Tuple of (max_duration_days, avg_duration_days). """ if equity_curve.empty: return (0, 0.0) - + rolling_max = equity_curve.cummax() underwater = equity_curve < rolling_max - + # Find contiguous underwater periods underwater_periods = [] current_duration = 0 - + for is_underwater in underwater: if is_underwater: current_duration += 1 @@ -130,36 +130,36 @@ def calculate_drawdown_duration(equity_curve: pd.Series) -> Tuple[int, float]: if current_duration > 0: underwater_periods.append(current_duration) current_duration = 0 - + # Don't forget the last period if still underwater if current_duration > 0: underwater_periods.append(current_duration) - + if not underwater_periods: return (0, 0.0) - + max_duration = max(underwater_periods) avg_duration = sum(underwater_periods) / len(underwater_periods) - + logger.debug(f"Drawdown durations: max={max_duration} days, avg={avg_duration:.1f} days") return (max_duration, avg_duration) def calculate_perf_metrics( - equity_curve: pd.Series, + equity_curve: pd.Series, freq: int = TRADING_DAYS_PER_YEAR, include_bootstrap_ci: bool = False, n_bootstrap: int = 1000 ) -> Dict[str, Any]: """ Calculate comprehensive performance metrics. - + Args: equity_curve: Series of equity values (starting from 1.0). freq: Trading days per year for annualization. include_bootstrap_ci: Whether to compute bootstrap CI for Sharpe. n_bootstrap: Number of bootstrap samples. - + Returns: Dictionary of performance metrics. """ @@ -169,7 +169,7 @@ def calculate_perf_metrics( # Returns daily_rets = equity_curve.pct_change().dropna() - + # Total Time try: years = (equity_curve.index[-1] - equity_curve.index[0]).days / 365.25 @@ -205,13 +205,13 @@ def calculate_perf_metrics( rolling_max = equity_curve.cummax() drawdown = (equity_curve - rolling_max) / rolling_max max_dd = drawdown.min() - + # Drawdown Duration max_dd_duration, avg_dd_duration = calculate_drawdown_duration(equity_curve) - + # Calmar Ratio calmar = cagr / abs(max_dd) if max_dd != 0 else 0 - + # Win Rate (Daily) win_rate = (daily_rets > 0).mean() @@ -235,20 +235,20 @@ def calculate_perf_metrics( def run_backtest( - df: pd.DataFrame, - signal_col: str, - cost_bps: float = 0.0010, + df: pd.DataFrame, + signal_col: str, + cost_bps: float = 0.0010, rebalance_freq: Literal['D', 'W', 'M'] = 'M' ) -> pd.DataFrame: """ Run a vectorized backtest based on a signal column. - + Args: df: DataFrame with date index, 'Close', 'Daily_Return' and the signal column. signal_col: Name of column with 1 (Long), 0 (Cash), -1 (Short). cost_bps: Cost per trade (e.g., 0.0010 for 10bps). rebalance_freq: 'D' for daily, 'W' for weekly, 'M' for monthly. - + Returns: DataFrame with backtest results including equity curves and drawdowns. """ @@ -257,14 +257,14 @@ def run_backtest( return pd.DataFrame() logger.info(f"Running backtest: signal={signal_col}, cost={cost_bps*10000:.0f}bps, freq={rebalance_freq}") - + bt_df = df.copy() - + # 1. Signal Processing if rebalance_freq == 'D': # Daily Rebalance: Position today is determined by Signal yesterday bt_df['Position'] = bt_df[signal_col].shift(1).fillna(0) - + elif rebalance_freq == 'W': # Weekly Rebalance bt_df['Period'] = bt_df.index.to_period('W') @@ -272,7 +272,7 @@ def run_backtest( weekly_positions = weekly_signals.shift(1) bt_df['Position'] = bt_df['Period'].map(weekly_positions) bt_df['Position'] = bt_df['Position'].fillna(0) - + elif rebalance_freq == 'M': # Monthly Rebalance bt_df['Period'] = bt_df.index.to_period('M') @@ -283,65 +283,65 @@ def run_backtest( else: logger.error(f"Invalid rebalance frequency: {rebalance_freq}") return pd.DataFrame() - + # 2. Strategy Returns bt_df['Strategy_Return'] = bt_df['Position'] * bt_df['Daily_Return'] - + # 3. Transaction Costs bt_df['Position_Change'] = bt_df['Position'].diff().abs().fillna(0) bt_df['Cost'] = bt_df['Position_Change'] * cost_bps bt_df['Strategy_Net_Return'] = bt_df['Strategy_Return'] - bt_df['Cost'] - + # 4. Equity Curves bt_df['Equity_Benchmark'] = (1 + bt_df['Daily_Return']).cumprod() bt_df['Equity_Strategy'] = (1 + bt_df['Strategy_Net_Return']).cumprod() - + # 5. Drawdown Curves bt_df['DD_Benchmark'] = (bt_df['Equity_Benchmark'] / bt_df['Equity_Benchmark'].cummax()) - 1 bt_df['DD_Strategy'] = (bt_df['Equity_Strategy'] / bt_df['Equity_Strategy'].cummax()) - 1 - + logger.info( f"Backtest complete: {len(bt_df)} days, " f"Final equity: {bt_df['Equity_Strategy'].iloc[-1]:.2f}" ) - + return bt_df def calculate_conditional_stats( - df: pd.DataFrame, - strategy_col: str, + df: pd.DataFrame, + strategy_col: str, regime_col: str ) -> pd.DataFrame: """ Calculate performance stats conditioned on a regime column. - + Args: df: DataFrame with strategy returns and regime column. strategy_col: Column name of strategy returns. regime_col: Column name of regime classification. - + Returns: DataFrame with metrics per regime. """ if df.empty or regime_col not in df.columns: logger.warning(f"Invalid input for conditional stats: missing '{regime_col}'") return pd.DataFrame() - + regimes = df[regime_col].unique() results = [] - + for reg in regimes: subset = df[df[regime_col] == reg][strategy_col] - + if subset.empty: continue - + avg_ret = subset.mean() * TRADING_DAYS_PER_YEAR vol = subset.std() * (TRADING_DAYS_PER_YEAR ** 0.5) sharpe = avg_ret / vol if vol != 0 else 0 win_rate = (subset > 0).mean() - + results.append({ "Regime": reg, "Ann_Return": avg_ret, @@ -350,7 +350,7 @@ def calculate_conditional_stats( "WinRate": win_rate, "Count": len(subset) }) - + logger.debug(f"Conditional stats calculated for {len(results)} regimes") return pd.DataFrame(results).set_index("Regime") @@ -409,11 +409,11 @@ def walk_forward_backtest( ) -> Dict[str, Any]: """ Perform walk-forward validation with rolling training windows. - + This method splits the data into overlapping train/test periods, evaluates the strategy on each out-of-sample test period, and aggregates the results. - + Args: df: DataFrame with date index, 'Close', 'Daily_Return', and signal column. signal_col: Name of column with 1 (Long), 0 (Cash), -1 (Short). @@ -421,7 +421,7 @@ def walk_forward_backtest( test_months: Number of months for test window. cost_bps: Transaction cost in basis points. rebalance_freq: Rebalancing frequency. - + Returns: Dictionary containing: - 'summary': Aggregated performance metrics @@ -431,60 +431,60 @@ def walk_forward_backtest( if df.empty or signal_col not in df.columns: logger.error("Invalid input for walk-forward backtest") return {} - + logger.info( f"Walk-forward validation: train={train_months}m, test={test_months}m" ) - + # Convert to monthly periods for slicing df = df.copy() df['YearMonth'] = df.index.to_period('M') unique_months = df['YearMonth'].unique() - + total_months = len(unique_months) min_required = train_months + test_months - + if total_months < min_required: logger.warning( f"Insufficient data: {total_months} months < {min_required} required" ) return {} - + periods_results: List[Dict[str, Any]] = [] all_oos_returns: List[pd.Series] = [] - + # Walk forward through the data start_idx = 0 while start_idx + min_required <= total_months: # Define train and test periods train_end_idx = start_idx + train_months test_end_idx = train_end_idx + test_months - + train_months_range = unique_months[start_idx:train_end_idx] test_months_range = unique_months[train_end_idx:test_end_idx] - + # Filter data train_mask = df['YearMonth'].isin(train_months_range) test_mask = df['YearMonth'].isin(test_months_range) - + train_df = df[train_mask].copy() test_df = df[test_mask].copy() - + if len(test_df) == 0: break - + # Run backtest on test period only (signal already generated) bt_results = run_backtest( test_df, signal_col, cost_bps=cost_bps, rebalance_freq=rebalance_freq ) - + if bt_results.empty: start_idx += test_months continue - + # Calculate metrics for this period period_metrics = calculate_perf_metrics(bt_results['Equity_Strategy']) - + periods_results.append({ "train_start": str(train_months_range[0]), "train_end": str(train_months_range[-1]), @@ -492,30 +492,30 @@ def walk_forward_backtest( "test_end": str(test_months_range[-1]), "metrics": period_metrics }) - + all_oos_returns.append(bt_results['Strategy_Net_Return']) - + # Slide forward by test_months start_idx += test_months - + if not periods_results: logger.warning("No valid walk-forward periods") return {} - + # Aggregate out-of-sample returns oos_returns = pd.concat(all_oos_returns) oos_equity = (1 + oos_returns).cumprod() - + # Calculate aggregate metrics aggregate_metrics = calculate_perf_metrics( oos_equity, include_bootstrap_ci=True ) - + logger.info( f"Walk-forward complete: {len(periods_results)} periods, " f"OOS Sharpe={aggregate_metrics.get('Sharpe', 0):.2f}" ) - + return { "summary": aggregate_metrics, "periods": periods_results, diff --git a/src/modules/config.py b/src/modules/config.py index 5afd2ae..fddda8a 100644 --- a/src/modules/config.py +++ b/src/modules/config.py @@ -54,7 +54,7 @@ # === Asset Universe === PRESET_UNIVERSE = [ - "SPY", "QQQ", "IWM", "GLD", "TLT", + "SPY", "QQQ", "IWM", "GLD", "TLT", "XLK", "XLE", "BTC-USD", "ETH-USD" ] diff --git a/src/modules/data_model.py b/src/modules/data_model.py index 2ef90b7..c0e7109 100644 --- a/src/modules/data_model.py +++ b/src/modules/data_model.py @@ -21,43 +21,43 @@ @st.cache_data(ttl=CACHE_TTL_SECONDS) def fetch_stock_data( - ticker: str, + ticker: str, period: str = "10y", interval: str = "1d" ) -> pd.DataFrame: """ Fetch historical OHLCV data from Yahoo Finance with caching. - + Args: ticker: The asset symbol (e.g., 'SPY', 'BTC-USD'). period: Time period string - '1y', '5y', '10y', 'max', etc. interval: Data interval - '1d', '1wk', '1mo'. - + Returns: DataFrame with Date index and columns: Open, High, Low, Close, Volume. Returns empty DataFrame on error. """ logger.info(f"Fetching data for {ticker}, period={period}, interval={interval}") - + try: stock = yf.Ticker(ticker) df = stock.history(period=period, interval=interval) - + if df.empty: logger.warning(f"No data returned for {ticker}") return df - + # Ensure index is datetime if not isinstance(df.index, pd.DatetimeIndex): df.index = pd.to_datetime(df.index) - + # Drop timezone if present for consistency if df.index.tz is not None: df.index = df.index.tz_localize(None) - + logger.info(f"Fetched {len(df)} rows for {ticker}") return df - + except Exception as e: logger.error(f"Error fetching data for {ticker}: {e}") st.error(f"Error fetching data for {ticker}: {e}") @@ -128,10 +128,10 @@ def align_volume(data: dict) -> pd.DataFrame: def validate_ticker(ticker: str) -> bool: """ Validate if a ticker symbol exists and has data. - + Args: ticker: The ticker symbol to validate. - + Returns: True if ticker is valid and has data, False otherwise. """ @@ -148,10 +148,10 @@ def validate_ticker(ticker: str) -> bool: def get_ticker_info(ticker: str) -> Optional[dict]: """ Get basic info about a ticker. - + Args: ticker: The ticker symbol. - + Returns: Dictionary with ticker info or None on error. """ diff --git a/src/modules/signals.py b/src/modules/signals.py index 73dd7a9..e4e236f 100644 --- a/src/modules/signals.py +++ b/src/modules/signals.py @@ -18,75 +18,75 @@ def add_technical_indicators( - df: pd.DataFrame, - sma_window: int = 50, - mom_window: int = 12, + df: pd.DataFrame, + sma_window: int = 50, + mom_window: int = 12, vol_window: int = 21 ) -> pd.DataFrame: """ Adds technical indicators to the dataframe. - + Args: df: OHLCV data with 'Close' column. sma_window: Trend lookback in days. mom_window: Momentum lookback in months. vol_window: Volatility lookback in days. - + Returns: DataFrame with added indicator columns. """ if df.empty: return df - + df = df.copy() - + # 1. Trend: Moving Averages df[f'SMA_{sma_window}'] = df['Close'].rolling(window=sma_window).mean() df['SMA_200'] = df['Close'].rolling(window=200).mean() # Standard long-term benchmark - + # 2. Momentum (12-1 Month equivalent) # We approximate months as 21 trading days. # Momentum 12-1 = Return from 12 months ago to 1 month ago. lag_start = TRADING_DAYS_PER_MONTH # Skip most recent month lag_end_custom = mom_window * TRADING_DAYS_PER_MONTH - + df[f'Momentum_{mom_window}M_1M'] = ( df['Close'].shift(lag_start) / df['Close'].shift(lag_end_custom) - 1 ) - + # 3. Volatility (Annualized) df['Daily_Return'] = df['Close'].pct_change() df[f'Vol_{vol_window}d'] = ( - df['Daily_Return'].rolling(window=vol_window).std() + df['Daily_Return'].rolling(window=vol_window).std() * (TRADING_DAYS_PER_YEAR ** 0.5) ) - + # 4. Relative Strength Index (RSI) - Vectorized calculation delta = df['Close'].diff() gain = delta.where(delta > 0, 0.0).rolling(window=DEFAULT_RSI_WINDOW).mean() loss = (-delta.where(delta < 0, 0.0)).rolling(window=DEFAULT_RSI_WINDOW).mean() rs = gain / loss.replace(0, np.nan) # Avoid division by zero df['RSI_14'] = 100 - (100 / (1 + rs)) - + # 5. Distance from SMA (Trend Strength) df['Trend_Strength_Pct'] = ( (df['Close'] - df[f'SMA_{sma_window}']) / df[f'SMA_{sma_window}'] ) - + return df def detect_volatility_regime( - df: pd.DataFrame, - vol_col: str = 'Vol_21d', - quantile_high: float = 0.75, + df: pd.DataFrame, + vol_col: str = 'Vol_21d', + quantile_high: float = 0.75, quantile_low: float = 0.25, use_expanding: bool = False, min_periods: Optional[int] = None ) -> pd.DataFrame: """ Classifies periods into Volatility Regimes (Low, Normal, High). - + Args: df: DataFrame containing the volatility column. vol_col: Name of the volatility column. @@ -97,13 +97,13 @@ def detect_volatility_regime( If False, uses full-sample quantiles (faster, for exploratory analysis). min_periods: Minimum periods required for expanding window calculation. Only used if use_expanding=True. Defaults to MIN_PERIODS_FOR_EXPANDING. - + Returns: DataFrame with 'Vol_Regime' column: 'High' if vol > quantile_high threshold 'Low' if vol < quantile_low threshold 'Normal' otherwise - + Note: When use_expanding=False (default), regime classification uses full-sample quantiles which introduces look-ahead bias. This is acceptable for exploratory @@ -111,23 +111,23 @@ def detect_volatility_regime( """ if df.empty or vol_col not in df.columns: return df - + df = df.copy() - + if min_periods is None: min_periods = MIN_PERIODS_FOR_EXPANDING - + if use_expanding: # OUT-OF-SAMPLE: Expanding window quantiles (no look-ahead bias) # At each point in time, we only use data available up to that point thresh_high = df[vol_col].expanding(min_periods=min_periods).quantile(quantile_high) thresh_low = df[vol_col].expanding(min_periods=min_periods).quantile(quantile_low) - + # Vectorized regime classification with expanding thresholds df['Vol_Regime'] = 'Normal' df.loc[df[vol_col] > thresh_high, 'Vol_Regime'] = 'High' df.loc[df[vol_col] < thresh_low, 'Vol_Regime'] = 'Low' - + # Mark early periods as 'Unknown' where we don't have enough data df.loc[thresh_high.isna(), 'Vol_Regime'] = 'Unknown' else: @@ -135,37 +135,37 @@ def detect_volatility_regime( # Use this for exploratory analysis and visualization thresh_high = df[vol_col].quantile(quantile_high) thresh_low = df[vol_col].quantile(quantile_low) - + conditions = [ (df[vol_col] > thresh_high), (df[vol_col] < thresh_low) ] choices = ['High', 'Low'] - + df['Vol_Regime'] = np.select(conditions, choices, default='Normal') - + return df def detect_volatility_regime_oos( - df: pd.DataFrame, - vol_col: str = 'Vol_21d', - quantile_high: float = 0.75, + df: pd.DataFrame, + vol_col: str = 'Vol_21d', + quantile_high: float = 0.75, quantile_low: float = 0.25, min_periods: int = MIN_PERIODS_FOR_EXPANDING ) -> pd.DataFrame: """ Convenience wrapper for out-of-sample regime detection. - + This function should be used for backtesting to ensure no look-ahead bias. - + Args: df: DataFrame containing the volatility column. vol_col: Name of the volatility column. quantile_high: Percentile threshold for High Volatility. quantile_low: Percentile threshold for Low Volatility. min_periods: Minimum periods required before classification starts. - + Returns: DataFrame with 'Vol_Regime' column using expanding-window quantiles. """ diff --git a/src/modules/signals_advanced.py b/src/modules/signals_advanced.py index d49db0b..12593c2 100644 --- a/src/modules/signals_advanced.py +++ b/src/modules/signals_advanced.py @@ -32,34 +32,34 @@ def calculate_bollinger_bands( ) -> pd.DataFrame: """ Calculate Bollinger Bands for mean reversion signals. - + Args: df: DataFrame with price data. window: Lookback window for moving average. num_std: Number of standard deviations for bands. price_col: Column name for price data. - + Returns: DataFrame with added columns: BB_Middle, BB_Upper, BB_Lower, BB_Width, BB_Position. """ if df.empty or price_col not in df.columns: logger.warning(f"Invalid input for Bollinger Bands: missing '{price_col}'") return df - + df = df.copy() - + # Calculate bands df['BB_Middle'] = df[price_col].rolling(window=window).mean() rolling_std = df[price_col].rolling(window=window).std() df['BB_Upper'] = df['BB_Middle'] + (rolling_std * num_std) df['BB_Lower'] = df['BB_Middle'] - (rolling_std * num_std) - + # Band width (volatility measure) df['BB_Width'] = (df['BB_Upper'] - df['BB_Lower']) / df['BB_Middle'] - + # Price position within bands (-1 at lower, 0 at middle, +1 at upper) df['BB_Position'] = (df[price_col] - df['BB_Middle']) / (df['BB_Upper'] - df['BB_Middle']) - + logger.debug(f"Bollinger Bands calculated: window={window}, std={num_std}") return df @@ -74,12 +74,12 @@ def generate_mean_reversion_signal( ) -> pd.Series: """ Generate mean reversion signal based on RSI and optionally Bollinger Bands. - + Signal Logic: - Buy (1): RSI oversold AND (optionally) price near lower BB - Sell (-1): RSI overbought AND (optionally) price near upper BB - Hold (0): Otherwise - + Args: df: DataFrame with RSI and optionally BB columns. rsi_col: Column name for RSI values. @@ -87,32 +87,32 @@ def generate_mean_reversion_signal( overbought: RSI threshold for overbought condition. use_bollinger: Whether to incorporate Bollinger Band position. bb_position_col: Column name for BB position. - + Returns: Series with signal values: 1 (buy), -1 (sell), 0 (hold). """ if df.empty or rsi_col not in df.columns: logger.error(f"Missing required column: {rsi_col}") return pd.Series(0, index=df.index) - + signal = pd.Series(0, index=df.index) - + # RSI conditions rsi_oversold = df[rsi_col] < oversold rsi_overbought = df[rsi_col] > overbought - + if use_bollinger and bb_position_col in df.columns: # Combine with Bollinger Band position bb_low = df[bb_position_col] < -0.8 # Near lower band bb_high = df[bb_position_col] > 0.8 # Near upper band - + signal[rsi_oversold & bb_low] = 1 # Strong buy signal[rsi_overbought & bb_high] = -1 # Strong sell else: # RSI only signal[rsi_oversold] = 1 signal[rsi_overbought] = -1 - + logger.info(f"Mean reversion signal: {(signal == 1).sum()} buy, {(signal == -1).sum()} sell") return signal @@ -125,30 +125,30 @@ def generate_volatility_breakout_signal( ) -> pd.Series: """ Generate volatility breakout signal. - + Signal Logic: - When volatility spikes above threshold, follow the trend direction - This aims to capture momentum during volatility expansion - + Args: df: DataFrame with volatility data. vol_col: Column name for volatility. vol_threshold_percentile: Percentile threshold for "high" vol. trend_col: Optional column indicating trend (1=up, -1=down). - + Returns: Series with signal values. """ if df.empty or vol_col not in df.columns: logger.error(f"Missing required column: {vol_col}") return pd.Series(0, index=df.index) - + df = df.copy() - + # Expanding threshold (no look-ahead bias) vol_threshold = df[vol_col].expanding(min_periods=60).quantile(vol_threshold_percentile) high_vol = df[vol_col] > vol_threshold - + # Determine trend direction from recent returns if not provided if trend_col is None or trend_col not in df.columns: # Use 5-day return direction @@ -156,15 +156,15 @@ def generate_volatility_breakout_signal( trend = df['_temp_trend'] else: trend = df[trend_col] - + # Signal: follow trend when vol is high signal = pd.Series(0, index=df.index) signal[high_vol] = trend[high_vol] - + # Clean up if '_temp_trend' in df.columns: df.drop('_temp_trend', axis=1, inplace=True) - + logger.info(f"Volatility breakout signal: {(signal != 0).sum()} active days") return signal @@ -177,29 +177,29 @@ def generate_dual_momentum_signal( ) -> pd.Series: """ Generate dual momentum signal (absolute + relative momentum). - + Signal Logic: - Long (1): Positive absolute momentum AND better than benchmark - Cash (0): Otherwise - + This is based on Gary Antonacci's dual momentum research. - + Args: df: DataFrame with momentum column. abs_mom_col: Column for absolute momentum. rel_benchmark_return: Optional benchmark return series for relative momentum. abs_threshold: Threshold for considering momentum "positive". - + Returns: Series with signal values: 1 (long) or 0 (cash). """ if df.empty or abs_mom_col not in df.columns: logger.error(f"Missing required column: {abs_mom_col}") return pd.Series(0, index=df.index) - + # Absolute momentum: is the asset trending up? abs_mom_positive = df[abs_mom_col] > abs_threshold - + if rel_benchmark_return is not None: # Relative momentum: is the asset beating the benchmark? rel_mom_positive = df[abs_mom_col] > rel_benchmark_return @@ -209,7 +209,7 @@ def generate_dual_momentum_signal( # Just absolute momentum signal = pd.Series(0, index=df.index) signal[abs_mom_positive] = 1 - + logger.info(f"Dual momentum signal: {(signal == 1).sum()} long days, {(signal == 0).sum()} cash days") return signal @@ -222,39 +222,39 @@ def generate_composite_signal( ) -> pd.Series: """ Combine multiple signals into a composite signal. - + Args: df: DataFrame (used for index). signals: Dictionary of {name: signal_series}. weights: Optional dictionary of {name: weight}. Defaults to equal weights. threshold: Threshold for composite signal to trigger position. - + Returns: Series with signal values: 1 (long), -1 (short), 0 (neutral). """ if not signals: logger.error("No signals provided") return pd.Series(0, index=df.index) - + # Default to equal weights if weights is None: weights = {name: 1.0 / len(signals) for name in signals} - + # Normalize weights total_weight = sum(weights.values()) weights = {k: v / total_weight for k, v in weights.items()} - + # Calculate weighted average composite = pd.Series(0.0, index=df.index) for name, signal in signals.items(): weight = weights.get(name, 0) composite += signal * weight - + # Convert to discrete signal final_signal = pd.Series(0, index=df.index) final_signal[composite >= threshold] = 1 final_signal[composite <= -threshold] = -1 - + logger.info( f"Composite signal: {(final_signal == 1).sum()} long, " f"{(final_signal == -1).sum()} short, {(final_signal == 0).sum()} neutral" @@ -268,32 +268,32 @@ def calculate_atr( ) -> pd.Series: """ Calculate Average True Range (ATR) for position sizing and stops. - + Args: df: DataFrame with High, Low, Close columns. window: ATR lookback period. - + Returns: Series with ATR values. """ if df.empty or not all(col in df.columns for col in ['High', 'Low', 'Close']): logger.warning("Missing required columns for ATR calculation") return pd.Series(dtype=float, index=df.index if not df.empty else None) - + high = df['High'] low = df['Low'] close = df['Close'] - + # True Range tr1 = high - low tr2 = abs(high - close.shift(1)) tr3 = abs(low - close.shift(1)) - + true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) - + # Average True Range atr = true_range.rolling(window=window).mean() - + logger.debug(f"ATR calculated: window={window}, current={atr.iloc[-1]:.4f}") return atr @@ -307,28 +307,28 @@ def calculate_position_size( ) -> Tuple[int, float]: """ Calculate position size based on ATR volatility. - + Args: account_value: Total account value. risk_per_trade: Fraction of account to risk (e.g., 0.01 for 1%). atr: Current ATR value. atr_multiplier: Multiplier for stop distance (e.g., 2 ATR). price: Current asset price. - + Returns: Tuple of (shares, stop_distance). """ if atr <= 0 or price <= 0: logger.warning("Invalid ATR or price for position sizing") return (0, 0.0) - + risk_amount = account_value * risk_per_trade stop_distance = atr * atr_multiplier - + # Position size = Risk Amount / Risk per Share position_value = risk_amount / (stop_distance / price) shares = int(position_value / price) - + logger.debug( f"Position size: {shares} shares, " f"stop_distance={stop_distance:.2f}, risk=${risk_amount:.2f}" diff --git a/tests/test_backtester.py b/tests/test_backtester.py index 5b569cc..8d4442e 100644 --- a/tests/test_backtester.py +++ b/tests/test_backtester.py @@ -10,7 +10,7 @@ class TestBacktester(unittest.TestCase): - + def setUp(self): """Create test dataframes with known properties.""" dates = pd.date_range(start='2020-01-01', periods=100) @@ -19,7 +19,7 @@ def setUp(self): 'Daily_Return': np.full(100, 0.01), # Constant 1% return 'Signal': np.full(100, 1) # Always Long }, index=dates) - + # Create larger dataset for walk-forward tests dates_long = pd.date_range(start='2018-01-01', periods=1000) self.df_long = pd.DataFrame({ @@ -27,107 +27,107 @@ def setUp(self): 'Daily_Return': np.random.randn(1000) * 0.01, 'Signal': np.where(np.random.rand(1000) > 0.5, 1, 0) }, index=dates_long) - + def test_run_backtest_daily(self): """Test basic daily rebalancing backtest.""" data = self.df.copy() res = backtester.run_backtest(data, 'Signal', cost_bps=0.0, rebalance_freq='D') - + self.assertIn('Strategy_Return', res.columns) self.assertIn('Equity_Strategy', res.columns) self.assertIn('DD_Strategy', res.columns) - + # Since return is positive and we are long, equity should grow self.assertTrue(res['Equity_Strategy'].iloc[-1] > 1.0) - + def test_run_backtest_weekly(self): """Test weekly rebalancing backtest.""" data = self.df.copy() res = backtester.run_backtest(data, 'Signal', cost_bps=0.0, rebalance_freq='W') - + self.assertIn('Equity_Strategy', res.columns) self.assertTrue(res['Equity_Strategy'].iloc[-1] > 1.0) - + def test_run_backtest_monthly(self): """Test monthly rebalancing backtest.""" data = self.df.copy() res = backtester.run_backtest(data, 'Signal', cost_bps=0.0, rebalance_freq='M') - + self.assertIn('Equity_Strategy', res.columns) - + def test_metrics_basic(self): """Test basic performance metrics calculation.""" equity = pd.Series([1.0, 1.1, 1.21], index=pd.date_range('2020-01-01', periods=3)) metrics = backtester.calculate_perf_metrics(equity) - + self.assertIn('CAGR', metrics) self.assertIn('Sharpe', metrics) self.assertIn('MaxDD', metrics) self.assertIn('WinRate', metrics) self.assertTrue(metrics['CAGR'] > 0) - + def test_metrics_with_bootstrap_ci(self): """Test performance metrics with bootstrap CI.""" # Need more data for reliable CI dates = pd.date_range('2020-01-01', periods=100) returns = np.random.randn(100) * 0.01 + 0.001 # Slight positive drift equity = pd.Series((1 + returns).cumprod(), index=dates) - + metrics = backtester.calculate_perf_metrics( equity, include_bootstrap_ci=True, n_bootstrap=100 ) - + self.assertIn('Sharpe_CI_Lower', metrics) self.assertIn('Sharpe_CI_Upper', metrics) - + # CI bounds should exist and be ordered if not np.isnan(metrics['Sharpe_CI_Lower']): self.assertLessEqual(metrics['Sharpe_CI_Lower'], metrics['Sharpe_CI_Upper']) - + def test_drawdown_duration(self): """Test drawdown duration calculation.""" # Create equity curve with known drawdown equity = pd.Series([1.0, 1.1, 1.0, 0.9, 0.85, 0.9, 1.0, 1.1]) - + max_dd, avg_dd = backtester.calculate_drawdown_duration(equity) - + self.assertGreater(max_dd, 0) self.assertGreater(avg_dd, 0) - + def test_bootstrap_sharpe_ci(self): """Test bootstrap Sharpe CI directly.""" np.random.seed(42) returns = pd.Series(np.random.randn(100) * 0.01) - + lower, upper = backtester.bootstrap_sharpe_ci( returns, n_bootstrap=500, confidence_level=0.95, random_state=42 ) - + self.assertFalse(np.isnan(lower)) self.assertFalse(np.isnan(upper)) self.assertLess(lower, upper) - + def test_bootstrap_sharpe_ci_insufficient_data(self): """Test bootstrap CI with insufficient data.""" returns = pd.Series([0.01, 0.02, 0.01]) # Only 3 points - + lower, upper = backtester.bootstrap_sharpe_ci(returns) - + # Should return NaN due to insufficient data self.assertTrue(np.isnan(lower)) self.assertTrue(np.isnan(upper)) - + def test_conditional_stats(self): """Test conditional statistics calculation.""" df = pd.DataFrame({ 'Strategy_Net_Return': np.random.randn(100) * 0.01, 'Vol_Regime': ['High'] * 30 + ['Normal'] * 40 + ['Low'] * 30 }) - + stats = backtester.calculate_conditional_stats( df, 'Strategy_Net_Return', 'Vol_Regime' ) - + self.assertIn('High', stats.index) self.assertIn('Normal', stats.index) self.assertIn('Low', stats.index) @@ -147,24 +147,24 @@ def test_regime_stats(self): self.assertIn('High', stats.index) self.assertIn('Normal', stats.index) self.assertIn('CAGR', stats.columns) - + def test_walk_forward_backtest(self): """Test walk-forward validation.""" # Use the longer dataset result = backtester.walk_forward_backtest( - self.df_long, + self.df_long, 'Signal', train_months=12, test_months=3, cost_bps=0.001, rebalance_freq='M' ) - + self.assertIn('summary', result) self.assertIn('periods', result) self.assertIn('n_periods', result) self.assertGreater(result['n_periods'], 0) - + def test_walk_forward_insufficient_data(self): """Test walk-forward with insufficient data.""" result = backtester.walk_forward_backtest( @@ -173,20 +173,20 @@ def test_walk_forward_insufficient_data(self): train_months=24, test_months=6 ) - + # Should return empty dict due to insufficient data self.assertEqual(result, {}) - + def test_empty_dataframe_handling(self): """Test that functions handle empty dataframes gracefully.""" empty_df = pd.DataFrame() - + result = backtester.run_backtest(empty_df, 'Signal') self.assertTrue(result.empty) - + metrics = backtester.calculate_perf_metrics(pd.Series(dtype=float)) self.assertEqual(metrics, {}) - + if __name__ == '__main__': unittest.main() diff --git a/tests/test_data_model.py b/tests/test_data_model.py index 0fb4573..8c89650 100644 --- a/tests/test_data_model.py +++ b/tests/test_data_model.py @@ -13,7 +13,7 @@ class TestFetchStockData(unittest.TestCase): """Tests for fetch_stock_data function.""" - + @patch('src.modules.data_model.yf.Ticker') def test_successful_fetch(self, mock_ticker): """Test successful data fetch.""" @@ -26,32 +26,32 @@ def test_successful_fetch(self, mock_ticker): 'Close': [102] * 10, 'Volume': [1000000] * 10 }, index=dates) - + mock_ticker_instance = MagicMock() mock_ticker_instance.history.return_value = mock_df mock_ticker.return_value = mock_ticker_instance - + # Clear cache to ensure fresh call data_model.fetch_stock_data.clear() - + result = data_model.fetch_stock_data('TEST', period='1y') - + self.assertFalse(result.empty) self.assertEqual(len(result), 10) self.assertIn('Close', result.columns) - + @patch('src.modules.data_model.yf.Ticker') def test_empty_data_handling(self, mock_ticker): """Test handling of empty data response.""" mock_ticker_instance = MagicMock() mock_ticker_instance.history.return_value = pd.DataFrame() mock_ticker.return_value = mock_ticker_instance - + data_model.fetch_stock_data.clear() result = data_model.fetch_stock_data('INVALID', period='1y') - + self.assertTrue(result.empty) - + @patch('src.modules.data_model.yf.Ticker') def test_timezone_handling(self, mock_ticker): """Test that timezone is removed from index.""" @@ -59,21 +59,21 @@ def test_timezone_handling(self, mock_ticker): mock_df = pd.DataFrame({ 'Close': [100, 101, 102, 103, 104] }, index=dates) - + mock_ticker_instance = MagicMock() mock_ticker_instance.history.return_value = mock_df mock_ticker.return_value = mock_ticker_instance - + data_model.fetch_stock_data.clear() result = data_model.fetch_stock_data('SPY', period='1y') - + # Timezone should be removed self.assertIsNone(result.index.tz) class TestValidateTicker(unittest.TestCase): """Tests for validate_ticker function.""" - + @patch('src.modules.data_model.yf.Ticker') def test_valid_ticker(self, mock_ticker): """Test validation of a valid ticker.""" @@ -81,11 +81,11 @@ def test_valid_ticker(self, mock_ticker): mock_ticker_instance = MagicMock() mock_ticker_instance.info = mock_info mock_ticker.return_value = mock_ticker_instance - + result = data_model.validate_ticker('AAPL') - + self.assertTrue(result) - + @patch('src.modules.data_model.yf.Ticker') def test_invalid_ticker(self, mock_ticker): """Test validation of an invalid ticker.""" @@ -93,24 +93,24 @@ def test_invalid_ticker(self, mock_ticker): mock_ticker_instance = MagicMock() mock_ticker_instance.info = mock_info mock_ticker.return_value = mock_ticker_instance - + result = data_model.validate_ticker('INVALIDTICKER123') - + self.assertFalse(result) - + @patch('src.modules.data_model.yf.Ticker') def test_api_error_handling(self, mock_ticker): """Test handling of API errors.""" mock_ticker.side_effect = Exception("API Error") - + result = data_model.validate_ticker('ERROR') - + self.assertFalse(result) class TestGetTickerInfo(unittest.TestCase): """Tests for get_ticker_info function.""" - + @patch('src.modules.data_model.yf.Ticker') def test_successful_info_fetch(self, mock_ticker): """Test successful ticker info fetch.""" @@ -124,13 +124,13 @@ def test_successful_info_fetch(self, mock_ticker): mock_ticker_instance = MagicMock() mock_ticker_instance.info = mock_info mock_ticker.return_value = mock_ticker_instance - + result = data_model.get_ticker_info('AAPL') - + self.assertIsNotNone(result) self.assertEqual(result['name'], 'Apple Inc.') self.assertEqual(result['sector'], 'Technology') - + @patch('src.modules.data_model.yf.Ticker') def test_missing_info_fields(self, mock_ticker): """Test handling of missing info fields.""" @@ -138,20 +138,20 @@ def test_missing_info_fields(self, mock_ticker): mock_ticker_instance = MagicMock() mock_ticker_instance.info = mock_info mock_ticker.return_value = mock_ticker_instance - + result = data_model.get_ticker_info('TEST') - + self.assertIsNotNone(result) self.assertEqual(result['name'], 'Test Company') self.assertEqual(result['sector'], 'N/A') # Default value - + @patch('src.modules.data_model.yf.Ticker') def test_api_error_returns_none(self, mock_ticker): """Test that API errors return None.""" mock_ticker.side_effect = Exception("API Error") - + result = data_model.get_ticker_info('ERROR') - + self.assertIsNone(result) diff --git a/tests/test_signals.py b/tests/test_signals.py index e629f4d..8b0930d 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -10,22 +10,22 @@ class TestSignals(unittest.TestCase): - + def setUp(self): """Create test dataframes with known properties.""" # Create a dummy dataframe with 300 days of upward trending data dates = pd.date_range(start='2020-01-01', periods=300) prices = np.linspace(100, 200, 300) self.df = pd.DataFrame({'Close': prices}, index=dates) - + def test_sma_calculation(self): """Test that SMA is calculated correctly.""" result = signals.add_technical_indicators(self.df, sma_window=50, mom_window=12) - + self.assertIn('SMA_50', result.columns) # SMA should not be nan at the end self.assertFalse(np.isnan(result['SMA_50'].iloc[-1])) - + # Check logic: In a perfect linear uptrend, Price > SMA self.assertTrue(result['Close'].iloc[-1] > result['SMA_50'].iloc[-1]) @@ -39,7 +39,7 @@ def test_momentum_calculation(self): result = signals.add_technical_indicators(self.df, sma_window=50, mom_window=12) col_name = 'Momentum_12M_1M' self.assertIn(col_name, result.columns) - + # Momentum should be positive for uptrend self.assertTrue(result[col_name].iloc[-1] > 0) @@ -47,7 +47,7 @@ def test_rsi_bounds(self): """Test that RSI stays within 0-100 bounds.""" result = signals.add_technical_indicators(self.df, sma_window=50, mom_window=12) self.assertIn('RSI_14', result.columns) - + valid_rsi = result['RSI_14'].dropna() self.assertTrue((valid_rsi >= 0).all()) self.assertTrue((valid_rsi <= 100).all()) @@ -57,15 +57,15 @@ def test_volatility_regime_in_sample(self): # Create a df with varying volatility dates = pd.date_range('2020-01-01', periods=100) df = pd.DataFrame({'Vol_21d': np.random.rand(100)}, index=dates) - + # Force some high and low values df.iloc[0:10, 0] = 0.01 # Low df.iloc[90:100, 0] = 1.0 # High - + res = signals.detect_volatility_regime( df, 'Vol_21d', 0.8, 0.2, use_expanding=False ) - + self.assertIn('Vol_Regime', res.columns) # Check that we have High, Low, and Normal labels unique_regimes = res['Vol_Regime'].unique() @@ -76,15 +76,15 @@ def test_volatility_regime_out_of_sample(self): """Test out-of-sample regime detection (expanding-window quantiles).""" dates = pd.date_range('2020-01-01', periods=100) df = pd.DataFrame({'Vol_21d': np.random.rand(100)}, index=dates) - + # Force some high and low values df.iloc[0:10, 0] = 0.01 # Low df.iloc[90:100, 0] = 1.0 # High - + res = signals.detect_volatility_regime( df, 'Vol_21d', 0.8, 0.2, use_expanding=True, min_periods=20 ) - + self.assertIn('Vol_Regime', res.columns) # Early periods should be 'Unknown' due to insufficient data self.assertIn('Unknown', res['Vol_Regime'].iloc[:20].values) @@ -93,11 +93,11 @@ def test_volatility_regime_oos_wrapper(self): """Test the convenience wrapper for out-of-sample regime detection.""" dates = pd.date_range('2020-01-01', periods=100) df = pd.DataFrame({'Vol_21d': np.random.rand(100)}, index=dates) - + res = signals.detect_volatility_regime_oos( df, 'Vol_21d', min_periods=20 ) - + self.assertIn('Vol_Regime', res.columns) # Verify it uses expanding window (early periods should be 'Unknown') self.assertIn('Unknown', res['Vol_Regime'].iloc[:20].values) @@ -105,14 +105,13 @@ def test_volatility_regime_oos_wrapper(self): def test_empty_dataframe_handling(self): """Test that functions handle empty dataframes gracefully.""" empty_df = pd.DataFrame() - + result = signals.add_technical_indicators(empty_df) self.assertTrue(result.empty) - + result = signals.detect_volatility_regime(empty_df) self.assertTrue(result.empty) if __name__ == '__main__': unittest.main() - diff --git a/tests/test_signals_advanced.py b/tests/test_signals_advanced.py index 1fb03e3..7ad36ec 100644 --- a/tests/test_signals_advanced.py +++ b/tests/test_signals_advanced.py @@ -13,7 +13,7 @@ class TestBollingerBands(unittest.TestCase): """Tests for Bollinger Bands calculation.""" - + def setUp(self): """Create test data.""" np.random.seed(42) @@ -22,26 +22,26 @@ def setUp(self): returns = np.random.randn(100) * 0.02 prices = 100 * np.cumprod(1 + returns) self.df = pd.DataFrame({'Close': prices}, index=dates) - + def test_bollinger_bands_columns(self): """Test that all BB columns are created.""" result = signals_advanced.calculate_bollinger_bands(self.df) - + self.assertIn('BB_Middle', result.columns) self.assertIn('BB_Upper', result.columns) self.assertIn('BB_Lower', result.columns) self.assertIn('BB_Width', result.columns) self.assertIn('BB_Position', result.columns) - + def test_bollinger_bands_order(self): """Test that upper > middle > lower.""" result = signals_advanced.calculate_bollinger_bands(self.df) - + # After warmup period valid_data = result.iloc[25:] self.assertTrue((valid_data['BB_Upper'] > valid_data['BB_Middle']).all()) self.assertTrue((valid_data['BB_Middle'] > valid_data['BB_Lower']).all()) - + def test_bollinger_bands_width_positive(self): """Test that band width is always positive.""" result = signals_advanced.calculate_bollinger_bands(self.df) @@ -51,7 +51,7 @@ def test_bollinger_bands_width_positive(self): class TestMeanReversionSignal(unittest.TestCase): """Tests for mean reversion signal generation.""" - + def setUp(self): """Create test data with RSI and BB columns.""" dates = pd.date_range(start='2020-01-01', periods=100) @@ -59,13 +59,13 @@ def setUp(self): 'RSI_14': np.linspace(20, 80, 100), # RSI from oversold to overbought 'BB_Position': np.linspace(-1.5, 1.5, 100) # BB from below to above }, index=dates) - + def test_signal_values(self): """Test that signals are in valid range.""" signal = signals_advanced.generate_mean_reversion_signal(self.df) - + self.assertTrue(signal.isin([-1, 0, 1]).all()) - + def test_oversold_buy_signal(self): """Test that oversold conditions generate buy signals.""" # Create oversold data @@ -73,32 +73,32 @@ def test_oversold_buy_signal(self): 'RSI_14': [25, 28, 29], 'BB_Position': [-0.9, -0.85, -0.95] }) - + signal = signals_advanced.generate_mean_reversion_signal( df, oversold=30, overbought=70 ) - + # Should have buy signals self.assertTrue((signal == 1).any()) - + def test_overbought_sell_signal(self): """Test that overbought conditions generate sell signals.""" df = pd.DataFrame({ 'RSI_14': [75, 78, 80], 'BB_Position': [0.9, 0.85, 0.95] }) - + signal = signals_advanced.generate_mean_reversion_signal( df, oversold=30, overbought=70 ) - + # Should have sell signals self.assertTrue((signal == -1).any()) class TestVolatilityBreakoutSignal(unittest.TestCase): """Tests for volatility breakout signal.""" - + def setUp(self): """Create test data with volatility.""" np.random.seed(42) @@ -107,40 +107,40 @@ def setUp(self): 'Close': np.cumprod(1 + np.random.randn(200) * 0.01) * 100, 'Vol_21d': np.abs(np.random.randn(200) * 0.1) + 0.1 }, index=dates) - + # Spike volatility at end self.df.loc[self.df.index[-20:], 'Vol_21d'] = 0.5 - + def test_signal_generation(self): """Test that signal is generated.""" signal = signals_advanced.generate_volatility_breakout_signal(self.df) - + self.assertEqual(len(signal), len(self.df)) self.assertTrue(signal.isin([-1, 0, 1]).all()) class TestDualMomentumSignal(unittest.TestCase): """Tests for dual momentum signal.""" - + def setUp(self): """Create test data with momentum.""" dates = pd.date_range(start='2020-01-01', periods=100) self.df = pd.DataFrame({ 'Momentum_12M_1M': np.linspace(-0.2, 0.3, 100) # -20% to +30% }, index=dates) - + def test_positive_momentum_long(self): """Test that positive momentum generates long signal.""" signal = signals_advanced.generate_dual_momentum_signal(self.df) - + # Positive momentum should have some long signals positive_mom_mask = self.df['Momentum_12M_1M'] > 0 self.assertTrue((signal[positive_mom_mask] == 1).any()) - + def test_negative_momentum_cash(self): """Test that negative momentum is cash.""" signal = signals_advanced.generate_dual_momentum_signal(self.df) - + # Negative momentum should be cash (0) negative_mom_mask = self.df['Momentum_12M_1M'] < 0 self.assertTrue((signal[negative_mom_mask] == 0).all()) @@ -148,71 +148,71 @@ def test_negative_momentum_cash(self): class TestCompositeSignal(unittest.TestCase): """Tests for composite signal generation.""" - + def setUp(self): """Create test signals.""" dates = pd.date_range(start='2020-01-01', periods=10) self.df = pd.DataFrame(index=dates) - + self.signals = { 'trend': pd.Series([1, 1, 1, 0, -1, -1, 1, 1, 0, 0], index=dates), 'momentum': pd.Series([1, 1, 0, 0, 0, -1, 1, 0, 0, 1], index=dates), } - + def test_equal_weight_combination(self): """Test equal weight signal combination.""" signal = signals_advanced.generate_composite_signal( self.df, self.signals, threshold=0.5 ) - + self.assertEqual(len(signal), len(self.df)) self.assertTrue(signal.isin([-1, 0, 1]).all()) - + def test_weighted_combination(self): """Test weighted signal combination.""" weights = {'trend': 0.7, 'momentum': 0.3} signal = signals_advanced.generate_composite_signal( self.df, self.signals, weights=weights, threshold=0.5 ) - + self.assertTrue(signal.isin([-1, 0, 1]).all()) class TestATR(unittest.TestCase): """Tests for ATR calculation.""" - + def setUp(self): """Create OHLC test data.""" np.random.seed(42) dates = pd.date_range(start='2020-01-01', periods=50) close = np.cumprod(1 + np.random.randn(50) * 0.01) * 100 - + self.df = pd.DataFrame({ 'Open': close * (1 + np.random.randn(50) * 0.005), 'High': close * (1 + np.abs(np.random.randn(50) * 0.01)), 'Low': close * (1 - np.abs(np.random.randn(50) * 0.01)), 'Close': close }, index=dates) - + def test_atr_positive(self): """Test that ATR is always positive.""" atr = signals_advanced.calculate_atr(self.df) - + valid_atr = atr.dropna() self.assertTrue((valid_atr > 0).all()) - + def test_atr_window(self): """Test custom ATR window.""" atr_14 = signals_advanced.calculate_atr(self.df, window=14) atr_7 = signals_advanced.calculate_atr(self.df, window=7) - + # Shorter window should have values earlier self.assertTrue(atr_7.first_valid_index() <= atr_14.first_valid_index()) class TestPositionSizing(unittest.TestCase): """Tests for position sizing.""" - + def test_basic_position_size(self): """Test basic position sizing calculation.""" shares, stop = signals_advanced.calculate_position_size( @@ -222,10 +222,10 @@ def test_basic_position_size(self): atr_multiplier=2.0, price=50.0 ) - + self.assertGreater(shares, 0) self.assertEqual(stop, 4.0) # 2 * 2 ATR - + def test_zero_atr_handling(self): """Test handling of zero ATR.""" shares, stop = signals_advanced.calculate_position_size( @@ -234,7 +234,7 @@ def test_zero_atr_handling(self): atr=0, price=50.0 ) - + self.assertEqual(shares, 0) self.assertEqual(stop, 0.0)