Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion adapters/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
## 0.11.0

No changes.
Enhancements:

- Adds `cf.Prompt` and `cf.Input` for requesting structured input from users mid-execution (with optional Pydantic model for typed responses, per-run memoisation, and `requires` tags for routing).
- Adds `cf.select` for waiting on the first of multiple handles (executions and/or inputs) to resolve, with optional cancellation of the rest.
- Adds `cf.cancel` (and `.cancel()` on handles) for atomic cancellation of executions and inputs.
- Supports `async def` functions in `@task` and `@workflow` decorators.
- Adds fluent `with_*` methods to `Target` for overriding decorator options at a call site.

Changes:

- Wait-expiry from `cf.suspense(timeout=...)` now raises the standard `TimeoutError`; `ExecutionTimeout` is reserved for executions exceeding their configured `timeout`.

## 0.10.0

Expand Down
5 changes: 4 additions & 1 deletion cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## 0.11.0

No changes.
Enhancements:

- Adds `--drain-timeout` flag to `worker` (default: 2 minutes) for gracefully draining in-flight executions on shutdown or reload. A second signal aborts the drain early; a third forces exit.
- Adds `inputs list`, `inputs inspect`, `inputs respond` and `inputs dismiss` commands for managing input requests from the CLI.

## 0.10.0

Expand Down
14 changes: 14 additions & 0 deletions docs/docs/cli_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ Start a worker.
| `--accepts` | Tags executions must have |
| `--adapter` | Adapter command |
| `--session` | Session ID (for pool-launched workers) |
| `--drain-timeout` | How long to wait for in-flight executions to finish on shutdown or reload (default: `2m`; `0` = wait indefinitely) |

On `SIGINT`/`SIGTERM` the worker stops accepting new executions and drains the in-flight ones (up to `--drain-timeout`). Sending the signal a second time aborts the drain early; a third time forces an immediate exit. The same drain runs between reloads in `--watch` / `--dev` mode.

## `coflux setup`

Expand Down Expand Up @@ -211,6 +214,17 @@ See [pools](./pools.md) for launcher-specific fields.
| `tokens create` | Create a token (`--name`, `--workspaces`) |
| `tokens revoke <id>` | Revoke a token |

## `coflux inputs`

| Command | Description |
|---------|-------------|
| `inputs list` | List pending inputs (`--run` to filter by run ID) |
| `inputs inspect <input-id>` | Show a prompt's template, schema, and current state |
| `inputs respond <input-id> <value>` | Submit a response (value as JSON) |
| `inputs dismiss <input-id>` | Dismiss a prompt |

See [inputs](./inputs.md) for the workflow side of the API.

## `coflux assets`

| Command | Description |
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ Workflows need to be registered with a project and workspace so that they appear

When a workflow is submitted, this initiates a _run_. A run is made up of _steps_, which each correspond to a target to be executed. The target (a workflow or task) can call other tasks, which cause those to be scheduled as steps. Each step has at least one associated _execution_. Steps can be retried (manually or automatically), which will lead to multiple executions being associated with the step.

## Inputs

Tasks can request _input_ from a user mid-execution - for example, an approval before a deployment, or a structured form to fill in. Prompts are responded to from Studio (or the CLI), and the execution resumes once a response is provided. See [inputs](./inputs.md).

## Assets

Executions can persist _assets_ (a collection of files) which can be passed between executions and restored as needed, or viewed in Studio.
178 changes: 178 additions & 0 deletions docs/docs/inputs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Inputs

Tasks and workflows request _input_ from a user mid-execution. This is useful for human-in-the-loop steps - approving a deployment, reviewing flagged content, supplying credentials that aren't safe to store, or any other point where the workflow needs information that isn't available up-front.

A prompt is presented in Studio (or fetched/answered from the CLI). The execution waits for a response, then resumes with the result.

## Defining a prompt

Prompts are defined with `cf.Prompt`. Submitting a prompt returns an `Input` handle (much like submitting a task returns an `Execution` handle):

```python
import coflux as cf

approve = cf.Prompt("Approve deployment of {service}?", title="Deployment")

@cf.workflow()
def deploy(service: str):
approve(service=service) # blocks until approved; raises InputDismissed if dismissed
do_deploy(service)
```

