diff --git a/src/agentgrep/__init__.py b/src/agentgrep/__init__.py index 202624f..c296477 100644 --- a/src/agentgrep/__init__.py +++ b/src/agentgrep/__init__.py @@ -32,6 +32,7 @@ import argparse import asyncio +import collections import contextlib import dataclasses import datetime @@ -147,13 +148,15 @@ def build_description( """ Read-only search across Codex, Claude, and Cursor local stores. - ``search`` is the default subcommand. ``agentgrep bliss`` is - equivalent to ``agentgrep search bliss``. + Bare ``agentgrep`` launches the interactive Textual explorer + (``agentgrep ui``). ``agentgrep `` is shorthand for + ``agentgrep search ``. """, ( ( "quick", ( + "agentgrep", "agentgrep bliss", "agentgrep serene bliss --agent codex", ), @@ -209,6 +212,22 @@ def build_description( ), ), ) +UI_DESCRIPTION = build_description( + """ + Launch the interactive Textual explorer. Bare ``agentgrep`` is + equivalent to ``agentgrep ui``. + """, + ( + ( + None, + ( + "agentgrep", + "agentgrep ui", + "agentgrep ui bliss", + ), + ), + ), +) class PrivatePath(PrivatePathBase): @@ -1010,6 +1029,14 @@ class FindArgs: color_mode: ColorMode +@dataclasses.dataclass(slots=True) +class UIArgs: + """Typed arguments for ``agentgrep ui``.""" + + initial_query: str + color_mode: ColorMode + + @dataclasses.dataclass(slots=True) class SearchQuery: """Compiled search configuration.""" @@ -1096,6 +1123,10 @@ def answer_now_requested(self) -> bool: """Return whether search should stop and answer with partial results.""" return self._answer_now.is_set() + def reset(self) -> None: + """Clear any prior answer-now request so a new search starts fresh.""" + self._answer_now.clear() + class AnswerNowInputListener: """Listen for a blank Enter keypress and request a partial answer.""" @@ -1716,6 +1747,14 @@ class FilterRequestedPayload(pydantic.BaseModel): text: str +class SearchRequestedPayload(pydantic.BaseModel): + """Pydantic payload for a debounced search-bar-changed Textual message.""" + + model_config = pydantic.ConfigDict(frozen=True) + + text: str + + class FilterCompletedPayload(pydantic.BaseModel): """Pydantic payload for a worker-completed filter result Textual message.""" @@ -1988,19 +2027,20 @@ def normalize_color_mode(argv: cabc.Sequence[str] | None) -> ColorMode: return "auto" -SUBCOMMANDS: frozenset[str] = frozenset({"search", "find"}) +SUBCOMMANDS: frozenset[str] = frozenset({"search", "find", "ui"}) def inject_default_subcommand( argv: cabc.Sequence[str] | None, ) -> cabc.Sequence[str] | None: - """Prepend ``search`` to ``argv`` when no subcommand is supplied. + """Prepend a subcommand to ``argv`` when none is supplied. Walks ``argv`` skipping the global ``--color`` option and any help flag. - If the first remaining token is not a known subcommand, inserts - ``search`` at that position so ``agentgrep bliss`` parses identically - to ``agentgrep search bliss``. Returns the input unchanged when no - injection is needed. + Empty effective argv defaults to ``ui`` so ``agentgrep`` lands in the + Textual explorer. If the first remaining token is not a known + subcommand, inserts ``search`` at that position so ``agentgrep bliss`` + parses identically to ``agentgrep search bliss``. Returns the input + unchanged when no injection is needed. Examples -------- @@ -2010,12 +2050,16 @@ def inject_default_subcommand( ['search', 'bliss'] >>> inject_default_subcommand(["find", "codex"]) ['find', 'codex'] + >>> inject_default_subcommand(["ui"]) + ['ui'] >>> inject_default_subcommand(["--color", "never", "bliss"]) ['--color', 'never', 'search', 'bliss'] + >>> inject_default_subcommand(["--color", "never"]) + ['--color', 'never', 'ui'] >>> inject_default_subcommand(["--help"]) ['--help'] >>> inject_default_subcommand([]) - [] + ['ui'] """ effective = list(sys.argv[1:]) if argv is None else list(argv) index = 0 @@ -2033,7 +2077,8 @@ def inject_default_subcommand( return argv effective.insert(index, "search") return effective - return argv + effective.append("ui") + return effective @contextlib.contextmanager @@ -2135,6 +2180,20 @@ def create_parser( help="Limit the number of results", ) add_output_mode_options(find_parser, allow_ui=False) + + ui_parser = subparsers.add_parser( + "ui", + help="Launch the interactive Textual explorer", + description=UI_DESCRIPTION, + formatter_class=formatter_class, + color=color_mode != "never", + ) + _ = ui_parser.add_argument( + "initial_query", + nargs="?", + default="", + help="Optional initial search text to populate the search bar", + ) return ParserBundle(parser=parser, search_parser=search_parser, find_parser=find_parser) @@ -2152,7 +2211,7 @@ def build_docs_parser() -> argparse.ArgumentParser: def parse_args( argv: cabc.Sequence[str] | None = None, -) -> SearchArgs | FindArgs | None: +) -> SearchArgs | FindArgs | UIArgs | None: """Parse CLI arguments into typed dataclasses.""" color_mode = normalize_color_mode(argv) argv = inject_default_subcommand(argv) @@ -2163,6 +2222,14 @@ def parse_args( with configured_color_environment(color_mode): bundle.parser.print_help() return None + + command = t.cast("str", namespace.command) + if command == "ui": + return UIArgs( + initial_query=t.cast("str", namespace.initial_query), + color_mode=color_mode, + ) + agents = parse_agents(t.cast("list[str]", namespace.agent)) output_mode = parse_output_mode(namespace) limit = t.cast("int | None", namespace.limit) @@ -2170,10 +2237,9 @@ def parse_args( with configured_color_environment(color_mode): bundle.parser.error("--limit must be greater than 0") - command = t.cast("str", namespace.command) if command == "search": terms = tuple(t.cast("list[str]", namespace.terms)) - if not terms: + if not terms and output_mode != "ui": with configured_color_environment(color_mode): bundle.search_parser.print_help() return None @@ -2409,6 +2475,41 @@ def isoformat_from_mtime_ns(mtime_ns: int) -> str | None: ) +def format_timestamp_tig(value: str | None) -> str: + """Render an ISO-8601 timestamp as ``YYYY-MM-DD HH:MM ±HHMM`` (tig style). + + Localizes to the system timezone before formatting so the displayed + time matches what the user expects to see — tig's main view does the + same. Returns ``""`` for ``None`` / empty input and a clipped raw + string for unparseable input so callers can pad consistently. + + Examples + -------- + >>> format_timestamp_tig(None) + '' + >>> format_timestamp_tig("") + '' + >>> # An ISO timestamp with explicit timezone — formatted result keeps + >>> # the offset for the system's local timezone (whose exact value + >>> # varies by host, so we just check shape here). + >>> sample = format_timestamp_tig("2026-05-17T11:59:12+00:00") + >>> len(sample) + 22 + >>> sample[4], sample[7], sample[10], sample[13], sample[16] + ('-', '-', ' ', ':', ' ') + >>> format_timestamp_tig("not-a-real-timestamp") + 'not-a-real-timestamp' + """ + if not value: + return "" + candidate = value.replace("Z", "+00:00") + try: + moment = datetime.datetime.fromisoformat(candidate) + except ValueError: + return value[:22] + return moment.astimezone().strftime("%Y-%m-%d %H:%M %z") + + def discover_from_catalog( home: pathlib.Path, agent: AgentName, @@ -3686,6 +3787,36 @@ def build_search_haystack(record: SearchRecord) -> str: return "\n".join(part for part in parts if part) +_HAYSTACK_CACHE: dict[int, str] = {} + + +def cached_haystack(record: SearchRecord) -> str: + """Return the casefolded haystack for ``record``, memoized by ``id``. + + The filter worker scans every loaded record on every keystroke; + recomputing ``build_search_haystack(...).casefold()`` per record per + pass dominates filter latency once the result set grows past a few + thousand records. Memoizing by ``id`` is safe because every record + instance is immutable (``frozen=True`` dataclass) and lives for the + duration of a single search session — populate on append, clear when + a new search starts. + + Callers that need to invalidate (because a new search will allocate + new records) should call :func:`clear_haystack_cache`. + """ + key = id(record) + cached = _HAYSTACK_CACHE.get(key) + if cached is None: + cached = build_search_haystack(record).casefold() + _HAYSTACK_CACHE[key] = cached + return cached + + +def clear_haystack_cache() -> None: + """Drop every memoized haystack — call before allocating a new record set.""" + _HAYSTACK_CACHE.clear() + + def compute_filter_matches( records: cabc.Sequence[SearchRecord], text: str, @@ -3711,9 +3842,7 @@ def compute_filter_matches( normalized = text.strip().casefold() if not normalized: return tuple(records) - return tuple( - record for record in records if normalized in build_search_haystack(record).casefold() - ) + return tuple(record for record in records if normalized in cached_haystack(record)) def matches_text(text: str, query: SearchQuery) -> bool: @@ -3986,22 +4115,13 @@ def run_ui( ) -> None: """Launch the streaming Textual explorer for ``query``. - Thin wrapper that builds the app via :func:`build_streaming_ui_app` and - calls ``app.run()``. The factory split lets tests construct the app for - a Textual ``Pilot`` smoke test without entering the blocking run loop. - - Parameters - ---------- - home : pathlib.Path - User home directory, passed through to :func:`run_search_query`. - query : SearchQuery - Search to run. Empty ``terms`` means "all records" (browse mode). - control : SearchControl - Shared cooperative-cancel flag; ``Esc`` / ``Ctrl-C`` call - ``request_answer_now`` to nudge the worker to wrap up. + Thin wrapper that imports the real implementation from + :mod:`agentgrep.ui.app` lazily so a bare ``import agentgrep`` never + pulls in Textual. """ - app = build_streaming_ui_app(home, query, control=control) - t.cast("RunnableAppLike", app).run() + from agentgrep.ui.app import run_ui as _run_ui + + _run_ui(home, query, control=control) def build_streaming_ui_app( @@ -4012,993 +4132,13 @@ def build_streaming_ui_app( ) -> object: """Construct the streaming Textual app without entering its run loop. - Returns the constructed ``AgentGrepApp`` instance (typed ``object`` because - the actual class is defined dynamically inside this factory). Callers can - invoke ``.run()`` for a real session or ``.run_test()`` for a Pilot smoke - test. The full app body — message subclasses, ``SpinnerWidget``, - ``ElapsedWidget``, ``FilterInput``, ``AgentGrepApp`` — lives here so the - Textual imports stay lazy. - - Parameters - ---------- - home : pathlib.Path - User home directory, passed through to :func:`run_search_query`. - query : SearchQuery - Search to run. Empty ``terms`` means "all records" (browse mode). - control : SearchControl - Shared cooperative-cancel flag; ``Esc`` / ``Ctrl-C`` call - ``request_answer_now`` to nudge the worker to wrap up. + Thin wrapper that imports the real factory from :mod:`agentgrep.ui.app` + lazily — Textual is only required at the moment the UI is actually + built, never at import time of the top-level package. """ - try: - textual_app = t.cast( - "TextualAppModule", - t.cast("object", importlib.import_module("textual.app")), - ) - textual_containers = t.cast( - "TextualContainersModule", - t.cast("object", importlib.import_module("textual.containers")), - ) - textual_widgets = t.cast( - "TextualWidgetsModule", - t.cast("object", importlib.import_module("textual.widgets")), - ) - textual_message = t.cast( - "TextualMessageModule", - t.cast("object", importlib.import_module("textual.message")), - ) - textual_option_list_internals = t.cast( - "TextualOptionListInternalsModule", - t.cast("object", importlib.import_module("textual.widgets.option_list")), - ) - textual_binding = t.cast( - "TextualBindingModule", - t.cast("object", importlib.import_module("textual.binding")), - ) - rich_text_module = t.cast( - "RichTextModule", - t.cast("object", importlib.import_module("rich.text")), - ) - except ImportError as error: - msg = "Textual is required for --ui. Install with `uv pip install --editable .`." - raise RuntimeError(msg) from error - - app_type = textual_app.App - message_type = textual_message.Message - option_list_type = textual_widgets.OptionList - option_type = textual_option_list_internals.Option - binding_type = textual_binding.Binding - rich_text = rich_text_module - horizontal = textual_containers.Horizontal - vertical_scroll = textual_containers.VerticalScroll - footer = textual_widgets.Footer - header = textual_widgets.Header - input_widget = textual_widgets.Input - static_type = textual_widgets.Static - - # FilterRequested / FilterCompleted stay on the Textual message bus — they - # fire at typing speed, not streaming speed, so the FIFO queue is fine for - # them. Records / progress / search-finished events bypass the message bus - # entirely (see ``make_emit`` below) so they never queue behind keystrokes. - - class FilterRequested(message_type): # ty: ignore[unsupported-base] - """Debounced filter-text-changed event from :class:`FilterInput`.""" - - def __init__(self, payload: FilterRequestedPayload) -> None: - super().__init__() - self.payload = payload - - class FilterCompleted(message_type): # ty: ignore[unsupported-base] - """Worker-completed filter result posted back to the main thread.""" - - def __init__(self, payload: FilterCompletedPayload) -> None: - super().__init__() - self.payload = payload - - def make_emit(app: StreamingAppLike) -> cabc.Callable[[object], None]: - """Build an ``emit`` callback that dispatches streaming events via ``call_from_thread``. - - ``call_from_thread`` schedules the callback directly on the event loop - rather than enqueuing a ``Message`` — so high-frequency record batches - don't compete with keystroke / timer events for FIFO message dispatch. - Vibe-tmux uses the same pattern (``call_from_thread(_rebuild_tree, snap)``) - and Textual's own ``Log`` widget mutates state directly without a per- - write message. This is the canonical Textual pattern for "many small - updates from a worker thread." - """ - typed_app = t.cast("t.Any", app) - - def emit(event: object) -> None: - if isinstance(event, StreamingRecordsBatch): - typed_app.call_from_thread( - typed_app._apply_records_batch, - event.records, - event.total, - ) - elif isinstance(event, ProgressSnapshot): - typed_app.call_from_thread(typed_app._apply_progress, event) - elif isinstance(event, StreamingSearchFinished): - typed_app.call_from_thread( - typed_app._apply_finished, - event.outcome, - event.total, - event.elapsed, - str(event.error) if event.error else None, - ) - - return emit - - class SpinnerWidget(static_type): # ty: ignore[unsupported-base] - """Self-driving Braille spinner that animates regardless of event-loop load. - - The widget pulls its frame index from ``time.monotonic()`` on every - ``render`` and lets Textual's per-widget ``auto_refresh`` reactor drive - the redraw. This decouples the spinner from any main-thread timer or - message handler — even if record-batch dispatch backs up, the spinner - keeps ticking. - """ - - _FRAMES: t.ClassVar[str] = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" - _FPS: t.ClassVar[float] = 10.0 - - def __init__(self, *, id: str | None = None) -> None: # noqa: A002 -- forwarded to Textual's ``id`` kwarg - super().__init__("", id=id) - self._final_glyph: str | None = None - self._started_at: float = time.monotonic() - - def on_mount(self) -> None: - """Arm the per-widget refresh timer (Textual reads this after mount).""" - self.auto_refresh = 1.0 / self._FPS - - def render(self) -> str: - """Return the current Braille frame from elapsed wall-clock time.""" - if self._final_glyph is not None: - return self._final_glyph - elapsed = time.monotonic() - self._started_at - frame_index = int(elapsed * self._FPS) % len(self._FRAMES) - return self._FRAMES[frame_index] + from agentgrep.ui.app import build_streaming_ui_app as _build - def freeze(self, glyph: str) -> None: - """Stop animating and lock the displayed glyph (called on terminal events).""" - self._final_glyph = glyph - self.auto_refresh = None - self.refresh() - - class ElapsedWidget(static_type): # ty: ignore[unsupported-base] - """Self-refreshing elapsed-time display that ticks once per second.""" - - def __init__( - self, - *, - start_provider: cabc.Callable[[], float | None], - id: str | None = None, # noqa: A002 -- forwarded to Textual's ``id`` kwarg - ) -> None: - super().__init__("", id=id) - self._start_provider = start_provider - self._frozen: float | None = None - - def on_mount(self) -> None: - """Arm the 1 Hz refresh; widget keeps ticking until ``freeze`` is called.""" - self.auto_refresh = 1.0 - - def render(self) -> str: - """Return ``Ts`` for the current elapsed value (or ``""`` until started).""" - if self._frozen is not None: - return f"{self._frozen:.1f}s" - started = self._start_provider() - if started is None: - return "" - return f"{time.monotonic() - started:.1f}s" - - def freeze(self, final_elapsed: float) -> None: - """Stop refreshing and lock the displayed elapsed value.""" - self._frozen = final_elapsed - self.auto_refresh = None - self.refresh() - - class SearchResultsList( - option_list_type, # ty: ignore[unsupported-base] - can_focus=True, - ): - """``OptionList`` subclass for streaming agentgrep search records. - - ``OptionList`` is Textual's proven cursor-navigable virtual list. It - ships with working Tab focus, a visible cursor highlight via the - ``option-list--option-highlighted`` CSS class, and posts an - ``OptionHighlighted`` message on cursor movement — all the things our - previous custom widget had to wire up manually and failed at in the - real terminal. - - Adding records via ``append_records`` / ``set_records`` runs on the - event-loop thread because the worker uses ``app.call_from_thread`` to - invoke these methods. That keeps the streaming transport off the - Textual message bus so keystroke + timer events never queue behind it. - """ - - BINDINGS: t.ClassVar[list[tuple[str, str, str]]] = [ - ("k", "cursor_up", "Up"), - ("j", "cursor_down", "Down"), - ("l", "focus_detail", "Detail"), - ("right", "focus_detail", ""), - ("g", "cursor_top", "Top"), - ("G", "cursor_bottom", "Bottom"), - ("ctrl+d", "cursor_half_page_down", "½ Down"), - ("ctrl+u", "cursor_half_page_up", "½ Up"), - ] - - def __init__( - self, - *, - id: str | None = None, # noqa: A002 -- forwarded to Textual's ``id`` kwarg - ) -> None: - super().__init__(id=id) - self._records: list[SearchRecord] = [] - - def append_records(self, records: cabc.Sequence[SearchRecord]) -> None: - """Append a batch of records — invoked via ``app.call_from_thread``.""" - if not records: - return - self._records.extend(records) - self.add_options( - [option_type(self._render_record(r), id=str(id(r))) for r in records], - ) - - def set_records(self, records: cabc.Sequence[SearchRecord]) -> None: - """Atomic swap of the backing list (used after a filter completes).""" - self._records = list(records) - self.clear_options() - if self._records: - self.add_options( - [option_type(self._render_record(r), id=str(id(r))) for r in self._records], - ) - - def clear(self) -> None: - """Empty the list.""" - self._records = [] - self.clear_options() - - _AGENT_COLORS: t.ClassVar[dict[str, str]] = { - "codex": "cyan", - "claude": "magenta", - "cursor": "yellow", - } - _KIND_COLORS: t.ClassVar[dict[str, str]] = { - "prompt": "green", - "history": "blue", - } - - def _render_record(self, record: SearchRecord) -> object: - agent_text = (record.agent or "").ljust(8)[:8] - kind_text = (record.kind or "").ljust(10)[:10] - timestamp_text = (record.timestamp or "").ljust(20)[:20] - title_text = (record.title or "").ljust(40)[:40] - path_text = format_compact_path(record.path, max_width=60) - text = rich_text.Text(no_wrap=True, overflow="ellipsis") - text.append(agent_text, style=self._AGENT_COLORS.get(record.agent or "", "")) - text.append(" ") - text.append(kind_text, style=self._KIND_COLORS.get(record.kind or "", "")) - text.append(" ") - text.append(timestamp_text, style="italic") - text.append(" ") - text.append(title_text, style="bold") - text.append(" ") - text.append(path_text, style="grey50") - return text - - def action_cursor_up(self) -> None: - """Release focus to the filter input when the cursor is at row 0.""" - if self.highlighted in (None, 0): - self.app.action_focus_previous() - else: - super().action_cursor_up() - - def action_focus_detail(self) -> None: - """Move focus rightward to the detail-scroll pane (vim-style ``l``).""" - detail = self.app.query_one("#detail-scroll") - t.cast("t.Any", detail).focus() - - def action_cursor_top(self) -> None: - """Jump the highlight to the first row (vim-style ``g``).""" - self.action_first() - - def action_cursor_bottom(self) -> None: - """Jump the highlight to the last row (vim-style ``G``).""" - self.action_last() - - def _cursor_jump(self, delta: int) -> None: - """Move the highlight by ``delta`` rows, clamped to list bounds.""" - row_count = len(self._records) - if row_count == 0: - return - current = self.highlighted if self.highlighted is not None else 0 - target = max(0, min(row_count - 1, current + delta)) - self.highlighted = target - - def action_cursor_half_page_down(self) -> None: - """Advance the highlight by half the visible viewport height (vim ``Ctrl-D``).""" - half = max(1, self.size.height // 2) - self._cursor_jump(half) - - def action_cursor_half_page_up(self) -> None: - """Move the highlight up by half the visible viewport height (vim ``Ctrl-U``).""" - half = max(1, self.size.height // 2) - self._cursor_jump(-half) - - vertical_scroll_base = t.cast("type[object]", vertical_scroll) - - class DetailScroll( - vertical_scroll_base, # ty: ignore[unsupported-base] - can_focus=True, - ): - """``VerticalScroll`` subclass for the right-side detail pane. - - Adds vim-style bindings: ``h`` / left-arrow releases focus back to the - results list, and ``j`` / ``k`` mirror the stock ``down`` / ``up`` - scroll bindings so navigation stays consistent with - :class:`SearchResultsList`. ``can_focus=True`` is set via the - class-keyword form — Textual reads it during ``__init_subclass__``, - so the plain class-attribute form silently fails to enroll the widget - in the focus chain. - """ - - BINDINGS: t.ClassVar[list[tuple[str, str, str]]] = [ - ("k", "scroll_up", "Up"), - ("j", "scroll_down", "Down"), - ("h", "focus_results", "Results"), - ("left", "focus_results", ""), - ("g", "scroll_home", "Top"), - ("G", "scroll_end", "Bottom"), - ("ctrl+d", "scroll_half_down", "½ Down"), - ("ctrl+u", "scroll_half_up", "½ Up"), - ("ctrl+f", "page_down", "Pg Down"), - ("ctrl+b", "page_up", "Pg Up"), - ] - - def action_focus_results(self) -> None: - """Move focus leftward back to the results list (vim-style ``h``).""" - results = self.app.query_one("#results") - t.cast("t.Any", results).focus() - - def action_scroll_up(self) -> None: - """Release focus to the filter input when already scrolled to the top. - - Mirrors :meth:`SearchResultsList.action_cursor_up` — when the - widget has nothing left to give in that direction, hand focus off - to the neighbor instead of swallowing the keystroke. Catches both - ``k`` (our binding) and ``up`` (inherited from - ``ScrollableContainer``). - """ - scroll_y = t.cast("float", getattr(self, "scroll_y", 0)) - if scroll_y <= 0: - self.app.query_one("#filter").focus() - else: - super().action_scroll_up() - - def action_scroll_half_down(self) -> None: - """Scroll down by half the visible viewport (vim ``Ctrl-D``).""" - half = max(1, self.size.height // 2) - self.scroll_relative(y=half, animate=True) - - def action_scroll_half_up(self) -> None: - """Scroll up by half the visible viewport (vim ``Ctrl-U``).""" - half = max(1, self.size.height // 2) - self.scroll_relative(y=-half, animate=True) - - class FilterInput(input_widget): # ty: ignore[unsupported-base] - """``Input`` subclass with debounced filter + cursor-or-focus arrows. - - The base ``Input.Changed`` event still fires immediately on each - keystroke so the cursor, selection, and validation feedback stay - instant. The expensive filter operation is deferred onto a - :class:`FilterRequested` message which is only posted after 150 ms of - typing inactivity, letting a worker run the actual filter without - blocking the input itself. - - Up / down arrows are dual-purpose: when there's text in the input - they jump the cursor to the start / end; when the input is empty (or - the cursor is already at the relevant edge) they release focus to - the previous / next widget so the user can navigate into the results - table without reaching for Tab. - """ - - _DEBOUNCE_SECONDS: t.ClassVar[float] = 0.15 - - BINDINGS: t.ClassVar[list[tuple[str, str, str]]] = [ - ("down", "release_down", "Results"), - ] - - def __init__( - self, - *, - placeholder: str = "", - id: str | None = None, # noqa: A002 -- forwarded to Textual's ``id`` kwarg - ) -> None: - super().__init__(placeholder=placeholder, id=id) - self._debounce_timer: object | None = None - - def _watch_value(self, value: str) -> None: - """Post normal ``Input.Changed`` and arm a debounced ``FilterRequested``.""" - super()._watch_value(value) - if self._debounce_timer is not None: - self._debounce_timer.stop() - self._debounce_timer = self.set_timer( - self._DEBOUNCE_SECONDS, - lambda: self.post_message( - FilterRequested(payload=FilterRequestedPayload(text=value)), - ), - ) - - async def _on_key(self, event: object) -> None: - """Down/up route between cursor-jump and focus-release per spec.""" - key = str(getattr(event, "key", "")) - cursor = int(getattr(self, "cursor_position", 0)) - value = str(getattr(self, "value", "")) - stop = getattr(event, "stop", None) - if key == "down": - if value and cursor < len(value): - self.cursor_position = len(value) - if callable(stop): - stop() - return - # Empty or at end — release focus to next widget (DataTable) - if callable(stop): - stop() - self.app.action_focus_next() - return - if key == "up": - if value and cursor > 0: - self.cursor_position = 0 - if callable(stop): - stop() - return - # Empty or at start — no widget meaningfully above; eat the key - if callable(stop): - stop() - return - await super()._on_key(event) - - def action_release_down(self) -> None: - """Footer-binding fallback (``_on_key`` handles the real release).""" - self.app.action_focus_next() - - class AgentGrepApp(app_type): # ty: ignore[unsupported-base] - """Streaming read-only explorer for normalized search records.""" - - CSS: t.ClassVar[str] = """ - Screen { - layout: vertical; - } - #chrome { - height: 1; - padding: 0 1; - layout: horizontal; - } - #chrome-spinner { - width: 2; - color: $accent; - } - #chrome-status { - width: 1fr; - color: ansi_bright_cyan; - text-style: bold; - } - #chrome-matches { - width: auto; - color: $warning; - text-style: bold; - padding: 0 1; - } - #chrome-elapsed { - width: auto; - color: #d8d8d8; - } - #body { - height: 1fr; - } - #detail-scroll { - overflow-y: auto; - overflow-x: hidden; - /* Reserve the border cell up-front (transparent) so toggling - focus only repaints the perimeter — no layout shift, no - extra padding when the border appears. Mirrors the - OptionList default CSS pattern. */ - border: tall transparent; - } - #detail-scroll:focus { - border: tall $border; - } - #detail { - padding: 0 1 0 0; - } - #results { - height: 1fr; - overflow-x: hidden; - } - /* Keep Textual's OptionList default of "border appears only on focus" - (textual/widgets/_option_list.py:154 — ``border: tall $border``). - We only cancel the two parts of that focus rule that fight our - per-span semantic colors: the ``$foreground 5%`` background-tint - and the bright ``$block-cursor-*`` cursor-row recolor. */ - #results:focus { - background-tint: $foreground 0%; - } - #results:focus > .option-list--option-highlighted { - color: $block-cursor-blurred-foreground; - background: $block-cursor-blurred-background; - text-style: $block-cursor-blurred-text-style; - } - """ - # ``priority=True`` on the directional ``ctrl+hjkl`` bindings pushes - # them into Textual's priority dispatch lane so they win over any - # widget binding for the same key (e.g. ``Input``'s readline - # ``ctrl+k`` = kill-to-end-of-line). Trade-off accepted per user - # request: filter loses ``ctrl+k``; ``ctrl+u`` and ``ctrl+w`` are - # untouched and remain readline-compatible. - BINDINGS: t.ClassVar[list[t.Any]] = [ - ("tab", "focus_next", "Switch focus"), - ("q", "quit", "Quit"), - ("escape", "stop_search", "Stop search"), - ("ctrl+c", "smart_quit", "Stop / Quit"), - binding_type("ctrl+h", "focus_pane_left", "← Pane", priority=True), - binding_type("ctrl+j", "focus_pane_down", "↓ Pane", priority=True), - binding_type("ctrl+k", "focus_pane_up", "↑ Pane", priority=True), - binding_type("ctrl+l", "focus_pane_right", "→ Pane", priority=True), - # Terminal-alias fallback: many terminals (and tmux without - # ``xterm-keys on``) send 0x08 for both Backspace and Ctrl-H, so - # Textual sees ``key="backspace"``, never ``ctrl+h``. NO priority - # here — the filter input's own backspace handler (delete prev - # char) must keep winning inside the input. In panes nothing - # else binds backspace, so this fires. - binding_type("backspace", "focus_pane_left", "", show=False), - ] - all_records: list[SearchRecord] - filtered_records: list[SearchRecord] - - def __init__( - self, - *, - home: pathlib.Path, - query: SearchQuery, - control: SearchControl, - ) -> None: - super().__init__() - self.home = home - self.query = query - self.control = control - self.all_records = [] - self.filtered_records = [] - self._filter_text = "" - self._progress: StreamingSearchProgress | None = None - self._search_done = False - self._started_at: float | None = None - self._last_snapshot: ProgressSnapshot | None = None - self._results: SearchResultsList | None = None - self._detail: StaticLike | None = None - self._status_widget: StaticLike | None = None - self._matches_widget: StaticLike | None = None - self._spinner_widget: SpinnerWidget | None = None - self._elapsed_widget: ElapsedWidget | None = None - self._filter_input: FilterInput | None = None - self._resize_debounce_timer: object | None = None - self._current_detail_record: SearchRecord | None = None - self._detail_scroll: t.Any = None - - def _get_start_time(self) -> float | None: - return self._started_at - - def compose(self) -> cabc.Iterator[object]: - """Build the widget tree (header → chrome row → filter → body → footer).""" - yield header() - with horizontal(id="chrome"): - yield SpinnerWidget(id="chrome-spinner") - yield static_type("", id="chrome-status") - yield static_type("", id="chrome-matches") - yield ElapsedWidget( - start_provider=self._get_start_time, - id="chrome-elapsed", - ) - yield FilterInput(placeholder="Filter by keyword", id="filter") - with horizontal(id="body"): - yield SearchResultsList(id="results") - with DetailScroll(id="detail-scroll"): - yield static_type("", id="detail") - yield footer() - - def on_mount(self) -> None: - """Cache widget references, start the worker, and seed the chrome.""" - streaming = t.cast("StreamingAppLike", t.cast("object", self)) - self._results = t.cast( - "SearchResultsList", - streaming.query_one("#results"), - ) - self._detail = t.cast( - "StaticLike", - streaming.query_one("#detail", static_type), - ) - self._detail_scroll = streaming.query_one("#detail-scroll") - self._status_widget = t.cast( - "StaticLike", - streaming.query_one("#chrome-status", static_type), - ) - self._matches_widget = t.cast( - "StaticLike", - streaming.query_one("#chrome-matches", static_type), - ) - self._spinner_widget = t.cast( - "SpinnerWidget", - streaming.query_one("#chrome-spinner"), - ) - self._elapsed_widget = t.cast( - "ElapsedWidget", - streaming.query_one("#chrome-elapsed"), - ) - self._filter_input = t.cast( - "FilterInput", - streaming.query_one("#filter"), - ) - self._status_widget.update( - f"Searching {' '.join(self.query.terms) if self.query.terms else 'all records'}", - ) - self._progress = StreamingSearchProgress(emit=make_emit(streaming)) - streaming.run_worker( - self._run_search, - name="search", - thread=True, - exclusive=True, - ) - - def _run_search(self) -> None: - progress = self._progress - if progress is None: - return - try: - run_search_query( - self.home, - self.query, - progress=progress, - control=self.control, - ) - except BaseException as exc: - streaming = t.cast("StreamingAppLike", t.cast("object", self)) - streaming.call_from_thread( - self._apply_finished, - "error", - len(self.all_records), - 0.0, - str(exc), - ) - - def _apply_records_batch( - self, - records: cabc.Sequence[SearchRecord], - total: int, - ) -> None: - """Append a streaming records batch — invoked via ``call_from_thread``.""" - self.all_records.extend(records) - matching = [r for r in records if self._matches_filter(r)] - if matching and self._results is not None: - self._results.append_records(matching) - self.filtered_records.extend(matching) - if self._matches_widget is not None: - self._matches_widget.update(format_match_count(total)) - - def _apply_progress(self, snapshot: ProgressSnapshot) -> None: - """Update the status widget — invoked via ``call_from_thread``.""" - self._last_snapshot = snapshot - if self._started_at is None: - self._started_at = time.monotonic() - label = snapshot.query_label - if snapshot.current is not None and snapshot.total is not None: - status = ( - f"Searching {label} | " - f"{snapshot.phase} {snapshot.current}/{snapshot.total} sources" - ) - elif snapshot.detail: - status = f"Searching {label} | {snapshot.phase} {snapshot.detail}" - else: - status = f"Searching {label} | {snapshot.phase}" - if self._status_widget is not None: - self._status_widget.update(status) - - def _apply_finished( - self, - outcome: str, - total: int, - elapsed: float, - error_message: str | None, - ) -> None: - """Freeze chrome widgets — invoked via ``call_from_thread``.""" - self._search_done = True - glyphs = {"complete": "✓", "interrupted": "■", "error": "✗"} - if self._spinner_widget is not None: - self._spinner_widget.freeze(glyphs.get(outcome, "·")) - if self._elapsed_widget is not None: - self._elapsed_widget.freeze(elapsed) - if self._status_widget is not None: - if outcome == "error": - self._status_widget.update(f"Search failed: {error_message}") - elif outcome == "interrupted": - self._status_widget.update( - f"Stopped at {format_match_count(total)} " - f"across {self._sources_label()} sources", - ) - else: - self._status_widget.update( - f"Search complete: {format_match_count(total)}", - ) - - def _sources_label(self) -> str: - snap = self._last_snapshot - if snap is None or snap.current is None or snap.total is None: - return "?" - return f"{snap.current}/{snap.total}" - - def on_filter_requested(self, message: FilterRequested) -> None: - """Spawn a worker to recompute the filter; exclusive cancels any in-flight one.""" - text = message.payload.text - self._filter_text = text.strip().casefold() - streaming = t.cast("StreamingAppLike", t.cast("object", self)) - streaming.run_worker( - lambda captured_text=text: self._run_filter_worker(captured_text), - name="filter", - group="filter", - thread=True, - exclusive=True, - ) - - def _run_filter_worker(self, text: str) -> None: - """Compute the filtered list on a background thread; post a ``FilterCompleted``. - - Runs in a worker thread; safe to scan ``self.all_records`` since - list reads under CPython are GIL-protected. The main thread guards - against stale results by comparing the captured text against the - current input value in :meth:`on_filter_completed`. - """ - matching = compute_filter_matches(self.all_records, text) - streaming = t.cast("StreamingAppLike", t.cast("object", self)) - streaming.post_message( - FilterCompleted( - payload=FilterCompletedPayload(text=text, matching=matching), - ), - ) - - def on_filter_completed(self, message: FilterCompleted) -> None: - """Apply the worker's filter result if it matches the current input.""" - payload = message.payload - if self._filter_input is not None and payload.text != self._filter_input.value: - return - self.filtered_records = list(payload.matching) - if self._results is not None: - self._results.set_records(payload.matching) - if self._detail is not None: - if self.filtered_records: - self.show_detail(self.filtered_records[0]) - else: - self._detail.update( - "No results." if self._search_done else "No matches yet.", - ) - - def on_option_list_option_highlighted(self, event: object) -> None: - """Update the detail pane when the OptionList cursor moves.""" - option_index = getattr(event, "option_index", None) - if option_index is None: - return - row_index = int(option_index) - if 0 <= row_index < len(self.filtered_records): - self.show_detail(self.filtered_records[row_index]) - - # Constant — keep in sync with the label list in ``show_detail`` below. - # 7 label rows (Agent / Kind / Store / Adapter / Timestamp / Model / Path) - # plus 1 blank separator = 8 lines of header before the body starts. - _DETAIL_HEADER_LINES: t.ClassVar[int] = 8 - - def show_detail(self, record: SearchRecord) -> None: - """Render ``record`` with colored labels + format-aware body + scroll-to-match. - - The body is truncated to :data:`DETAIL_BODY_MAX_LINES` lines (the - ``VerticalScroll`` wrapper handles letting the user scroll within - the visible window). The body renderable is chosen by - :func:`detect_content_format`: - - * JSON bodies are pretty-printed and rendered via - :class:`rich.syntax.Syntax` with ``ansi_dark`` theming. - * Markdown bodies render via :class:`rich.markdown.Markdown`. - * Everything else keeps the existing ``Text`` + ``highlight_regex`` - flow so search-term matches stay bold-yellow. - - If any current query term occurs in the body the pane is scrolled - so that line lands vertically centered in the viewport (line index - is recomputed against the formatted body for JSON so the jump is - still accurate). - """ - if self._detail is None: - return - self._current_detail_record = record - width = max(20, self._detail.size.width or 80) - agent_color = SearchResultsList._AGENT_COLORS.get(record.agent or "", "") - kind_color = SearchResultsList._KIND_COLORS.get(record.kind or "", "") - header = rich_text.Text(no_wrap=False) - for label, value, value_style in ( - ("Agent:", record.agent or "", agent_color), - ("Kind:", record.kind or "", kind_color), - ("Store:", record.store or "", "dim"), - ("Adapter:", record.adapter_id or "", "dim"), - ("Timestamp:", record.timestamp or "unknown", "dim"), - ("Model:", record.model or "unknown", "magenta"), - ( - "Path:", - format_compact_path(record.path, max_width=width - 8), - "grey50", - ), - ): - header.append(f"{label} ", style="bold") - header.append(f"{value}\n", style=value_style) - header.append("\n") - body_truncated = truncate_lines(record.text, DETAIL_BODY_MAX_LINES) - query_terms = list(self.query.terms) - body_renderable, body_for_scroll = self._build_detail_body( - body_truncated, - query_terms, - ) - self._detail.update( - _RichGroup(header, t.cast("t.Any", body_renderable)), - ) - self._scroll_detail_to_first_match(body_for_scroll, query_terms) - - def _build_detail_body( - self, - body_text: str, - query_terms: cabc.Sequence[str], - ) -> tuple[object, str]: - """Return ``(renderable, body_text_for_match_search)`` for ``body_text``. - - The second tuple element is whatever text the caller's - ``find_first_match_line`` should scan. For JSON we pretty-print - and return the formatted text so the line index lines up with - what the user actually sees rendered. - """ - fmt = detect_content_format(body_text) - if fmt == "json": - try: - formatted = json.dumps( - json.loads(body_text), - indent=2, - ensure_ascii=False, - ) - except json.JSONDecodeError, ValueError: - formatted = body_text - match_line = find_first_match_line( - formatted, - query_terms, - case_sensitive=self.query.case_sensitive, - regex=self.query.regex, - ) - highlight_lines = {match_line + 1} if match_line is not None else None - syntax = _RichSyntax( - formatted, - "json", - theme="ansi_dark", - word_wrap=True, - highlight_lines=highlight_lines, - ) - return syntax, formatted - if fmt == "markdown": - return _RichMarkdown(body_text, code_theme="ansi_dark"), body_text - return ( - highlight_matches( - body_text, - query_terms, - case_sensitive=self.query.case_sensitive, - regex=self.query.regex, - ), - body_text, - ) - - def _scroll_detail_to_first_match( - self, - body_text: str, - query_terms: cabc.Sequence[str], - ) -> None: - """Jump ``_detail_scroll`` so the first match lands at the viewport center.""" - if self._detail_scroll is None: - return - scroll: t.Any = self._detail_scroll - match_line = find_first_match_line( - body_text, - query_terms, - case_sensitive=self.query.case_sensitive, - regex=self.query.regex, - ) - if match_line is None: - scroll.scroll_to(y=0, animate=False) - return - target_line = self._DETAIL_HEADER_LINES + match_line - viewport_h = int(getattr(scroll.size, "height", 0) or 0) - center_offset = max(0, target_line - viewport_h // 2) - scroll.scroll_to(y=center_offset, animate=False) - - def on_resize(self, event: object) -> None: - """Debounce rapid resize bursts (e.g. tiling-WM live drag).""" - del event - if self._resize_debounce_timer is not None: - timer = t.cast("t.Any", self._resize_debounce_timer) - timer.stop() - self._resize_debounce_timer = self.set_timer(0.05, self._after_resize) - - def _after_resize(self) -> None: - """Refresh chrome; the detail pane scroll wrapper handles its own reflow.""" - if self._matches_widget is not None: - self._matches_widget.refresh() - - def action_stop_search(self) -> None: - """``Esc``: cooperative early-exit of the worker (no-op when finished).""" - self._cancel_active_action() - - def action_smart_quit(self) -> None: - """``Ctrl-C``: cancel the topmost in-flight action; quit if there are none.""" - if self._has_active_actions(): - self._cancel_active_action() - else: - self.exit() - - # Directional pane focus (tmux-style ``ctrl+hjkl``). Edge moves (e.g. - # ``ctrl+j`` from the results pane — nothing below it) are no-ops. - # The three focusable regions are #filter (top), #results (bottom- - # left), and #detail-scroll (bottom-right). - - def _focus_widget_by_id(self, widget_id: str) -> None: - try: - target = self.query_one(f"#{widget_id}") - except Exception: - return - t.cast("t.Any", target).focus() - - def action_focus_pane_left(self) -> None: - """``Ctrl-H``: focus the pane to the left of the current one.""" - if self.focused is not None and self.focused.id == "detail-scroll": - self._focus_widget_by_id("results") - - def action_focus_pane_right(self) -> None: - """``Ctrl-L``: focus the pane to the right of the current one.""" - if self.focused is not None and self.focused.id in ("results", "filter"): - self._focus_widget_by_id("detail-scroll") - - def action_focus_pane_up(self) -> None: - """``Ctrl-K``: focus the pane above the current one (filter row).""" - if self.focused is not None and self.focused.id in ( - "results", - "detail-scroll", - ): - self._focus_widget_by_id("filter") - - def action_focus_pane_down(self) -> None: - """``Ctrl-J``: focus the pane below the current one (results).""" - if self.focused is not None and self.focused.id == "filter": - self._focus_widget_by_id("results") - - def _has_active_actions(self) -> bool: - """Return True if any cancellable in-flight action exists. - - Extension point: when a second cancellable action lands (async - detail-fetch, debounced refilter, etc.), add its state here. - """ - return not self._search_done - - def _cancel_active_action(self) -> None: - """Cancel the topmost in-flight cancellable action. - - Extension point: extend with future cancellable actions in - most-recently-started order so ``Ctrl-C`` peels them off one at a - time before exiting. - """ - if not self._search_done: - self.control.request_answer_now() - - def _matches_filter(self, record: SearchRecord) -> bool: - if not self._filter_text: - return True - return self._filter_text in build_search_haystack(record).casefold() - - return AgentGrepApp(home=home, query=query, control=control) + return _build(home, query, control=control) def run_search_command(args: SearchArgs) -> int: @@ -5050,6 +4190,32 @@ def run_find_command(args: FindArgs) -> int: return 1 +def run_ui_command(args: UIArgs) -> int: + """Execute ``agentgrep ui``.""" + initial_terms = tuple(args.initial_query.split()) if args.initial_query else () + query = SearchQuery( + terms=initial_terms, + search_type="prompts", + any_term=False, + regex=False, + case_sensitive=False, + agents=AGENT_CHOICES, + limit=None, + ) + run_ui(pathlib.Path.home(), query, control=SearchControl()) + return 0 + + +def _is_interactive_terminal() -> bool: + """Return ``True`` when both stdin and stdout are TTYs. + + Bare ``agentgrep`` defaults to the TUI, but pipelines (``agentgrep | + cat``), redirected output, and CI subprocesses have no TTY — in those + cases the caller almost certainly wants ``--help`` instead. + """ + return sys.stdin.isatty() and sys.stdout.isatty() + + def _exit_on_sigint() -> t.NoReturn: """Terminate with Ctrl-C signal semantics where the platform supports them.""" if sys.platform == "win32": @@ -5068,12 +4234,20 @@ def _write_interrupt_notice() -> None: def main(argv: cabc.Sequence[str] | None = None) -> int: """Run the CLI.""" + raw = list(sys.argv[1:]) if argv is None else list(argv) + if not raw and not _is_interactive_terminal(): + color_mode = normalize_color_mode(argv) + with configured_color_environment(color_mode): + create_parser(color_mode).parser.print_help() + return 0 try: parsed = parse_args(argv) if parsed is None: return 0 if isinstance(parsed, SearchArgs): return run_search_command(parsed) + if isinstance(parsed, UIArgs): + return run_ui_command(parsed) return run_find_command(parsed) except KeyboardInterrupt: _write_interrupt_notice() diff --git a/src/agentgrep/ui/__init__.py b/src/agentgrep/ui/__init__.py new file mode 100644 index 0000000..7432de5 --- /dev/null +++ b/src/agentgrep/ui/__init__.py @@ -0,0 +1,14 @@ +"""Textual TUI subpackage for agentgrep. + +This subpackage holds the streaming Textual explorer ``run_ui`` and the +:func:`build_streaming_ui_app` factory. It is imported lazily by the +top-level ``agentgrep`` package — bare ``import agentgrep`` does not +load Textual. Anyone who imports ``agentgrep.ui`` (or calls +``agentgrep.run_ui()``) requires Textual to be installed. +""" + +from __future__ import annotations + +from agentgrep.ui.app import build_streaming_ui_app, run_ui + +__all__ = ["build_streaming_ui_app", "run_ui"] diff --git a/src/agentgrep/ui/app.py b/src/agentgrep/ui/app.py new file mode 100644 index 0000000..3fe8c51 --- /dev/null +++ b/src/agentgrep/ui/app.py @@ -0,0 +1,1624 @@ +"""Streaming Textual app — ``run_ui`` and the app factory. + +This module owns the entire TUI surface that used to live inside the +``build_streaming_ui_app`` closure in ``agentgrep.__init__``. Splitting +it out keeps the top-level package focused on the CLI / search engine +and gives the widget classes their own file for the perf-fix LRU caches +to land cleanly. + +Textual is imported lazily inside :func:`build_streaming_ui_app` (via +``importlib.import_module``) so importing this module by itself does +not require Textual at import time — the import error is deferred to +the moment a UI is actually built. +""" + +from __future__ import annotations + +import asyncio +import collections +import contextlib +import importlib +import json +import pathlib +import time +import typing as t +from collections import abc as cabc + +from rich.console import Group as _RichGroup +from rich.markdown import Markdown as _RichMarkdown +from rich.syntax import Syntax as _RichSyntax + +from agentgrep import ( + DETAIL_BODY_MAX_LINES, + FilterCompletedPayload, + FilterRequestedPayload, + ProgressSnapshot, + RichTextModule, + RunnableAppLike, + SearchControl, + SearchQuery, + SearchRecord, + SearchRequestedPayload, + StaticLike, + StreamingAppLike, + StreamingRecordsBatch, + StreamingSearchFinished, + StreamingSearchProgress, + TextualAppModule, + TextualBindingModule, + TextualContainersModule, + TextualMessageModule, + TextualOptionListInternalsModule, + TextualWidgetsModule, + build_search_haystack, + cached_haystack, + clear_haystack_cache, + compute_filter_matches, + detect_content_format, + find_first_match_line, + format_compact_path, + format_match_count, + format_timestamp_tig, + highlight_matches, + run_search_query, + truncate_lines, +) + + +def scroll_percent(scroll_y: float, max_scroll_y: float) -> int: + """Return an integer scroll percent clamped to ``[0, 100]``. + + Returns ``100`` when there is no scrollable region (everything fits) + and ``0`` when scrolled to the very top. Mirrors tig's bottom-status + convention where a fully visible view reads as ``100%``. + """ + if max_scroll_y <= 0: + return 100 if scroll_y <= 0 else 0 + return min(100, max(0, round((scroll_y / max_scroll_y) * 100))) + + +def run_ui( + home: pathlib.Path, + query: SearchQuery, + *, + control: SearchControl, +) -> None: + """Launch the streaming Textual explorer for ``query``. + + Thin wrapper that builds the app via :func:`build_streaming_ui_app` and + calls ``app.run()``. The factory split lets tests construct the app for + a Textual ``Pilot`` smoke test without entering the blocking run loop. + + Parameters + ---------- + home : pathlib.Path + User home directory, passed through to :func:`run_search_query`. + query : SearchQuery + Search to run. Empty ``terms`` means "all records" (browse mode). + control : SearchControl + Shared cooperative-cancel flag; ``Esc`` / ``Ctrl-C`` call + ``request_answer_now`` to nudge the worker to wrap up. + """ + app = build_streaming_ui_app(home, query, control=control) + t.cast("RunnableAppLike", app).run() + + +def build_streaming_ui_app( + home: pathlib.Path, + query: SearchQuery, + *, + control: SearchControl, +) -> object: + """Construct the streaming Textual app without entering its run loop. + + Returns the constructed ``AgentGrepApp`` instance (typed ``object`` because + the actual class is defined dynamically inside this factory). Callers can + invoke ``.run()`` for a real session or ``.run_test()`` for a Pilot smoke + test. The full app body — message subclasses, ``SpinnerWidget``, + ``FilterInput``, ``AgentGrepApp`` — lives here so the + Textual imports stay lazy. + + Parameters + ---------- + home : pathlib.Path + User home directory, passed through to :func:`run_search_query`. + query : SearchQuery + Search to run. Empty ``terms`` means "all records" (browse mode). + control : SearchControl + Shared cooperative-cancel flag; ``Esc`` / ``Ctrl-C`` call + ``request_answer_now`` to nudge the worker to wrap up. + """ + try: + textual_app = t.cast( + "TextualAppModule", + t.cast("object", importlib.import_module("textual.app")), + ) + textual_containers = t.cast( + "TextualContainersModule", + t.cast("object", importlib.import_module("textual.containers")), + ) + textual_widgets = t.cast( + "TextualWidgetsModule", + t.cast("object", importlib.import_module("textual.widgets")), + ) + textual_message = t.cast( + "TextualMessageModule", + t.cast("object", importlib.import_module("textual.message")), + ) + textual_option_list_internals = t.cast( + "TextualOptionListInternalsModule", + t.cast("object", importlib.import_module("textual.widgets.option_list")), + ) + textual_binding = t.cast( + "TextualBindingModule", + t.cast("object", importlib.import_module("textual.binding")), + ) + rich_text_module = t.cast( + "RichTextModule", + t.cast("object", importlib.import_module("rich.text")), + ) + except ImportError as error: + msg = "Textual is required for --ui. Install with `uv pip install --editable .`." + raise RuntimeError(msg) from error + + app_type = textual_app.App + message_type = textual_message.Message + option_list_type = textual_widgets.OptionList + option_type = textual_option_list_internals.Option + binding_type = textual_binding.Binding + rich_text = rich_text_module + horizontal = textual_containers.Horizontal + vertical = textual_containers.Vertical + vertical_scroll = textual_containers.VerticalScroll + footer = textual_widgets.Footer + header = textual_widgets.Header + input_widget = textual_widgets.Input + static_type = textual_widgets.Static + + # FilterRequested / FilterCompleted stay on the Textual message bus — they + # fire at typing speed, not streaming speed, so the FIFO queue is fine for + # them. Records / progress / search-finished events bypass the message bus + # entirely (see ``make_emit`` below) so they never queue behind keystrokes. + + class FilterRequested(message_type): # ty: ignore[unsupported-base] + """Debounced filter-text-changed event from :class:`FilterInput`.""" + + def __init__(self, payload: FilterRequestedPayload) -> None: + super().__init__() + self.payload = payload + + class FilterCompleted(message_type): # ty: ignore[unsupported-base] + """Worker-completed filter result posted back to the main thread.""" + + def __init__(self, payload: FilterCompletedPayload) -> None: + super().__init__() + self.payload = payload + + class SearchRequested(message_type): # ty: ignore[unsupported-base] + """Debounced search-text-changed event from :class:`SearchInput`.""" + + def __init__(self, payload: SearchRequestedPayload) -> None: + super().__init__() + self.payload = payload + + class ResultsScrollChanged(message_type): # ty: ignore[unsupported-base] + """Posted by :class:`SearchResultsList` when scroll or cursor moves. + + The app handler renders the right side of the results status line + from this snapshot — cursor position out of total, plus the scroll + percent. Pre-shaped here so the widget never reaches into the app + directly. + """ + + def __init__(self, cursor: int | None, total: int, percent: int) -> None: + super().__init__() + self.cursor = cursor + self.total = total + self.percent = percent + + class DetailScrollChanged(message_type): # ty: ignore[unsupported-base] + """Posted by :class:`DetailScroll` when the detail-pane scrolls.""" + + def __init__(self, percent: int) -> None: + super().__init__() + self.percent = percent + + def make_emit(app: StreamingAppLike) -> cabc.Callable[[object], None]: + """Build an ``emit`` callback that dispatches streaming events via ``call_from_thread``. + + ``call_from_thread`` schedules the callback directly on the event loop + rather than enqueuing a ``Message`` — so high-frequency record batches + don't compete with keystroke / timer events for FIFO message dispatch. + Vibe-tmux uses the same pattern (``call_from_thread(_rebuild_tree, snap)``) + and Textual's own ``Log`` widget mutates state directly without a per- + write message. This is the canonical Textual pattern for "many small + updates from a worker thread." + """ + typed_app = t.cast("t.Any", app) + + def emit(event: object) -> None: + if isinstance(event, StreamingRecordsBatch): + typed_app.call_from_thread( + typed_app._apply_records_batch, + event.records, + event.total, + ) + elif isinstance(event, ProgressSnapshot): + typed_app.call_from_thread(typed_app._apply_progress, event) + elif isinstance(event, StreamingSearchFinished): + typed_app.call_from_thread( + typed_app._apply_finished, + event.outcome, + event.total, + event.elapsed, + str(event.error) if event.error else None, + ) + + return emit + + class SpinnerWidget(static_type): # ty: ignore[unsupported-base] + """Self-driving Braille spinner that animates regardless of event-loop load. + + The widget pulls its frame index from ``time.monotonic()`` on every + ``render`` and lets Textual's per-widget ``auto_refresh`` reactor drive + the redraw. This decouples the spinner from any main-thread timer or + message handler — even if record-batch dispatch backs up, the spinner + keeps ticking. + """ + + _FRAMES: t.ClassVar[str] = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + _FPS: t.ClassVar[float] = 10.0 + + def __init__(self, *, id: str | None = None) -> None: # noqa: A002 -- forwarded to Textual's ``id`` kwarg + super().__init__("", id=id) + self._final_glyph: str | None = None + self._started_at: float = time.monotonic() + + def on_mount(self) -> None: + """Arm the per-widget refresh timer (Textual reads this after mount).""" + self.auto_refresh = 1.0 / self._FPS + + def render(self) -> str: + """Return the current Braille frame from elapsed wall-clock time.""" + if self._final_glyph is not None: + return self._final_glyph + elapsed = time.monotonic() - self._started_at + frame_index = int(elapsed * self._FPS) % len(self._FRAMES) + return self._FRAMES[frame_index] + + def freeze(self, glyph: str) -> None: + """Stop animating and lock the displayed glyph (called on terminal events).""" + self._final_glyph = glyph + self.auto_refresh = None + self.refresh() + + def unfreeze(self) -> None: + """Resume animation (called when a fresh search restarts).""" + self._final_glyph = None + self._started_at = time.monotonic() + self.auto_refresh = 1.0 / self._FPS + self.refresh() + + class SearchResultsList( + option_list_type, # ty: ignore[unsupported-base] + can_focus=True, + ): + """``OptionList`` subclass for streaming agentgrep search records. + + ``OptionList`` is Textual's proven cursor-navigable virtual list. It + ships with working Tab focus, a visible cursor highlight via the + ``option-list--option-highlighted`` CSS class, and posts an + ``OptionHighlighted`` message on cursor movement — all the things our + previous custom widget had to wire up manually and failed at in the + real terminal. + + Adding records via ``append_records`` / ``set_records`` runs on the + event-loop thread because the worker uses ``app.call_from_thread`` to + invoke these methods. That keeps the streaming transport off the + Textual message bus so keystroke + timer events never queue behind it. + """ + + BINDINGS: t.ClassVar[list[tuple[str, str, str]]] = [ + ("k", "cursor_up", "Up"), + ("j", "cursor_down", "Down"), + ("l", "focus_detail", "Detail"), + ("right", "focus_detail", ""), + ("g", "cursor_top", "Top"), + ("G", "cursor_bottom", "Bottom"), + ("ctrl+d", "cursor_half_page_down", "½ Down"), + ("ctrl+u", "cursor_half_page_up", "½ Up"), + ] + + def __init__( + self, + *, + id: str | None = None, # noqa: A002 -- forwarded to Textual's ``id`` kwarg + ) -> None: + super().__init__(id=id) + self._records: list[SearchRecord] = [] + + def append_records(self, records: cabc.Sequence[SearchRecord]) -> None: + """Append a batch of records — invoked via ``app.call_from_thread``. + + Eagerly warms :func:`cached_haystack` for each new record so the + cost is paid during streaming (when the user is already watching + the spinner) rather than during the next filter keystroke. + """ + if not records: + return + self._records.extend(records) + for record in records: + cached_haystack(record) + self.add_options( + [option_type(self._render_record(r), id=str(id(r))) for r in records], + ) + + def set_records(self, records: cabc.Sequence[SearchRecord]) -> None: + """Apply a new filter result by patching the existing options. + + For the common "user typed another character" narrowing case the + method removes the now-unmatched options without rebuilding the + list — keeps rendering O(removed) instead of O(total) and never + touches the haystack cache. Falls back to a full rebuild when + the new set introduces records not currently shown (widening) or + when more than half of the current options would be removed + (where ``remove_option_at_index`` would do worse than a single + ``clear_options`` + ``add_options`` pair). + """ + new_records = list(records) + new_ids: set[int] = {id(record) for record in new_records} + current_records = self._records + if not current_records: + self._rebuild_options(new_records) + return + current_index_by_id: dict[int, int] = { + id(record): idx for idx, record in enumerate(current_records) + } + additions = [record for record in new_records if id(record) not in current_index_by_id] + if additions: + self._rebuild_options(new_records) + return + to_remove_indices = sorted( + ( + current_index_by_id[id(record)] + for record in current_records + if id(record) not in new_ids + ), + reverse=True, + ) + if len(to_remove_indices) > len(current_records) // 2: + # More than half goes — a single clear+rebuild is cheaper + # than N ``remove_option_at_index`` calls (each shifts the + # internal options list). + self._rebuild_options(new_records) + return + for idx in to_remove_indices: + self.remove_option_at_index(idx) + self._records = new_records + + def _rebuild_options(self, records: cabc.Sequence[SearchRecord]) -> None: + """Full clear + rebuild path. Used when delta-apply isn't safe.""" + self._records = list(records) + self.clear_options() + if self._records: + for record in self._records: + cached_haystack(record) + self.add_options( + [option_type(self._render_record(r), id=str(id(r))) for r in self._records], + ) + + def clear(self) -> None: + """Empty the list.""" + self._records = [] + self.clear_options() + + def _scroll_percent(self) -> int: + """Compute the current scroll percent, clamped to ``[0, 100]``.""" + return scroll_percent( + float(getattr(self, "scroll_y", 0) or 0), + float(getattr(self, "max_scroll_y", 0) or 0), + ) + + def _post_scroll_changed(self, cursor: int | None = None) -> None: + """Post a :class:`ResultsScrollChanged` snapshot to the app. + + ``cursor`` defaults to the widget's current ``highlighted`` + reactive but accepts an explicit override so watchers can pass + the freshly-set value through without racing the reactive + dispatch. + """ + if cursor is None: + cursor = t.cast("int | None", getattr(self, "highlighted", None)) + self.post_message( + ResultsScrollChanged( + cursor=cursor, + total=len(self._records), + percent=self._scroll_percent(), + ), + ) + + def watch_scroll_y(self, old: float, new: float) -> None: + """Re-render the status line on scroll. Inherited base does the actual scroll.""" + base = getattr(super(), "watch_scroll_y", None) + if callable(base): + base(old, new) + self._post_scroll_changed() + + def watch_highlighted(self, highlighted: int | None) -> None: + """Re-render the status line on cursor move.""" + base = getattr(super(), "watch_highlighted", None) + if callable(base): + base(highlighted) + self._post_scroll_changed(cursor=highlighted) + + _AGENT_COLORS: t.ClassVar[dict[str, str]] = { + "codex": "cyan", + "claude": "magenta", + "cursor": "yellow", + } + _KIND_COLORS: t.ClassVar[dict[str, str]] = { + "prompt": "green", + "history": "blue", + } + + def _render_record(self, record: SearchRecord) -> object: + agent_text = (record.agent or "").ljust(8)[:8] + kind_text = (record.kind or "").ljust(10)[:10] + timestamp_text = format_timestamp_tig(record.timestamp).ljust(22)[:22] + title_text = (record.title or "").ljust(40)[:40] + path_text = format_compact_path(record.path, max_width=60) + text = rich_text.Text(no_wrap=True, overflow="ellipsis") + text.append(agent_text, style=self._AGENT_COLORS.get(record.agent or "", "")) + text.append(" ") + text.append(kind_text, style=self._KIND_COLORS.get(record.kind or "", "")) + text.append(" ") + text.append(timestamp_text, style="italic") + text.append(" ") + text.append(title_text, style="bold") + text.append(" ") + text.append(path_text, style="grey50") + return text + + def action_cursor_up(self) -> None: + """Release focus to the filter input when the cursor is at row 0.""" + if self.highlighted in (None, 0): + self.app.action_focus_previous() + else: + super().action_cursor_up() + + def action_focus_detail(self) -> None: + """Move focus rightward to the detail-scroll pane (vim-style ``l``).""" + detail = self.app.query_one("#detail-scroll") + t.cast("t.Any", detail).focus() + + def action_cursor_top(self) -> None: + """Jump the highlight to the first row (vim-style ``g``).""" + self.action_first() + + def action_cursor_bottom(self) -> None: + """Jump the highlight to the last row (vim-style ``G``).""" + self.action_last() + + def _cursor_jump(self, delta: int) -> None: + """Move the highlight by ``delta`` rows, clamped to list bounds.""" + row_count = len(self._records) + if row_count == 0: + return + current = self.highlighted if self.highlighted is not None else 0 + target = max(0, min(row_count - 1, current + delta)) + self.highlighted = target + + def action_cursor_half_page_down(self) -> None: + """Advance the highlight by half the visible viewport height (vim ``Ctrl-D``).""" + half = max(1, self.size.height // 2) + self._cursor_jump(half) + + def action_cursor_half_page_up(self) -> None: + """Move the highlight up by half the visible viewport height (vim ``Ctrl-U``).""" + half = max(1, self.size.height // 2) + self._cursor_jump(-half) + + vertical_scroll_base = t.cast("type[object]", vertical_scroll) + + class DetailScroll( + vertical_scroll_base, # ty: ignore[unsupported-base] + can_focus=True, + ): + """``VerticalScroll`` subclass for the right-side detail pane. + + Adds vim-style bindings: ``h`` / left-arrow releases focus back to the + results list, and ``j`` / ``k`` mirror the stock ``down`` / ``up`` + scroll bindings so navigation stays consistent with + :class:`SearchResultsList`. ``can_focus=True`` is set via the + class-keyword form — Textual reads it during ``__init_subclass__``, + so the plain class-attribute form silently fails to enroll the widget + in the focus chain. + """ + + BINDINGS: t.ClassVar[list[tuple[str, str, str]]] = [ + ("k", "scroll_up", "Up"), + ("j", "scroll_down", "Down"), + ("h", "focus_results", "Results"), + ("left", "focus_results", ""), + ("g", "scroll_home", "Top"), + ("G", "scroll_end", "Bottom"), + ("ctrl+d", "scroll_half_down", "½ Down"), + ("ctrl+u", "scroll_half_up", "½ Up"), + ("ctrl+f", "page_down", "Pg Down"), + ("ctrl+b", "page_up", "Pg Up"), + ] + + def action_focus_results(self) -> None: + """Move focus leftward back to the results list (vim-style ``h``).""" + results = self.app.query_one("#results") + t.cast("t.Any", results).focus() + + def action_scroll_up(self) -> None: + """Release focus to the filter input when already scrolled to the top. + + Mirrors :meth:`SearchResultsList.action_cursor_up` — when the + widget has nothing left to give in that direction, hand focus off + to the neighbor instead of swallowing the keystroke. Catches both + ``k`` (our binding) and ``up`` (inherited from + ``ScrollableContainer``). + """ + scroll_y = t.cast("float", getattr(self, "scroll_y", 0)) + if scroll_y <= 0: + self.app.query_one("#filter").focus() + else: + super().action_scroll_up() + + def action_scroll_half_down(self) -> None: + """Scroll down by half the visible viewport (vim ``Ctrl-D``).""" + half = max(1, self.size.height // 2) + self.scroll_relative(y=half, animate=True) + + def action_scroll_half_up(self) -> None: + """Scroll up by half the visible viewport (vim ``Ctrl-U``).""" + half = max(1, self.size.height // 2) + self.scroll_relative(y=-half, animate=True) + + def watch_scroll_y(self, old: float, new: float) -> None: + """Re-render the detail status line on scroll.""" + base = getattr(super(), "watch_scroll_y", None) + if callable(base): + base(old, new) + self.post_message( + DetailScrollChanged( + percent=scroll_percent( + float(new or 0), + float(getattr(self, "max_scroll_y", 0) or 0), + ), + ), + ) + + class FilterInput(input_widget): # ty: ignore[unsupported-base] + """``Input`` subclass with debounced filter + cursor-or-focus arrows. + + The base ``Input.Changed`` event still fires immediately on each + keystroke so the cursor, selection, and validation feedback stay + instant. The expensive filter operation is deferred onto a + :class:`FilterRequested` message which is only posted after 150 ms of + typing inactivity, letting a worker run the actual filter without + blocking the input itself. + + Up / down arrows are dual-purpose: when there's text in the input + they jump the cursor to the start / end; when the input is empty (or + the cursor is already at the relevant edge) they release focus to + the previous / next widget so the user can navigate into the results + table without reaching for Tab. + """ + + _DEBOUNCE_SECONDS: t.ClassVar[float] = 0.15 + + BINDINGS: t.ClassVar[list[tuple[str, str, str]]] = [ + ("down", "release_down", "Results"), + ] + + def __init__( + self, + *, + placeholder: str = "", + id: str | None = None, # noqa: A002 -- forwarded to Textual's ``id`` kwarg + ) -> None: + super().__init__(placeholder=placeholder, id=id) + self._debounce_timer: object | None = None + + def _watch_value(self, value: str) -> None: + """Post normal ``Input.Changed`` and arm a debounced ``FilterRequested``.""" + super()._watch_value(value) + if self._debounce_timer is not None: + self._debounce_timer.stop() + self._debounce_timer = self.set_timer( + self._DEBOUNCE_SECONDS, + lambda: self.post_message( + FilterRequested(payload=FilterRequestedPayload(text=value)), + ), + ) + + async def _on_key(self, event: object) -> None: + """Down/up route between cursor-jump and focus-release per spec.""" + key = str(getattr(event, "key", "")) + cursor = int(getattr(self, "cursor_position", 0)) + value = str(getattr(self, "value", "")) + stop = getattr(event, "stop", None) + if key == "down": + if value and cursor < len(value): + self.cursor_position = len(value) + if callable(stop): + stop() + return + # Empty or at end — release focus to next widget (DataTable) + if callable(stop): + stop() + self.app.action_focus_next() + return + if key == "up": + if value and cursor > 0: + self.cursor_position = 0 + if callable(stop): + stop() + return + # Empty or at start — release focus up to the top search bar + # so plain ``up`` navigates filter → search without reaching + # for Ctrl-K. Mirrors the symmetric ``down`` → results path. + if callable(stop): + stop() + with contextlib.suppress(Exception): + self.app.query_one("#search").focus() + return + if key == "right" and not value: + # Empty filter → release focus rightward to the detail pane. + # When the filter has text, fall through so the cursor can + # walk through it character-by-character. + if callable(stop): + stop() + with contextlib.suppress(Exception): + self.app.query_one("#detail-scroll").focus() + return + await super()._on_key(event) + + def action_release_down(self) -> None: + """Footer-binding fallback (``_on_key`` handles the real release).""" + self.app.action_focus_next() + + class SearchInput(input_widget): # ty: ignore[unsupported-base] + """``Input`` subclass that fires :class:`SearchRequested` on Enter. + + Keystrokes update the input text immediately so the cursor stays + instant, but no backend search runs until the user presses + Enter. This makes the search explicit (no surprise dispatches + while typing) and gives the cancel-existing-search logic a + clean trigger to hang off of — every Enter cancels the prior + worker before spawning a fresh one. + """ + + BINDINGS: t.ClassVar[list[tuple[str, str, str]]] = [ + ("down", "release_down", "Filter"), + ] + + def __init__( + self, + *, + value: str = "", + placeholder: str = "", + id: str | None = None, # noqa: A002 -- forwarded to Textual's ``id`` kwarg + ) -> None: + super().__init__(value=value, placeholder=placeholder, id=id) + + def on_input_submitted(self, event: object) -> None: + """Enter pressed — dispatch a :class:`SearchRequested` for the current value.""" + stop = getattr(event, "stop", None) + if callable(stop): + stop() + value = str(getattr(self, "value", "")) + self.post_message( + SearchRequested(payload=SearchRequestedPayload(text=value)), + ) + + async def _on_key(self, event: object) -> None: + """``down`` releases focus to the filter; ``up`` is a no-op (top widget).""" + key = str(getattr(event, "key", "")) + cursor = int(getattr(self, "cursor_position", 0)) + value = str(getattr(self, "value", "")) + stop = getattr(event, "stop", None) + if key == "down": + if value and cursor < len(value): + self.cursor_position = len(value) + if callable(stop): + stop() + return + if callable(stop): + stop() + self.app.action_focus_next() + return + if key == "up": + if value and cursor > 0: + self.cursor_position = 0 + if callable(stop): + stop() + return + if callable(stop): + stop() + return + await super()._on_key(event) + + def action_release_down(self) -> None: + """Footer-binding fallback (``_on_key`` handles the real release).""" + self.app.action_focus_next() + + class AgentGrepApp(app_type): # ty: ignore[unsupported-base] + """Streaming read-only explorer for normalized search records.""" + + CSS: t.ClassVar[str] = """ + Screen { + layout: vertical; + } + #search { + height: 3; + } + #body { + height: 1fr; + } + #results-column { + width: 1fr; + layout: vertical; + } + #detail-column { + width: 1fr; + layout: vertical; + } + #filter { + height: 3; + } + #detail-scroll { + height: 1fr; + overflow-y: auto; + overflow-x: hidden; + /* Reserve the border cell up-front (transparent) so toggling + focus only repaints the perimeter — no layout shift, no + extra padding when the border appears. Mirrors the + OptionList default CSS pattern. */ + border: tall transparent; + } + #detail-scroll:focus { + border: tall $border; + } + #detail { + padding: 0 1 0 0; + } + #results { + height: 1fr; + overflow-x: hidden; + } + #results-statusline { + height: 1; + padding: 0; + layout: horizontal; + } + #status-spinner { + width: 2; + color: $accent; + } + #status-text { + width: 1fr; + color: ansi_bright_cyan; + text-style: bold; + } + #status-right { + width: auto; + color: $warning; + text-style: bold; + } + #detail-statusline { + height: 1; + padding: 0; + color: #d8d8d8; + } + /* Keep Textual's OptionList default of "border appears only on focus" + (textual/widgets/_option_list.py:154 — ``border: tall $border``). + We only cancel the two parts of that focus rule that fight our + per-span semantic colors: the ``$foreground 5%`` background-tint + and the bright ``$block-cursor-*`` cursor-row recolor. */ + #results:focus { + background-tint: $foreground 0%; + } + #results:focus > .option-list--option-highlighted { + color: $block-cursor-blurred-foreground; + background: $block-cursor-blurred-background; + text-style: $block-cursor-blurred-text-style; + } + """ + # ``priority=True`` on the directional ``ctrl+hjkl`` bindings pushes + # them into Textual's priority dispatch lane so they win over any + # widget binding for the same key (e.g. ``Input``'s readline + # ``ctrl+k`` = kill-to-end-of-line). Trade-off accepted per user + # request: filter loses ``ctrl+k``; ``ctrl+u`` and ``ctrl+w`` are + # untouched and remain readline-compatible. + BINDINGS: t.ClassVar[list[t.Any]] = [ + ("tab", "focus_next", "Switch focus"), + ("q", "quit", "Quit"), + ("escape", "stop_search", "Stop search"), + ("ctrl+c", "smart_quit", "Stop / Quit"), + binding_type("ctrl+h", "focus_pane_left", "← Pane", priority=True), + binding_type("ctrl+j", "focus_pane_down", "↓ Pane", priority=True), + binding_type("ctrl+k", "focus_pane_up", "↑ Pane", priority=True), + binding_type("ctrl+l", "focus_pane_right", "→ Pane", priority=True), + # Terminal-alias fallback: many terminals (and tmux without + # ``xterm-keys on``) send 0x08 for both Backspace and Ctrl-H, so + # Textual sees ``key="backspace"``, never ``ctrl+h``. NO priority + # here — the filter input's own backspace handler (delete prev + # char) must keep winning inside the input. In panes nothing + # else binds backspace, so this fires. + binding_type("backspace", "focus_pane_left", "", show=False), + ] + all_records: list[SearchRecord] + filtered_records: list[SearchRecord] + + _DETAIL_CACHE_MAX: t.ClassVar[int] = 1024 + + def __init__( + self, + *, + home: pathlib.Path, + query: SearchQuery, + control: SearchControl, + ) -> None: + super().__init__() + self.home = home + self.query = query + self.control = control + self.all_records = [] + self.filtered_records = [] + self._filter_text = "" + self._progress: StreamingSearchProgress | None = None + self._search_done = False + self._started_at: float | None = None + self._last_snapshot: ProgressSnapshot | None = None + self._results: SearchResultsList | None = None + self._detail: StaticLike | None = None + self._status_widget: StaticLike | None = None + self._matches_widget: StaticLike | None = None + self._spinner_widget: SpinnerWidget | None = None + self._detail_statusline: StaticLike | None = None + self._filter_input: FilterInput | None = None + self._search_input: SearchInput | None = None + self._resize_debounce_timer: object | None = None + self._current_detail_record: SearchRecord | None = None + self._detail_scroll: t.Any = None + # LRU caches for detail-pane work. Keyed by + # ``(id(record), query.terms, case_sensitive, regex)`` — the + # tuple of attributes that determines the rendered body and + # the highlighted match line. Bounded so a long browsing + # session can't grow them without limit. + self._detail_body_cache: collections.OrderedDict[ + tuple[int, tuple[str, ...], bool, bool], + tuple[object, str], + ] = collections.OrderedDict() + self._first_match_cache: collections.OrderedDict[ + tuple[int, tuple[str, ...], bool, bool], + int | None, + ] = collections.OrderedDict() + + def _get_start_time(self) -> float | None: + return self._started_at + + def compose(self) -> cabc.Iterator[object]: + """Build the widget tree (header → search → body[results-col, detail-col] → footer). + + The results column carries its live chrome (spinner + status + + match count + scroll %) as a header above the filter and + list, so the running search state sits next to the search + input that drives it. The detail column keeps its status + line at the bottom — record path + scroll % is contextual to + whatever's currently being read, so the natural place to + glance is the foot of the pane. + """ + yield header() + initial_search = " ".join(self.query.terms) if self.query.terms else "" + yield SearchInput( + value=initial_search, + placeholder="Search prompts and history", + id="search", + ) + with horizontal(id="body"): + with vertical(id="results-column"): + with horizontal(id="results-statusline"): + yield SpinnerWidget(id="status-spinner") + yield static_type("", id="status-text") + yield static_type("", id="status-right") + yield FilterInput(placeholder="Filter loaded results", id="filter") + yield SearchResultsList(id="results") + with vertical(id="detail-column"): + with DetailScroll(id="detail-scroll"): + yield static_type("", id="detail") + yield static_type("", id="detail-statusline") + yield footer() + + def on_mount(self) -> None: + """Cache widget references, start the worker, and seed the chrome.""" + streaming = t.cast("StreamingAppLike", t.cast("object", self)) + self._results = t.cast( + "SearchResultsList", + streaming.query_one("#results"), + ) + self._detail = t.cast( + "StaticLike", + streaming.query_one("#detail", static_type), + ) + self._detail_scroll = streaming.query_one("#detail-scroll") + self._status_widget = t.cast( + "StaticLike", + streaming.query_one("#status-text", static_type), + ) + self._matches_widget = t.cast( + "StaticLike", + streaming.query_one("#status-right", static_type), + ) + self._spinner_widget = t.cast( + "SpinnerWidget", + streaming.query_one("#status-spinner"), + ) + self._detail_statusline = t.cast( + "StaticLike", + streaming.query_one("#detail-statusline", static_type), + ) + self._filter_input = t.cast( + "FilterInput", + streaming.query_one("#filter"), + ) + self._search_input = t.cast( + "SearchInput", + streaming.query_one("#search"), + ) + self._progress = StreamingSearchProgress(emit=make_emit(streaming)) + if self.query.terms: + self._start_search_worker(self.query) + self._filter_input.focus() + else: + # No initial query — leave the chrome idle and land focus on + # the search bar so the user can start typing immediately. + self._search_done = True + if self._status_widget is not None: + self._status_widget.update("Press Enter to search") + if self._spinner_widget is not None: + self._spinner_widget.freeze(" ") + self._search_input.focus() + + def _start_search_worker(self, query: SearchQuery) -> None: + """Reset chrome and spawn a new search worker for ``query``. + + ``exclusive=True`` with ``group="search"`` makes Textual cancel + any prior in-flight search worker before this one runs, which + is the canonical Textual pattern for "fire a backend search on + every debounced keystroke without piling up cancellations." + """ + self.query = query + self._reset_search_chrome() + streaming = t.cast("StreamingAppLike", t.cast("object", self)) + streaming.run_worker( + self._run_search, + name="search", + group="search", + thread=True, + exclusive=True, + ) + + def _reset_search_chrome(self) -> None: + """Wipe per-search state and chrome before a fresh search starts. + + Swap ``self.control`` for a fresh :class:`SearchControl` + instead of resetting the existing one — any worker thread + still holding the previous reference will continue to see + its cancel flag set (signaled by ``on_search_requested`` + before this call) and bail out cooperatively, while the + new worker starts with a clean slate. + """ + self.control = SearchControl() + clear_haystack_cache() + self._detail_body_cache.clear() + self._first_match_cache.clear() + self.all_records = [] + self.filtered_records = [] + self._search_done = False + self._started_at = None + self._last_snapshot = None + self._current_detail_record = None + if self._results is not None: + self._results.set_records([]) + if self._detail is not None: + self._detail.update("") + if self._matches_widget is not None: + self._matches_widget.update("") + if self._detail_statusline is not None: + self._detail_statusline.update("") + if self._status_widget is not None: + terms = " ".join(self.query.terms) if self.query.terms else "all records" + self._status_widget.update(f"Searching {terms}") + if self._spinner_widget is not None: + self._spinner_widget.unfreeze() + self._progress = StreamingSearchProgress( + emit=make_emit( + t.cast("StreamingAppLike", t.cast("object", self)), + ), + ) + + def _run_search(self) -> None: + progress = self._progress + if progress is None: + return + try: + run_search_query( + self.home, + self.query, + progress=progress, + control=self.control, + ) + except BaseException as exc: + streaming = t.cast("StreamingAppLike", t.cast("object", self)) + streaming.call_from_thread( + self._apply_finished, + "error", + len(self.all_records), + 0.0, + str(exc), + ) + + def on_search_requested(self, message: SearchRequested) -> None: + """User changed the top search input; relaunch the backend search. + + Treats whitespace-only / empty input as "no search" and just + resets the UI to an idle state without spawning a worker. + """ + text = message.payload.text.strip() + new_query = self._build_search_query(text) + self.control.request_answer_now() + if not text: + self._reset_search_chrome() + self._search_done = True + if self._status_widget is not None: + self._status_widget.update("Press Enter to search") + if self._spinner_widget is not None: + self._spinner_widget.freeze(" ") + self.query = new_query + return + self._start_search_worker(new_query) + + def _build_search_query(self, text: str) -> SearchQuery: + """Build a fresh :class:`SearchQuery` from the search-bar text. + + Preserves the agent and search-type filters from the current + query so the search bar lives on top of the existing filter + scope rather than resetting it. + """ + terms = tuple(text.split()) if text else () + return SearchQuery( + terms=terms, + search_type=self.query.search_type, + any_term=self.query.any_term, + regex=self.query.regex, + case_sensitive=self.query.case_sensitive, + agents=self.query.agents, + limit=self.query.limit, + ) + + _APPLY_CHUNK_SIZE: t.ClassVar[int] = 200 + + async def _apply_records_batch( + self, + records: cabc.Sequence[SearchRecord], + total: int, + ) -> None: + """Append a streaming records batch — invoked via ``call_from_thread``. + + Runs as a coroutine so the chunked loop can yield to the event + loop between each ``_APPLY_CHUNK_SIZE`` slice. ``call_from_thread`` + blocks the worker for the full duration of this coroutine, which + gives natural backpressure (the worker can't queue up batches + faster than the UI can apply them) while ``await asyncio.sleep(0)`` + gives the event loop a chance to process keystrokes, timers, and + renders between chunks — so a 5000-record batch can't freeze the + UI for the duration of a single apply. + """ + self.all_records.extend(records) + matching = [record for record in records if self._matches_filter(record)] + if matching and self._results is not None: + results = self._results + chunk_size = self._APPLY_CHUNK_SIZE + for start in range(0, len(matching), chunk_size): + chunk = matching[start : start + chunk_size] + results.append_records(chunk) + self.filtered_records.extend(chunk) + if start + chunk_size < len(matching): + await asyncio.sleep(0) + self._refresh_results_status_right() + + def _apply_progress(self, snapshot: ProgressSnapshot) -> None: + """Update the status widget — invoked via ``call_from_thread``.""" + self._last_snapshot = snapshot + if self._started_at is None: + self._started_at = time.monotonic() + label = snapshot.query_label + if snapshot.current is not None and snapshot.total is not None: + status = ( + f"Searching {label} | " + f"{snapshot.phase} {snapshot.current}/{snapshot.total} sources" + ) + elif snapshot.detail: + status = f"Searching {label} | {snapshot.phase} {snapshot.detail}" + else: + status = f"Searching {label} | {snapshot.phase}" + if self._status_widget is not None: + self._status_widget.update(status) + + def _apply_finished( + self, + outcome: str, + total: int, + elapsed: float, + error_message: str | None, + ) -> None: + """Freeze chrome widgets — invoked via ``call_from_thread``. + + Elapsed time is folded into the final status string rather than + shown as a live-ticking sibling widget. The status line no + longer claims animation budget once a search is done. + """ + self._search_done = True + glyphs = {"complete": "✓", "interrupted": "■", "error": "✗"} + if self._spinner_widget is not None: + self._spinner_widget.freeze(glyphs.get(outcome, "·")) + if self._status_widget is not None: + if outcome == "error": + self._status_widget.update(f"Search failed: {error_message}") + elif outcome == "interrupted": + self._status_widget.update( + f"Stopped at {format_match_count(total)} " + f"across {self._sources_label()} sources in {elapsed:.1f}s", + ) + else: + self._status_widget.update( + f"Search complete: {format_match_count(total)} in {elapsed:.1f}s", + ) + + def _sources_label(self) -> str: + snap = self._last_snapshot + if snap is None or snap.current is None or snap.total is None: + return "?" + return f"{snap.current}/{snap.total}" + + def on_filter_requested(self, message: FilterRequested) -> None: + """Spawn a worker to recompute the filter; exclusive cancels any in-flight one.""" + text = message.payload.text + self._filter_text = text.strip().casefold() + streaming = t.cast("StreamingAppLike", t.cast("object", self)) + streaming.run_worker( + lambda captured_text=text: self._run_filter_worker(captured_text), + name="filter", + group="filter", + thread=True, + exclusive=True, + ) + + def _run_filter_worker(self, text: str) -> None: + """Compute the filtered list on a background thread; post a ``FilterCompleted``. + + Runs in a worker thread; safe to scan ``self.all_records`` since + list reads under CPython are GIL-protected. The main thread guards + against stale results by comparing the captured text against the + current input value in :meth:`on_filter_completed`. + """ + matching = compute_filter_matches(self.all_records, text) + streaming = t.cast("StreamingAppLike", t.cast("object", self)) + streaming.post_message( + FilterCompleted( + payload=FilterCompletedPayload(text=text, matching=matching), + ), + ) + + def on_filter_completed(self, message: FilterCompleted) -> None: + """Apply the worker's filter result if it matches the current input. + + Skips :meth:`show_detail` when the top filtered record is already + the one being displayed — ``set_records`` re-emits an + ``OptionHighlighted`` event during its rebuild which triggers a + detail re-render anyway, and detail rendering (Rich Text header, + JSON/Markdown body, scroll-to-match) is one of the heavier + main-thread units per filter pass. + """ + payload = message.payload + if self._filter_input is not None and payload.text != self._filter_input.value: + return + self.filtered_records = list(payload.matching) + if self._results is not None: + self._results.set_records(payload.matching) + if self._detail is not None: + if self.filtered_records: + top = self.filtered_records[0] + if top is not self._current_detail_record: + self.show_detail(top) + else: + self._detail.update( + "No results." if self._search_done else "No matches yet.", + ) + + def on_option_list_option_highlighted(self, event: object) -> None: + """Update the detail pane and footer on OptionList cursor move. + + Guards against the redundant re-render that fires when + ``set_records`` rebuilds the list and Textual re-emits the + highlight for the same row that's already in the detail pane. + """ + option_index = getattr(event, "option_index", None) + if option_index is None: + self._refresh_results_status_right() + return + row_index = int(option_index) + if 0 <= row_index < len(self.filtered_records): + record = self.filtered_records[row_index] + if record is not self._current_detail_record: + self.show_detail(record) + self._refresh_results_status_right( + cursor=row_index, + visible=len(self.filtered_records), + percent=self._results._scroll_percent() if self._results is not None else None, + ) + + def on_results_scroll_changed(self, message: ResultsScrollChanged) -> None: + """Re-render the right side of the results status line.""" + self._refresh_results_status_right( + cursor=message.cursor, + visible=message.total, + percent=message.percent, + ) + + def on_detail_scroll_changed(self, message: DetailScrollChanged) -> None: + """Re-render the detail status line on detail-pane scroll.""" + self._refresh_detail_statusline(message.percent) + + def _refresh_results_status_right( + self, + *, + cursor: int | None = None, + visible: int | None = None, + percent: int | None = None, + ) -> None: + """Compose the results-status right slot from the most recent state. + + Combines the streaming match count (from ``self.all_records``) + with the current cursor / scroll percent so the right slot is + always shaped ``{N} matches [{cursor+1}/{visible}] {pct}%``. + Each segment is omitted when its inputs are unknown. + """ + if self._matches_widget is None: + return + if cursor is None and visible is None and percent is None and self._results is not None: + cursor = t.cast("int | None", getattr(self._results, "highlighted", None)) + visible = len(self._results._records) + percent = self._results._scroll_percent() + self._matches_widget.update( + self._format_results_right(cursor, visible, percent), + ) + + def _format_results_right( + self, + cursor: int | None, + visible: int | None, + percent: int | None, + ) -> str: + """Render the right slot: ``{N} matches {cursor+1}/{visible} {pct}%`` (tig style).""" + total_matches = len(self.all_records) + parts: list[str] = [] + if total_matches > 0: + parts.append(format_match_count(total_matches)) + if visible and visible > 0 and cursor is not None: + parts.append(f"{cursor + 1}/{visible}") + if percent is not None and total_matches > 0: + parts.append(f"{percent}%") + return " ".join(parts) + + def _refresh_detail_statusline(self, percent: int | None = None) -> None: + """Update the detail status line with the current record path and scroll %.""" + if self._detail_statusline is None: + return + record = self._current_detail_record + if record is None: + self._detail_statusline.update("") + return + pct = percent if percent is not None else self._current_detail_scroll_percent() + width = max(20, int(getattr(self._detail_statusline.size, "width", 80))) + path_text = format_compact_path(record.path, max_width=max(10, width - 6)) + pad = max(1, width - len(path_text) - len(f"{pct}%")) + self._detail_statusline.update(f"{path_text}{' ' * pad}{pct}%") + + def _current_detail_scroll_percent(self) -> int: + """Compute the detail pane's scroll percent on demand.""" + if self._detail_scroll is None: + return 100 + scroll = self._detail_scroll + return scroll_percent( + float(getattr(scroll, "scroll_y", 0) or 0), + float(getattr(scroll, "max_scroll_y", 0) or 0), + ) + + # Constant — keep in sync with the label list in ``show_detail`` below. + # 7 label rows (Agent / Kind / Store / Adapter / Timestamp / Model / Path) + # plus 1 blank separator = 8 lines of header before the body starts. + _DETAIL_HEADER_LINES: t.ClassVar[int] = 8 + + def show_detail(self, record: SearchRecord) -> None: + """Render ``record`` with colored labels + format-aware body + scroll-to-match. + + The body is truncated to :data:`DETAIL_BODY_MAX_LINES` lines (the + ``VerticalScroll`` wrapper handles letting the user scroll within + the visible window). The body renderable is chosen by + :func:`detect_content_format`: + + * JSON bodies are pretty-printed and rendered via + :class:`rich.syntax.Syntax` with ``ansi_dark`` theming. + * Markdown bodies render via :class:`rich.markdown.Markdown`. + * Everything else keeps the existing ``Text`` + ``highlight_regex`` + flow so search-term matches stay bold-yellow. + + If any current query term occurs in the body the pane is scrolled + so that line lands vertically centered in the viewport (line index + is recomputed against the formatted body for JSON so the jump is + still accurate). + """ + if self._detail is None: + return + self._current_detail_record = record + width = max(20, self._detail.size.width or 80) + agent_color = SearchResultsList._AGENT_COLORS.get(record.agent or "", "") + kind_color = SearchResultsList._KIND_COLORS.get(record.kind or "", "") + header = rich_text.Text(no_wrap=False) + for label, value, value_style in ( + ("Agent:", record.agent or "", agent_color), + ("Kind:", record.kind or "", kind_color), + ("Store:", record.store or "", "dim"), + ("Adapter:", record.adapter_id or "", "dim"), + ("Timestamp:", record.timestamp or "unknown", "dim"), + ("Model:", record.model or "unknown", "magenta"), + ( + "Path:", + format_compact_path(record.path, max_width=width - 8), + "grey50", + ), + ): + header.append(f"{label} ", style="bold") + header.append(f"{value}\n", style=value_style) + header.append("\n") + body_truncated = truncate_lines(record.text, DETAIL_BODY_MAX_LINES) + query_terms = list(self.query.terms) + body_renderable, body_for_scroll = self._build_detail_body( + body_truncated, + query_terms, + ) + self._detail.update( + _RichGroup(header, t.cast("t.Any", body_renderable)), + ) + self._scroll_detail_to_first_match(body_for_scroll, query_terms) + self._refresh_detail_statusline() + + def _detail_cache_key( + self, + query_terms: cabc.Sequence[str], + ) -> tuple[int, tuple[str, ...], bool, bool] | None: + """Compose the LRU key for the current record + query. + + Returns ``None`` when there is no current record (e.g. detail + pane invoked before a record is highlighted) so callers know + to skip the cache entirely. + """ + record = self._current_detail_record + if record is None: + return None + return ( + id(record), + tuple(query_terms), + self.query.case_sensitive, + self.query.regex, + ) + + def _build_detail_body( + self, + body_text: str, + query_terms: cabc.Sequence[str], + ) -> tuple[object, str]: + """Return ``(renderable, body_text_for_match_search)`` for ``body_text``. + + The second tuple element is whatever text the caller's + ``find_first_match_line`` should scan. For JSON we pretty-print + and return the formatted text so the line index lines up with + what the user actually sees rendered. Result is memoized per + ``(record, query)`` so scrolling back to a previously-viewed + record never re-parses the JSON body. + """ + cache_key = self._detail_cache_key(query_terms) + if cache_key is not None: + cached = self._detail_body_cache.get(cache_key) + if cached is not None: + self._detail_body_cache.move_to_end(cache_key) + return cached + fmt = detect_content_format(body_text) + result: tuple[object, str] + if fmt == "json": + try: + formatted = json.dumps( + json.loads(body_text), + indent=2, + ensure_ascii=False, + ) + except json.JSONDecodeError, ValueError: + formatted = body_text + match_line = find_first_match_line( + formatted, + query_terms, + case_sensitive=self.query.case_sensitive, + regex=self.query.regex, + ) + highlight_lines = {match_line + 1} if match_line is not None else None + syntax = _RichSyntax( + formatted, + "json", + theme="ansi_dark", + word_wrap=True, + highlight_lines=highlight_lines, + ) + result = (syntax, formatted) + elif fmt == "markdown": + result = ( + _RichMarkdown(body_text, code_theme="ansi_dark"), + body_text, + ) + else: + result = ( + highlight_matches( + body_text, + query_terms, + case_sensitive=self.query.case_sensitive, + regex=self.query.regex, + ), + body_text, + ) + if cache_key is not None: + self._detail_body_cache[cache_key] = result + self._detail_body_cache.move_to_end(cache_key) + if len(self._detail_body_cache) > self._DETAIL_CACHE_MAX: + self._detail_body_cache.popitem(last=False) + return result + + def _scroll_detail_to_first_match( + self, + body_text: str, + query_terms: cabc.Sequence[str], + ) -> None: + """Jump ``_detail_scroll`` so the first match lands at the viewport center. + + Memoizes ``find_first_match_line`` per ``(record, query)`` so a + cursor parked on the same record across viewport refreshes does + not rescan the body each time. + """ + if self._detail_scroll is None: + return + scroll: t.Any = self._detail_scroll + cache_key = self._detail_cache_key(query_terms) + if cache_key is not None and cache_key in self._first_match_cache: + match_line = self._first_match_cache[cache_key] + self._first_match_cache.move_to_end(cache_key) + else: + match_line = find_first_match_line( + body_text, + query_terms, + case_sensitive=self.query.case_sensitive, + regex=self.query.regex, + ) + if cache_key is not None: + self._first_match_cache[cache_key] = match_line + self._first_match_cache.move_to_end(cache_key) + if len(self._first_match_cache) > self._DETAIL_CACHE_MAX: + self._first_match_cache.popitem(last=False) + if match_line is None: + scroll.scroll_to(y=0, animate=False) + return + target_line = self._DETAIL_HEADER_LINES + match_line + viewport_h = int(getattr(scroll.size, "height", 0) or 0) + center_offset = max(0, target_line - viewport_h // 2) + scroll.scroll_to(y=center_offset, animate=False) + + def on_resize(self, event: object) -> None: + """Debounce rapid resize bursts (e.g. tiling-WM live drag).""" + del event + if self._resize_debounce_timer is not None: + timer = t.cast("t.Any", self._resize_debounce_timer) + timer.stop() + self._resize_debounce_timer = self.set_timer(0.05, self._after_resize) + + def _after_resize(self) -> None: + """Refresh chrome; the detail pane scroll wrapper handles its own reflow.""" + if self._matches_widget is not None: + self._matches_widget.refresh() + + def action_stop_search(self) -> None: + """``Esc``: cooperative early-exit of the worker (no-op when finished).""" + self._cancel_active_action() + + def action_smart_quit(self) -> None: + """``Ctrl-C``: cancel the topmost in-flight action; quit if there are none.""" + if self._has_active_actions(): + self._cancel_active_action() + else: + self.exit() + + # Directional pane focus (tmux-style ``ctrl+hjkl``). Edge moves (e.g. + # ``ctrl+j`` from the detail pane — nothing below it) are no-ops. + # The focusable regions, top-to-bottom: #search (top), then in the + # body: #filter and #results (left column, sticky filter above the + # list) and #detail-scroll (right column). + + def _focus_widget_by_id(self, widget_id: str) -> None: + try: + target = self.query_one(f"#{widget_id}") + except Exception: + return + t.cast("t.Any", target).focus() + + def action_focus_pane_left(self) -> None: + """``Ctrl-H``: focus the pane to the left of the current one.""" + if self.focused is not None and self.focused.id == "detail-scroll": + self._focus_widget_by_id("results") + + def action_focus_pane_right(self) -> None: + """``Ctrl-L``: focus the pane to the right of the current one.""" + if self.focused is not None and self.focused.id in ( + "results", + "filter", + "search", + ): + self._focus_widget_by_id("detail-scroll") + + def action_focus_pane_up(self) -> None: + """``Ctrl-K``: focus the pane above the current one. + + Inside the body, ``up`` lands on the body's top row (``#filter``). + From the body's top row, ``up`` leaves the body and lands on the + top-level search bar. + """ + focused_id = self.focused.id if self.focused is not None else None + if focused_id in ("results", "detail-scroll"): + self._focus_widget_by_id("filter") + elif focused_id == "filter": + self._focus_widget_by_id("search") + + def action_focus_pane_down(self) -> None: + """``Ctrl-J``: focus the pane below the current one.""" + focused_id = self.focused.id if self.focused is not None else None + if focused_id == "search": + self._focus_widget_by_id("filter") + elif focused_id == "filter": + self._focus_widget_by_id("results") + + def _has_active_actions(self) -> bool: + """Return True if any cancellable in-flight action exists. + + Extension point: when a second cancellable action lands (async + detail-fetch, debounced refilter, etc.), add its state here. + """ + return not self._search_done + + def _cancel_active_action(self) -> None: + """Cancel the topmost in-flight cancellable action. + + Extension point: extend with future cancellable actions in + most-recently-started order so ``Ctrl-C`` peels them off one at a + time before exiting. + """ + if not self._search_done: + self.control.request_answer_now() + + def _matches_filter(self, record: SearchRecord) -> bool: + if not self._filter_text: + return True + return self._filter_text in build_search_haystack(record).casefold() + + return AgentGrepApp(home=home, query=query, control=control) diff --git a/tests/test_agentgrep.py b/tests/test_agentgrep.py index a4328b0..c69188e 100644 --- a/tests/test_agentgrep.py +++ b/tests/test_agentgrep.py @@ -4,6 +4,7 @@ from __future__ import annotations import argparse +import asyncio import contextlib import dataclasses import importlib @@ -337,6 +338,54 @@ def test_default_verb_works_after_global_color_flag(tmp_path: pathlib.Path) -> N assert completed.returncode == 1 +def test_inject_default_subcommand_empty_returns_ui() -> None: + """Bare ``agentgrep`` should default to the ``ui`` subcommand.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + + assert list(agentgrep.inject_default_subcommand([])) == ["ui"] + + +def test_inject_default_subcommand_color_only_returns_ui() -> None: + """``agentgrep --color never`` should also default to ``ui``.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + + assert list(agentgrep.inject_default_subcommand(["--color", "never"])) == [ + "--color", + "never", + "ui", + ] + + +def test_parse_args_ui_subcommand_returns_ui_args() -> None: + """``agentgrep ui`` parses to a ``UIArgs`` with empty initial query.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + + args = agentgrep.parse_args(["ui"]) + + assert isinstance(args, agentgrep.UIArgs) + assert args.initial_query == "" + + +def test_parse_args_ui_subcommand_with_initial_query() -> None: + """``agentgrep ui bliss`` populates ``initial_query``.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + + args = agentgrep.parse_args(["ui", "bliss"]) + + assert isinstance(args, agentgrep.UIArgs) + assert args.initial_query == "bliss" + + +def test_parse_args_empty_argv_returns_ui_args() -> None: + """``parse_args([])`` returns a ``UIArgs`` via the default subcommand.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + + args = agentgrep.parse_args([]) + + assert isinstance(args, agentgrep.UIArgs) + assert args.initial_query == "" + + def test_search_progress_mode_parses_default_and_explicit() -> None: agentgrep = load_agentgrep_module() @@ -841,6 +890,72 @@ def test_compute_filter_matches_empty_text_returns_all(tmp_path: pathlib.Path) - assert agentgrep.compute_filter_matches([record], " ") == (record,) +def test_cached_haystack_memoizes_per_record( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``cached_haystack`` calls ``build_search_haystack`` once per record.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + agentgrep.clear_haystack_cache() + record = agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / "a.jsonl", + text="serene bliss", + ) + call_count = 0 + real_build_search_haystack = agentgrep.build_search_haystack + + def counting_build(rec: object) -> str: + nonlocal call_count + call_count += 1 + return t.cast("str", real_build_search_haystack(rec)) + + monkeypatch.setattr(agentgrep, "build_search_haystack", counting_build) + first = agentgrep.cached_haystack(record) + second = agentgrep.cached_haystack(record) + assert first == second + assert first == "serene bliss\n" + str(record.path) + assert call_count == 1 + agentgrep.clear_haystack_cache() + # Cache cleared — next call rebuilds. + _ = agentgrep.cached_haystack(record) + assert call_count == 2 + + +def test_compute_filter_matches_uses_cached_haystack( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The filter uses the cache: ``build_search_haystack`` not called once cached.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + agentgrep.clear_haystack_cache() + records = [ + agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / f"r{idx}.jsonl", + text=f"row {idx} alpha", + ) + for idx in range(3) + ] + # Warm the cache. + for record in records: + agentgrep.cached_haystack(record) + + def raise_if_called(_record: object) -> str: + msg = "build_search_haystack must not run after cache is warm" + raise RuntimeError(msg) + + monkeypatch.setattr(agentgrep, "build_search_haystack", raise_if_called) + matches = agentgrep.compute_filter_matches(records, "alpha") + assert len(matches) == 3 + + def _build_empty_ui_app( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, @@ -882,6 +997,118 @@ async def test_streaming_ui_app_mounts_cleanly( await pilot.pause() focus_chain_ids = {getattr(w, "id", None) for w in app.screen.focus_chain} assert "results" in focus_chain_ids, f"#results not in focus chain; chain={focus_chain_ids}" + # Both inputs and the detail pane should be focusable too. + assert {"search", "filter", "detail-scroll"}.issubset(focus_chain_ids) + + +async def test_empty_query_focuses_search_input_and_marks_search_done( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """With no initial query, the search bar takes focus and chrome is idle.""" + app = _build_empty_ui_app(tmp_path, monkeypatch) + async with app.run_test() as pilot: + await pilot.pause() + assert app.focused is not None + assert app.focused.id == "search" + assert app._search_done is True + + +async def test_search_input_posts_search_requested_only_on_enter( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Typing alone posts nothing; pressing Enter posts one ``SearchRequested``. + + The ``SearchRequested`` class lives inside the streaming-app factory + closure, so the test sniffs every posted message and filters to ones + whose payload type matches :class:`SearchRequestedPayload`. + """ + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + posts: list[str] = [] + + async with app.run_test() as pilot: + await pilot.pause() + app._search_input.focus() + await pilot.pause() + original_post_message = app._search_input.post_message + + def capture(message: object) -> bool: + payload = getattr(message, "payload", None) + if isinstance(payload, agentgrep.SearchRequestedPayload): + posts.append(payload.text) + return original_post_message(message) + + monkeypatch.setattr(app._search_input, "post_message", capture) + await pilot.press("b") + await pilot.press("l") + await pilot.press("i") + await pilot.pause(0.4) + assert posts == [], f"keystrokes should not auto-post; got {posts}" + await pilot.press("enter") + await pilot.pause(0.1) + assert posts == ["bli"], f"expected one post on Enter, got {posts}" + + +async def test_search_input_dispatch_spawns_search_group_worker( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Pressing Enter on a non-empty search bar spawns a ``search`` worker.""" + app = _build_empty_ui_app(tmp_path, monkeypatch) + spawned: list[dict[str, object]] = [] + + def fake_worker(*args: object, **kwargs: object) -> None: + spawned.append({"args": args, "kwargs": kwargs}) + + async with app.run_test() as pilot: + await pilot.pause() + monkeypatch.setattr(app, "run_worker", fake_worker) + app._search_input.focus() + await pilot.pause() + app._search_input.value = "bliss" + await pilot.pause(0.1) + assert spawned == [], f"value change alone should not spawn; got {spawned}" + await pilot.press("enter") + await pilot.pause(0.1) + groups = [t.cast("dict[str, object]", entry["kwargs"]).get("group") for entry in spawned] + assert "search" in groups, f"expected a search-group worker, got {spawned}" + + +async def test_search_input_enter_replaces_control_to_cancel_prior_search( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Each new search signals the prior control and installs a fresh one. + + The cooperative cancel contract is: the old worker thread keeps its + (now-signaled) ``SearchControl`` reference and bails out; the new + worker gets a fresh, un-signaled control. + """ + app = _build_empty_ui_app(tmp_path, monkeypatch) + + async with app.run_test() as pilot: + await pilot.pause() + # Stub run_worker so the app's worker bookkeeping doesn't fight us. + monkeypatch.setattr(app, "run_worker", lambda *a, **kw: None) + app._search_input.focus() + await pilot.pause() + app._search_input.value = "first" + await pilot.press("enter") + await pilot.pause(0.1) + first_control = app.control + assert first_control.answer_now_requested() is False + app._search_input.value = "second" + await pilot.press("enter") + await pilot.pause(0.1) + assert app.control is not first_control, "control should be replaced on new search" + assert first_control.answer_now_requested() is True, ( + "prior control should be signaled to cancel" + ) + assert app.control.answer_now_requested() is False, ( + "fresh control should not carry over the cancel flag" + ) async def test_tab_moves_focus_from_filter_to_results( @@ -892,7 +1119,10 @@ async def test_tab_moves_focus_from_filter_to_results( app = _build_empty_ui_app(tmp_path, monkeypatch) async with app.run_test() as pilot: await pilot.pause() - # Filter starts focused (first focusable in the chain). + # On empty initial query the search bar takes initial focus, so + # manually move focus to the filter input for this test. + app._filter_input.focus() + await pilot.pause() assert app.focused is not None assert app.focused.id == "filter" await pilot.press("tab") @@ -908,6 +1138,8 @@ async def test_down_at_empty_filter_releases_focus_to_results( """``down`` arrow on an empty filter moves focus to the results table.""" app = _build_empty_ui_app(tmp_path, monkeypatch) async with app.run_test() as pilot: + await pilot.pause() + app._filter_input.focus() await pilot.pause() assert app.focused is not None and app.focused.id == "filter" await pilot.press("down") @@ -937,7 +1169,9 @@ async def test_up_at_results_top_row_releases_focus_to_filter( app.filtered_records.append(record) app._results.append_records([record]) await pilot.pause() - # Move focus to the results list and confirm cursor is at row 0. + # Land focus on the filter and tab to the results. + app._filter_input.focus() + await pilot.pause() await pilot.press("tab") await pilot.pause() assert app.focused is not None and app.focused.id == "results" @@ -969,6 +1203,8 @@ async def test_l_from_results_focuses_detail_pane( app.filtered_records.append(record) app._results.append_records([record]) await pilot.pause() + app._filter_input.focus() + await pilot.pause() await pilot.press("tab") await pilot.pause() assert app.focused is not None and app.focused.id == "results" @@ -1071,6 +1307,8 @@ async def test_g_on_results_jumps_to_top( app.filtered_records.extend(records) app._results.append_records(records) await pilot.pause() + app._filter_input.focus() + await pilot.pause() await pilot.press("tab") await pilot.pause() app._results.highlighted = 3 @@ -1095,6 +1333,8 @@ async def test_G_on_results_jumps_to_bottom( app.filtered_records.extend(records) app._results.append_records(records) await pilot.pause() + app._filter_input.focus() + await pilot.pause() await pilot.press("tab") await pilot.pause() await pilot.press("G") @@ -1116,6 +1356,8 @@ async def test_ctrl_d_on_results_advances_half_page( app.filtered_records.extend(records) app._results.append_records(records) await pilot.pause() + app._filter_input.focus() + await pilot.pause() await pilot.press("tab") await pilot.pause() app._results.highlighted = 0 @@ -1243,6 +1485,8 @@ async def test_ctrl_j_from_filter_focuses_results( app.filtered_records.extend(records) app._results.append_records(records) await pilot.pause() + app._filter_input.focus() + await pilot.pause() assert app.focused is not None and app.focused.id == "filter" await pilot.press("ctrl+j") await pilot.pause() @@ -1263,6 +1507,8 @@ async def test_ctrl_l_from_results_focuses_detail( app.filtered_records.extend(records) app._results.append_records(records) await pilot.pause() + app._filter_input.focus() + await pilot.pause() await pilot.press("tab") await pilot.pause() assert app.focused is not None and app.focused.id == "results" @@ -1307,6 +1553,8 @@ async def test_ctrl_k_from_results_focuses_filter( app.filtered_records.extend(records) app._results.append_records(records) await pilot.pause() + app._filter_input.focus() + await pilot.pause() await pilot.press("tab") await pilot.pause() assert app.focused is not None and app.focused.id == "results" @@ -1364,6 +1612,8 @@ async def test_backspace_in_filter_still_deletes_a_character( """The backspace alias must NOT steal backspace from the filter input.""" app = _build_empty_ui_app(tmp_path, monkeypatch) async with app.run_test() as pilot: + await pilot.pause() + app._filter_input.focus() await pilot.pause() assert app.focused is not None and app.focused.id == "filter" await pilot.press("a") @@ -1392,12 +1642,88 @@ async def test_ctrl_h_from_filter_is_a_noop( app.filtered_records.extend(records) app._results.append_records(records) await pilot.pause() + app._filter_input.focus() + await pilot.pause() assert app.focused is not None and app.focused.id == "filter" await pilot.press("ctrl+h") await pilot.pause() assert app.focused is not None and app.focused.id == "filter" +async def test_up_on_empty_filter_releases_focus_to_search( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Plain ``up`` on an empty filter input lifts focus to the top search bar.""" + app = _build_empty_ui_app(tmp_path, monkeypatch) + async with app.run_test() as pilot: + await pilot.pause() + app._filter_input.focus() + await pilot.pause() + assert app.focused is not None and app.focused.id == "filter" + await pilot.press("up") + await pilot.pause() + assert app.focused is not None + assert app.focused.id == "search" + + +async def test_up_on_filter_with_cursor_at_start_releases_focus_to_search( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``up`` on a non-empty filter whose cursor is at position 0 still escapes upward.""" + app = _build_empty_ui_app(tmp_path, monkeypatch) + async with app.run_test() as pilot: + await pilot.pause() + app._filter_input.focus() + await pilot.pause() + # Type something, then move cursor back to start. + app._filter_input.value = "abc" + app._filter_input.cursor_position = 0 + await pilot.pause() + await pilot.press("up") + await pilot.pause() + assert app.focused is not None + assert app.focused.id == "search" + + +async def test_right_on_empty_filter_releases_focus_to_detail( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``right`` on an empty filter hands focus across to the detail pane.""" + app = _build_empty_ui_app(tmp_path, monkeypatch) + async with app.run_test() as pilot: + await pilot.pause() + app._filter_input.focus() + await pilot.pause() + assert app._filter_input.value == "" + await pilot.press("right") + await pilot.pause() + assert app.focused is not None + assert app.focused.id == "detail-scroll" + + +async def test_right_on_non_empty_filter_moves_cursor( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``right`` on a non-empty filter walks the cursor — does not release focus.""" + app = _build_empty_ui_app(tmp_path, monkeypatch) + async with app.run_test() as pilot: + await pilot.pause() + app._filter_input.focus() + await pilot.pause() + app._filter_input.value = "abc" + app._filter_input.cursor_position = 0 + await pilot.pause() + await pilot.press("right") + await pilot.pause() + # Focus stays on the filter; cursor advances by one. + assert app.focused is not None and app.focused.id == "filter" + assert app._filter_input.cursor_position == 1 + + async def test_search_results_list_append_under_load( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, @@ -1433,6 +1759,269 @@ async def test_search_results_list_append_under_load( assert elapsed < 2.0, f"append_records(1000) took {elapsed:.3f}s; expected < 2.0s" +async def test_set_records_narrowing_avoids_clear_options( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A narrowing filter (subset of current records) must not full-rebuild the list.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + records = [ + agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / f"r{idx}.jsonl", + text=f"row {idx}", + ) + for idx in range(10) + ] + async with app.run_test() as pilot: + await pilot.pause() + app._results.append_records(records) + await pilot.pause() + clear_count = 0 + original_clear = app._results.clear_options + + def counting_clear() -> object: + nonlocal clear_count + clear_count += 1 + return original_clear() + + monkeypatch.setattr(app._results, "clear_options", counting_clear) + # Narrow to the first 7 records (drop 3). 3 / 10 <= 50% → delta path. + app._results.set_records(records[:7]) + await pilot.pause() + assert clear_count == 0 + assert len(app._results._records) == 7 + assert [id(r) for r in app._results._records] == [id(r) for r in records[:7]] + + +async def test_set_records_widening_triggers_full_rebuild( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Widening (introducing records not currently shown) rebuilds for order correctness.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + records = [ + agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / f"r{idx}.jsonl", + text=f"row {idx}", + ) + for idx in range(5) + ] + async with app.run_test() as pilot: + await pilot.pause() + app._results.append_records(records[:3]) + await pilot.pause() + clear_count = 0 + original_clear = app._results.clear_options + + def counting_clear() -> object: + nonlocal clear_count + clear_count += 1 + return original_clear() + + monkeypatch.setattr(app._results, "clear_options", counting_clear) + # Widen to all 5 records — two of them weren't shown before. + app._results.set_records(records) + await pilot.pause() + assert clear_count == 1, "widening must rebuild to preserve record order" + assert len(app._results._records) == 5 + + +async def test_apply_records_batch_yields_between_chunks( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Applying a large batch yields to the event loop every chunk_size records.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + chunk = app._APPLY_CHUNK_SIZE + # Three chunks worth — should yield twice (between chunk 0/1 and 1/2). + record_count = chunk * 3 + records = [ + agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / f"r{idx}.jsonl", + text=f"row {idx}", + ) + for idx in range(record_count) + ] + async with app.run_test() as pilot: + await pilot.pause() + sleep_calls = 0 + real_sleep = asyncio.sleep + + async def counting_sleep(delay: float) -> None: + nonlocal sleep_calls + if delay == 0: + sleep_calls += 1 + await real_sleep(delay) + + monkeypatch.setattr(asyncio, "sleep", counting_sleep) + await app._apply_records_batch(records, record_count) + assert sleep_calls >= 2, ( + f"expected >= 2 yields for {record_count} records in chunks of {chunk}, " + f"got {sleep_calls}" + ) + assert len(app._results._records) == record_count + + +async def test_set_records_majority_removal_falls_back_to_rebuild( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Removing more than half of the current records uses the chunked rebuild path.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + records = [ + agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / f"r{idx}.jsonl", + text=f"row {idx}", + ) + for idx in range(10) + ] + async with app.run_test() as pilot: + await pilot.pause() + app._results.append_records(records) + await pilot.pause() + clear_count = 0 + original_clear = app._results.clear_options + + def counting_clear() -> object: + nonlocal clear_count + clear_count += 1 + return original_clear() + + monkeypatch.setattr(app._results, "clear_options", counting_clear) + # Drop 8 of 10 — well over the 50% threshold. + app._results.set_records(records[:2]) + await pilot.pause() + assert clear_count == 1, "majority-removal must take the rebuild path" + assert len(app._results._records) == 2 + + +def test_scroll_percent_returns_full_when_nothing_scrolls() -> None: + """A pane that fits its viewport reports ``100%`` (tig convention).""" + from agentgrep.ui.app import scroll_percent + + assert scroll_percent(0.0, 0.0) == 100 + + +def test_scroll_percent_clamps_to_bounds() -> None: + """Scroll percent is clamped to ``[0, 100]`` even for nonsense inputs.""" + from agentgrep.ui.app import scroll_percent + + assert scroll_percent(0.0, 100.0) == 0 + assert scroll_percent(50.0, 100.0) == 50 + assert scroll_percent(100.0, 100.0) == 100 + # Overshoot past max — clamped to 100. + assert scroll_percent(500.0, 100.0) == 100 + # Negative scroll — clamped to 0. + assert scroll_percent(-10.0, 100.0) == 0 + + +async def test_results_status_right_shows_match_count_cursor_and_percent( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``_format_results_right`` renders ``{N} matches {cursor+1}/{visible} {pct}%``.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + async with app.run_test() as pilot: + await pilot.pause() + # No streaming results yet — empty right slot regardless of args. + assert app._format_results_right(cursor=None, visible=None, percent=None) == "" + # Seed streaming totals so the match count segment renders. + app.all_records.extend(_seed_records(agentgrep, tmp_path, 10)) + # No cursor yet — match count + percent. + assert app._format_results_right(cursor=None, visible=10, percent=50) == "10 matches 50%" + # Cursor at row 0 of all 10 — full triple. + assert app._format_results_right(cursor=0, visible=10, percent=0) == "10 matches 1/10 0%" + + +async def test_detail_statusline_shows_path_and_scroll_percent( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``show_detail`` populates the detail status line with path + scroll %.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + record = agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / "session.jsonl", + text="hello", + ) + async with app.run_test() as pilot: + await pilot.pause() + updates: list[str] = [] + real_update = app._detail_statusline.update + + def spy(content: t.Any = "", *args: t.Any, **kwargs: t.Any) -> None: + updates.append(str(content)) + real_update(content, *args, **kwargs) + + monkeypatch.setattr(app._detail_statusline, "update", spy) + app.show_detail(record) + await pilot.pause() + # Latest update should carry both the path's basename and a trailing ``%``. + rendered = updates[-1] if updates else "" + assert "session.jsonl" in rendered + assert rendered.rstrip().endswith("%") + + +async def test_results_scroll_changed_updates_status_right( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The app handler updates ``#status-right`` when the OptionList scrolls.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + records = _seed_records(agentgrep, tmp_path, 5) + async with app.run_test() as pilot: + await pilot.pause() + updates: list[str] = [] + real_update = app._matches_widget.update + + def spy(content: t.Any = "", *args: t.Any, **kwargs: t.Any) -> None: + updates.append(str(content)) + real_update(content, *args, **kwargs) + + monkeypatch.setattr(app._matches_widget, "update", spy) + # Pre-seed streaming records so the match count is non-zero. + app.all_records.extend(records) + app._results.append_records(records) + await pilot.pause() + # Explicitly land focus and move cursor to row 0 — the reactive + # ``highlighted`` watcher fires on change, so set it directly. + app._results.focus() + await pilot.pause() + app._results.highlighted = 0 + await pilot.pause() + # The ``highlighted`` watcher posts ``ResultsScrollChanged`` which + # the app handler renders as ``5 matches 1/5 N%``. + assert any("1/5" in u and "matches" in u for u in updates), ( + f"expected '5 matches 1/5 N%' in {updates!r}" + ) + + def test_format_compact_path_passes_short_paths_through( monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path, @@ -1524,6 +2113,42 @@ async def test_show_detail_caps_body_at_max_lines( assert body_text.plain.count("body line") == cap +def test_format_timestamp_tig_renders_iso_with_offset_in_local_tz() -> None: + """ISO inputs with explicit offsets are localized to the system timezone.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + result = agentgrep.format_timestamp_tig("2026-05-17T11:59:12+00:00") + # Shape: ``YYYY-MM-DD HH:MM ±HHMM`` (22 chars) + assert len(result) == 22 + assert result[4] == "-" and result[7] == "-" + assert result[10] == " " + assert result[13] == ":" + assert result[16] == " " + assert result[17] in {"+", "-"} + + +def test_format_timestamp_tig_renders_zulu_input() -> None: + """``Z`` suffix is treated as ``+00:00`` (Python's ``fromisoformat`` requires the swap).""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + result = agentgrep.format_timestamp_tig("2026-05-17T11:59:12Z") + assert len(result) == 22 + + +def test_format_timestamp_tig_returns_empty_string_for_missing_input() -> None: + """``None`` / empty inputs render as the empty string so callers can pad.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + assert agentgrep.format_timestamp_tig(None) == "" + assert agentgrep.format_timestamp_tig("") == "" + + +def test_format_timestamp_tig_falls_back_to_raw_on_parse_error() -> None: + """Unparseable inputs return the original string clipped to 22 chars.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + assert agentgrep.format_timestamp_tig("not-an-iso-timestamp") == "not-an-iso-timestamp" + # Long unparseable input is clipped. + long_input = "this-is-not-a-timestamp-but-it-is-too-long-anyway" + assert agentgrep.format_timestamp_tig(long_input) == long_input[:22] + + def test_find_first_match_line_returns_index_of_first_match() -> None: """Returns the line index of the first matching line; case-insensitive by default.""" agentgrep = t.cast("t.Any", load_agentgrep_module()) @@ -1561,6 +2186,116 @@ def test_highlight_matches_combines_terms() -> None: assert len(styled) == 3 # 2 alpha + 1 gamma +async def test_show_detail_memoizes_body_formatting( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Re-rendering the same record + query reuses the cached body renderable.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + json_body = '{"alpha": 1, "beta": 2, "gamma": 3}' + record = agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / "j.jsonl", + text=json_body, + ) + async with app.run_test() as pilot: + await pilot.pause() + app.show_detail(record) + await pilot.pause() + # Replace json.loads so a real cache miss would explode loudly. + load_calls = 0 + real_loads = json.loads + + def counting_loads(*args: t.Any, **kwargs: t.Any) -> t.Any: + nonlocal load_calls + load_calls += 1 + return real_loads(*args, **kwargs) + + monkeypatch.setattr(json, "loads", counting_loads) + app.show_detail(record) + await pilot.pause() + assert load_calls == 0, "JSON should not be re-parsed for the same record + query" + + +async def test_show_detail_memoizes_first_match_line( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """``find_first_match_line`` is not called twice for the same record + query.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + home = tmp_path / "home" + home.mkdir(parents=True, exist_ok=True) + monkeypatch.setattr( + agentgrep, + "run_search_query", + lambda *args, **kwargs: [], + ) + query = agentgrep.SearchQuery( + terms=("needle",), + search_type="prompts", + any_term=False, + regex=False, + case_sensitive=False, + agents=("codex",), + limit=None, + ) + app = agentgrep.build_streaming_ui_app(home, query, control=agentgrep.SearchControl()) + body = "\n".join(["padding"] * 5 + ["needle here"] + ["padding"] * 5) + record = agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / "n.jsonl", + text=body, + ) + async with app.run_test() as pilot: + await pilot.pause() + app.show_detail(record) + await pilot.pause() + match_calls = 0 + real_match = agentgrep.find_first_match_line + + def counting_match(*args: t.Any, **kwargs: t.Any) -> t.Any: + nonlocal match_calls + match_calls += 1 + return real_match(*args, **kwargs) + + monkeypatch.setattr(agentgrep, "find_first_match_line", counting_match) + app.show_detail(record) + await pilot.pause() + assert match_calls == 0, "first_match_line should be cached for repeat views" + + +async def test_reset_search_chrome_invalidates_detail_caches( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Starting a new search clears any stale detail-pane caches.""" + agentgrep = t.cast("t.Any", load_agentgrep_module()) + app = _build_empty_ui_app(tmp_path, monkeypatch) + record = agentgrep.SearchRecord( + kind="prompt", + agent="codex", + store="codex.sessions", + adapter_id="codex.sessions_jsonl.v1", + path=tmp_path / "x.jsonl", + text='{"x": 1}', + ) + async with app.run_test() as pilot: + await pilot.pause() + app.show_detail(record) + await pilot.pause() + assert len(app._detail_body_cache) >= 1 + app._reset_search_chrome() + assert len(app._detail_body_cache) == 0 + assert len(app._first_match_cache) == 0 + + async def test_show_detail_scrolls_to_first_match( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch,