Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 121 additions & 19 deletions aai_cli/code_agent/tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

from __future__ import annotations

import itertools
import threading
import time
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar

from rich.markup import escape
from textual.app import App, ComposeResult
from textual.containers import Horizontal
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Input, Label, RichLog, Static
from textual.worker import Worker
Expand All @@ -29,11 +31,26 @@
if TYPE_CHECKING:
from collections.abc import Mapping

from textual.timer import Timer

# Glyphs cycled by the working indicator's animation (purely cosmetic).
_SPIN_FRAMES = "✶✷✸✹✺" # pragma: no mutate


def _format_args(args: Mapping[str, object]) -> str:
return ", ".join(f"{key}={value!r}" for key, value in args.items())


def _spinner_text(elapsed_s: int, frame: str) -> str:
"""The working-indicator line: a spinner glyph and the elapsed seconds."""
return f"{frame} Working… ({elapsed_s}s)"


def _approval_decision(button_id: str | None) -> str:
"""Map a pressed approval button's id to a decision, defaulting to reject if unset."""
return button_id or "reject"


def _abbrev_home(path: Path) -> str:
"""Render ``path`` with the home directory collapsed to ``~``."""
try:
Expand Down Expand Up @@ -63,45 +80,77 @@ def _status_text(cwd: Path, *, auto_approve: bool) -> str:
return " ".join(parts)


class ApprovalScreen(ModalScreen[bool]):
"""A modal asking the user to approve or reject one pending tool call."""
class ApprovalScreen(ModalScreen[str]):
"""A compact, bottom-docked prompt to approve/auto-approve/reject one tool call.

BINDINGS: ClassVar = [("y", "approve", "Approve"), ("n", "reject", "Reject")]
The transparent screen background leaves the transcript visible above (no full-screen
takeover); the decision is one of ``"approve"``, ``"auto"``, or ``"reject"``.
"""

DEFAULT_CSS = """
ApprovalScreen { align: center bottom; background: transparent; }
ApprovalScreen #approvalbox {
dock: bottom; width: 1fr; height: auto;
border: round #f59e0b; background: #0b0e16; padding: 0 1; margin: 0 1 1 1;
}
ApprovalScreen #approvalbox Label { height: auto; }
ApprovalScreen #approvalbox Horizontal { height: auto; }
ApprovalScreen #approvalbox Button { margin: 0 1 0 0; }
"""
BINDINGS: ClassVar = [
("y", "approve", "Approve"),
("a", "auto", "Auto-approve"),
("n", "reject", "Reject"),
]

def __init__(self, name: str, args: Mapping[str, object]) -> None:
super().__init__()
self._tool_name = name # not _name: that shadows Textual Widget's str|None attr
self._args = args

def compose(self) -> ComposeResult:
yield Label(
f"Run tool [b]{escape(self._tool_name)}[/b]?\n{escape(_format_args(self._args))}"
)
with Horizontal():
yield Button("Approve (y)", id="approve", variant="success")
yield Button("Reject (n)", id="reject", variant="error")
with Vertical(id="approvalbox"):
yield Label(
f"Run tool [b]{escape(self._tool_name)}[/b]? "
f"[dim]{escape(_format_args(self._args))}[/dim]"
)
with Horizontal():
yield Button("Approve (y)", id="approve", variant="success")
yield Button("Auto-approve (a)", id="auto", variant="primary")
yield Button("Reject (n)", id="reject", variant="error")

def on_button_pressed(self, event: Button.Pressed) -> None:
approved = event.button.id == "approve"
self.dismiss(approved)
self.dismiss(_approval_decision(event.button.id))

def action_approve(self) -> None:
self.dismiss(result=True)
self.dismiss("approve")

def action_auto(self) -> None:
self.dismiss("auto")

def action_reject(self) -> None:
self.dismiss(result=False)
self.dismiss("reject")


class AskScreen(ModalScreen[str]):
"""A modal that asks the user a question from the agent and returns their answer."""
"""A bottom-docked prompt that relays a question from the agent and returns the answer."""

DEFAULT_CSS = """
AskScreen { align: center bottom; background: transparent; }
AskScreen #askbox {
dock: bottom; width: 1fr; height: auto;
border: round #3a3f55; background: #0b0e16; padding: 0 1; margin: 0 1 1 1;
}
"""

def __init__(self, question: str) -> None:
super().__init__()
self._question = question

def compose(self) -> ComposeResult:
yield Label(f"[b]The agent asks:[/b]\n{escape(self._question)}")
yield Input(id="answer", placeholder="Type your answer and press Enter…")
with Vertical(id="askbox"):
yield Label(f"[b]The agent asks:[/b] {escape(self._question)}")
yield Input(id="answer", placeholder="Type your answer and press Enter…")

