Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
</p>

<p align="center">
<strong>Open-source workflow engine for Python</strong><br>
<strong>Orchestration engine for durable workflows</strong><br>
Orchestrate and observe computational workflows defined in plain Python.<br>
Suitable for data pipelines, background tasks, AI agents, etc.
Suitable for data pipelines, background tasks, agentic systems.
</p>

<p align="center">
Expand All @@ -19,7 +19,7 @@

- **Plain Python**: workflows are regular Python functions with decorators - no DSLs, no YAML, no static DAGs.
- **Low latency**: millisecond task startup using warm executor processes.
- **Real-time observability**: watch workflows execute live using the CLI, or in [Coflux Studio](https://studio.coflux.com), with graph visualisation, logs, and results.
- **Real-time observability**: watch workflows execute live using the CLI, or in [Coflux Studio](https://studio.coflux.com), with graph visualisation, logs, metrics, and results.
- **Self-hosted**: you run the server; your data stays in your infrastructure.
- **Workspace inheritance**: branch production into development workspaces and re-run individual steps with real data.

Expand Down Expand Up @@ -118,6 +118,26 @@ def notify(campaign_id):
send_email.submit(r)
```

### Metrics

Record numeric values from tasks and visualise them in Studio:

```python
loss = cf.Metric("loss", group="training")

@cf.task()
def train(epochs):
for epoch in range(epochs):
loss.record(run_epoch(epoch), at=epoch)
```

Track iteration progress with a built-in progress bar:

```python
for item in cf.progress(items):
process(item)
```

### Groups

Organise parallel tasks into groups:
Expand All @@ -141,7 +161,8 @@ cf.log_info("Processing {count} items for {user}", count=42, user="alice")
- **Debouncing**: defer execution until a task stops being called (`defer=True`)
- **Recurrence**: automatically re-execute workflows for polling (`recurrent=True`)
- **Suspense**: pause a task and free resources while waiting (`cf.suspend()`)
- **Worker pools**: auto-launch and manage workers with Docker or process launchers
- **Timeouts**: kill executions that exceed a time limit (`timeout=30`)
- **Worker pools**: auto-launch and manage workers with Docker, process, or Kubernetes launchers

## Getting started

Expand Down
2 changes: 2 additions & 0 deletions adapters/python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Enhancements:
- Adds `timeout` parameter to `@task` and `@workflow` decorators.
- Adds `ExecutionCancelled` and `ExecutionTimeout` exceptions.
- Adds `.poll()` method to `Execution` for checking execution results without blocking (or suspending).
- Adds `memo` parameter to `@task`, `@workflow`, and `@stub` decorators.
- Adds `requires` parameter to `@task` and `@workflow` decorators.

## 0.9.0

Expand Down
5 changes: 5 additions & 0 deletions cli/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Enhancements:

- Updates `server` command to set default project ("default"), and improve Docker lifecycle handling.
- Updates `worker` command to infer adapter (to avoid running `setup`).
- Adds `--type kubernetes` support for `pools create` and `pools update`.
- Adds `pools disable` and `pools enable` commands.
- Adds `pools export` and `pools import` commands.
- Adds `--accepts` flag for pool commands.
- Adds `--requires`, `--memo`/`--no-memo`, `--delay`, and `--retries` flags to `submit`.

## 0.9.0

Expand Down
4 changes: 4 additions & 0 deletions docs/docs/cli_config.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
---
sidebar_label: CLI
---

# CLI configuration

The CLI is configured through a combination of a configuration file, environment variables, and command-line flags. The priority order is: flags > environment variables > config file > defaults.
Expand Down
235 changes: 235 additions & 0 deletions docs/docs/cli_reference.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
---
sidebar_label: CLI
---

# CLI reference

## Global flags

These flags are available on all commands:

| Flag | Short | Default | Description |
|------|-------|---------|-------------|
| `--host` | | `localhost:7777` | Server host |
| `--project` | | | Project ID |
| `--token` | | | Authentication token |
| `--team` | `-t` | | Team ID (for Studio auth) |
| `--workspace` | `-w` | `default` | Workspace name |
| `--output` | `-o` | | Output format (`json`) |
| `--config` | `-c` | `coflux.toml` | Configuration file path |
| `--log-level` | | `info` | Log level (`debug`, `info`, `warn`, `error`) |

## `coflux submit`

Submit a workflow run.

```bash
coflux submit <module/target> [arguments...]
```

Arguments are passed as JSON strings.

| Flag | Description |
|------|-------------|
| `--no-wait` | Submit and exit without waiting for completion |
| `--idempotency-key` | Deduplication key |
| `--requires` | Override requires tags (can be repeated) |
| `--no-requires` | Clear requires |
| `--memo` / `--no-memo` | Override memoisation |
| `--delay` | Override delay (seconds) |
| `--retries` | Override retry limit (0 = no retries) |

```bash
coflux submit myapp/process '"arg1"' '42'
coflux submit --no-wait myapp/long_job '"data"'
coflux submit --requires gpu:A100 --memo myapp/train '"config"'
```

## `coflux runs`

### `coflux runs inspect <run-id>`

Inspect a run (target, status, timestamps, step/execution counts).

| Flag | Description |
|------|-------------|
| `--no-wait` | Return snapshot without waiting |

### `coflux runs result <target>`

Get the result of a run, step, or execution.

Target format: `<run-id>`, `<run-id>:<step>`, or `<run-id>:<step>:<attempt>`.

### `coflux runs rerun <step-id>`

Re-run a step. Step ID format: `<run-id>:<step>`.

| Flag | Description |
|------|-------------|
| `--no-wait` | Re-run and exit without waiting |

### `coflux runs cancel <execution-id>`

Cancel an execution and all its descendants.

## `coflux logs`

```bash
coflux logs <run-id> [<run-id>:<step>:<attempt>]
```

Fetch logs for a run or specific execution.

| Flag | Short | Description |
|------|-------|-------------|
| `--follow` | `-f` | Stream logs in real-time |
| `--from` | | Only include logs after this timestamp (unix ms) |

## `coflux worker`

```bash
coflux worker [modules...]
```

Start a worker.

| Flag | Description |
|------|-------------|
| `--dev` | Development mode (implies `--watch` and `--register`) |
| `--watch` | Watch for file changes and reload |
| `--register` | Register modules with server |
| `--concurrency` | Max concurrent executions (default: CPU count + 4) |
| `--provides` | Features worker provides (e.g., `gpu:A100`) |
| `--accepts` | Tags executions must have |
| `--adapter` | Adapter command |
| `--session` | Session ID (for pool-launched workers) |

## `coflux setup`

Interactive configuration wizard. Creates `coflux.toml`.

| Flag | Description |
|------|-------------|
| `--host` | Server host |
| `--workspace` | Workspace name |
| `--adapter` | Adapter command |
| `--detect` | Auto-detect adapter |

## `coflux server`

Start a local server using Docker.

| Flag | Short | Default | Description |
|------|-------|---------|-------------|
| `--port` | `-p` | `7777` | Server port |
| `--data-dir` | `-d` | `./data` | Data directory |
| `--image` | | | Docker image |
| `--project` | | | Single-project mode |
| `--public-host` | | | Public host (use `%` prefix for subdomain routing) |
| `--no-auth` | | | Disable authentication |
| `--super-token` | | | Super token |
| `--super-token-hash` | | | Pre-hashed super token (SHA-256 hex) |
| `--secret` | | | Server secret for signing service tokens |
| `--team` | | | Allowed team IDs for Studio auth |
| `--launcher` | | | Allowed launcher types (`docker`, `process`, `kubernetes`) |
| `--studio-url` | | | Studio URL |
| `--allow-origin` | | | Allowed CORS origins |

## `coflux login` / `coflux logout`

Authenticate with Coflux Studio using a device authorization flow.

| Flag | Description |
|------|-------------|
| `--no-browser` | Don't open browser automatically (`login` only) |

## `coflux workspaces`

| Command | Description |
|---------|-------------|
| `workspaces list` | List workspaces |
| `workspaces create <name>` | Create a workspace (`--base` for inheritance) |
| `workspaces update` | Update workspace (`--name`, `--base`, `--no-base`) |
| `workspaces pause` | Pause a workspace |
| `workspaces resume` | Resume a paused workspace |
| `workspaces archive` | Archive a workspace |

## `coflux manifests`

| Command | Description |
|---------|-------------|
| `manifests discover <modules...>` | Discover targets without registering |
| `manifests register <modules...>` | Register targets with the server |
| `manifests archive <module>` | Archive a module |
| `manifests inspect` | List registered modules and targets (`--watch`) |

All manifest commands accept `--adapter` to specify the adapter command.

## `coflux pools`

| Command | Description |
|---------|-------------|
| `pools list` | List pools |
| `pools get <name>` | Get pool configuration |
| `pools create <name>` | Create a pool |
| `pools update <name>` | Update a pool |
| `pools delete <name>` | Delete a pool |
| `pools disable <name>` | Disable a pool (drain workers) |
| `pools enable <name>` | Re-enable a pool |
| `pools launches <pool>` | View launched workers (`--watch`) |
| `pools export` | Export pool configs as TOML (`-o`, `--only`) |
| `pools import <file>` | Import pool configs from TOML |

### Pool creation flags

| Flag | Description |
|------|-------------|
| `--type` | Launcher type: `kubernetes`, `docker`, `process` (required) |
| `--set` | Set a field (e.g., `--set image=myapp:latest`, `--set env.KEY=VALUE`) |
| `--modules` | Modules to host |
| `--provides` | Features workers provide |
| `--accepts` | Tags executions must have |

### Pool update flags

| Flag | Description |
|------|-------------|
| `--set` | Set a field |
| `--unset` | Unset a field |
| `--modules` | Modules to host |
| `--provides` / `--no-provides` | Set or clear provides |
| `--accepts` / `--no-accepts` | Set or clear accepts |

See [pools](./pools.md) for launcher-specific fields.

## `coflux tokens`

| Command | Description |
|---------|-------------|
| `tokens list` | List service tokens |
| `tokens create` | Create a token (`--name`, `--workspaces`) |
| `tokens revoke <id>` | Revoke a token |

## `coflux assets`

| Command | Description |
|---------|-------------|
| `assets inspect <id>` | List asset entries (`--match` to filter) |
| `assets download <id>` | Download asset files (`--to`, `--match`, `--force`) |

## `coflux blobs`

### `coflux blobs get <key>`

Retrieve a blob by key. Use `-o` to write to a file (default: stdout).

## `coflux sessions`

### `coflux sessions list`

List active sessions (`--watch` to watch for changes).

## `coflux queue`

Show the execution queue (`--no-watch` for a snapshot).
18 changes: 18 additions & 0 deletions docs/docs/concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ def create_order(user_execution, product_execution):
# ...
```

### Polling

Instead of blocking with `.result()`, you can use `.poll()` to check whether a result is ready without suspending the caller:

```python
execution = slow_task.submit()
while (result := execution.poll(timeout=1)) is None:
print("waiting...")
```

`poll()` returns the result if it's available, or `None` (by default) if the execution is still running. The optional `timeout` parameter specifies how long to wait (in seconds) before returning `None`.

A custom default value can be provided with the `default` keyword argument:

```python
result = execution.poll(default="not ready")
```

### Suspense

A timeout can be imposed on the `.result()` call by surrounding it in a 'suspense' context. See the [suspense](/suspense) page for details.
Expand Down
34 changes: 21 additions & 13 deletions docs/docs/executions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@
The state transitions of an execution can be represented in a diagram:

```mermaid
graph TB;
start((Start))-->queued
queued[Queued]-->cached[Cached]
queued-->assigned
queued-->spawned[Spawned]
queued-->deferred[Deferred]
queued-->cancelled
assigned[Assigned]-->succeeded[Succeeded]
assigned-->failed[Failed]
assigned-->abandoned[Abandoned]
assigned-->suspended[Suspended]
assigned-->cancelled[Cancelled]
flowchart TB
start((Start)) --> queued[Queued]

queued --> cached[Cached]
queued --> spawned[Spawned]
queued --> deferred[Deferred]
queued --> cancelled_q[Cancelled]
queued --> assigned[Assigned]

assigned --> succeeded[Succeeded]
assigned --> failed[Failed]
assigned --> timed_out[Timed out]
assigned --> abandoned[Abandoned]
assigned --> suspended[Suspended]
assigned --> cancelled_a[Cancelled]
assigned --> recurred[Recurred]
```

When an execution is first scheduled, it starts in the _Queued_ state — unless caching is enabled and there is a cache hit, in which case it transitions straight to _Cached_.
Expand All @@ -23,4 +27,8 @@ From the _Queued_ state it will transition to _Assigned_ once it is due and a su

Once assigned, the worker will generally execute it until it succeeds (_Succeeded_) or raises an exception (_Failed_). If contact is lost with a worker for more than the timeout period, the tasks that are running will be marked as _Abandoned_ (we don't know whether they completed successfully). Executions may be _Cancelled_ while they're running (or before they've been assigned). An execution may choose to suspend itself (either explicitly, or from timing out while waiting for another execution to complete) — in this case it will be automatically re-run.

Steps may be configured to automatically retry (from a failed or abandoned state), or they may be re-run manually, in which case a new execution will be started.
If a task has a [timeout](./timeouts.md) configured and exceeds it, the execution transitions to _Timed out_. The process is killed, child executions are cancelled, and any execution waiting on the result receives an `ExecutionTimeout` exception.

A [recurrent](./recurring.md) target that returns `None` transitions to _Recurred_, triggering the next recurrence.

Steps may be configured to automatically retry (from a failed, abandoned, or timed out state), or they may be re-run manually, in which case a new execution will be started.
Loading
Loading