Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ target/
!config_examples/config_freqai.example.json

docker-compose-*.yml

# Workflow results (regenerable via make e2e-*)
results/
157 changes: 157 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# PortfolioBench — Development & Testing Makefile
# Usage: make help

# ───────────────────── Configuration ─────────────────────

PYTHON := python3
FLOWMESH_URL ?= http://localhost:8000
FLOWMESH_KEY ?= flm-demo-00000000000000000000000000000000
LUMIDOS_DIR := $(shell cd .. && pwd)/LumidOS

WORKFLOW_DIR := workflow/examples
DATA_DIR := user_data/data/usstock

.PHONY: help install install-dev test lint \
data generate-data \
workflow-local workflow-echo workflow-inference workflow-ai-analysis \
benchmark benchmark-quick \
check-flowmesh check-data check-lumidos \
clean

help: ## Show this help
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf " \033[36m%-24s\033[0m %s\n", $$1, $$2}'

# ───────────────────── Setup ─────────────────────

install: ## Install PortfolioBench and dependencies
$(PYTHON) -m pip install -e .
cd freqtrade && $(PYTHON) -m pip install -e .

install-dev: ## Install with test dependencies
$(PYTHON) -m pip install -e ".[dev]" 2>/dev/null || $(PYTHON) -m pip install -e .
cd freqtrade && $(PYTHON) -m pip install -e .

# ───────────────────── Preflight Checks ─────────────────────

check-data: ## Verify market data exists
@test -d $(DATA_DIR) && echo "Data dir OK: $$(ls $(DATA_DIR)/*.feather 2>/dev/null | wc -l) feather files" \
|| (echo "No data at $(DATA_DIR). Run: make generate-data" && exit 1)

check-lumidos: ## Verify LumidOS is available as sibling directory
@test -d $(LUMIDOS_DIR)/adapters/portbench && echo "LumidOS OK at $(LUMIDOS_DIR)" \
|| (echo "LumidOS not found at $(LUMIDOS_DIR)" && exit 1)

check-flowmesh: ## Verify FlowMesh is reachable
@curl -sf $(FLOWMESH_URL)/healthz >/dev/null 2>&1 \
&& echo "FlowMesh OK at $(FLOWMESH_URL)" \
|| (echo "FlowMesh not reachable at $(FLOWMESH_URL). Start it with: cd ../FlowMesh_dev && make up" && exit 1)

# ───────────────────── Data ─────────────────────

generate-data: ## Generate synthetic OHLCV test data for all 119 instruments
portbench generate-data

data: generate-data ## Alias for generate-data

download-data: ## Download real market data from Google Drive
portbench download-data --exchange portfoliobench

# ───────────────────── Tests ─────────────────────

test: ## Run unit tests
$(PYTHON) -m pytest tests/ -v

test-workflow: check-data check-lumidos ## Run workflow unit tests (local only, no FlowMesh)
$(PYTHON) -m pytest tests/test_workflow.py -v

# ───────────────────── Local Workflows (no FlowMesh) ─────────────────────

workflow-local: check-data check-lumidos ## Run default EMA blend workflow (local only)
portbench workflow $(WORKFLOW_DIR)/ema_ons_blend.json

workflow-local-rsi: check-data check-lumidos ## Run RSI equal-weight workflow (local only)
portbench workflow $(WORKFLOW_DIR)/rsi_equal_weight.json

workflow-local-macd: check-data check-lumidos ## Run MACD ONS workflow (local only)
portbench workflow $(WORKFLOW_DIR)/macd_ons_pure.json

workflow-local-all: check-data check-lumidos ## Run all local workflows
@echo "=== EMA Blend ===" && portbench workflow $(WORKFLOW_DIR)/ema_ons_blend.json --output-json results/ema_blend.json
@echo ""
@echo "=== RSI Equal Weight ===" && portbench workflow $(WORKFLOW_DIR)/rsi_equal_weight.json --output-json results/rsi_equal.json
@echo ""
@echo "=== MACD ONS ===" && portbench workflow $(WORKFLOW_DIR)/macd_ons_pure.json --output-json results/macd_ons.json
@echo ""
@echo "=== Bollinger Blend ===" && portbench workflow $(WORKFLOW_DIR)/bollinger_blend.json --output-json results/bollinger.json
@echo ""
@echo "All local workflows complete. Results in results/"

# ───────────────────── FlowMesh Workflows (GPU) ─────────────────────

