diff --git a/anton/chat.py b/anton/chat.py index 25acdd9c..058ed03d 100644 --- a/anton/chat.py +++ b/anton/chat.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import json as _json import os import urllib.error import re as _re diff --git a/anton/cli.py b/anton/cli.py index aa3e1f38..6dacbeff 100644 --- a/anton/cli.py +++ b/anton/cli.py @@ -278,6 +278,15 @@ def main( resume: bool = typer.Option( False, "--resume", "-r", help="Resume a previous chat session" ), + prompt: str | None = typer.Option( + None, "--prompt", "-p", help="Run a single prompt in headless mode and exit" + ), + output_format: str = typer.Option( + "text", "--output-format", help="Output format: text or json (headless mode only)" + ), + stdin: bool = typer.Option( + False, "--stdin", help="Read prompt from stdin (headless mode)" + ), ) -> None: """Anton — a self-evolving autonomous system.""" _ensure_dependencies(console) @@ -287,6 +296,35 @@ def main( settings = AntonSettings() settings.resolve_workspace(folder) + # Headless mode: --prompt or --stdin (mutually exclusive) + if prompt and stdin: + import sys as _sys + print("Error: --prompt and --stdin are mutually exclusive", file=_sys.stderr) + raise typer.Exit(1) + + headless_prompt = prompt + if stdin: + import sys as _sys + headless_prompt = _sys.stdin.read().strip() + if not headless_prompt: + print("Error: no input received from stdin", file=_sys.stderr) + raise typer.Exit(1) + + if headless_prompt: + # Headless mode — skip terms consent, banner, update check + if not _has_api_key(settings): + print("Error: no API key configured. Run `anton setup` first.", file=sys.stderr) + raise typer.Exit(1) + + ctx.ensure_object(dict) + ctx.obj["settings"] = settings + + from anton.commands.headless import run_headless + + _ensure_workspace(settings) + run_headless(console, settings, prompt=headless_prompt, output_format=output_format) + raise typer.Exit(0) + if not settings.terms_consent: _ensure_terms_consent(console, settings) diff --git a/anton/commands/headless.py b/anton/commands/headless.py new file mode 100644 index 00000000..285ff389 --- /dev/null +++ b/anton/commands/headless.py @@ -0,0 +1,154 @@ +"""Headless single-shot prompt execution.""" +from __future__ import annotations + +import asyncio +import json as _json +import sys +from pathlib import Path +from typing import TYPE_CHECKING + +from anton.chat_session import build_runtime_context +from anton.data_vault import DataVault +from anton.datasource_registry import DatasourceRegistry +from anton.llm.provider import ( + StreamComplete, + StreamTextDelta, + StreamToolUseDelta, + StreamToolUseEnd, + StreamToolUseStart, +) +from anton.utils.datasources import register_secret_vars + +if TYPE_CHECKING: + from rich.console import Console + + from anton.config.settings import AntonSettings + + +def run_headless( + console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text" +) -> None: + """Run a single prompt in headless mode and exit.""" + if not prompt: + print("Error: headless mode requires a prompt via --prompt or --stdin", file=sys.stderr) + raise SystemExit(1) + + asyncio.run(_headless(console, settings, prompt=prompt, output_format=output_format)) + + +async def _headless( + console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text" +) -> None: + """Execute a single prompt without interactive elements.""" + try: + from anton.context.self_awareness import SelfAwarenessContext + from anton.llm.client import LLMClient + from anton.memory.cortex import Cortex + from anton.workspace import Workspace + + llm_client = LLMClient.from_settings(settings) + + self_awareness = SelfAwarenessContext(Path(settings.context_dir)) + workspace = Workspace(settings.workspace_path) + workspace.apply_env_to_process() + + # Inject datasource env vars + dv = DataVault() + dreg = DatasourceRegistry() + for conn in dv.list_connections(): + dv.inject_env(conn["engine"], conn["name"]) + edef = dreg.get(conn["engine"]) + if edef is not None: + register_secret_vars(edef, engine=conn["engine"], name=conn["name"]) + del dv, dreg + + global_memory_dir = Path.home() / ".anton" / "memory" + project_memory_dir = settings.workspace_path / ".anton" / "memory" + + cortex = Cortex( + global_dir=global_memory_dir, + project_dir=project_memory_dir, + mode=settings.memory_mode, + llm_client=llm_client, + ) + + from anton.memory.episodes import EpisodicMemory + + episodes_dir = settings.workspace_path / ".anton" / "episodes" + episodic = EpisodicMemory(episodes_dir, enabled=settings.episodic_memory) + if episodic.enabled: + episodic.start_session() + + from anton.memory.history_store import HistoryStore + + history_store = HistoryStore(episodes_dir) + current_session_id = episodic._session_id if episodic.enabled else None + + from anton.chat import ChatSession + + runtime_context = build_runtime_context(settings) + coding_api_key = ( + settings.anthropic_api_key + if settings.coding_provider == "anthropic" + else settings.openai_api_key + ) or "" + + session = ChatSession( + llm_client, + self_awareness=self_awareness, + cortex=cortex, + episodic=episodic, + runtime_context=runtime_context, + workspace=workspace, + console=None, + coding_provider=settings.coding_provider, + coding_api_key=coding_api_key, + coding_base_url=settings.openai_base_url or "", + history_store=history_store, + session_id=current_session_id, + proactive_dashboards=False, + ) + + # Execute single turn + response_text = "" + tool_calls: list[dict] = [] + usage_data: dict = {} + + async for event in session.turn_stream(prompt): + if isinstance(event, StreamTextDelta): + response_text += event.text + elif isinstance(event, StreamToolUseStart): + tool_calls.append({"name": event.name, "id": event.id, "input": {}}) + elif isinstance(event, StreamToolUseDelta): + if tool_calls: + last = tool_calls[-1] + last.setdefault("_raw_input", "") + last["_raw_input"] += event.json_delta + elif isinstance(event, StreamToolUseEnd): + if tool_calls: + last = tool_calls[-1] + raw = last.pop("_raw_input", "{}") + try: + last["input"] = _json.loads(raw) + except _json.JSONDecodeError: + last["input"] = raw + elif isinstance(event, StreamComplete): + usage_data = { + "input_tokens": event.response.usage.input_tokens, + "output_tokens": event.response.usage.output_tokens, + } + + # Output + if output_format == "json": + result = { + "response": response_text, + "tool_calls": [{"name": tc["name"], "input": tc.get("input", {})} for tc in tool_calls], + "usage": usage_data, + } + print(_json.dumps(result)) + else: + print(response_text) + + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + raise SystemExit(1) diff --git a/tests/test_headless.py b/tests/test_headless.py new file mode 100644 index 00000000..c96573ee --- /dev/null +++ b/tests/test_headless.py @@ -0,0 +1,166 @@ +"""Tests for headless mode (--prompt flag).""" +from __future__ import annotations + +import json +from io import StringIO +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from anton.commands.headless import _headless +from anton.llm.provider import ( + LLMResponse, + StreamComplete, + StreamTextDelta, + StreamToolUseEnd, + StreamToolUseStart, + ToolCall, + Usage, +) + + +def _text_response(text: str) -> LLMResponse: + return LLMResponse( + content=text, + tool_calls=[], + usage=Usage(input_tokens=10, output_tokens=20), + stop_reason="end_turn", + ) + + +def _mock_settings(): + settings = MagicMock() + settings.workspace_path = MagicMock() + settings.workspace_path.__truediv__ = lambda self, other: MagicMock() + settings.context_dir = "/tmp/test-anton-context" + settings.memory_mode = "off" + settings.episodic_memory = False + settings.coding_provider = "anthropic" + settings.anthropic_api_key = "test-key" + settings.openai_api_key = None + settings.openai_base_url = None + settings.proactive_dashboards = False + return settings + + +def _patches(mock_session): + """Common patches for headless tests. Returns context manager stack.""" + mock_dv = MagicMock() + mock_dv.return_value.list_connections.return_value = [] + mock_ep = MagicMock() + mock_ep.return_value.enabled = False + mock_ep.return_value._session_id = None + + return ( + patch("anton.llm.client.LLMClient.from_settings", return_value=MagicMock()), + patch("anton.context.self_awareness.SelfAwarenessContext"), + patch("anton.workspace.Workspace"), + patch("anton.data_vault.DataVault", mock_dv), + patch("anton.datasource_registry.DatasourceRegistry"), + patch("anton.memory.cortex.Cortex"), + patch("anton.memory.episodes.EpisodicMemory", mock_ep), + patch("anton.memory.history_store.HistoryStore"), + patch("anton.chat_session.build_runtime_context", return_value=""), + patch("anton.chat.ChatSession", return_value=mock_session), + ) + + +class TestHeadlessTextOutput: + @pytest.mark.asyncio + async def test_basic_text_response(self, capsys): + mock_session = AsyncMock() + + async def fake_stream(prompt): + yield StreamTextDelta("The answer is 42.") + yield StreamComplete(_text_response("The answer is 42.")) + + mock_session.turn_stream = fake_stream + + patches = _patches(mock_session) + with patches[0], patches[1], patches[2], patches[3], patches[4], \ + patches[5], patches[6], patches[7], patches[8], patches[9]: + from rich.console import Console + console = Console(file=StringIO()) + await _headless(console, _mock_settings(), prompt="question", output_format="text") + + captured = capsys.readouterr() + assert "The answer is 42." in captured.out + + +class TestHeadlessJsonOutput: + @pytest.mark.asyncio + async def test_json_response(self, capsys): + mock_session = AsyncMock() + + async def fake_stream(prompt): + yield StreamTextDelta("Hello world") + yield StreamComplete(_text_response("Hello world")) + + mock_session.turn_stream = fake_stream + + patches = _patches(mock_session) + with patches[0], patches[1], patches[2], patches[3], patches[4], \ + patches[5], patches[6], patches[7], patches[8], patches[9]: + from rich.console import Console + console = Console(file=StringIO()) + await _headless(console, _mock_settings(), prompt="say hello", output_format="json") + + captured = capsys.readouterr() + result = json.loads(captured.out) + assert result["response"] == "Hello world" + assert isinstance(result["tool_calls"], list) + assert result["usage"]["input_tokens"] == 10 + assert result["usage"]["output_tokens"] == 20 + + +class TestHeadlessToolCalls: + @pytest.mark.asyncio + async def test_tool_calls_in_json_output(self, capsys): + mock_session = AsyncMock() + + async def fake_stream(prompt): + yield StreamToolUseStart(id="tc_1", name="scratchpad") + yield StreamToolUseEnd(id="tc_1") + yield StreamTextDelta("Result: 55") + yield StreamComplete(LLMResponse( + content="Result: 55", + tool_calls=[ToolCall(id="tc_1", name="scratchpad", input={"action": "exec"})], + usage=Usage(input_tokens=50, output_tokens=100), + stop_reason="end_turn", + )) + + mock_session.turn_stream = fake_stream + + patches = _patches(mock_session) + with patches[0], patches[1], patches[2], patches[3], patches[4], \ + patches[5], patches[6], patches[7], patches[8], patches[9]: + from rich.console import Console + console = Console(file=StringIO()) + await _headless(console, _mock_settings(), prompt="fibonacci", output_format="json") + + captured = capsys.readouterr() + result = json.loads(captured.out) + assert len(result["tool_calls"]) == 1 + assert result["tool_calls"][0]["name"] == "scratchpad" + assert "55" in result["response"] + + +class TestHeadlessNoInteractive: + @pytest.mark.asyncio + async def test_completes_without_interactive_input(self): + """Headless mode completes without prompt_toolkit or interactive console.""" + mock_session = AsyncMock() + + async def fake_stream(prompt): + yield StreamTextDelta("ok") + yield StreamComplete(_text_response("ok")) + + mock_session.turn_stream = fake_stream + + patches = _patches(mock_session) + with patches[0], patches[1], patches[2], patches[3], patches[4], \ + patches[5], patches[6], patches[7], patches[8], patches[9]: + from rich.console import Console + console = Console(file=StringIO()) + # Should complete without hanging on interactive input + await _headless(console, _mock_settings(), prompt="test", output_format="text")