Real-time Dhan market data capture with:
- 4-connection WebSocket streaming for spot, futures, CE options, and PE options
- Parquet tick persistence under
data/ticks/ - A live market dashboard at
/ - A reference-backed moneyflow dashboard at
/moneyflow - DuckDB historical analytics from
market_data.duckdboverlaid with this app's live/parquet data
main.pyloads config, instruments, storage, dashboard, and feed manager.src/feed_manager.pyowns the websocket connections and dynamic option subscriptions.src/storage.pyflushes quotes, OI, and reference ticks to parquet.- Tick files are written under
data/ticks/{YYYY-MM-DD}/.
/serves the lightweight live market dashboard fromsrc/dashboard.py./moneyflowserves the full reference-style moneyflow UI frommoneyflow.htmlthroughsrc/moneyflow_page.py.
- Historical analytics come from
moneyflow.pyagainstmarket_data.duckdb. - Local path/config shims live in
db_paths.pyanddata_service.py. src/reference_moneyflow.pyis the integration layer:- uses DuckDB/reference analytics for historical and baseline calculations
- overlays current live in-memory ticks or parquet replay data for active-session freshness
- returns the field shapes expected by the full moneyflow frontend
src/canonical_moneyflow.pyis still used as the live/parquet mirror layer.
config.yaml: runtime configurationmarket_data.duckdb: historical options/ohlcv analytics databasesector_mapping.json: symbol-to-sector mappingmoneyflow.py: reference moneyflow analytics enginemoneyflow.html: full moneyflow frontendsrc/reference_moneyflow.py: live integration wrappertests/: unit and integration-style tests
- Python 3.10+
- Dhan client id and access token
- Local DuckDB file at
market_data.duckdb
Install dependencies:
pip install -r requirements.txtRuntime configuration is read from config.yaml, not from .env.
Important keys:
dhan:
client_id: "..."
access_token: "..."
storage:
data_dir: "./data"
flush_interval_seconds: 15
retention_days: 7
feed:
options_strike_range: 10
options_max_expiries: 1
atm_refresh_seconds: 60
dashboard:
enabled: true
host: "127.0.0.1"
port: 8765
batch_interval_ms: 500
moneyflow_refresh_ms: 1500
sector_map_path: ""Optional moneyflow-specific environment overrides:
$env:WS_MONEYFLOW_DB_PATH = "C:\path\to\market_data.duckdb"
$env:WS_MONEYFLOW_INTRADAY_DB_PATH = "C:\path\to\live_market.duckdb"
$env:WS_MONEYFLOW_ATM_WINDOW = "10"Default behavior:
WS_MONEYFLOW_DB_PATHdefaults to./market_data.duckdbWS_MONEYFLOW_INTRADAY_DB_PATHdefaults to./data/live_market.duckdb- If no intraday DuckDB exists, the reference engine falls back to the base historical DB
Start the full system:
python main.pyThen open:
- Live dashboard:
http://127.0.0.1:8765/ - Moneyflow dashboard:
http://127.0.0.1:8765/moneyflow
Stop with Ctrl+C.
GET /GET /api/snapshotGET /eventsGET /api/stats
GET /moneyflowGET /api/moneyflow/snapshotGET /api/moneyflow/datesGET /api/moneyflow/expiriesGET /api/moneyflow/allGET /api/moneyflow/summaryGET /api/moneyflow/symbolsGET /api/moneyflow/sectorsGET /api/moneyflow/unusualGET /api/moneyflow/historyGET /api/moneyflow/live-computeGET /api/moneyflow/parquet-computeGET /api/moneyflow/parquet-streamGET /api/moneyflow/prev-zscoresGET /api/ws-live-ohlcvGET /api/ws-live-ohlcv-snapshot
- Uses DuckDB/reference analytics directly from
market_data.duckdb. - Suitable for previous trading dates and history charts.
- Prefers the live in-memory engine when websocket ticks are available.
- Falls back to parquet replay when live in-memory data is unavailable.
- Preserves reference-derived fields such as MFS, z-scores, sector aggregates, and history-compatible symbol rows.
- For the latest available trade date,
/api/ws-live-ohlcvserves current live OHLCV plus aggregated CE/PE OI. - For historical dates, it falls back to the parquet snapshot path.
Run the full test suite:
python -m unittest discover -s tests -p "test_*.py"Notable coverage:
tests/test_dashboard.py: dashboard behavior and SSE/disconnect handlingtests/test_canonical_moneyflow.py: parquet/live canonical parity checkstests/test_reference_moneyflow.py: reference frontend contract, live overlay merge, and OHLCV CE/PE OI snapshot behavior
- The first cold DuckDB moneyflow query can be noticeably slower than subsequent cached queries.
- The first cold parquet replay for a date can also be expensive; repeated calls are much faster because the replay layer caches results.
- Sector mapping is loaded from
sector_mapping.jsonin the repo root when present.