FlowMesh workers subscribe to Redis topics published by the orchestrator, execute the requested task using the appropriate executor (Transformers, TRL, vLLM, RAG, agent pipelines, and more), and persist results plus optional artifacts to either shared storage or the orchestrator via HTTP callbacks.
# Install uv if it is not already available
pip install uv
# Create and activate a virtual environment
uv venv .venv
source .venv/bin/activate
# Sync the worker runtime (baseline inference stack)
uv sync --extra inference
# Optional executors:
# uv sync --extra inference --extra training # enable PPO/DPO/SFT trainers
# uv sync --extra inference --extra rag # enable RAG executor
# uv sync --extra inference --extra agent # enable Agent pipelines
# uv sync --all-extras # install every optional componentexport REDIS_URL="redis://localhost:6379/0"
export RESULTS_DIR=./results_workers
export ORCHESTRATOR_BASE_URL="http://127.0.0.1:8000" # required for HTTP artifact uploads
uv run python worker/main.pyAt startup the worker:
- Collects hardware information and registers itself in Redis.
- Streams heartbeats that include load and power metrics.
- Listens for tasks, selects the right executor, and writes outputs under
RESULTS_DIR/<task_id>/. - Archives
final_model/orfinal_lora/and uploads the bundle when the task requests HTTP artifact delivery.
| Variable | Default | Description |
|---|---|---|
REDIS_URL |
– | Redis connection string (required). |
RESULTS_DIR |
./results_workers |
Root directory for task outputs. |
TASK_TOPIC |
tasks |
Redis pub/sub topic the worker subscribes to. |
HEARTBEAT_INTERVAL_SEC |
30 |
Interval between heartbeats. |
WORKER_ID |
random | Override to pin a stable worker identifier. |
WORKER_TAGS |
empty | Comma-separated tags used by the scheduler. |
LOG_LEVEL |
INFO |
Worker log level. |
WORKER_COST_PER_HOUR |
1.0 |
Hourly cost in USD; reported with heartbeats. |
ORCHESTRATOR_BASE_URL |
empty | Required to build artifact download links when using HTTP results. |
MODEL_ARCHIVE_USE_PIGZ |
1 |
Enable multithreaded pigz compression (set 0/false to disable). |
MODEL_ARCHIVE_COMPRESSION_LEVEL |
6 |
Gzip compression level (0-9). |
MODEL_ARCHIVE_PIGZ_THREADS |
– | Force a specific thread count for pigz; defaults to all CPUs. |
MODEL_ARCHIVE_PIGZ_BIN |
pigz |
Path to the pigz binary. |
MODEL_ARCHIVE_TAR_BIN |
tar |
Tar executable used before compression. |
WORKER_NETWORK_BANDWIDTH_BYTES_PER_SEC |
empty | Throttle HTTP uploads to emulate limited bandwidth. |
The heartbeat TTL is computed automatically as
max(HEARTBEAT_INTERVAL_SEC * 4, 120).
When a task sets
spec.output.destination.type: http, configureORCHESTRATOR_BASE_URLso the worker can upload artifacts toPOST /api/v1/results/{task_id}/filesand expose the generated download link.
- Every task receives a dedicated subdirectory under
RESULTS_DIR. - Executors write their JSON summary to
<task_id>/responses.json. - Training executors produce checkpoints and, when HTTP uploads are enabled,
create
final_model.tar.gzorfinal_lora.tar.gz.
- Stage 1 declares
spec.output.destination.type: http. - The worker keeps a local copy and uploads the archive to the orchestrator.
- The orchestrator serves the bundle at
GET /api/v1/results/{task_id}/files/<archive>. - Downstream stages reference the URL via
${stage.result.final_model_archive_url}(or the LoRA equivalent).
Prefer shared storage? Point both the orchestrator and workers at the same mount and disable HTTP uploads—the executors still emit artifacts locally and templates can reference absolute paths.
- Inspect
worker.logfor executor output and upload diagnostics. - Use
redis-clito checkworker:<worker_id>hashes and heartbeat TTLs. - If a pipeline stalls, confirm the Stage 1 task produced the expected
final_*_archive_urland that the orchestrator results directory contains the uploaded.tar.gz.
- Elastic scaling is controlled by
ENABLE_ELASTIC_SCALING(defaulttrue). The orchestrator can temporarily disable idle workers and reactivate them when backlog grows. - Task merge allows the orchestrator to coalesce duplicate inference/RAG
requests. Workers receive a parent payload with
merge_children, and executors (e.g. vLLM) emit per-child outputs underresult.children; the runner writes each child result to its own directory. - Multi-GPU execution: when multiple GPUs are available, vLLM automatically
sets
tensor_parallel_size, and PPO/DPO/SFT executors launch distributed jobs viatorchrun. Overridetraining.allow_multi_gpu=falseortraining.nproc_per_nodeto constrain world size. - Bandwidth throttling: set
WORKER_NETWORK_BANDWIDTH_BYTES_PER_SECto simulate limited HTTP throughput; the worker reports the value and delays callbacks accordingly.