A prompt with no `model` is an _approval_ prompt - it has no payload, just an approve/dismiss decision.

## Templates and placeholders

Prompt templates are rendered as Markdown, with placeholder keys substituted with the values passed as submission. Three placeholder forms control how each value is rendered:

| Form | Rendering |
|------|-----------|
| `{key}` | Inline - renders a formatted value inline. |
| `{{key}}` | Block - renders the value as a block-level element. |
| `{{{key}}}` | Markdown - renders the string as Markdown. |

```python
summarise = cf.Prompt(
"""
## Summarise {meeting}

Transcript:

{{transcript}}
""",
model=Summary,
)

summarise(meeting="Weekly sync", transcript=long_text)
```

Placeholder values are serialised when the prompt is submitted and resolved when it's displayed - so references to other executions, blobs, or assets remain live at render time, following the same behaviour as in log messages.

## Typed inputs (Pydantic models)

To collect structured data, pass a Pydantic model (or any class with `model_json_schema()` and `model_validate()`) as `model`:

```python
from typing import Literal
from pydantic import BaseModel, Field

class Label(BaseModel):
sentiment: Literal["positive", "neutral", "negative"]
topic: Literal["product", "shipping", "support", "other"]
confidence: int = Field(ge=1, le=5)
notes: str | None = None

label = cf.Prompt("Label this feedback:\n\n{text}", model=Label, title="Label")

@cf.workflow()
def annotate(sample_id: str, text: str):
result = label(text=text) # result is a Label instance
store(sample_id, sentiment=result.sentiment, topic=result.topic)
```

The model's JSON schema is used to render a form in Studio, and to validate the response. Primitive types (`str`, `int`, `float`, `bool`, `date`, `time`, `datetime`) can also be specified as the `model`.

## Submitting without blocking

Calling the prompt (`prompt(...)`) blocks the execution until a response arrives. Use `prompt.submit(...)` to get an `Input` handle without blocking, then resolve it later - or combine it with [`cf.select`](./select.md) to wait alongside other handles:

```python
handle = label.submit(text="Shipping was slow but the product is great.")

# ...do other work...

result = handle.result()
```

To check whether a response has arrived without blocking, use `.poll()`. It returns the value if the input has resolved, or `default` (`None` by default) otherwise:

```python
handle = label.submit(text=text)

if (result := handle.poll()) is not None:
process(result)
else:
cf.log_info("Still waiting for a label")
```

`.poll(timeout=...)` will wait up to the given number of seconds for a response before returning `default`.

## Suspending while waiting

A response might take minutes, hours, or days to arrive - holding a worker slot open the whole time is wasteful. Resolving an input inside a [`cf.suspense`](./suspense.md) scope lets the execution suspend while it waits, and be resumed once the response is available:

```python
@cf.workflow()
def annotate(sample_id: str, text: str):
handle = label.submit(text=text)
with cf.suspense():
result = handle.result()
...
```

Because the execution re-runs from the beginning on resumption, any work performed before the suspense block needs to be idempotent - typically by [memoising](./memoizing.md) the tasks it calls. Input memoisation (below) makes the prompt itself safe to re-submit.

## Memoisation

Inputs are automatically memoised within a run by their template and placeholder values, so a prompt that's submitted twice (e.g., after a re-run of a step) returns the same input handle and reuses the existing response.

If a placeholder varies between submissions - say a timestamp shown in the prompt - the auto-generated key will vary with it, and deduplication won't kick in. Set an explicit key with `with_key` to dedupe on a stable subset of the submission:

```python
from datetime import datetime

label.with_key(f"sample-{sample_id}").submit(
text=text,
requested_at=datetime.now().isoformat(), # shown in the prompt; excluded from the key
)
```

## Routing

Prompts can be tagged with `requires` to control who can respond - for example, restricting an approval to a specific role or user. Tags are matched against the responding user's tags in Studio.

```python
deploy_prod = cf.Prompt(
"Deploy {service} to production?",
requires={"role": "release-manager"},
)
```

## Customising the prompt

