Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,22 @@ Tests mirror source structure in `tests/`:

Use `pytest.mark.performance` for performance tests (exclude with `-m "not performance"`).

### Test Fixture Patterns

When writing integration tests that construct `WorkflowConfig` programmatically, follow these conventions (see `tests/test_engine/test_limits.py` for canonical examples):

- `AgentDef` uses `prompt=` (not `instructions=`), `output={"key": OutputField(type="string")}` (dict, not list), and `routes=[RouteDef(...)]` (not raw dicts).
- `WorkflowDef` requires `entry_point=` and places `limits=` inside `workflow=`. `agents=` and `output=` are top-level on `WorkflowConfig`.
- The engine entry point is `await engine.run({})` (not `execute`).
- To test with controlled token/cost data, patch `provider.execute` to return a custom `AgentOutput` with explicit `input_tokens`, `output_tokens`, and `model` fields.

### Resume / Checkpoint Parity

When adding new fields to `LimitEnforcer`:

- **Transient fields** (reset each run): add to `from_dict()` as parameters sourced from the current workflow config, like `timeout_seconds`, `budget_usd`, `budget_mode`. Update the call site in `cli/run.py` → `resume_workflow_async()`.
- **Persistent fields** (survive across resume): add to both `to_dict()` and `from_dict()` deserialization, like `max_iterations`, `current_iteration`, `execution_history`.

## Code Style

- Python 3.12+
Expand Down
24 changes: 21 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,10 @@ Safety limits prevent runaway execution:
```yaml
workflow:
limits:
max_iterations: 10 # Default: 10, max: 100
timeout_seconds: 600 # Default: 600, max: 3600
max_iterations: 10 # Default: 10, max: 500
timeout_seconds: 600 # Default: None (unlimited)
budget_usd: 5.00 # Default: None (no budget tracking)
budget_mode: audit # Default: audit. Options: audit, enforce
```

**max_iterations**:
Expand All @@ -278,6 +280,21 @@ workflow:
- Total workflow timeout
- Includes all agent executions

**budget_usd** and **budget_mode**:
- Tracks cumulative cost and acts when the budget is exceeded
- `audit` mode (default): emits a `budget_exceeded` event and logs a warning,
but the workflow continues — use this to discover cost profiles
- `enforce` mode: emits a `budget_exceeded` event, saves a checkpoint,
and stops the workflow with a `BudgetExceededError` (resumable via
`conductor resume` after increasing the budget)
- When `budget_usd` is not set, no budget tracking occurs

**Recommended graduation path**:

1. Run workflows without a budget to see costs in the summary
2. Add `budget_usd` in `audit` mode to track overshoots without breaking workflows
3. Switch to `enforce` mode once you know your cost profile

## Complete Examples

### Claude Configuration
Expand Down Expand Up @@ -394,7 +411,8 @@ export CONDUCTOR_LOG_LEVEL=DEBUG # INFO, DEBUG, WARNING, ERROR

1. **Set conservative limits** initially (`max_iterations: 10`)
2. **Use timeout** to prevent long-running workflows
3. **Test with dry-run** before production
3. **Set a cost budget** — start with `budget_usd` in `audit` mode to learn your cost profile, then switch to `enforce`
4. **Test with dry-run** before production

## Troubleshooting