workflow-echo: check-lumidos check-flowmesh ## Run echo connectivity test via FlowMesh
portbench workflow $(WORKFLOW_DIR)/flowmesh_echo_test.json \
--flowmesh-url $(FLOWMESH_URL) \
--flowmesh-key $(FLOWMESH_KEY)

workflow-inference: check-lumidos check-flowmesh ## Run pure GPU inference via FlowMesh
portbench workflow $(WORKFLOW_DIR)/flowmesh_inference_only.json \
--flowmesh-url $(FLOWMESH_URL) \
--flowmesh-key $(FLOWMESH_KEY) \
--output-json results/inference_only.json

workflow-ai-analysis: check-data check-lumidos check-flowmesh ## Run full pipeline: local alpha/strategy/portfolio + FlowMesh AI analysis
portbench workflow $(WORKFLOW_DIR)/ema_blend_ai_analysis.json \
--flowmesh-url $(FLOWMESH_URL) \
--flowmesh-key $(FLOWMESH_KEY) \
--output-json results/ai_analysis.json

# ───────────────────── Benchmarking ─────────────────────

benchmark-quick: ## Run quick benchmark suite (subset of strategies)
portbench benchmark --quick

benchmark: ## Run full benchmark suite
portbench benchmark

benchmark-all: ## Run complete benchmark matrix (all strategies x all asset classes)
portbench benchmark-all

# ───────────────────── E2E Pipeline ─────────────────────

e2e-local: check-data check-lumidos ## Run full local E2E pipeline (no FlowMesh)
@echo "================================================================"
@echo "E2E LOCAL PIPELINE"
@echo "================================================================"
@mkdir -p results
portbench workflow $(WORKFLOW_DIR)/ema_ons_blend.json --output-json results/e2e_local.json
@echo ""
@echo "E2E local pipeline PASSED"
@echo "Results: results/e2e_local.json"

e2e-flowmesh: check-data check-lumidos check-flowmesh ## Run full E2E pipeline through FlowMesh GPU
@echo "================================================================"
@echo "E2E FLOWMESH PIPELINE"
@echo "================================================================"
@mkdir -p results
@echo "Step 1: Echo test..."
portbench workflow $(WORKFLOW_DIR)/flowmesh_echo_test.json \
--flowmesh-url $(FLOWMESH_URL) --flowmesh-key $(FLOWMESH_KEY)
@echo ""
@echo "Step 2: GPU inference test..."
portbench workflow $(WORKFLOW_DIR)/flowmesh_inference_only.json \
--flowmesh-url $(FLOWMESH_URL) --flowmesh-key $(FLOWMESH_KEY) \
--output-json results/e2e_inference.json
@echo ""
@echo "Step 3: Full pipeline (local compute + FlowMesh AI)..."
portbench workflow $(WORKFLOW_DIR)/ema_blend_ai_analysis.json \
--flowmesh-url $(FLOWMESH_URL) --flowmesh-key $(FLOWMESH_KEY) \
--output-json results/e2e_ai_analysis.json
@echo ""
@echo "E2E FlowMesh pipeline PASSED"
@echo "Results in results/"

# ───────────────────── Cleanup ─────────────────────

clean: ## Remove build artifacts and result files
rm -rf build/ dist/ *.egg-info .pytest_cache results/
find . -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
8 changes: 6 additions & 2 deletions workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Workflow-driven CLI mode for PortfolioBench.

This package provides a CLI mode where the alpha -> strategy -> portfolio ->
backtest pipeline is orchestrated by a LumidStack workflow JSON, executed
locally on a single node via :class:`LocalWorkflowRunner`.
backtest pipeline is orchestrated by a lumid/v1 workflow JSON, executed via
LumidOS's :class:`LocalWorkflowRunner`.

Stages with ``portbench.*`` templates run locally; stages with ``flowmesh.*``
templates are dispatched to a FlowMesh instance for GPU-accelerated execution
when ``--flowmesh-url`` is provided.
"""
144 changes: 104 additions & 40 deletions workflow/cli_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
portbench workflow path/to/workflow.json
portbench workflow path/to/workflow.json --output-json results.json

# With FlowMesh GPU backend (via LumidOS):
portbench workflow path/to/workflow.json \\
--flowmesh-url http://localhost:8000 \\
--flowmesh-key flm-demo-00000000000000000000000000000000

The workflow JSON follows the ``lumid/v1`` API version with
``kind: Workflow``. The runner resolves stage dependencies,
executes each pipeline step (alpha -> strategy -> portfolio) via the
:class:`LocalWorkflowRunner` from LumidStack, then backtests the
resulting portfolio weights using PortfolioBench's backtest engine.
executes each pipeline step via the :class:`LocalWorkflowRunner` from
LumidStack. Stages with ``portbench.*`` templates run locally; stages
with ``flowmesh.*`` templates are dispatched to FlowMesh for
GPU-accelerated execution.
"""