| Option | Description |
|--------|-------------|
| `title` | A short title shown above the prompt. |
| `actions` | Custom labels for the respond/dismiss buttons as `(respond, dismiss)` (e.g. `("Approve", "Reject")`). |
| `schema` | A raw JSON schema (string or dict) - alternative to `model` when you don't have a Pydantic class. |
| `requires` | Routing tags. |

The fluent methods (`with_key`, `with_initial`, `with_actions`, `with_requires`) return a new `Prompt` with overrides applied, leaving the original unchanged. This makes it easy to define a prompt once and reuse it with per-submission tweaks:

```python
label.with_initial(Label(sentiment="positive", topic="product", confidence=4)).submit(text=text)
```

## Lifecycle

An input is in one of these states:

- **Pending** - submitted, awaiting a response.
- **Responded** - a value (or approval) was provided. `Input.result()` returns it.
- **Dismissed** - the responder explicitly dismissed the prompt. `Input.result()` raises `InputDismissed`.
- **Cancelled** - the input was cancelled programmatically (via `Input.cancel()` or `cf.cancel([...])`). Distinct from _dismissed_, which represents a deliberate user decision.

Inputs can be cancelled atomically alongside other handles using [`cf.cancel`](./python_reference.md#cancelhandles).

## Responding from the CLI

Inputs can also be listed and responded to from the CLI:

```bash
coflux inputs list # show pending inputs
coflux inputs inspect <input-id> # show a prompt
coflux inputs respond <input-id> <value> # submit a response (JSON)
coflux inputs dismiss <input-id> # dismiss the prompt
```

This is useful for scripting or for systems where inputs are produced by other automation rather than interactive users.
120 changes: 118 additions & 2 deletions docs/docs/python_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ All exports are available from the `coflux` package (`import coflux as cf`).

## Decorators

Both `@workflow` and `@task` accept `async def` functions in addition to regular functions. The coroutine is run to completion by the executor, and the target's return type is the coroutine's resolved value.

### `@workflow`

Defines a workflow — the entry point for a run.
Expand Down Expand Up @@ -104,6 +106,29 @@ Calls the target synchronously — submits for execution and blocks until the re

Submits the target for asynchronous execution and returns an `Execution` handle. The caller can continue other work and retrieve the result later.

### Per-call-site overrides

Each `with_*` method returns a new `Target` with the corresponding decorator-level option overridden, leaving the original target unchanged. This is useful for one-off variations without re-decorating, and the methods can be chained:

```python
my_task.with_retries(3).with_timeout(30).submit(x)
my_task.with_cache(False).submit(x) # disable caching for this call

cached = my_task.with_cache(60) # stash a configured variant
cached.submit(a)
cached.submit(b)
```

| Method | Description |
|--------|-------------|
| `with_cache(cache)` | Override [caching](./caching.md). Pass `False` to disable. |
| `with_retries(retries)` | Override [retries](./retries.md). Pass `0` or `False` to disable. |
| `with_defer(defer)` | Override [defer](./deferring.md) configuration. |
| `with_memo(memo)` | Override [memoisation](./memoizing.md) configuration. |
| `with_delay(delay)` | Override submission delay (seconds or `timedelta`). |
| `with_timeout(timeout)` | Override execution [timeout](./timeouts.md). |
| `with_requires(requires)` | Override worker routing tags. |

## Execution

Returned by `target.submit()`. Represents a running or completed execution, and acts as a future for its result. Can be passed as an argument to other tasks, or returned from a task/workflow.
Expand Down Expand Up @@ -135,6 +160,74 @@ Checks whether a result is ready without suspending the caller. Returns the resu

Cancels the execution and its descendants.

## Input

Returned by `prompt.submit(...)`. Represents a [requested input](./inputs.md), and acts as a future for the response. Like `Execution`, it can be passed to other tasks and only carries an ID across the wire.

### Properties

| Property | Type | Description |
|----------|------|-------------|
| `id` | `str` | Input ID |

### `input.result() -> T`

Blocks (suspends) until the input is responded to and returns the value. If the prompt was created with a `model`, the value is parsed into that type.

**Raises:** `InputDismissed` if the prompt was dismissed; `ExecutionCancelled` if the input was cancelled. Raises `TimeoutError` if called inside a `cf.suspense(timeout=...)` scope and the timeout expires before a response arrives.

### `input.poll(timeout=None, *, default=None) -> T | D`

Non-suspending check for a response. Returns the value if available, or `default` otherwise.

### `input.cancel() -> None`

Cancels the input, transitioning it to a terminal _cancelled_ state (distinct from _dismissed_).

## Prompt

Defines a prompt for [requesting input](./inputs.md) from a user. Submit it to get an `Input` handle.

```python
cf.Prompt(
template: str,
model: type[T] | None = None,
*,
title: str | None = None,
actions: tuple[str, str] | None = None,
schema: str | dict | None = None,
requires: dict[str, str | bool | list[str]] | None = None,
)
```

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `template` | `str` | required | Message template. May contain `{placeholders}` substituted at submission. |
| `model` | `type \| None` | `None` | Pydantic model (or primitive type) used to render a form and validate the response. Without a model, the prompt is approval-only. |
| `title` | `str \| None` | `None` | Short title shown above the prompt. |
| `actions` | `tuple[str, str] \| None` | `None` | Custom labels for the respond/dismiss buttons, as `(respond, dismiss)`. |
| `schema` | `str \| dict \| None` | `None` | Raw JSON schema (alternative to `model`). |
| `requires` | `dict \| None` | `None` | Routing tags for matching to responders. |

### `prompt(**placeholders) -> T`

Submits the prompt and blocks until a response is available. Equivalent to `prompt.submit(...).result()`.

### `prompt.submit(**placeholders) -> Input[T]`

Submits the prompt and returns an `Input` handle without blocking.

### Per-submission overrides

Each `with_*` method returns a new `Prompt` with overrides applied:

| Method | Description |
|--------|-------------|
| `with_key(key)` | Use an explicit memoisation key (per-run). |
| `with_initial(value)` | Pre-populate the form with an initial value (e.g. a Pydantic instance). |
| `with_actions(respond, dismiss)` | Override button labels. |
| `with_requires(requires)` | Override routing tags. |

## Configuration classes

These are used as values for decorator parameters when more control is needed than the shorthand forms allow.
Expand Down Expand Up @@ -273,6 +366,27 @@ Context manager that sets a timeout on `.result()` calls within its scope. If th

Explicitly suspends the current execution. It will be re-run after the specified delay. `delay` can be `float` (seconds), `timedelta`, or `datetime`.

### `select(handles, *, cancel_remaining=False)`

Wait for the first of one or more handles (`Execution` and/or `Input`) to resolve. Returns `(winner, remaining)` — call `winner.result()` to get the value (or to raise the exception that resolved it). Picks up its timeout from any enclosing `cf.suspense(timeout=...)` scope; raises `TimeoutError` if the wait expires. See [select](./select.md).

```python
winner, remaining = cf.select([a.submit(), b.submit()])
```

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `handles` | `Sequence[Execution \| Input]` | required | Handles to wait on (must be non-empty). |
| `cancel_remaining` | `bool` | `False` | Atomically cancel non-winner `Execution` handles when one resolves. `Input` handles are left pending. |

### `cancel(handles)`

Atomically cancel one or more handles. `Execution` handles are cancelled recursively (descendants too); `Input` handles transition to a terminal _cancelled_ state (distinct from _dismissed_). Already-resolved handles are silently skipped.

```python
cf.cancel([execution_a, execution_b, input_c])
```

### Logging

Structured logging functions that associate key-value pairs with a template string. Values are stored separately from the template, enabling filtering and search in Studio. See [logging](./logging.md).
Expand All @@ -293,5 +407,7 @@ Creates and persists a collection of files as an asset, which can be inspected a
| Exception | Description |
|-----------|-------------|
| `ExecutionError` | Child execution failed. When the original exception type can be resolved, the raised exception subclasses both `ExecutionError` and the original type, so you can catch either. |
| `ExecutionCancelled` | Child execution was cancelled. |
| `ExecutionTimeout` | Child execution exceeded its configured timeout. |
| `ExecutionCancelled` | Child execution (or input) was cancelled. |
| `ExecutionTimeout` | Child execution exceeded its configured `timeout`. |
| `InputDismissed` | A requested input was dismissed by the responder. |
| `TimeoutError` (built-in) | A `cf.suspense(timeout=...)` wait expired before a handle resolved. |
Loading
Loading