Expand Down
2 changes: 2 additions & 0 deletions src/conductor/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,8 @@ async def resume_workflow_async(
restored_limits = LimitEnforcer.from_dict(
cp.limits,
timeout_seconds=config.workflow.limits.timeout_seconds,
budget_usd=config.workflow.limits.budget_usd,
budget_mode=config.workflow.limits.budget_mode,
)

# Construct the web dashboard early (subscribes to the emitter on
Expand Down
20 changes: 20 additions & 0 deletions src/conductor/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,26 @@ class LimitsConfig(BaseModel):
a hard time limit.
"""

budget_usd: float | None = Field(default=None, ge=0.0)
"""Maximum cost budget for the workflow in USD.

When set, the engine tracks cumulative cost and acts according to
``budget_mode`` when the budget is exceeded. Default is None
(no budget tracking).
"""

budget_mode: Literal["audit", "enforce"] = "audit"
"""How the engine responds when ``budget_usd`` is exceeded.

- ``audit``: emit a ``budget_exceeded`` event and log a warning,
but allow the workflow to continue. Use this to discover cost
profiles before applying hard limits.
- ``enforce``: emit a ``budget_exceeded`` event, save a checkpoint,
and stop the workflow with a ``BudgetExceededError``.

Only takes effect when ``budget_usd`` is set. Default is ``audit``.
"""


class PricingOverride(BaseModel):
"""Custom pricing for a specific model.
Expand Down
48 changes: 47 additions & 1 deletion src/conductor/engine/limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Any

from conductor.exceptions import (
BudgetExceededError,
MaxIterationsError,
)
from conductor.exceptions import (
Expand Down Expand Up @@ -52,6 +53,12 @@ class LimitEnforcer:
timeout_seconds: int | None = None
"""Maximum wall-clock time for entire workflow. None means unlimited."""

budget_usd: float | None = None
"""Maximum cost budget in USD. None means no budget tracking."""

budget_mode: str = "audit"
"""Budget enforcement mode: 'audit' (warn only) or 'enforce' (stop)."""

current_iteration: int = 0
"""Current iteration count."""

Expand All @@ -64,6 +71,9 @@ class LimitEnforcer:
current_agent: str | None = None
"""Currently executing agent name."""

_budget_exceeded_emitted: bool = field(default=False, repr=False)
"""Internal flag to emit budget_exceeded event only once."""

def to_dict(self) -> dict[str, Any]:
"""Serialize limit state to a JSON-compatible dict.

Expand All @@ -85,17 +95,22 @@ def from_dict(
cls,
data: dict[str, Any],
timeout_seconds: int | None = None,
budget_usd: float | None = None,
budget_mode: str = "audit",
) -> LimitEnforcer:
"""Reconstruct a LimitEnforcer from a serialized dict.

Uses ``max_iterations`` from the checkpoint (it may have been
increased by the user) and ``timeout_seconds`` from the current
workflow config so that the resumed run gets a fresh timeout
window.
window. ``budget_usd`` and ``budget_mode`` come from the current
config so that the resumed run gets a fresh budget window.

Args:
data: Dict previously produced by ``to_dict()``.
timeout_seconds: Timeout from the workflow config (fresh window).
budget_usd: Budget from the workflow config (fresh window).
budget_mode: Budget mode from the workflow config.

Returns:
A new LimitEnforcer with restored iteration state and a fresh
Expand All @@ -104,6 +119,8 @@ def from_dict(
enforcer = cls(
max_iterations=data.get("max_iterations", 10),
timeout_seconds=timeout_seconds,
budget_usd=budget_usd,
budget_mode=budget_mode,
)
enforcer.current_iteration = data.get("current_iteration", 0)
enforcer.execution_history = list(data.get("execution_history", []))
Expand Down Expand Up @@ -239,6 +256,35 @@ def check_timeout(self) -> None:
current_agent=self.current_agent,
)

def check_budget(self, spent_usd: float) -> tuple[bool, bool]:
"""Check if the workflow cost budget has been exceeded.

Returns a tuple indicating whether the budget was exceeded and
whether this is the first time the overshoot was detected (so
the caller can emit a one-time event).

In ``enforce`` mode the caller should raise ``BudgetExceededError``
after emitting the event. In ``audit`` mode the caller should
log a warning and continue.

Args:
spent_usd: Current cumulative cost from UsageTracker.

Returns:
Tuple of (exceeded, first_time). ``exceeded`` is True when
``spent_usd > budget_usd``. ``first_time`` is True only on
the first call that detects the overshoot.
"""
if self.budget_usd is None:
return (False, False)

if spent_usd > self.budget_usd:
first_time = not self._budget_exceeded_emitted
self._budget_exceeded_emitted = True
return (True, first_time)

return (False, False)

def get_elapsed_time(self) -> float:
"""Get the elapsed time since workflow start.

Expand Down
70 changes: 67 additions & 3 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from conductor.events import WorkflowEvent, WorkflowEventEmitter
from conductor.exceptions import (
AgentTimeoutError,
BudgetExceededError,
ConductorError,
ExecutionError,
InterruptError,
Expand Down Expand Up @@ -356,6 +357,8 @@ def __init__(
self.limits = LimitEnforcer(
max_iterations=config.workflow.limits.max_iterations,
timeout_seconds=config.workflow.limits.timeout_seconds,
budget_usd=config.workflow.limits.budget_usd,
budget_mode=config.workflow.limits.budget_mode,
)
self.gate_handler = HumanGateHandler(skip_gates=skip_gates)
self.max_iterations_handler = MaxIterationsHandler(skip_gates=skip_gates)
Expand Down Expand Up @@ -479,6 +482,58 @@ def _emit(self, event_type: str, data: dict[str, Any]) -> None:
event = WorkflowEvent(type=event_type, timestamp=_time.time(), data=data)
self._event_emitter.emit(event)

def _check_budget(self) -> None:
"""Check whether the workflow cost budget has been exceeded.

Reads current spend from ``usage_tracker``, delegates the
threshold check to ``LimitEnforcer.check_budget()``, and acts
according to ``budget_mode``:

- On first overshoot: emit a ``budget_exceeded`` event.
- ``enforce`` mode: raise ``BudgetExceededError`` (triggers
checkpoint + workflow stop).
- ``audit`` mode: log a warning and continue.

Subsequent overshoots in ``audit`` mode are silent (no repeated
events or warnings).
"""
summary = self.usage_tracker.get_summary()
spent = summary.total_cost_usd or 0.0
exceeded, first_time = self.limits.check_budget(spent)

if not exceeded:
return

budget = self.limits.budget_usd # guaranteed non-None when exceeded
assert budget is not None # for type narrowing

if first_time:
self._emit(
"budget_exceeded",
{
"budget_usd": budget,
"spent_usd": spent,
"budget_mode": self.limits.budget_mode,
"current_agent": self.limits.current_agent,
},
)

if self.limits.budget_mode == "enforce":
raise BudgetExceededError(
f"Workflow exceeded cost budget (${budget:.2f}): "
f"spent ${spent:.2f}",
budget_usd=budget,
spent_usd=spent,
current_agent=self.limits.current_agent,
)

if first_time:
logger.warning(
"Budget exceeded (audit mode): spent $%.4f of $%.2f budget",
spent,
budget,
)

def _yaml_source_field(self) -> dict[str, str]:
"""Return ``{"yaml_source": <text>}`` if the workflow file is readable."""
if self.workflow_path is None:
Expand Down Expand Up @@ -2038,8 +2093,9 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
for_each_group.name, count=for_each_output.count
)

# Check timeout after for-each group
# Check timeout and budget after for-each group
self.limits.check_timeout()
self._check_budget()

# Evaluate routes from for-each group
route_result = self._evaluate_for_each_routes(
Expand Down Expand Up @@ -2109,8 +2165,9 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
agent_count = len(parallel_group.agents)
self.limits.record_execution(parallel_group.name, count=agent_count)

# Check timeout after parallel group
# Check timeout and budget after parallel group
self.limits.check_timeout()
self._check_budget()

# Evaluate routes from parallel group
route_result = self._evaluate_parallel_routes(
Expand Down Expand Up @@ -2355,6 +2412,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
self.context.store(agent.name, output_content)
self.limits.record_execution(agent.name)
self.limits.check_timeout()
self._check_budget()

route_result = self._evaluate_routes(agent, output_content)

Expand Down Expand Up @@ -2446,6 +2504,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
self.context.store(agent.name, sub_output)
self.limits.record_execution(agent.name)
self.limits.check_timeout()
self._check_budget()

route_result = self._evaluate_routes(agent, sub_output)

Expand Down Expand Up @@ -2576,8 +2635,9 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
# Record successful execution
self.limits.record_execution(agent.name)

# Check timeout after each agent
# Check timeout and budget after each agent
self.limits.check_timeout()
self._check_budget()

# Evaluate routes using the Router
route_result = self._evaluate_routes(agent, output.content)
Expand Down Expand Up @@ -2624,6 +2684,10 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
fail_data["elapsed_seconds"] = e.elapsed_seconds
fail_data["timeout_seconds"] = e.timeout_seconds
fail_data["current_agent"] = e.current_agent
elif isinstance(e, BudgetExceededError):
fail_data["budget_usd"] = e.budget_usd
fail_data["spent_usd"] = e.spent_usd
fail_data["current_agent"] = e.current_agent
self._emit("workflow_failed", fail_data)
# Execute on_error hook with error information
self._execute_hook("on_error", error=e)
Expand Down
38 changes: 38 additions & 0 deletions src/conductor/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,44 @@ def __init__(
self.agent_name = agent_name


class BudgetExceededError(ExecutionError):
"""Raised when a workflow exceeds its cost budget in enforce mode.

This is a safety mechanism to prevent runaway spending in agentic
workflows. Only raised when ``budget_mode`` is ``enforce``.

Attributes:
budget_usd: The configured budget limit.
spent_usd: The actual amount spent when the limit was exceeded.
current_agent: The agent that was executing when the budget was exceeded.
"""

def __init__(
self,
message: str,
*,
budget_usd: float,
spent_usd: float,
current_agent: str | None = None,
suggestion: str | None = None,
file_path: str | None = None,
line_number: int | None = None,
) -> None:
self.budget_usd = budget_usd
self.spent_usd = spent_usd
self.current_agent = current_agent

if suggestion is None:
suggestion = (
f"Increase limits.budget_usd (currently ${budget_usd:.2f}) "
f"or switch to budget_mode: audit to continue without enforcement"
)
if current_agent:
suggestion += f". Budget exceeded after agent '{current_agent}'"

super().__init__(message, suggestion, file_path, line_number)


class HumanGateError(ExecutionError):
"""Raised when a human gate encounters an error.

Expand Down
Loading
Loading