def on_input_submitted(self, event: Input.Submitted) -> None:
self.dismiss(event.value)
Expand All @@ -121,6 +170,9 @@ class CodeAgentApp(App[None]):
#promptbar {{ dock: bottom; height: 3; background: #0b0e16; border: round #3a3f55; margin: 1 1; }}
#promptmark {{ width: 3; color: {banner.BRAND_HEX}; content-align: center middle; }}
#prompt {{ border: none; background: #0b0e16; padding: 0; }}
/* In normal flow below the 1fr log, so it sits just above the docked prompt bar. */
#spinner {{ height: 1; background: #0b0e16; padding: 0 2;
color: {banner.BRAND_HEX}; display: none; }}
#status {{ dock: bottom; height: 1; background: #0b0e16; padding: 0 1; }}
"""
TITLE = "AssemblyAI Code"
Expand Down Expand Up @@ -152,6 +204,9 @@ def __init__(
self._cwd = cwd if cwd is not None else Path.cwd()
self._web_note = web_note
self._last_reply = ""
self._spin_frames = itertools.cycle(_SPIN_FRAMES)
self._spin_timer: Timer | None = None
self._turn_started = 0.0 # pragma: no mutate — always reset by _start_spinner first
self._session = CodeSession(
agent=agent,
sink=self._emit_event,
Expand All @@ -164,6 +219,8 @@ def compose(self) -> ComposeResult:
# No Header/Footer chrome — the splash is the title and the bottom status line
# the only footer, so the screen stays a flat dark canvas.
yield RichLog(id="log", wrap=True, markup=True)
# Docked before the prompt bar, so the working indicator sits just above the input.
yield Static("", id="spinner")
with Horizontal(id="promptbar"):
yield Static(">", id="promptmark")
yield Input(id="prompt", placeholder="Ask the agent to build something…")
Expand Down Expand Up @@ -237,8 +294,30 @@ def _store(result: T | None) -> None:
return box["value"]

def _approve(self, name: str, args: dict[str, object]) -> bool:
"""Block the worker on a modal approval screen and return the decision."""
return self._modal_result(ApprovalScreen(name, args), default=False)
"""Decide whether to run a gated tool, prompting unless auto-approve is on.

Once the user picks "Auto-approve", later tool calls skip the modal entirely —
functionally the same as starting with ``--auto``.
"""
if self._auto_approve:
return True
decision = self._modal_result(ApprovalScreen(name, args), default="reject")
if decision == "auto":
self._enable_auto_approve()
return True
return decision == "approve"

def _enable_auto_approve(self) -> None:
"""Switch the session to auto-approve and refresh the mode badge."""
self._auto_approve = True
self._session.auto_approve = True
self.call_from_thread(self._refresh_status)

def _refresh_status(self) -> None:
"""Re-render the bottom status line (e.g. after the mode flips to auto)."""
self.query_one("#status", Static).update(
_status_text(self._cwd, auto_approve=self._auto_approve)
)

def _ask(self, question: str) -> str:
"""Block the worker on a modal input screen and return the user's answer."""
Expand All @@ -256,15 +335,38 @@ def _submit(self, text: str) -> None:
log = self.query_one("#log", RichLog)
log.write(f"[b cyan]» {escape(text)}[/b cyan]")
self.query_one("#prompt", Input).disabled = True
self._start_spinner()
self._run_turn(text)

def _run_turn(self, text: str) -> Worker[None]:
return self.run_worker(
lambda: self._session.send(text), thread=True, exclusive=True, name="agent-turn"
)

# --- working indicator (spinner + elapsed) --------------------------------

def _start_spinner(self) -> None:
"""Show the working indicator and animate it while the turn runs."""
self._turn_started = time.monotonic()
self.query_one("#spinner", Static).display = True
self._tick()
self._spin_timer = self.set_interval(0.25, self._tick) # pragma: no mutate

def _tick(self) -> None:
"""Advance the spinner one frame and refresh the elapsed-seconds readout."""
elapsed = int(time.monotonic() - self._turn_started)
self.query_one("#spinner", Static).update(_spinner_text(elapsed, next(self._spin_frames)))

def _stop_spinner(self) -> None:
"""Stop the animation and hide the working indicator."""
if self._spin_timer is not None:
self._spin_timer.stop()
self._spin_timer = None
self.query_one("#spinner", Static).display = False

def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
if event.worker.is_finished:
self._stop_spinner()
prompt = self.query_one("#prompt", Input)
prompt.disabled = False
prompt.focus()
98 changes: 95 additions & 3 deletions tests/test_code_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

import asyncio
import threading
import time
from pathlib import Path

import pytest
from langchain_core.messages import AIMessage, HumanMessage
from textual.widgets import Input, RichLog
from textual.widgets import Input, RichLog, Static

