From 542f03a419faba6d52bca33eef2d3500e1698c0d Mon Sep 17 00:00:00 2001 From: velocitybolt Date: Tue, 7 Apr 2026 14:22:06 -0500 Subject: [PATCH 1/2] feat: add --prompt flag for headless single-shot execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enable programmatic usage of Anton via CLI flags: --prompt/-p Run a single query and exit --output-format Return text (default) or structured JSON --stdin Read prompt from stdin for piping Headless mode reuses the full ChatSession pipeline (memory, tools, scratchpad, datasources) without any interactive elements — no prompt_toolkit, no spinner, no escape watcher. JSON output includes response text, tool calls, and token usage for easy integration with orchestration platforms and CI pipelines. --- anton/chat.py | 120 +++++++++++++++++++++++++++++ anton/cli.py | 33 ++++++++ tests/test_headless.py | 166 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 319 insertions(+) create mode 100644 tests/test_headless.py diff --git a/anton/chat.py b/anton/chat.py index 25acdd9c..094b42a8 100644 --- a/anton/chat.py +++ b/anton/chat.py @@ -1980,6 +1980,126 @@ def run_chat( asyncio.run(_chat_loop(console, settings, resume=resume, first_run=first_run, desktop_first_run=desktop_first_run)) +def run_headless( + console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text" +) -> None: + """Run a single prompt in headless mode and exit.""" + asyncio.run(_headless(console, settings, prompt=prompt, output_format=output_format)) + + +async def _headless( + console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text" +) -> None: + """Execute a single prompt without interactive elements.""" + from anton.context.self_awareness import SelfAwarenessContext + from anton.llm.client import LLMClient + from anton.memory.cortex import Cortex + from anton.workspace import Workspace + + llm_client = LLMClient.from_settings(settings) + + self_awareness = SelfAwarenessContext(Path(settings.context_dir)) + workspace = Workspace(settings.workspace_path) + workspace.apply_env_to_process() + + # Inject datasource env vars + dv = DataVault() + dreg = DatasourceRegistry() + for conn in dv.list_connections(): + dv.inject_env(conn["engine"], conn["name"]) + edef = dreg.get(conn["engine"]) + if edef is not None: + register_secret_vars(edef, engine=conn["engine"], name=conn["name"]) + del dv, dreg + + global_memory_dir = Path.home() / ".anton" / "memory" + project_memory_dir = settings.workspace_path / ".anton" / "memory" + + cortex = Cortex( + global_dir=global_memory_dir, + project_dir=project_memory_dir, + mode=settings.memory_mode, + llm_client=llm_client, + ) + + from anton.memory.episodes import EpisodicMemory + + episodes_dir = settings.workspace_path / ".anton" / "episodes" + episodic = EpisodicMemory(episodes_dir, enabled=settings.episodic_memory) + if episodic.enabled: + episodic.start_session() + + from anton.memory.history_store import HistoryStore + + history_store = HistoryStore(episodes_dir) + current_session_id = episodic._session_id if episodic.enabled else None + + runtime_context = build_runtime_context(settings) + coding_api_key = ( + settings.anthropic_api_key + if settings.coding_provider == "anthropic" + else settings.openai_api_key + ) or "" + + session = ChatSession( + llm_client, + self_awareness=self_awareness, + cortex=cortex, + episodic=episodic, + runtime_context=runtime_context, + workspace=workspace, + console=None, # No interactive console in headless mode + coding_provider=settings.coding_provider, + coding_api_key=coding_api_key, + coding_base_url=settings.openai_base_url or "", + history_store=history_store, + session_id=current_session_id, + proactive_dashboards=False, # No interactive dashboards in headless + ) + + # Execute single turn + response_text = "" + tool_calls: list[dict] = [] + usage_data: dict = {} + + async for event in session.turn_stream(prompt): + if isinstance(event, StreamTextDelta): + response_text += event.text + elif isinstance(event, StreamToolUseStart): + tool_calls.append({"name": event.name, "id": event.id, "input": {}}) + elif isinstance(event, StreamToolUseDelta): + # Accumulate tool input JSON + if tool_calls: + last = tool_calls[-1] + last.setdefault("_raw_input", "") + last["_raw_input"] += event.json_delta + elif isinstance(event, StreamToolUseEnd): + # Parse accumulated JSON input + if tool_calls: + last = tool_calls[-1] + raw = last.pop("_raw_input", "{}") + try: + last["input"] = _json.loads(raw) + except _json.JSONDecodeError: + last["input"] = raw + elif isinstance(event, StreamComplete): + usage_data = { + "input_tokens": event.response.usage.input_tokens, + "output_tokens": event.response.usage.output_tokens, + } + + # Output + if output_format == "json": + result = { + "response": response_text, + "tool_calls": [{"name": tc["name"], "input": tc.get("input", {})} for tc in tool_calls], + "usage": usage_data, + } + print(_json.dumps(result)) + else: + print(response_text) + + async def _chat_loop( console: Console, settings: AntonSettings, *, resume: bool = False, first_run: bool = False, desktop_first_run: bool = False ) -> None: diff --git a/anton/cli.py b/anton/cli.py index aa3e1f38..af70552b 100644 --- a/anton/cli.py +++ b/anton/cli.py @@ -278,6 +278,15 @@ def main( resume: bool = typer.Option( False, "--resume", "-r", help="Resume a previous chat session" ), + prompt: str | None = typer.Option( + None, "--prompt", "-p", help="Run a single prompt in headless mode and exit" + ), + output_format: str = typer.Option( + "text", "--output-format", help="Output format: text or json (headless mode only)" + ), + stdin: bool = typer.Option( + False, "--stdin", help="Read prompt from stdin (headless mode)" + ), ) -> None: """Anton — a self-evolving autonomous system.""" _ensure_dependencies(console) @@ -287,6 +296,30 @@ def main( settings = AntonSettings() settings.resolve_workspace(folder) + # Headless mode: --prompt or --stdin + headless_prompt = prompt + if stdin: + import sys as _sys + headless_prompt = _sys.stdin.read().strip() + if not headless_prompt: + console.print("[red]Error: no input received from stdin[/]") + raise typer.Exit(1) + + if headless_prompt: + # Headless mode — skip terms consent, banner, update check + if not _has_api_key(settings): + console.print("[red]Error: no API key configured. Run `anton setup` first.[/]") + raise typer.Exit(1) + + ctx.ensure_object(dict) + ctx.obj["settings"] = settings + + from anton.chat import run_headless + + _ensure_workspace(settings) + run_headless(console, settings, prompt=headless_prompt, output_format=output_format) + raise typer.Exit(0) + if not settings.terms_consent: _ensure_terms_consent(console, settings) diff --git a/tests/test_headless.py b/tests/test_headless.py new file mode 100644 index 00000000..24512ab1 --- /dev/null +++ b/tests/test_headless.py @@ -0,0 +1,166 @@ +"""Tests for headless mode (--prompt flag).""" +from __future__ import annotations + +import json +from io import StringIO +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from anton.chat import _headless +from anton.llm.provider import ( + LLMResponse, + StreamComplete, + StreamTextDelta, + StreamToolUseEnd, + StreamToolUseStart, + ToolCall, + Usage, +) + + +def _text_response(text: str) -> LLMResponse: + return LLMResponse( + content=text, + tool_calls=[], + usage=Usage(input_tokens=10, output_tokens=20), + stop_reason="end_turn", + ) + + +def _mock_settings(): + settings = MagicMock() + settings.workspace_path = MagicMock() + settings.workspace_path.__truediv__ = lambda self, other: MagicMock() + settings.context_dir = "/tmp/test-anton-context" + settings.memory_mode = "off" + settings.episodic_memory = False + settings.coding_provider = "anthropic" + settings.anthropic_api_key = "test-key" + settings.openai_api_key = None + settings.openai_base_url = None + settings.proactive_dashboards = False + return settings + + +def _patches(mock_session): + """Common patches for headless tests. Returns context manager stack.""" + mock_dv = MagicMock() + mock_dv.return_value.list_connections.return_value = [] + mock_ep = MagicMock() + mock_ep.return_value.enabled = False + mock_ep.return_value._session_id = None + + return ( + patch("anton.llm.client.LLMClient.from_settings", return_value=MagicMock()), + patch("anton.context.self_awareness.SelfAwarenessContext"), + patch("anton.workspace.Workspace"), + patch("anton.data_vault.DataVault", mock_dv), + patch("anton.datasource_registry.DatasourceRegistry"), + patch("anton.memory.cortex.Cortex"), + patch("anton.memory.episodes.EpisodicMemory", mock_ep), + patch("anton.memory.history_store.HistoryStore"), + patch("anton.chat_session.build_runtime_context", return_value=""), + patch("anton.chat.ChatSession", return_value=mock_session), + ) + + +class TestHeadlessTextOutput: + @pytest.mark.asyncio + async def test_basic_text_response(self, capsys): + mock_session = AsyncMock() + + async def fake_stream(prompt): + yield StreamTextDelta("The answer is 42.") + yield StreamComplete(_text_response("The answer is 42.")) + + mock_session.turn_stream = fake_stream + + patches = _patches(mock_session) + with patches[0], patches[1], patches[2], patches[3], patches[4], \ + patches[5], patches[6], patches[7], patches[8], patches[9]: + from rich.console import Console + console = Console(file=StringIO()) + await _headless(console, _mock_settings(), prompt="question", output_format="text") + + captured = capsys.readouterr() + assert "The answer is 42." in captured.out + + +class TestHeadlessJsonOutput: + @pytest.mark.asyncio + async def test_json_response(self, capsys): + mock_session = AsyncMock() + + async def fake_stream(prompt): + yield StreamTextDelta("Hello world") + yield StreamComplete(_text_response("Hello world")) + + mock_session.turn_stream = fake_stream + + patches = _patches(mock_session) + with patches[0], patches[1], patches[2], patches[3], patches[4], \ + patches[5], patches[6], patches[7], patches[8], patches[9]: + from rich.console import Console + console = Console(file=StringIO()) + await _headless(console, _mock_settings(), prompt="say hello", output_format="json") + + captured = capsys.readouterr() + result = json.loads(captured.out) + assert result["response"] == "Hello world" + assert isinstance(result["tool_calls"], list) + assert result["usage"]["input_tokens"] == 10 + assert result["usage"]["output_tokens"] == 20 + + +class TestHeadlessToolCalls: + @pytest.mark.asyncio + async def test_tool_calls_in_json_output(self, capsys): + mock_session = AsyncMock() + + async def fake_stream(prompt): + yield StreamToolUseStart(id="tc_1", name="scratchpad") + yield StreamToolUseEnd(id="tc_1") + yield StreamTextDelta("Result: 55") + yield StreamComplete(LLMResponse( + content="Result: 55", + tool_calls=[ToolCall(id="tc_1", name="scratchpad", input={"action": "exec"})], + usage=Usage(input_tokens=50, output_tokens=100), + stop_reason="end_turn", + )) + + mock_session.turn_stream = fake_stream + + patches = _patches(mock_session) + with patches[0], patches[1], patches[2], patches[3], patches[4], \ + patches[5], patches[6], patches[7], patches[8], patches[9]: + from rich.console import Console + console = Console(file=StringIO()) + await _headless(console, _mock_settings(), prompt="fibonacci", output_format="json") + + captured = capsys.readouterr() + result = json.loads(captured.out) + assert len(result["tool_calls"]) == 1 + assert result["tool_calls"][0]["name"] == "scratchpad" + assert "55" in result["response"] + + +class TestHeadlessNoInteractive: + @pytest.mark.asyncio + async def test_completes_without_interactive_input(self): + """Headless mode completes without prompt_toolkit or interactive console.""" + mock_session = AsyncMock() + + async def fake_stream(prompt): + yield StreamTextDelta("ok") + yield StreamComplete(_text_response("ok")) + + mock_session.turn_stream = fake_stream + + patches = _patches(mock_session) + with patches[0], patches[1], patches[2], patches[3], patches[4], \ + patches[5], patches[6], patches[7], patches[8], patches[9]: + from rich.console import Console + console = Console(file=StringIO()) + # Should complete without hanging on interactive input + await _headless(console, _mock_settings(), prompt="test", output_format="text") From e92724787c8f595dcb617b1b656133f35f9aee14 Mon Sep 17 00:00:00 2001 From: velocitybolt Date: Wed, 8 Apr 2026 17:31:53 -0500 Subject: [PATCH 2/2] address PR comments --- anton/chat.py | 121 ----------------------------- anton/cli.py | 13 +++- anton/commands/headless.py | 154 +++++++++++++++++++++++++++++++++++++ tests/test_headless.py | 2 +- 4 files changed, 164 insertions(+), 126 deletions(-) create mode 100644 anton/commands/headless.py diff --git a/anton/chat.py b/anton/chat.py index 094b42a8..058ed03d 100644 --- a/anton/chat.py +++ b/anton/chat.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import json as _json import os import urllib.error import re as _re @@ -1980,126 +1979,6 @@ def run_chat( asyncio.run(_chat_loop(console, settings, resume=resume, first_run=first_run, desktop_first_run=desktop_first_run)) -def run_headless( - console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text" -) -> None: - """Run a single prompt in headless mode and exit.""" - asyncio.run(_headless(console, settings, prompt=prompt, output_format=output_format)) - - -async def _headless( - console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text" -) -> None: - """Execute a single prompt without interactive elements.""" - from anton.context.self_awareness import SelfAwarenessContext - from anton.llm.client import LLMClient - from anton.memory.cortex import Cortex - from anton.workspace import Workspace - - llm_client = LLMClient.from_settings(settings) - - self_awareness = SelfAwarenessContext(Path(settings.context_dir)) - workspace = Workspace(settings.workspace_path) - workspace.apply_env_to_process() - - # Inject datasource env vars - dv = DataVault() - dreg = DatasourceRegistry() - for conn in dv.list_connections(): - dv.inject_env(conn["engine"], conn["name"]) - edef = dreg.get(conn["engine"]) - if edef is not None: - register_secret_vars(edef, engine=conn["engine"], name=conn["name"]) - del dv, dreg - - global_memory_dir = Path.home() / ".anton" / "memory" - project_memory_dir = settings.workspace_path / ".anton" / "memory" - - cortex = Cortex( - global_dir=global_memory_dir, - project_dir=project_memory_dir, - mode=settings.memory_mode, - llm_client=llm_client, - ) - - from anton.memory.episodes import EpisodicMemory - - episodes_dir = settings.workspace_path / ".anton" / "episodes" - episodic = EpisodicMemory(episodes_dir, enabled=settings.episodic_memory) - if episodic.enabled: - episodic.start_session() - - from anton.memory.history_store import HistoryStore - - history_store = HistoryStore(episodes_dir) - current_session_id = episodic._session_id if episodic.enabled else None - - runtime_context = build_runtime_context(settings) - coding_api_key = ( - settings.anthropic_api_key - if settings.coding_provider == "anthropic" - else settings.openai_api_key - ) or "" - - session = ChatSession( - llm_client, - self_awareness=self_awareness, - cortex=cortex, - episodic=episodic, - runtime_context=runtime_context, - workspace=workspace, - console=None, # No interactive console in headless mode - coding_provider=settings.coding_provider, - coding_api_key=coding_api_key, - coding_base_url=settings.openai_base_url or "", - history_store=history_store, - session_id=current_session_id, - proactive_dashboards=False, # No interactive dashboards in headless - ) - - # Execute single turn - response_text = "" - tool_calls: list[dict] = [] - usage_data: dict = {} - - async for event in session.turn_stream(prompt): - if isinstance(event, StreamTextDelta): - response_text += event.text - elif isinstance(event, StreamToolUseStart): - tool_calls.append({"name": event.name, "id": event.id, "input": {}}) - elif isinstance(event, StreamToolUseDelta): - # Accumulate tool input JSON - if tool_calls: - last = tool_calls[-1] - last.setdefault("_raw_input", "") - last["_raw_input"] += event.json_delta - elif isinstance(event, StreamToolUseEnd): - # Parse accumulated JSON input - if tool_calls: - last = tool_calls[-1] - raw = last.pop("_raw_input", "{}") - try: - last["input"] = _json.loads(raw) - except _json.JSONDecodeError: - last["input"] = raw - elif isinstance(event, StreamComplete): - usage_data = { - "input_tokens": event.response.usage.input_tokens, - "output_tokens": event.response.usage.output_tokens, - } - - # Output - if output_format == "json": - result = { - "response": response_text, - "tool_calls": [{"name": tc["name"], "input": tc.get("input", {})} for tc in tool_calls], - "usage": usage_data, - } - print(_json.dumps(result)) - else: - print(response_text) - - async def _chat_loop( console: Console, settings: AntonSettings, *, resume: bool = False, first_run: bool = False, desktop_first_run: bool = False ) -> None: diff --git a/anton/cli.py b/anton/cli.py index af70552b..6dacbeff 100644 --- a/anton/cli.py +++ b/anton/cli.py @@ -296,25 +296,30 @@ def main( settings = AntonSettings() settings.resolve_workspace(folder) - # Headless mode: --prompt or --stdin + # Headless mode: --prompt or --stdin (mutually exclusive) + if prompt and stdin: + import sys as _sys + print("Error: --prompt and --stdin are mutually exclusive", file=_sys.stderr) + raise typer.Exit(1) + headless_prompt = prompt if stdin: import sys as _sys headless_prompt = _sys.stdin.read().strip() if not headless_prompt: - console.print("[red]Error: no input received from stdin[/]") + print("Error: no input received from stdin", file=_sys.stderr) raise typer.Exit(1) if headless_prompt: # Headless mode — skip terms consent, banner, update check if not _has_api_key(settings): - console.print("[red]Error: no API key configured. Run `anton setup` first.[/]") + print("Error: no API key configured. Run `anton setup` first.", file=sys.stderr) raise typer.Exit(1) ctx.ensure_object(dict) ctx.obj["settings"] = settings - from anton.chat import run_headless + from anton.commands.headless import run_headless _ensure_workspace(settings) run_headless(console, settings, prompt=headless_prompt, output_format=output_format) diff --git a/anton/commands/headless.py b/anton/commands/headless.py new file mode 100644 index 00000000..285ff389 --- /dev/null +++ b/anton/commands/headless.py @@ -0,0 +1,154 @@ +"""Headless single-shot prompt execution.""" +from __future__ import annotations + +import asyncio +import json as _json +import sys +from pathlib import Path +from typing import TYPE_CHECKING + +from anton.chat_session import build_runtime_context +from anton.data_vault import DataVault +from anton.datasource_registry import DatasourceRegistry +from anton.llm.provider import ( + StreamComplete, + StreamTextDelta, + StreamToolUseDelta, + StreamToolUseEnd, + StreamToolUseStart, +) +from anton.utils.datasources import register_secret_vars + +if TYPE_CHECKING: + from rich.console import Console + + from anton.config.settings import AntonSettings + + +def run_headless( + console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text" +) -> None: + """Run a single prompt in headless mode and exit.""" + if not prompt: + print("Error: headless mode requires a prompt via --prompt or --stdin", file=sys.stderr) + raise SystemExit(1) + + asyncio.run(_headless(console, settings, prompt=prompt, output_format=output_format)) + + +async def _headless( + console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text" +) -> None: + """Execute a single prompt without interactive elements.""" + try: + from anton.context.self_awareness import SelfAwarenessContext + from anton.llm.client import LLMClient + from anton.memory.cortex import Cortex + from anton.workspace import Workspace + + llm_client = LLMClient.from_settings(settings) + + self_awareness = SelfAwarenessContext(Path(settings.context_dir)) + workspace = Workspace(settings.workspace_path) + workspace.apply_env_to_process() + + # Inject datasource env vars + dv = DataVault() + dreg = DatasourceRegistry() + for conn in dv.list_connections(): + dv.inject_env(conn["engine"], conn["name"]) + edef = dreg.get(conn["engine"]) + if edef is not None: + register_secret_vars(edef, engine=conn["engine"], name=conn["name"]) + del dv, dreg + + global_memory_dir = Path.home() / ".anton" / "memory" + project_memory_dir = settings.workspace_path / ".anton" / "memory" + + cortex = Cortex( + global_dir=global_memory_dir, + project_dir=project_memory_dir, + mode=settings.memory_mode, + llm_client=llm_client, + ) + + from anton.memory.episodes import EpisodicMemory + + episodes_dir = settings.workspace_path / ".anton" / "episodes" + episodic = EpisodicMemory(episodes_dir, enabled=settings.episodic_memory) + if episodic.enabled: + episodic.start_session() + + from anton.memory.history_store import HistoryStore + + history_store = HistoryStore(episodes_dir) + current_session_id = episodic._session_id if episodic.enabled else None + + from anton.chat import ChatSession + + runtime_context = build_runtime_context(settings) + coding_api_key = ( + settings.anthropic_api_key + if settings.coding_provider == "anthropic" + else settings.openai_api_key + ) or "" + + session = ChatSession( + llm_client, + self_awareness=self_awareness, + cortex=cortex, + episodic=episodic, + runtime_context=runtime_context, + workspace=workspace, + console=None, + coding_provider=settings.coding_provider, + coding_api_key=coding_api_key, + coding_base_url=settings.openai_base_url or "", + history_store=history_store, + session_id=current_session_id, + proactive_dashboards=False, + ) + + # Execute single turn + response_text = "" + tool_calls: list[dict] = [] + usage_data: dict = {} + + async for event in session.turn_stream(prompt): + if isinstance(event, StreamTextDelta): + response_text += event.text + elif isinstance(event, StreamToolUseStart): + tool_calls.append({"name": event.name, "id": event.id, "input": {}}) + elif isinstance(event, StreamToolUseDelta): + if tool_calls: + last = tool_calls[-1] + last.setdefault("_raw_input", "") + last["_raw_input"] += event.json_delta + elif isinstance(event, StreamToolUseEnd): + if tool_calls: + last = tool_calls[-1] + raw = last.pop("_raw_input", "{}") + try: + last["input"] = _json.loads(raw) + except _json.JSONDecodeError: + last["input"] = raw + elif isinstance(event, StreamComplete): + usage_data = { + "input_tokens": event.response.usage.input_tokens, + "output_tokens": event.response.usage.output_tokens, + } + + # Output + if output_format == "json": + result = { + "response": response_text, + "tool_calls": [{"name": tc["name"], "input": tc.get("input", {})} for tc in tool_calls], + "usage": usage_data, + } + print(_json.dumps(result)) + else: + print(response_text) + + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + raise SystemExit(1) diff --git a/tests/test_headless.py b/tests/test_headless.py index 24512ab1..c96573ee 100644 --- a/tests/test_headless.py +++ b/tests/test_headless.py @@ -7,7 +7,7 @@ import pytest -from anton.chat import _headless +from anton.commands.headless import _headless from anton.llm.provider import ( LLMResponse, StreamComplete,