diff --git a/adapters/python/CHANGELOG.md b/adapters/python/CHANGELOG.md index 87737610..4a2f8968 100644 --- a/adapters/python/CHANGELOG.md +++ b/adapters/python/CHANGELOG.md @@ -1,6 +1,16 @@ ## 0.11.0 -No changes. +Enhancements: + +- Adds `cf.Prompt` and `cf.Input` for requesting structured input from users mid-execution (with optional Pydantic model for typed responses, per-run memoisation, and `requires` tags for routing). +- Adds `cf.select` for waiting on the first of multiple handles (executions and/or inputs) to resolve, with optional cancellation of the rest. +- Adds `cf.cancel` (and `.cancel()` on handles) for atomic cancellation of executions and inputs. +- Supports `async def` functions in `@task` and `@workflow` decorators. +- Adds fluent `with_*` methods to `Target` for overriding decorator options at a call site. + +Changes: + +- Wait-expiry from `cf.suspense(timeout=...)` now raises the standard `TimeoutError`; `ExecutionTimeout` is reserved for executions exceeding their configured `timeout`. ## 0.10.0 diff --git a/cli/CHANGELOG.md b/cli/CHANGELOG.md index e3b7282c..944c01ff 100644 --- a/cli/CHANGELOG.md +++ b/cli/CHANGELOG.md @@ -1,6 +1,9 @@ ## 0.11.0 -No changes. +Enhancements: + +- Adds `--drain-timeout` flag to `worker` (default: 2 minutes) for gracefully draining in-flight executions on shutdown or reload. A second signal aborts the drain early; a third forces exit. +- Adds `inputs list`, `inputs inspect`, `inputs respond` and `inputs dismiss` commands for managing input requests from the CLI. ## 0.10.0 diff --git a/docs/docs/cli_reference.md b/docs/docs/cli_reference.md index 52ebd955..e871020e 100644 --- a/docs/docs/cli_reference.md +++ b/docs/docs/cli_reference.md @@ -104,6 +104,9 @@ Start a worker. | `--accepts` | Tags executions must have | | `--adapter` | Adapter command | | `--session` | Session ID (for pool-launched workers) | +| `--drain-timeout` | How long to wait for in-flight executions to finish on shutdown or reload (default: `2m`; `0` = wait indefinitely) | + +On `SIGINT`/`SIGTERM` the worker stops accepting new executions and drains the in-flight ones (up to `--drain-timeout`). Sending the signal a second time aborts the drain early; a third time forces an immediate exit. The same drain runs between reloads in `--watch` / `--dev` mode. ## `coflux setup` @@ -211,6 +214,17 @@ See [pools](./pools.md) for launcher-specific fields. | `tokens create` | Create a token (`--name`, `--workspaces`) | | `tokens revoke ` | Revoke a token | +## `coflux inputs` + +| Command | Description | +|---------|-------------| +| `inputs list` | List pending inputs (`--run` to filter by run ID) | +| `inputs inspect ` | Show a prompt's template, schema, and current state | +| `inputs respond ` | Submit a response (value as JSON) | +| `inputs dismiss ` | Dismiss a prompt | + +See [inputs](./inputs.md) for the workflow side of the API. + ## `coflux assets` | Command | Description | diff --git a/docs/docs/concepts.md b/docs/docs/concepts.md index 6e9e074d..cb7590cb 100644 --- a/docs/docs/concepts.md +++ b/docs/docs/concepts.md @@ -45,6 +45,10 @@ Workflows need to be registered with a project and workspace so that they appear When a workflow is submitted, this initiates a _run_. A run is made up of _steps_, which each correspond to a target to be executed. The target (a workflow or task) can call other tasks, which cause those to be scheduled as steps. Each step has at least one associated _execution_. Steps can be retried (manually or automatically), which will lead to multiple executions being associated with the step. +## Inputs + +Tasks can request _input_ from a user mid-execution - for example, an approval before a deployment, or a structured form to fill in. Prompts are responded to from Studio (or the CLI), and the execution resumes once a response is provided. See [inputs](./inputs.md). + ## Assets Executions can persist _assets_ (a collection of files) which can be passed between executions and restored as needed, or viewed in Studio. diff --git a/docs/docs/inputs.md b/docs/docs/inputs.md new file mode 100644 index 00000000..99d6a9a1 --- /dev/null +++ b/docs/docs/inputs.md @@ -0,0 +1,178 @@ +# Inputs + +Tasks and workflows request _input_ from a user mid-execution. This is useful for human-in-the-loop steps - approving a deployment, reviewing flagged content, supplying credentials that aren't safe to store, or any other point where the workflow needs information that isn't available up-front. + +A prompt is presented in Studio (or fetched/answered from the CLI). The execution waits for a response, then resumes with the result. + +## Defining a prompt + +Prompts are defined with `cf.Prompt`. Submitting a prompt returns an `Input` handle (much like submitting a task returns an `Execution` handle): + +```python +import coflux as cf + +approve = cf.Prompt("Approve deployment of {service}?", title="Deployment") + +@cf.workflow() +def deploy(service: str): + approve(service=service) # blocks until approved; raises InputDismissed if dismissed + do_deploy(service) +``` + +A prompt with no `model` is an _approval_ prompt - it has no payload, just an approve/dismiss decision. + +## Templates and placeholders + +Prompt templates are rendered as Markdown, with placeholder keys substituted with the values passed as submission. Three placeholder forms control how each value is rendered: + +| Form | Rendering | +|------|-----------| +| `{key}` | Inline - renders a formatted value inline. | +| `{{key}}` | Block - renders the value as a block-level element. | +| `{{{key}}}` | Markdown - renders the string as Markdown. | + +```python +summarise = cf.Prompt( + """ + ## Summarise {meeting} + + Transcript: + + {{transcript}} + """, + model=Summary, +) + +summarise(meeting="Weekly sync", transcript=long_text) +``` + +Placeholder values are serialised when the prompt is submitted and resolved when it's displayed - so references to other executions, blobs, or assets remain live at render time, following the same behaviour as in log messages. + +## Typed inputs (Pydantic models) + +To collect structured data, pass a Pydantic model (or any class with `model_json_schema()` and `model_validate()`) as `model`: + +```python +from typing import Literal +from pydantic import BaseModel, Field + +class Label(BaseModel): + sentiment: Literal["positive", "neutral", "negative"] + topic: Literal["product", "shipping", "support", "other"] + confidence: int = Field(ge=1, le=5) + notes: str | None = None + +label = cf.Prompt("Label this feedback:\n\n{text}", model=Label, title="Label") + +@cf.workflow() +def annotate(sample_id: str, text: str): + result = label(text=text) # result is a Label instance + store(sample_id, sentiment=result.sentiment, topic=result.topic) +``` + +The model's JSON schema is used to render a form in Studio, and to validate the response. Primitive types (`str`, `int`, `float`, `bool`, `date`, `time`, `datetime`) can also be specified as the `model`. + +## Submitting without blocking + +Calling the prompt (`prompt(...)`) blocks the execution until a response arrives. Use `prompt.submit(...)` to get an `Input` handle without blocking, then resolve it later - or combine it with [`cf.select`](./select.md) to wait alongside other handles: + +```python +handle = label.submit(text="Shipping was slow but the product is great.") + +# ...do other work... + +result = handle.result() +``` + +To check whether a response has arrived without blocking, use `.poll()`. It returns the value if the input has resolved, or `default` (`None` by default) otherwise: + +```python +handle = label.submit(text=text) + +if (result := handle.poll()) is not None: + process(result) +else: + cf.log_info("Still waiting for a label") +``` + +`.poll(timeout=...)` will wait up to the given number of seconds for a response before returning `default`. + +## Suspending while waiting + +A response might take minutes, hours, or days to arrive - holding a worker slot open the whole time is wasteful. Resolving an input inside a [`cf.suspense`](./suspense.md) scope lets the execution suspend while it waits, and be resumed once the response is available: + +```python +@cf.workflow() +def annotate(sample_id: str, text: str): + handle = label.submit(text=text) + with cf.suspense(): + result = handle.result() + ... +``` + +Because the execution re-runs from the beginning on resumption, any work performed before the suspense block needs to be idempotent - typically by [memoising](./memoizing.md) the tasks it calls. Input memoisation (below) makes the prompt itself safe to re-submit. + +## Memoisation + +Inputs are automatically memoised within a run by their template and placeholder values, so a prompt that's submitted twice (e.g., after a re-run of a step) returns the same input handle and reuses the existing response. + +If a placeholder varies between submissions - say a timestamp shown in the prompt - the auto-generated key will vary with it, and deduplication won't kick in. Set an explicit key with `with_key` to dedupe on a stable subset of the submission: + +```python +from datetime import datetime + +label.with_key(f"sample-{sample_id}").submit( + text=text, + requested_at=datetime.now().isoformat(), # shown in the prompt; excluded from the key +) +``` + +## Routing + +Prompts can be tagged with `requires` to control who can respond - for example, restricting an approval to a specific role or user. Tags are matched against the responding user's tags in Studio. + +```python +deploy_prod = cf.Prompt( + "Deploy {service} to production?", + requires={"role": "release-manager"}, +) +``` + +## Customising the prompt + +| Option | Description | +|--------|-------------| +| `title` | A short title shown above the prompt. | +| `actions` | Custom labels for the respond/dismiss buttons as `(respond, dismiss)` (e.g. `("Approve", "Reject")`). | +| `schema` | A raw JSON schema (string or dict) - alternative to `model` when you don't have a Pydantic class. | +| `requires` | Routing tags. | + +The fluent methods (`with_key`, `with_initial`, `with_actions`, `with_requires`) return a new `Prompt` with overrides applied, leaving the original unchanged. This makes it easy to define a prompt once and reuse it with per-submission tweaks: + +```python +label.with_initial(Label(sentiment="positive", topic="product", confidence=4)).submit(text=text) +``` + +## Lifecycle + +An input is in one of these states: + +- **Pending** - submitted, awaiting a response. +- **Responded** - a value (or approval) was provided. `Input.result()` returns it. +- **Dismissed** - the responder explicitly dismissed the prompt. `Input.result()` raises `InputDismissed`. +- **Cancelled** - the input was cancelled programmatically (via `Input.cancel()` or `cf.cancel([...])`). Distinct from _dismissed_, which represents a deliberate user decision. + +Inputs can be cancelled atomically alongside other handles using [`cf.cancel`](./python_reference.md#cancelhandles). + +## Responding from the CLI + +Inputs can also be listed and responded to from the CLI: + +```bash +coflux inputs list # show pending inputs +coflux inputs inspect # show a prompt +coflux inputs respond # submit a response (JSON) +coflux inputs dismiss # dismiss the prompt +``` + +This is useful for scripting or for systems where inputs are produced by other automation rather than interactive users. diff --git a/docs/docs/python_reference.md b/docs/docs/python_reference.md index 17221f66..0d45876f 100644 --- a/docs/docs/python_reference.md +++ b/docs/docs/python_reference.md @@ -8,6 +8,8 @@ All exports are available from the `coflux` package (`import coflux as cf`). ## Decorators +Both `@workflow` and `@task` accept `async def` functions in addition to regular functions. The coroutine is run to completion by the executor, and the target's return type is the coroutine's resolved value. + ### `@workflow` Defines a workflow — the entry point for a run. @@ -104,6 +106,29 @@ Calls the target synchronously — submits for execution and blocks until the re Submits the target for asynchronous execution and returns an `Execution` handle. The caller can continue other work and retrieve the result later. +### Per-call-site overrides + +Each `with_*` method returns a new `Target` with the corresponding decorator-level option overridden, leaving the original target unchanged. This is useful for one-off variations without re-decorating, and the methods can be chained: + +```python +my_task.with_retries(3).with_timeout(30).submit(x) +my_task.with_cache(False).submit(x) # disable caching for this call + +cached = my_task.with_cache(60) # stash a configured variant +cached.submit(a) +cached.submit(b) +``` + +| Method | Description | +|--------|-------------| +| `with_cache(cache)` | Override [caching](./caching.md). Pass `False` to disable. | +| `with_retries(retries)` | Override [retries](./retries.md). Pass `0` or `False` to disable. | +| `with_defer(defer)` | Override [defer](./deferring.md) configuration. | +| `with_memo(memo)` | Override [memoisation](./memoizing.md) configuration. | +| `with_delay(delay)` | Override submission delay (seconds or `timedelta`). | +| `with_timeout(timeout)` | Override execution [timeout](./timeouts.md). | +| `with_requires(requires)` | Override worker routing tags. | + ## Execution Returned by `target.submit()`. Represents a running or completed execution, and acts as a future for its result. Can be passed as an argument to other tasks, or returned from a task/workflow. @@ -135,6 +160,74 @@ Checks whether a result is ready without suspending the caller. Returns the resu Cancels the execution and its descendants. +## Input + +Returned by `prompt.submit(...)`. Represents a [requested input](./inputs.md), and acts as a future for the response. Like `Execution`, it can be passed to other tasks and only carries an ID across the wire. + +### Properties + +| Property | Type | Description | +|----------|------|-------------| +| `id` | `str` | Input ID | + +### `input.result() -> T` + +Blocks (suspends) until the input is responded to and returns the value. If the prompt was created with a `model`, the value is parsed into that type. + +**Raises:** `InputDismissed` if the prompt was dismissed; `ExecutionCancelled` if the input was cancelled. Raises `TimeoutError` if called inside a `cf.suspense(timeout=...)` scope and the timeout expires before a response arrives. + +### `input.poll(timeout=None, *, default=None) -> T | D` + +Non-suspending check for a response. Returns the value if available, or `default` otherwise. + +### `input.cancel() -> None` + +Cancels the input, transitioning it to a terminal _cancelled_ state (distinct from _dismissed_). + +## Prompt + +Defines a prompt for [requesting input](./inputs.md) from a user. Submit it to get an `Input` handle. + +```python +cf.Prompt( + template: str, + model: type[T] | None = None, + *, + title: str | None = None, + actions: tuple[str, str] | None = None, + schema: str | dict | None = None, + requires: dict[str, str | bool | list[str]] | None = None, +) +``` + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `template` | `str` | required | Message template. May contain `{placeholders}` substituted at submission. | +| `model` | `type \| None` | `None` | Pydantic model (or primitive type) used to render a form and validate the response. Without a model, the prompt is approval-only. | +| `title` | `str \| None` | `None` | Short title shown above the prompt. | +| `actions` | `tuple[str, str] \| None` | `None` | Custom labels for the respond/dismiss buttons, as `(respond, dismiss)`. | +| `schema` | `str \| dict \| None` | `None` | Raw JSON schema (alternative to `model`). | +| `requires` | `dict \| None` | `None` | Routing tags for matching to responders. | + +### `prompt(**placeholders) -> T` + +Submits the prompt and blocks until a response is available. Equivalent to `prompt.submit(...).result()`. + +### `prompt.submit(**placeholders) -> Input[T]` + +Submits the prompt and returns an `Input` handle without blocking. + +### Per-submission overrides + +Each `with_*` method returns a new `Prompt` with overrides applied: + +| Method | Description | +|--------|-------------| +| `with_key(key)` | Use an explicit memoisation key (per-run). | +| `with_initial(value)` | Pre-populate the form with an initial value (e.g. a Pydantic instance). | +| `with_actions(respond, dismiss)` | Override button labels. | +| `with_requires(requires)` | Override routing tags. | + ## Configuration classes These are used as values for decorator parameters when more control is needed than the shorthand forms allow. @@ -273,6 +366,27 @@ Context manager that sets a timeout on `.result()` calls within its scope. If th Explicitly suspends the current execution. It will be re-run after the specified delay. `delay` can be `float` (seconds), `timedelta`, or `datetime`. +### `select(handles, *, cancel_remaining=False)` + +Wait for the first of one or more handles (`Execution` and/or `Input`) to resolve. Returns `(winner, remaining)` — call `winner.result()` to get the value (or to raise the exception that resolved it). Picks up its timeout from any enclosing `cf.suspense(timeout=...)` scope; raises `TimeoutError` if the wait expires. See [select](./select.md). + +```python +winner, remaining = cf.select([a.submit(), b.submit()]) +``` + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `handles` | `Sequence[Execution \| Input]` | required | Handles to wait on (must be non-empty). | +| `cancel_remaining` | `bool` | `False` | Atomically cancel non-winner `Execution` handles when one resolves. `Input` handles are left pending. | + +### `cancel(handles)` + +Atomically cancel one or more handles. `Execution` handles are cancelled recursively (descendants too); `Input` handles transition to a terminal _cancelled_ state (distinct from _dismissed_). Already-resolved handles are silently skipped. + +```python +cf.cancel([execution_a, execution_b, input_c]) +``` + ### Logging Structured logging functions that associate key-value pairs with a template string. Values are stored separately from the template, enabling filtering and search in Studio. See [logging](./logging.md). @@ -293,5 +407,7 @@ Creates and persists a collection of files as an asset, which can be inspected a | Exception | Description | |-----------|-------------| | `ExecutionError` | Child execution failed. When the original exception type can be resolved, the raised exception subclasses both `ExecutionError` and the original type, so you can catch either. | -| `ExecutionCancelled` | Child execution was cancelled. | -| `ExecutionTimeout` | Child execution exceeded its configured timeout. | +| `ExecutionCancelled` | Child execution (or input) was cancelled. | +| `ExecutionTimeout` | Child execution exceeded its configured `timeout`. | +| `InputDismissed` | A requested input was dismissed by the responder. | +| `TimeoutError` (built-in) | A `cf.suspense(timeout=...)` wait expired before a handle resolved. | diff --git a/docs/docs/select.md b/docs/docs/select.md new file mode 100644 index 00000000..83a2c58f --- /dev/null +++ b/docs/docs/select.md @@ -0,0 +1,72 @@ +# Select + +`cf.select` waits for the _first_ of multiple handles to resolve. The handles can be [executions](./executions.md), [inputs](./inputs.md), or a mix of both. It's the building block for first-wins coordination patterns: racing alternative implementations, prompting a user with a fallback timeout, multi-channel approvals, or any case where you want to react to whichever result arrives soonest. + +## Basic usage + +```python +import coflux as cf + +@cf.workflow() +def search(query: str): + a = source_a.submit(query) + b = source_b.submit(query) + c = source_c.submit(query) + + winner, remaining = cf.select([a, b, c]) + return winner.result() +``` + +`select` returns a tuple `(winner, remaining)` where: + +- `winner` is the handle that resolved first. Call `.result()` on it to get the value (or to raise the exception that caused it to resolve). +- `remaining` is the list of handles that did _not_ win, in the order they were passed in. They keep running and can be awaited later. + +## Cancelling the losers + +Pass `cancel_remaining=True` to atomically cancel the executions that didn't win as soon as one resolves. This is done in a single round-trip with the resolution itself, so there's no window where the losers continue to consume resources unnecessarily: + +```python +winner, _ = cf.select([fast.submit(), slow.submit()], cancel_remaining=True) +return winner.result() +``` + +`Input` handles in the list are left pending — only `Execution` handles are cancelled. + +## Mixing executions and inputs + +The handles can be a mix of `Execution` and `Input`, so you can race a long-running task against a user prompt — for example, asking for input only if a default lookup takes too long: + +```python +auto = lookup_value.submit(key) +manual = cf.Prompt("Enter value for {key}").submit(key=key) + +winner, _ = cf.select([auto, manual], cancel_remaining=True) +value = winner.result() +``` + +Or wait for the first of several alternative approvers to respond: + +```python +inputs = [approve.with_requires({"user": user}).submit() for user in approvers] +winner, _ = cf.select(inputs) +``` + +## Timeouts + +`select` takes its timeout from any enclosing [`cf.suspense(timeout=...)`](./suspense.md) scope. If the wait expires before any handle resolves, `cf.select` raises `TimeoutError`: + +```python +try: + with cf.suspense(timeout=30): + winner, _ = cf.select([a, b]) +except TimeoutError: + # neither resolved within 30s + ... +``` + +Note that `TimeoutError` here means the _wait_ expired — distinct from `ExecutionTimeout`, which is raised when an individual execution exceeds its configured `timeout`. + +## Resolving the same handle later + +When a handle wins a `select`, its result is cached in the execution context. Calling `.result()` (or `.poll()`) on the winner afterwards returns immediately without another round-trip. Handles in `remaining` are unaffected — they can be awaited normally as they resolve. diff --git a/docs/docs/timeouts.md b/docs/docs/timeouts.md index a7040989..833474b6 100644 --- a/docs/docs/timeouts.md +++ b/docs/docs/timeouts.md @@ -48,3 +48,7 @@ Timeouts compose with [retries](./retries.md) — a timed-out execution counts a def unreliable_api_call(): ... ``` + +## Wait timeouts vs execution timeouts + +`ExecutionTimeout` is raised when a child execution exceeds its configured `timeout` (above). It's a different concept from a [`cf.suspense(timeout=...)`](./suspense.md) wait expiring — the latter raises the standard `TimeoutError` and affects only the caller, not the execution being waited on. diff --git a/docs/sidebars.ts b/docs/sidebars.ts index 296eabc8..412e9d39 100644 --- a/docs/sidebars.ts +++ b/docs/sidebars.ts @@ -24,6 +24,7 @@ const sidebars: SidebarsConfig = { "executions", "concurrency", "groups", + "inputs", ], }, { @@ -37,6 +38,7 @@ const sidebars: SidebarsConfig = { "memoizing", "deferring", "suspense", + "select", ], }, { diff --git a/server/CHANGELOG.md b/server/CHANGELOG.md index 1a1a9f84..47078438 100644 --- a/server/CHANGELOG.md +++ b/server/CHANGELOG.md @@ -1,6 +1,9 @@ ## 0.11.0 -No changes. +Enhancements: + +- Adds support for tasks requesting structured inputs from users. +- Adds a generalised `select` operation for waiting for results, with support for specifying multiple handles, and optional cancellation of the other items. ## 0.10.0