from __future__ import annotations
Expand All @@ -31,11 +37,13 @@
if os.path.isdir(os.path.join(_FT_ROOT, "freqtrade")) and _FT_ROOT not in sys.path:
sys.path.insert(0, _FT_ROOT)

# LumidStack lives alongside PortfolioBench in the parent directory or can
# be installed as a package. Try the sibling-directory layout first.
_LUMIDSTACK_ROOT = os.path.join(os.path.dirname(_PROJECT_ROOT), "LumidStack")
if os.path.isdir(_LUMIDSTACK_ROOT) and _LUMIDSTACK_ROOT not in sys.path:
sys.path.insert(0, _LUMIDSTACK_ROOT)
# LumidOS lives alongside PortfolioBench in the parent directory.
# Try both "LumidOS" and legacy "LumidStack" directory names.
_PARENT = os.path.dirname(_PROJECT_ROOT)
for _sibling_name in ("LumidOS", "LumidStack"):
_sibling = os.path.join(_PARENT, _sibling_name)
if os.path.isdir(_sibling) and _sibling not in sys.path:
sys.path.insert(0, _sibling)

logger = logging.getLogger(__name__)

Expand All @@ -44,6 +52,8 @@ def run_workflow_cli(
workflow_file: str,
*,
output_json: str | None = None,
flowmesh_url: str | None = None,
flowmesh_key: str | None = None,
) -> dict[str, Any]:
"""Load a workflow JSON, execute via LumidStack, and backtest the output.

Expand All @@ -53,6 +63,12 @@ def run_workflow_cli(
Path to the ``lumid/v1`` workflow JSON.
output_json
If given, write the backtest metrics to this path as JSON.
flowmesh_url
FlowMesh host URL (e.g. ``http://localhost:8000``). When provided,
``flowmesh.*`` stage templates are dispatched to this FlowMesh
instance for GPU-accelerated execution.
flowmesh_key
FlowMesh API key (Bearer token).

Returns
-------
Expand All @@ -71,17 +87,38 @@ def run_workflow_cli(
if not path.is_file():
raise FileNotFoundError(f"Workflow file not found: {workflow_file}")

# Resolve FlowMesh config from args → env → defaults
fm_url = flowmesh_url or os.environ.get("FLOWMESH_URL")
fm_key = flowmesh_key or os.environ.get("FLOWMESH_API_KEY", "")
use_flowmesh = bool(fm_url)

print("=" * 60)
print("PORTFOLIOBENCH — WORKFLOW MODE")
print("=" * 60)
print(f" Workflow file : {path.resolve()}")
if use_flowmesh:
print(f" FlowMesh URL : {fm_url}")
else:
print(" FlowMesh : disabled (local-only mode)")
print()

# ── 1. Parse workflow ──────────────────────────────────────────────
runner = LocalWorkflowRunner.from_file(path)
register_all_handlers(runner)

wf_name = runner.workflow.name or path.stem
# Register FlowMesh handlers if configured
fm_client = None
if use_flowmesh:
from adapters.portbench.flowmesh_stages import register_flowmesh_handlers

fm_client = register_flowmesh_handlers(
runner,
flowmesh_url=fm_url,
api_key=fm_key,
)
print(" FlowMesh handlers registered: flowmesh.inference, flowmesh.analysis, flowmesh.echo")

wf_name = runner.workflow.metadata.name or path.stem

# Parse backtest config from the extra spec (domain-specific fields
# stored alongside stages in the workflow JSON).
Expand Down Expand Up @@ -121,9 +158,13 @@ def run_workflow_cli(
# ── 3. Execute workflow via LumidStack runner ──────────────────────
print()
print("=" * 60)
print("EXECUTING WORKFLOW via LumidStack LocalWorkflowRunner")
print("EXECUTING WORKFLOW via LumidOS")
print("=" * 60)
wf_result = runner.run(context=context)
try:
wf_result = runner.run(context=context)
finally:
if fm_client is not None:
fm_client.close()

for sname, sres in wf_result.stages.items():
print(f" Stage {sname!r}: {sres.duration_s:.3f}s — {sres.data}")
Expand All @@ -132,37 +173,58 @@ def run_workflow_cli(
prices = context.get("prices")
weights = context.get("weights")

if prices is None or weights is None:
raise RuntimeError(
"Workflow did not produce prices and weights in context. "
"Ensure the workflow has alpha, strategy, and portfolio stages."
if prices is not None and weights is not None:
print()
print("=" * 60)
print("BACKTESTING — PortfolioBench")
print("=" * 60)

result_df = backtest_portfolio(prices, weights, bt_capital)
metrics = compute_metrics(result_df)

print(f"\n{'=' * 60}")
print("PORTFOLIO RESULTS")
print(f"{'=' * 60}")
print(f" Workflow : {wf_name}")
print(f" Alpha : {context.get('alpha_type', 'n/a')}")
print(f" Strategy : {context.get('strategy_type', 'n/a')}")
print(f" Portfolio : {context.get('portfolio_type', 'n/a')}")
print(f" Initial capital : ${bt_capital:,.2f}")
print(f" Final value : ${result_df['portfolio_value'].iloc[-1]:,.2f}")
print(f" Total return : {metrics['total_return_pct']:.2f}%")
print(f" Annualised return : {metrics['annualised_return_pct']:.2f}%")
print(f" Annualised Sharpe : {metrics['annualised_sharpe']:.4f}")
print(f" Max drawdown : {metrics['max_drawdown_pct']:.2f}%")
print(f" Number of bars : {metrics['n_bars']}")
print(f" Workflow time : {wf_result.total_duration_s:.3f}s")
else:
metrics = {}
logger.info(
"No prices/weights in context — skipping backtest "
"(workflow may be inference-only)."
)

print()
print("=" * 60)
print("BACKTESTING — PortfolioBench")
print("=" * 60)

result_df = backtest_portfolio(prices, weights, bt_capital)
metrics = compute_metrics(result_df)

print(f"\n{'=' * 60}")
print("PORTFOLIO RESULTS")
print(f"{'=' * 60}")
print(f" Workflow : {wf_name}")
print(f" Alpha : {context.get('alpha_type', 'n/a')}")
print(f" Strategy : {context.get('strategy_type', 'n/a')}")
print(f" Portfolio : {context.get('portfolio_type', 'n/a')}")
print(f" Initial capital : ${bt_capital:,.2f}")
print(f" Final value : ${result_df['portfolio_value'].iloc[-1]:,.2f}")
print(f" Total return : {metrics['total_return_pct']:.2f}%")
print(f" Annualised return : {metrics['annualised_return_pct']:.2f}%")
print(f" Annualised Sharpe : {metrics['annualised_sharpe']:.4f}")
print(f" Max drawdown : {metrics['max_drawdown_pct']:.2f}%")
print(f" Number of bars : {metrics['n_bars']}")
print(f" Workflow time : {wf_result.total_duration_s:.3f}s")

# ── 5. Optionally write results to JSON ────────────────────────────
# ── 5. Collect AI analysis results (if any) ───────────────────────
ai_results: dict[str, Any] = {}
for key, value in context.items():
if key.endswith("_analysis") and isinstance(value, str):
ai_results[key] = value
if key.endswith("_response") and isinstance(value, str):
ai_results[key] = value

if ai_results:
print(f"\n{'=' * 60}")
print("AI ANALYSIS RESULTS")
print(f"{'=' * 60}")
for key, text in ai_results.items():
print(f"\n [{key}]")
# Print first 500 chars of each AI response
for line in text[:500].split("\n"):
print(f" {line}")
if len(text) > 500:
print(f" ... ({len(text) - 500} more chars)")

# ── 6. Optionally write results to JSON ────────────────────────────
full_result = {
"workflow": wf_name,
"alpha_type": context.get("alpha_type"),
Expand All @@ -172,6 +234,8 @@ def run_workflow_cli(
"timeframe": bt_timeframe,
"initial_capital": bt_capital,
"metrics": metrics,
"ai_results": {k: v[:1000] for k, v in ai_results.items()},
"flowmesh_enabled": use_flowmesh,
"stage_durations": {
name: round(sr.duration_s, 4) for name, sr in wf_result.stages.items()
},
Expand Down
Loading
Loading