from aai_cli.code_agent import tui
from aai_cli.code_agent.events import AssistantText, ErrorText, ToolCall, ToolResult
Expand Down Expand Up @@ -50,6 +51,14 @@ def test_format_args_and_abbrev_home() -> None:
assert tui._abbrev_home(outside) == str(outside)


def test_approval_decision_defaults_to_reject() -> None:
assert tui._approval_decision("approve") == "approve"
assert tui._approval_decision("auto") == "auto"
# A button with no id (Textual allows None) is treated as a rejection, not approval.
assert tui._approval_decision(None) == "reject"
assert tui._approval_decision("") == "reject"


def test_git_branch_and_status(tmp_path: Path) -> None:
assert tui._git_branch(tmp_path) is None # no .git
(tmp_path / ".git").mkdir()
Expand Down Expand Up @@ -201,7 +210,7 @@ def test_approval_button_press_dismisses() -> None:
# Covers ApprovalScreen.on_button_pressed (the click path; key paths are covered
# by the approve/reject modal tests above). The bracketed name/args also guard the
# compose() escape() — without it, Label markup parsing would raise on mount.
results: list[bool | None] = []
results: list[str | None] = []

async def go() -> None:
app = CodeAgentApp(agent=FakeAgent([]))
Expand All @@ -213,7 +222,90 @@ async def go() -> None:
await pilot.pause()

_run(go())
assert results == [False]
assert results == ["reject"]


def test_approval_box_is_compact_and_bottom_docked() -> None:
# Regression guard: the approval prompt must not take over the whole screen — it
# docks a short box at the bottom so the transcript stays visible above it.
async def go() -> None:
app = CodeAgentApp(agent=FakeAgent([]))
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
app.push_screen(ApprovalScreen("write_file", {"file_path": "x.py"}))
await pilot.pause()
box = app.screen.query_one("#approvalbox")
assert box.region.height <= 8 # a handful of rows, not the full 30
assert box.region.bottom <= 30 # anchored within the bottom of the screen
assert box.region.y >= 15 # sits in the lower half, transcript visible above

_run(go())


def test_approval_auto_approve_flips_mode_and_skips_later_prompts() -> None:
# Picking "Auto-approve (a)" approves this call, flips the badge manual→auto, and
# makes every later _approve return True without ever pushing a modal.
app = CodeAgentApp(agent=FakeAgent([]))
assert _drive_modal(app, lambda: app._approve("execute", {"cmd": "ls"}), ["a"]) is True
assert app._auto_approve is True
assert app._session.auto_approve is True
# A second decision short-circuits: it returns True even though no modal can be driven.
assert app._approve("write_file", {"file_path": "x"}) is True


def test_refresh_status_rerenders_badge() -> None:
# _enable_auto_approve (worker thread) marshals a _refresh_status onto the UI thread;
# this drives that re-render directly, asserting the badge tracks the mode flip.
async def go() -> None:
app = CodeAgentApp(agent=FakeAgent([]))
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
assert "manual" in str(app.query_one("#status", Static).render())
app._auto_approve = True
app._refresh_status()
await pilot.pause()
assert "auto" in str(app.query_one("#status", Static).render())

_run(go())


def test_spinner_text_formats_frame_and_elapsed() -> None:
assert tui._spinner_text(46, "✶") == "✶ Working… (46s)"
assert tui._spinner_text(0, "✷") == "✷ Working… (0s)"


def test_spinner_starts_ticks_and_stops(monkeypatch: pytest.MonkeyPatch) -> None:
async def go() -> None:
app = CodeAgentApp(agent=FakeAgent([]))
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
# Re-query for each display check: a stored `spinner.display` would let mypy
# narrow the bool across the start/stop calls and flag the next assert dead.
assert app.query_one("#spinner", Static).display is False # hidden at rest
app._start_spinner()
await pilot.pause()
assert app.query_one("#spinner", Static).display is True
# _tick wires the elapsed seconds off the start time; pin "now" to assert it.
monkeypatch.setattr(time, "monotonic", lambda: app._turn_started + 7.0)
app._tick()
assert "Working… (7s)" in str(app.query_one("#spinner", Static).render())
app._stop_spinner()
assert app.query_one("#spinner", Static).display is False
assert app._spin_timer is None

_run(go())


def test_stop_spinner_is_a_noop_when_not_started() -> None:
# The timer-None branch of _stop_spinner: stopping before any turn just hides.
async def go() -> None:
app = CodeAgentApp(agent=FakeAgent([]))
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
app._stop_spinner()
assert app.query_one("#spinner", Static).display is False

_run(go())


def test_ask_screen_compose_escapes_markup() -> None:
Expand Down
Loading