diff --git a/.gitignore b/.gitignore index dae9a11..3858e46 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ target/ !config_examples/config_freqai.example.json docker-compose-*.yml + +# Workflow results (regenerable via make e2e-*) +results/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ed286c5 --- /dev/null +++ b/Makefile @@ -0,0 +1,157 @@ +# PortfolioBench — Development & Testing Makefile +# Usage: make help + +# ───────────────────── Configuration ───────────────────── + +PYTHON := python3 +FLOWMESH_URL ?= http://localhost:8000 +FLOWMESH_KEY ?= flm-demo-00000000000000000000000000000000 +LUMIDOS_DIR := $(shell cd .. && pwd)/LumidOS + +WORKFLOW_DIR := workflow/examples +DATA_DIR := user_data/data/usstock + +.PHONY: help install install-dev test lint \ + data generate-data \ + workflow-local workflow-echo workflow-inference workflow-ai-analysis \ + benchmark benchmark-quick \ + check-flowmesh check-data check-lumidos \ + clean + +help: ## Show this help + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf " \033[36m%-24s\033[0m %s\n", $$1, $$2}' + +# ───────────────────── Setup ───────────────────── + +install: ## Install PortfolioBench and dependencies + $(PYTHON) -m pip install -e . + cd freqtrade && $(PYTHON) -m pip install -e . + +install-dev: ## Install with test dependencies + $(PYTHON) -m pip install -e ".[dev]" 2>/dev/null || $(PYTHON) -m pip install -e . + cd freqtrade && $(PYTHON) -m pip install -e . + +# ───────────────────── Preflight Checks ───────────────────── + +check-data: ## Verify market data exists + @test -d $(DATA_DIR) && echo "Data dir OK: $$(ls $(DATA_DIR)/*.feather 2>/dev/null | wc -l) feather files" \ + || (echo "No data at $(DATA_DIR). Run: make generate-data" && exit 1) + +check-lumidos: ## Verify LumidOS is available as sibling directory + @test -d $(LUMIDOS_DIR)/adapters/portbench && echo "LumidOS OK at $(LUMIDOS_DIR)" \ + || (echo "LumidOS not found at $(LUMIDOS_DIR)" && exit 1) + +check-flowmesh: ## Verify FlowMesh is reachable + @curl -sf $(FLOWMESH_URL)/healthz >/dev/null 2>&1 \ + && echo "FlowMesh OK at $(FLOWMESH_URL)" \ + || (echo "FlowMesh not reachable at $(FLOWMESH_URL). Start it with: cd ../FlowMesh_dev && make up" && exit 1) + +# ───────────────────── Data ───────────────────── + +generate-data: ## Generate synthetic OHLCV test data for all 119 instruments + portbench generate-data + +data: generate-data ## Alias for generate-data + +download-data: ## Download real market data from Google Drive + portbench download-data --exchange portfoliobench + +# ───────────────────── Tests ───────────────────── + +test: ## Run unit tests + $(PYTHON) -m pytest tests/ -v + +test-workflow: check-data check-lumidos ## Run workflow unit tests (local only, no FlowMesh) + $(PYTHON) -m pytest tests/test_workflow.py -v + +# ───────────────────── Local Workflows (no FlowMesh) ───────────────────── + +workflow-local: check-data check-lumidos ## Run default EMA blend workflow (local only) + portbench workflow $(WORKFLOW_DIR)/ema_ons_blend.json + +workflow-local-rsi: check-data check-lumidos ## Run RSI equal-weight workflow (local only) + portbench workflow $(WORKFLOW_DIR)/rsi_equal_weight.json + +workflow-local-macd: check-data check-lumidos ## Run MACD ONS workflow (local only) + portbench workflow $(WORKFLOW_DIR)/macd_ons_pure.json + +workflow-local-all: check-data check-lumidos ## Run all local workflows + @echo "=== EMA Blend ===" && portbench workflow $(WORKFLOW_DIR)/ema_ons_blend.json --output-json results/ema_blend.json + @echo "" + @echo "=== RSI Equal Weight ===" && portbench workflow $(WORKFLOW_DIR)/rsi_equal_weight.json --output-json results/rsi_equal.json + @echo "" + @echo "=== MACD ONS ===" && portbench workflow $(WORKFLOW_DIR)/macd_ons_pure.json --output-json results/macd_ons.json + @echo "" + @echo "=== Bollinger Blend ===" && portbench workflow $(WORKFLOW_DIR)/bollinger_blend.json --output-json results/bollinger.json + @echo "" + @echo "All local workflows complete. Results in results/" + +# ───────────────────── FlowMesh Workflows (GPU) ───────────────────── + +workflow-echo: check-lumidos check-flowmesh ## Run echo connectivity test via FlowMesh + portbench workflow $(WORKFLOW_DIR)/flowmesh_echo_test.json \ + --flowmesh-url $(FLOWMESH_URL) \ + --flowmesh-key $(FLOWMESH_KEY) + +workflow-inference: check-lumidos check-flowmesh ## Run pure GPU inference via FlowMesh + portbench workflow $(WORKFLOW_DIR)/flowmesh_inference_only.json \ + --flowmesh-url $(FLOWMESH_URL) \ + --flowmesh-key $(FLOWMESH_KEY) \ + --output-json results/inference_only.json + +workflow-ai-analysis: check-data check-lumidos check-flowmesh ## Run full pipeline: local alpha/strategy/portfolio + FlowMesh AI analysis + portbench workflow $(WORKFLOW_DIR)/ema_blend_ai_analysis.json \ + --flowmesh-url $(FLOWMESH_URL) \ + --flowmesh-key $(FLOWMESH_KEY) \ + --output-json results/ai_analysis.json + +# ───────────────────── Benchmarking ───────────────────── + +benchmark-quick: ## Run quick benchmark suite (subset of strategies) + portbench benchmark --quick + +benchmark: ## Run full benchmark suite + portbench benchmark + +benchmark-all: ## Run complete benchmark matrix (all strategies x all asset classes) + portbench benchmark-all + +# ───────────────────── E2E Pipeline ───────────────────── + +e2e-local: check-data check-lumidos ## Run full local E2E pipeline (no FlowMesh) + @echo "================================================================" + @echo "E2E LOCAL PIPELINE" + @echo "================================================================" + @mkdir -p results + portbench workflow $(WORKFLOW_DIR)/ema_ons_blend.json --output-json results/e2e_local.json + @echo "" + @echo "E2E local pipeline PASSED" + @echo "Results: results/e2e_local.json" + +e2e-flowmesh: check-data check-lumidos check-flowmesh ## Run full E2E pipeline through FlowMesh GPU + @echo "================================================================" + @echo "E2E FLOWMESH PIPELINE" + @echo "================================================================" + @mkdir -p results + @echo "Step 1: Echo test..." + portbench workflow $(WORKFLOW_DIR)/flowmesh_echo_test.json \ + --flowmesh-url $(FLOWMESH_URL) --flowmesh-key $(FLOWMESH_KEY) + @echo "" + @echo "Step 2: GPU inference test..." + portbench workflow $(WORKFLOW_DIR)/flowmesh_inference_only.json \ + --flowmesh-url $(FLOWMESH_URL) --flowmesh-key $(FLOWMESH_KEY) \ + --output-json results/e2e_inference.json + @echo "" + @echo "Step 3: Full pipeline (local compute + FlowMesh AI)..." + portbench workflow $(WORKFLOW_DIR)/ema_blend_ai_analysis.json \ + --flowmesh-url $(FLOWMESH_URL) --flowmesh-key $(FLOWMESH_KEY) \ + --output-json results/e2e_ai_analysis.json + @echo "" + @echo "E2E FlowMesh pipeline PASSED" + @echo "Results in results/" + +# ───────────────────── Cleanup ───────────────────── + +clean: ## Remove build artifacts and result files + rm -rf build/ dist/ *.egg-info .pytest_cache results/ + find . -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true diff --git a/freqtrade b/freqtrade index d57e6c5..ab093ff 160000 --- a/freqtrade +++ b/freqtrade @@ -1 +1 @@ -Subproject commit d57e6c5e5bca53cfc63ee10063710bf7e46d8df0 +Subproject commit ab093ff0e1af445f0b8491ea1168c46e1a51b2c0 diff --git a/workflow/__init__.py b/workflow/__init__.py index 6b777fe..a22db87 100644 --- a/workflow/__init__.py +++ b/workflow/__init__.py @@ -1,6 +1,10 @@ """Workflow-driven CLI mode for PortfolioBench. This package provides a CLI mode where the alpha -> strategy -> portfolio -> -backtest pipeline is orchestrated by a LumidStack workflow JSON, executed -locally on a single node via :class:`LocalWorkflowRunner`. +backtest pipeline is orchestrated by a lumid/v1 workflow JSON, executed via +LumidOS's :class:`LocalWorkflowRunner`. + +Stages with ``portbench.*`` templates run locally; stages with ``flowmesh.*`` +templates are dispatched to a FlowMesh instance for GPU-accelerated execution +when ``--flowmesh-url`` is provided. """ diff --git a/workflow/cli_workflow.py b/workflow/cli_workflow.py index 9278e09..da61978 100644 --- a/workflow/cli_workflow.py +++ b/workflow/cli_workflow.py @@ -5,11 +5,17 @@ portbench workflow path/to/workflow.json portbench workflow path/to/workflow.json --output-json results.json + # With FlowMesh GPU backend (via LumidOS): + portbench workflow path/to/workflow.json \\ + --flowmesh-url http://localhost:8000 \\ + --flowmesh-key flm-demo-00000000000000000000000000000000 + The workflow JSON follows the ``lumid/v1`` API version with ``kind: Workflow``. The runner resolves stage dependencies, -executes each pipeline step (alpha -> strategy -> portfolio) via the -:class:`LocalWorkflowRunner` from LumidStack, then backtests the -resulting portfolio weights using PortfolioBench's backtest engine. +executes each pipeline step via the :class:`LocalWorkflowRunner` from +LumidStack. Stages with ``portbench.*`` templates run locally; stages +with ``flowmesh.*`` templates are dispatched to FlowMesh for +GPU-accelerated execution. """ from __future__ import annotations @@ -31,11 +37,13 @@ if os.path.isdir(os.path.join(_FT_ROOT, "freqtrade")) and _FT_ROOT not in sys.path: sys.path.insert(0, _FT_ROOT) -# LumidStack lives alongside PortfolioBench in the parent directory or can -# be installed as a package. Try the sibling-directory layout first. -_LUMIDSTACK_ROOT = os.path.join(os.path.dirname(_PROJECT_ROOT), "LumidStack") -if os.path.isdir(_LUMIDSTACK_ROOT) and _LUMIDSTACK_ROOT not in sys.path: - sys.path.insert(0, _LUMIDSTACK_ROOT) +# LumidOS lives alongside PortfolioBench in the parent directory. +# Try both "LumidOS" and legacy "LumidStack" directory names. +_PARENT = os.path.dirname(_PROJECT_ROOT) +for _sibling_name in ("LumidOS", "LumidStack"): + _sibling = os.path.join(_PARENT, _sibling_name) + if os.path.isdir(_sibling) and _sibling not in sys.path: + sys.path.insert(0, _sibling) logger = logging.getLogger(__name__) @@ -44,6 +52,8 @@ def run_workflow_cli( workflow_file: str, *, output_json: str | None = None, + flowmesh_url: str | None = None, + flowmesh_key: str | None = None, ) -> dict[str, Any]: """Load a workflow JSON, execute via LumidStack, and backtest the output. @@ -53,6 +63,12 @@ def run_workflow_cli( Path to the ``lumid/v1`` workflow JSON. output_json If given, write the backtest metrics to this path as JSON. + flowmesh_url + FlowMesh host URL (e.g. ``http://localhost:8000``). When provided, + ``flowmesh.*`` stage templates are dispatched to this FlowMesh + instance for GPU-accelerated execution. + flowmesh_key + FlowMesh API key (Bearer token). Returns ------- @@ -71,17 +87,38 @@ def run_workflow_cli( if not path.is_file(): raise FileNotFoundError(f"Workflow file not found: {workflow_file}") + # Resolve FlowMesh config from args → env → defaults + fm_url = flowmesh_url or os.environ.get("FLOWMESH_URL") + fm_key = flowmesh_key or os.environ.get("FLOWMESH_API_KEY", "") + use_flowmesh = bool(fm_url) + print("=" * 60) print("PORTFOLIOBENCH — WORKFLOW MODE") print("=" * 60) print(f" Workflow file : {path.resolve()}") + if use_flowmesh: + print(f" FlowMesh URL : {fm_url}") + else: + print(" FlowMesh : disabled (local-only mode)") print() # ── 1. Parse workflow ────────────────────────────────────────────── runner = LocalWorkflowRunner.from_file(path) register_all_handlers(runner) - wf_name = runner.workflow.name or path.stem + # Register FlowMesh handlers if configured + fm_client = None + if use_flowmesh: + from adapters.portbench.flowmesh_stages import register_flowmesh_handlers + + fm_client = register_flowmesh_handlers( + runner, + flowmesh_url=fm_url, + api_key=fm_key, + ) + print(" FlowMesh handlers registered: flowmesh.inference, flowmesh.analysis, flowmesh.echo") + + wf_name = runner.workflow.metadata.name or path.stem # Parse backtest config from the extra spec (domain-specific fields # stored alongside stages in the workflow JSON). @@ -121,9 +158,13 @@ def run_workflow_cli( # ── 3. Execute workflow via LumidStack runner ────────────────────── print() print("=" * 60) - print("EXECUTING WORKFLOW via LumidStack LocalWorkflowRunner") + print("EXECUTING WORKFLOW via LumidOS") print("=" * 60) - wf_result = runner.run(context=context) + try: + wf_result = runner.run(context=context) + finally: + if fm_client is not None: + fm_client.close() for sname, sres in wf_result.stages.items(): print(f" Stage {sname!r}: {sres.duration_s:.3f}s — {sres.data}") @@ -132,37 +173,58 @@ def run_workflow_cli( prices = context.get("prices") weights = context.get("weights") - if prices is None or weights is None: - raise RuntimeError( - "Workflow did not produce prices and weights in context. " - "Ensure the workflow has alpha, strategy, and portfolio stages." + if prices is not None and weights is not None: + print() + print("=" * 60) + print("BACKTESTING — PortfolioBench") + print("=" * 60) + + result_df = backtest_portfolio(prices, weights, bt_capital) + metrics = compute_metrics(result_df) + + print(f"\n{'=' * 60}") + print("PORTFOLIO RESULTS") + print(f"{'=' * 60}") + print(f" Workflow : {wf_name}") + print(f" Alpha : {context.get('alpha_type', 'n/a')}") + print(f" Strategy : {context.get('strategy_type', 'n/a')}") + print(f" Portfolio : {context.get('portfolio_type', 'n/a')}") + print(f" Initial capital : ${bt_capital:,.2f}") + print(f" Final value : ${result_df['portfolio_value'].iloc[-1]:,.2f}") + print(f" Total return : {metrics['total_return_pct']:.2f}%") + print(f" Annualised return : {metrics['annualised_return_pct']:.2f}%") + print(f" Annualised Sharpe : {metrics['annualised_sharpe']:.4f}") + print(f" Max drawdown : {metrics['max_drawdown_pct']:.2f}%") + print(f" Number of bars : {metrics['n_bars']}") + print(f" Workflow time : {wf_result.total_duration_s:.3f}s") + else: + metrics = {} + logger.info( + "No prices/weights in context — skipping backtest " + "(workflow may be inference-only)." ) - print() - print("=" * 60) - print("BACKTESTING — PortfolioBench") - print("=" * 60) - - result_df = backtest_portfolio(prices, weights, bt_capital) - metrics = compute_metrics(result_df) - - print(f"\n{'=' * 60}") - print("PORTFOLIO RESULTS") - print(f"{'=' * 60}") - print(f" Workflow : {wf_name}") - print(f" Alpha : {context.get('alpha_type', 'n/a')}") - print(f" Strategy : {context.get('strategy_type', 'n/a')}") - print(f" Portfolio : {context.get('portfolio_type', 'n/a')}") - print(f" Initial capital : ${bt_capital:,.2f}") - print(f" Final value : ${result_df['portfolio_value'].iloc[-1]:,.2f}") - print(f" Total return : {metrics['total_return_pct']:.2f}%") - print(f" Annualised return : {metrics['annualised_return_pct']:.2f}%") - print(f" Annualised Sharpe : {metrics['annualised_sharpe']:.4f}") - print(f" Max drawdown : {metrics['max_drawdown_pct']:.2f}%") - print(f" Number of bars : {metrics['n_bars']}") - print(f" Workflow time : {wf_result.total_duration_s:.3f}s") - - # ── 5. Optionally write results to JSON ──────────────────────────── + # ── 5. Collect AI analysis results (if any) ─────────────────────── + ai_results: dict[str, Any] = {} + for key, value in context.items(): + if key.endswith("_analysis") and isinstance(value, str): + ai_results[key] = value + if key.endswith("_response") and isinstance(value, str): + ai_results[key] = value + + if ai_results: + print(f"\n{'=' * 60}") + print("AI ANALYSIS RESULTS") + print(f"{'=' * 60}") + for key, text in ai_results.items(): + print(f"\n [{key}]") + # Print first 500 chars of each AI response + for line in text[:500].split("\n"): + print(f" {line}") + if len(text) > 500: + print(f" ... ({len(text) - 500} more chars)") + + # ── 6. Optionally write results to JSON ──────────────────────────── full_result = { "workflow": wf_name, "alpha_type": context.get("alpha_type"), @@ -172,6 +234,8 @@ def run_workflow_cli( "timeframe": bt_timeframe, "initial_capital": bt_capital, "metrics": metrics, + "ai_results": {k: v[:1000] for k, v in ai_results.items()}, + "flowmesh_enabled": use_flowmesh, "stage_durations": { name: round(sr.duration_s, 4) for name, sr in wf_result.stages.items() }, diff --git a/workflow/examples/ema_blend_ai_analysis.json b/workflow/examples/ema_blend_ai_analysis.json new file mode 100644 index 0000000..8180c35 --- /dev/null +++ b/workflow/examples/ema_blend_ai_analysis.json @@ -0,0 +1,59 @@ +{ + "apiVersion": "lumid/v1", + "kind": "Workflow", + "metadata": { + "name": "ema-blend-ai-analysis", + "annotations": { + "description": "EMA alpha + blended portfolio + AI-powered analysis via FlowMesh GPU inference." + } + }, + "spec": { + "stages": { + "alpha": { + "template": "portbench.alpha", + "params": { + "type": "ema" + } + }, + "strategy": { + "template": "portbench.strategy", + "dependsOn": ["alpha"], + "params": { + "type": "ema_cross" + } + }, + "portfolio": { + "template": "portbench.portfolio", + "dependsOn": ["strategy"], + "params": { + "type": "blend", + "ons": { + "eta": 0.0, + "beta": 1.0, + "delta": 0.125 + }, + "blend_weights": { + "equal": 0.34, + "ons": 0.33, + "signal": 0.33 + } + } + }, + "ai_summary": { + "template": "flowmesh.analysis", + "dependsOn": ["portfolio"], + "params": { + "model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", + "analysis_type": "portfolio_summary", + "max_tokens": 512, + "gpu_memory": "4GB" + } + } + }, + "backtest": { + "pairs": ["BTC/USDT", "ETH/USDT", "SOL/USDT", "XRP/USDT", "MSFT/USD"], + "timeframe": "1d", + "initial_capital": 10000 + } + } +} diff --git a/workflow/examples/flowmesh_echo_test.json b/workflow/examples/flowmesh_echo_test.json new file mode 100644 index 0000000..f35b303 --- /dev/null +++ b/workflow/examples/flowmesh_echo_test.json @@ -0,0 +1,23 @@ +{ + "apiVersion": "lumid/v1", + "kind": "Workflow", + "metadata": { + "name": "flowmesh-echo-test", + "annotations": { + "description": "Connectivity test: submits an echo task to FlowMesh (no GPU required)." + } + }, + "spec": { + "stages": { + "echo": { + "template": "flowmesh.echo", + "params": { + "payload": { + "test": "hello from PortfolioBench", + "timestamp": "auto" + } + } + } + } + } +} diff --git a/workflow/examples/flowmesh_inference_only.json b/workflow/examples/flowmesh_inference_only.json new file mode 100644 index 0000000..96e154c --- /dev/null +++ b/workflow/examples/flowmesh_inference_only.json @@ -0,0 +1,25 @@ +{ + "apiVersion": "lumid/v1", + "kind": "Workflow", + "metadata": { + "name": "flowmesh-inference-only", + "annotations": { + "description": "Pure GPU inference via FlowMesh — no local PortfolioBench stages. Useful for testing the FlowMesh integration end-to-end." + } + }, + "spec": { + "stages": { + "inference": { + "template": "flowmesh.inference", + "params": { + "model": "meta-llama/Llama-3.1-8B-Instruct", + "system_prompt": "You are a helpful financial analyst.", + "prompt": "What are the key risk factors to consider when constructing a multi-asset portfolio that includes cryptocurrencies, US equities, and global indices?", + "temperature": 0.7, + "max_tokens": 512, + "gpu_memory": "16GB" + } + } + } + } +}