Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions file-budget.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
},
"tests/unit/test_session_lifecycle_cli.py": {
"purpose": "new e2e coverage for the status/resume/handoff verbs"
},
"tests/unit/test_run_command.py": {
"purpose": "unit coverage for the run verb (prompt to plan to engine driver)"
}
},
"packages": {
Expand Down
20 changes: 20 additions & 0 deletions src/bonfire/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,26 @@ def scan(
_lazy_run("bonfire.cli.commands.scan", "scan")(**scan_kwargs)


@app.command("run")
def run(
prompt: str = typer.Argument(..., help="The task for the build pipeline to perform."),
budget: float | None = typer.Option(
None,
"--budget",
"-b",
help="Maximum spend in USD for this run. Defaults to the plan's budget.",
),
workflow: str = typer.Option(
"standard_build",
"--workflow",
"-w",
help="Workflow plan to run. Use a name from the built-in registry.",
),
) -> None:
"""Drive a prompt through a workflow plan and the pipeline engine."""
_lazy_run("bonfire.cli.commands.run", "run")(prompt=prompt, budget=budget, workflow=workflow)


@app.command("status")
def status() -> None:
"""Show the most recent persisted Bonfire session: workflow, stage, cost."""
Expand Down
150 changes: 150 additions & 0 deletions src/bonfire/cli/commands/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2026 BonfireAI

"""Run command — drive a prompt through a workflow plan and the engine.

The minimal build driver: a prompt becomes a workflow plan, the plan runs
through the live :class:`~bonfire.engine.pipeline.PipelineEngine`, and the
result is rendered cleanly (success + cost, or the typed failure). The
process exits non-zero on failure so the verb composes in scripts and CI.

Dependency-injection seam
-------------------------
``_run`` accepts a ``build_engine`` factory that returns a wired
``PipelineEngine``. The public Typer command passes the real default
(:func:`_default_engine`, which wires the Claude Agent SDK backend). Unit
tests pass a factory returning an engine wired to a fake backend, so the
driver's plan-selection / rendering / exit-code logic is exercised with
zero network.
"""

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Protocol

import typer

from bonfire.workflow.registry import get_default_registry

if TYPE_CHECKING:
from bonfire.engine.pipeline import PipelineEngine, PipelineResult
from bonfire.models.plan import WorkflowPlan

#: Default workflow selected when the caller does not pass ``--workflow``.
_DEFAULT_WORKFLOW = "standard_build"


class _EngineFactory(Protocol):
"""Callable that builds a wired :class:`PipelineEngine` from a plan.

The plan is passed so a factory may size the backend / settings to the
plan's budget; the default factory ignores it. Keeping the plan in the
signature lets the seam stay stable as wiring grows.
"""

def __call__(self, plan: WorkflowPlan) -> PipelineEngine: ...


def _default_engine(plan: WorkflowPlan) -> PipelineEngine:
"""Wire a :class:`PipelineEngine` around the live SDK backend.

This is the real-network path: it builds the Claude Agent SDK backend,
a fresh event bus, and pulls the pipeline config from the loaded
settings. Unit tests never call this — they inject their own factory.
"""
from bonfire.dispatch.sdk_backend import ClaudeSDKBackend
from bonfire.engine.factory import load_settings_or_default
from bonfire.engine.pipeline import PipelineEngine
from bonfire.events.bus import EventBus

settings = load_settings_or_default()
bus = EventBus()
return PipelineEngine(
backend=ClaudeSDKBackend(bus=bus),
bus=bus,
config=settings.bonfire,
settings=settings,
)


def _select_plan(prompt: str, *, budget: float | None, workflow: str) -> WorkflowPlan:
"""Build the workflow plan for *prompt*, stamping task + budget.

Raises:
KeyError: If *workflow* is not a registered workflow name. The
registry's error message lists the available names.
"""
registry = get_default_registry()
plan = registry.get(workflow)()
updates: dict[str, object] = {"task_description": prompt}
if budget is not None:
updates["budget_usd"] = budget
return plan.model_copy(update=updates)


def _render(result: PipelineResult) -> None:
"""Print a clean summary of *result* and exit non-zero on failure."""
if result.success:
typer.echo(f"Run succeeded (session {result.session_id}).")
typer.echo(f" Cost: ${result.total_cost_usd:.2f}")
raise typer.Exit(0)

typer.echo(f"Run failed (session {result.session_id}).", err=True)
if result.failed_stage:
typer.echo(f" Stage: {result.failed_stage}", err=True)
if result.gate_failure is not None:
gate = result.gate_failure
typer.echo(f" Gate: {gate.gate_name} — {gate.message}", err=True)
if result.error:
typer.echo(f" Error: {result.error}", err=True)
typer.echo(f" Cost: ${result.total_cost_usd:.2f}", err=True)
raise typer.Exit(1)


def _run(
prompt: str,
*,
budget: float | None = None,
workflow: str = _DEFAULT_WORKFLOW,
build_engine: _EngineFactory = _default_engine,
) -> None:
"""Core driver — selects a plan, runs the engine, renders the result.

The ``build_engine`` seam is the unit-test injection point: pass a
factory returning an engine wired to a fake backend to run with no
network.

Raises:
typer.Exit: Always — code 0 on success, 1 on failure, 2 on an
unknown ``--workflow`` name.
"""
try:
plan = _select_plan(prompt, budget=budget, workflow=workflow)
except KeyError as exc:
# ``KeyError`` str() wraps its message in quotes; strip them.
typer.echo(str(exc).strip("\"'"), err=True)
raise typer.Exit(2) from None

engine = build_engine(plan)
result = asyncio.run(engine.run(plan))
_render(result)


def run(
prompt: str = typer.Argument(..., help="The task for the build pipeline to perform."),
budget: float | None = typer.Option(
None,
"--budget",
"-b",
help="Maximum spend in USD for this run. Defaults to the plan's budget.",
),
workflow: str = typer.Option(
_DEFAULT_WORKFLOW,
"--workflow",
"-w",
help="Workflow plan to run. Use a name from the built-in registry.",
),
) -> None:
"""Drive a prompt through a workflow plan and the pipeline engine."""
_run(prompt, budget=budget, workflow=workflow)
151 changes: 151 additions & 0 deletions tests/unit/test_run_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2026 BonfireAI

"""Unit tests for the ``bonfire run`` driver — NETWORK-FREE.

The driver wires a prompt into a workflow plan and runs it through the
live ``PipelineEngine``. These tests exercise plan selection, result
rendering, exit codes, and the prompt-to-plan handoff using a fake
backend injected via the ``build_engine`` seam — no SDK, no network.

The fake backend mirrors ``tests/unit/test_engine_pipeline.py``'s
``_MockBackend``: it returns COMPLETED envelopes by default and FAILED
envelopes for a configured set of agents.
"""

from __future__ import annotations

import pytest
import typer

from bonfire.cli.commands.run import _run, _select_plan
from bonfire.engine.pipeline import PipelineEngine
from bonfire.events.bus import EventBus
from bonfire.models.config import PipelineConfig
from bonfire.models.envelope import Envelope, ErrorDetail
from bonfire.protocols import DispatchOptions

# ``debug`` is a gate-free, handler-free workflow (scout -> warrior); every
# stage routes straight through the backend, so a bare fake backend with no
# gate/handler registries drives it to completion with no network.
_GATELESS_WORKFLOW = "debug"


class _FakeBackend:
"""Backend that returns COMPLETED envelopes (FAILED for ``fail_agents``).

Records every envelope it receives so a test can assert what the plan
handed the engine — in particular that the prompt reached the plan's
``task_description`` and flowed into the dispatched task/context.
"""

def __init__(self, *, fail_agents: set[str] | None = None) -> None:
self.fail_agents = fail_agents or set()
self.calls: list[Envelope] = []

async def execute(self, envelope: Envelope, *, options: DispatchOptions) -> Envelope:
self.calls.append(envelope)
if envelope.agent_name in self.fail_agents:
return envelope.with_error(
ErrorDetail(error_type="agent", message=f"{envelope.agent_name} failed")
)
return envelope.with_result(f"{envelope.agent_name} done", cost_usd=0.01)

async def health_check(self) -> bool:
return True


def _engine_factory(backend: _FakeBackend):
"""Build a ``build_engine`` seam returning an engine wired to *backend*."""

def _build(plan: object) -> PipelineEngine:
return PipelineEngine(
backend=backend, # type: ignore[arg-type]
bus=EventBus(),
config=PipelineConfig(),
)

return _build


# ---------------------------------------------------------------------------
# Plan selection — the prompt-to-plan handoff
# ---------------------------------------------------------------------------


class TestSelectPlan:
"""``_select_plan`` stamps the prompt and budget onto the chosen plan."""

def test_prompt_reaches_task_description(self) -> None:
plan = _select_plan("ship the widget", budget=None, workflow=_GATELESS_WORKFLOW)
assert plan.task_description == "ship the widget"

def test_budget_override_applied(self) -> None:
plan = _select_plan("x", budget=3.5, workflow=_GATELESS_WORKFLOW)
assert plan.budget_usd == 3.5

def test_budget_none_keeps_plan_default(self) -> None:
from bonfire.workflow.registry import get_default_registry

factory_default = get_default_registry().get(_GATELESS_WORKFLOW)().budget_usd
kept = _select_plan("x", budget=None, workflow=_GATELESS_WORKFLOW)
# The factory's own default budget survives when none is passed.
assert kept.budget_usd == factory_default

def test_unknown_workflow_raises_keyerror(self) -> None:
with pytest.raises(KeyError):
_select_plan("x", budget=None, workflow="no_such_workflow")


# ---------------------------------------------------------------------------
# Driver — happy path, failure path, exit codes
# ---------------------------------------------------------------------------


class TestRunDriver:
"""``_run`` drives the engine and renders the result with correct codes."""

def test_happy_path_exits_zero(self) -> None:
backend = _FakeBackend()
with pytest.raises(typer.Exit) as exc_info:
_run(
"build a thing",
workflow=_GATELESS_WORKFLOW,
build_engine=_engine_factory(backend),
)
assert exc_info.value.exit_code == 0

def test_failure_path_exits_nonzero(self) -> None:
# Fail the first stage of the debug workflow so the run halts.
backend = _FakeBackend(fail_agents={"scout"})
with pytest.raises(typer.Exit) as exc_info:
_run(
"build a thing",
workflow=_GATELESS_WORKFLOW,
build_engine=_engine_factory(backend),
)
assert exc_info.value.exit_code == 1

def test_prompt_reaches_dispatched_task(self) -> None:
"""The prompt must flow through the plan into the dispatched work."""
backend = _FakeBackend()
marker = "REACH-THE-BACKEND-MARKER"
with pytest.raises(typer.Exit):
_run(
marker,
workflow=_GATELESS_WORKFLOW,
build_engine=_engine_factory(backend),
)
assert backend.calls, "expected the engine to dispatch at least one stage"
first = backend.calls[0]
assert marker in first.task or marker in first.context

def test_unknown_workflow_exits_code_two(self) -> None:
backend = _FakeBackend()
with pytest.raises(typer.Exit) as exc_info:
_run(
"x",
workflow="definitely_not_registered",
build_engine=_engine_factory(backend),
)
assert exc_info.value.exit_code == 2
Loading