From f8799095268faeb41df7fde0f3bfb12ec2216a79 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Sat, 18 Apr 2026 04:19:31 +0800 Subject: [PATCH] fix: emit task_started on fork resume, redesign checkpoint TUI Redesign checkpoint TUI with tabbed detail panel, collapsible agent rosters, keybinding actions, and human-readable timestamps. --- lib/crewai/src/crewai/cli/checkpoint_cli.py | 41 +- lib/crewai/src/crewai/cli/checkpoint_tui.py | 590 +++++++++++++------- lib/crewai/src/crewai/crew.py | 24 +- 3 files changed, 441 insertions(+), 214 deletions(-) diff --git a/lib/crewai/src/crewai/cli/checkpoint_cli.py b/lib/crewai/src/crewai/cli/checkpoint_cli.py index 9db783e0e1..5e8572c628 100644 --- a/lib/crewai/src/crewai/cli/checkpoint_cli.py +++ b/lib/crewai/src/crewai/cli/checkpoint_cli.py @@ -106,17 +106,50 @@ def _parse_checkpoint_json(raw: str, source: str) -> dict[str, Any]: "name": entity.get("name"), "id": entity.get("id"), } + + raw_agents = entity.get("agents", []) + agents_by_id: dict[str, dict[str, Any]] = {} + parsed_agents: list[dict[str, Any]] = [] + for ag in raw_agents: + agent_info: dict[str, Any] = { + "id": ag.get("id", ""), + "role": ag.get("role", ""), + "goal": ag.get("goal", ""), + } + parsed_agents.append(agent_info) + if ag.get("id"): + agents_by_id[str(ag["id"])] = agent_info + if parsed_agents: + info["agents"] = parsed_agents + if tasks: info["tasks_completed"] = completed info["tasks_total"] = len(tasks) - info["tasks"] = [ - { + parsed_tasks: list[dict[str, Any]] = [] + for t in tasks: + task_info: dict[str, Any] = { "description": t.get("description", ""), "completed": t.get("output") is not None, "output": (t.get("output") or {}).get("raw", ""), } - for t in tasks - ] + task_agent = t.get("agent") + if isinstance(task_agent, dict): + task_info["agent_role"] = task_agent.get("role", "") + task_info["agent_id"] = task_agent.get("id", "") + elif isinstance(task_agent, str) and task_agent in agents_by_id: + task_info["agent_role"] = agents_by_id[task_agent].get("role", "") + task_info["agent_id"] = task_agent + parsed_tasks.append(task_info) + info["tasks"] = parsed_tasks + + if entity.get("entity_type") == "flow": + completed_methods = entity.get("checkpoint_completed_methods") + if completed_methods: + info["completed_methods"] = sorted(completed_methods) + state = entity.get("checkpoint_state") + if isinstance(state, dict): + info["flow_state"] = state + parsed_entities.append(info) inputs: dict[str, Any] = {} diff --git a/lib/crewai/src/crewai/cli/checkpoint_tui.py b/lib/crewai/src/crewai/cli/checkpoint_tui.py index 26791af237..0f945dd317 100644 --- a/lib/crewai/src/crewai/cli/checkpoint_tui.py +++ b/lib/crewai/src/crewai/cli/checkpoint_tui.py @@ -3,17 +3,20 @@ from __future__ import annotations from collections import defaultdict +from datetime import datetime from typing import Any, ClassVar, Literal from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Horizontal, Vertical, VerticalScroll from textual.widgets import ( - Button, + Collapsible, Footer, Header, Input, Static, + TabPane, + TabbedContent, TextArea, Tree, ) @@ -32,6 +35,22 @@ _DIM = "#888888" _BG_DARK = "#0d1117" _BG_PANEL = "#161b22" +_ACCENT = "#c9a227" +_SUCCESS = "#3fb950" +_PENDING = "#e3b341" + +_ENTITY_ICONS: dict[str, str] = { + "flow": "◆", + "crew": "●", + "agent": "◈", + "unknown": "○", +} +_ENTITY_COLORS: dict[str, str] = { + "flow": _ACCENT, + "crew": _SECONDARY, + "agent": _PRIMARY, + "unknown": _DIM, +} def _load_entries(location: str) -> list[dict[str, Any]]: @@ -40,8 +59,27 @@ def _load_entries(location: str) -> list[dict[str, Any]]: return _list_json(location) +def _human_ts(ts: str) -> str: + """Turn '2026-04-17 17:05:00' into a short relative label.""" + try: + dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") + except ValueError: + return ts + now = datetime.now() + delta = now.date() - dt.date() + hour = dt.hour % 12 or 12 + ampm = "am" if dt.hour < 12 else "pm" + time_str = f"{hour}:{dt.minute:02d}{ampm}" + if delta.days == 0: + return time_str + if delta.days == 1: + return f"yest {time_str}" + if delta.days < 7: + return f"{dt.strftime('%a').lower()} {time_str}" + return f"{dt.strftime('%b')} {dt.day}" + + def _short_id(name: str) -> str: - """Shorten a checkpoint name for tree display.""" if len(name) > 30: return name[:27] + "..." return name @@ -63,22 +101,22 @@ def _entry_id(entry: dict[str, Any]) -> str: return name -def _build_entity_header(ent: dict[str, Any]) -> str: - """Build rich text header for an entity (progress bar only).""" - lines: list[str] = [] - tasks = ent.get("tasks") - if isinstance(tasks, list): - completed = ent.get("tasks_completed", 0) - total = ent.get("tasks_total", 0) - pct = int(completed / total * 100) if total else 0 - bar_len = 20 - filled = int(bar_len * completed / total) if total else 0 - bar = f"[{_PRIMARY}]{'█' * filled}[/][{_DIM}]{'░' * (bar_len - filled)}[/]" - lines.append(f"{bar} {completed}/{total} tasks ({pct}%)") - return "\n".join(lines) +def _build_progress_bar(completed: int, total: int, width: int = 20) -> str: + if total == 0: + return f"[{_DIM}]{'░' * width}[/] 0/0" + pct = int(completed / total * 100) + filled = int(width * completed / total) + color = _SUCCESS if completed == total else _PRIMARY + bar = f"[{color}]{'█' * filled}[/][{_DIM}]{'░' * (width - filled)}[/]" + return f"{bar} {completed}/{total} ({pct}%)" + + +def _entity_icon(etype: str) -> str: + icon = _ENTITY_ICONS.get(etype, _ENTITY_ICONS["unknown"]) + color = _ENTITY_COLORS.get(etype, _DIM) + return f"[{color}]{icon}[/]" -# Return type: (location, action, inputs, task_output_overrides, entity_type) _TuiResult = ( tuple[ str, @@ -122,7 +160,7 @@ class CheckpointTUI(App[_TuiResult]): height: 1fr; }} #tree-panel {{ - width: 45%; + width: 40%; background: {_BG_PANEL}; border: round {_SECONDARY}; padding: 0 1; @@ -132,104 +170,98 @@ class CheckpointTUI(App[_TuiResult]): border: round {_PRIMARY}; }} #detail-container {{ - width: 55%; + width: 60%; height: 1fr; }} - #detail-scroll {{ - height: 1fr; - background: {_BG_PANEL}; - border: round {_SECONDARY}; - padding: 1 2; - scrollbar-color: {_PRIMARY}; - }} - #detail-scroll:focus-within {{ - border: round {_PRIMARY}; - }} - #detail-header {{ - margin-bottom: 1; - }} #status {{ height: 1; padding: 0 2; color: {_DIM}; }} - #inputs-section {{ - display: none; - height: auto; - max-height: 8; - padding: 0 1; - }} - #inputs-section.visible {{ - display: block; - }} - #inputs-label {{ - height: 1; - color: {_DIM}; - padding: 0 1; + #detail-tabs {{ + height: 1fr; }} - .input-row {{ - height: 3; - padding: 0 1; + TabbedContent > ContentSwitcher {{ + background: {_BG_PANEL}; + height: 1fr; }} - .input-row Static {{ - width: auto; - min-width: 12; - padding: 1 1 0 0; - color: {_TERTIARY}; + TabPane {{ + padding: 0; }} - .input-row Input {{ - width: 1fr; + Tabs {{ + background: {_BG_DARK}; }} - #no-inputs-label {{ - height: 1; + Tab {{ + background: {_BG_DARK}; color: {_DIM}; - padding: 0 1; - }} - #action-buttons {{ - height: 3; - align: right middle; - padding: 0 1; - display: none; - }} - #action-buttons.visible {{ - display: block; + padding: 0 2; }} - #action-buttons Button {{ - margin: 0 0 0 1; - min-width: 10; + Tab.-active {{ + background: {_BG_PANEL}; + color: {_PRIMARY}; }} - #btn-resume {{ - background: {_SECONDARY}; + Tab:hover {{ color: {_TERTIARY}; }} - #btn-resume:hover {{ - background: {_PRIMARY}; + Underline > .underline--bar {{ + color: {_SECONDARY}; + background: {_BG_DARK}; }} - #btn-fork {{ - background: {_PRIMARY}; - color: {_TERTIARY}; + .tab-scroll {{ + background: {_BG_PANEL}; + height: 1fr; + padding: 1 2; + scrollbar-color: {_PRIMARY}; }} - #btn-fork:hover {{ - background: {_SECONDARY}; + .section-header {{ + padding: 0 0 0 1; + margin: 1 0 0 0; }} - .entity-title {{ - padding: 1 1 0 1; + .detail-line {{ + padding: 0 0 0 1; }} - .entity-detail {{ + .task-label {{ padding: 0 1; }} .task-output-editor {{ height: auto; max-height: 10; - margin: 0 1 1 1; + margin: 0 1 1 3; border: round {_DIM}; }} .task-output-editor:focus {{ border: round {_PRIMARY}; }} - .task-label {{ + Collapsible {{ + background: {_BG_PANEL}; + padding: 0; + margin: 0 0 1 1; + }} + CollapsibleTitle {{ + background: {_BG_DARK}; + color: {_TERTIARY}; + padding: 0 1; + }} + CollapsibleTitle:hover {{ + background: {_SECONDARY}; + }} + .input-row {{ + height: 3; padding: 0 1; }} + .input-row Static {{ + width: auto; + min-width: 12; + padding: 1 1 0 0; + color: {_TERTIARY}; + }} + .input-row Input {{ + width: 1fr; + }} + .empty-state {{ + color: {_DIM}; + padding: 1; + }} Tree {{ background: {_BG_PANEL}; }} @@ -242,6 +274,8 @@ class CheckpointTUI(App[_TuiResult]): BINDINGS: ClassVar[list[Binding | tuple[str, str] | tuple[str, str, str]]] = [ ("q", "quit", "Quit"), ("r", "refresh", "Refresh"), + ("e", "resume", "Resume"), + ("f", "fork", "Fork"), ] def __init__(self, location: str = "./.checkpoints") -> None: @@ -256,27 +290,49 @@ def compose(self) -> ComposeResult: yield Header(show_clock=False) with Horizontal(id="main-layout"): tree: Tree[dict[str, Any]] = Tree("Checkpoints", id="tree-panel") - tree.show_root = True + tree.show_root = False tree.guide_depth = 3 yield tree with Vertical(id="detail-container"): yield Static("", id="status") - with VerticalScroll(id="detail-scroll"): - yield Static( - f"[{_DIM}]Select a checkpoint from the tree[/]", # noqa: S608 - id="detail-header", - ) - with Vertical(id="inputs-section"): - yield Static("Inputs", id="inputs-label") - with Horizontal(id="action-buttons"): - yield Button("Resume", id="btn-resume") - yield Button("Fork", id="btn-fork") + with TabbedContent(id="detail-tabs"): + with TabPane("Overview", id="tab-overview"): + with VerticalScroll(classes="tab-scroll"): + yield Static( + f"[{_DIM}]Select a checkpoint from the tree[/]", # noqa: S608 + id="overview-empty", + ) + with TabPane("Tasks", id="tab-tasks"): + with VerticalScroll(classes="tab-scroll"): + yield Static( + f"[{_DIM}]Select a checkpoint to view tasks[/]", + id="tasks-empty", + ) + with TabPane("Inputs", id="tab-inputs"): + with VerticalScroll(classes="tab-scroll"): + yield Static( + f"[{_DIM}]Select a checkpoint to view inputs[/]", + id="inputs-empty", + ) yield Footer() async def on_mount(self) -> None: self._refresh_tree() self.query_one("#tree-panel", Tree).root.expand() + # ── Tree building ────────────────────────────────────────────── + + @staticmethod + def _top_level_entity(entry: dict[str, Any]) -> tuple[str, str]: + etype, ename = "unknown", "" + for ent in entry.get("entities", []): + t = ent.get("type", "unknown") + if t == "flow": + return "flow", ent.get("name") or "" + if t == "crew" and etype != "crew": + etype, ename = "crew", ent.get("name") or "" + return etype, ename + def _refresh_tree(self) -> None: self._entries = _load_entries(self._location) self._selected_entry = None @@ -285,45 +341,57 @@ def _refresh_tree(self) -> None: tree.clear() if not self._entries: - self.query_one("#detail-header", Static).update( - f"[{_DIM}]No checkpoints in {self._location}[/]" - ) - self.query_one("#status", Static).update("") self.sub_title = self._location + self.query_one("#status", Static).update("") return - # Group by branch - branches: dict[str, list[dict[str, Any]]] = defaultdict(list) + grouped: dict[tuple[str, str], dict[str, list[dict[str, Any]]]] = defaultdict( + lambda: defaultdict(list) + ) for entry in self._entries: + key = self._top_level_entity(entry) branch = entry.get("branch", "main") - branches[branch].append(entry) - - # Index checkpoint names to tree nodes so forks can attach - node_by_name: dict[str, Any] = {} + grouped[key][branch].append(entry) def _make_label(e: dict[str, Any]) -> str: - name = e.get("name", "") ts = e.get("ts") or "" trigger = e.get("trigger") or "" - parts = [f"[bold]{_short_id(name)}[/]"] - if ts: - time_part = ts.split(" ")[-1] if " " in ts else ts + time_part = ts.split(" ")[-1] if " " in ts else ts + + total_c, total_t = 0, 0 + for ent in e.get("entities", []): + c = ent.get("tasks_completed") + t = ent.get("tasks_total") + if c is not None and t is not None: + total_c += c + total_t += t + + parts: list[str] = [] + if time_part: parts.append(f"[{_DIM}]{time_part}[/]") if trigger: parts.append(f"[{_PRIMARY}]{trigger}[/]") - return " ".join(parts) + if total_t: + display_c = total_c + if trigger == "task_started" and total_c < total_t: + display_c = total_c + 1 + color = _SUCCESS if total_c == total_t else _DIM + parts.append(f"[{color}]{display_c}/{total_t}[/]") + return " ".join(parts) if parts else _short_id(e.get("name", "")) fork_parents: set[str] = set() - for branch_name, entries in branches.items(): - if branch_name == "main" or not entries: - continue - oldest = min(entries, key=lambda e: str(e.get("name", ""))) - first_parent = oldest.get("parent_id") - if first_parent: - fork_parents.add(str(first_parent)) + for branches in grouped.values(): + for branch_name, entries in branches.items(): + if branch_name == "main" or not entries: + continue + oldest = min(entries, key=lambda e: str(e.get("name", ""))) + first_parent = oldest.get("parent_id") + if first_parent: + fork_parents.add(str(first_parent)) + + node_by_name: dict[str, Any] = {} def _add_checkpoint(parent_node: Any, e: dict[str, Any]) -> None: - """Add a checkpoint node — expandable only if a fork attaches to it.""" cp_id = _entry_id(e) if cp_id in fork_parents: node = parent_node.add( @@ -333,67 +401,97 @@ def _add_checkpoint(parent_node: Any, e: dict[str, Any]) -> None: node = parent_node.add_leaf(_make_label(e), data=e) node_by_name[cp_id] = node - if "main" in branches: - for entry in reversed(branches["main"]): - _add_checkpoint(tree.root, entry) + type_order = {"flow": 0, "crew": 1} + sorted_keys = sorted( + grouped.keys(), key=lambda k: (type_order.get(k[0], 9), k[1]) + ) + + for etype, ename in sorted_keys: + branches = grouped[(etype, ename)] + icon = _entity_icon(etype) + color = _ENTITY_COLORS.get(etype, _DIM) + total = sum(len(v) for v in branches.values()) + + label_parts = [f"{icon} [bold {color}]{etype.upper()}[/]"] + if ename: + label_parts.append(f"[bold]{ename}[/]") + label_parts.append(f"[{_DIM}]({total})[/]") + all_entries = [e for bl in branches.values() for e in bl] + timestamps = [str(e.get("ts", "")) for e in all_entries if e.get("ts")] + if timestamps: + latest = max(timestamps) + label_parts.append(f"[{_DIM}]{_human_ts(latest)}[/]") + entity_label = " ".join(label_parts) + entity_node = tree.root.add(entity_label, expand=True) + + if "main" in branches: + for entry in reversed(branches["main"]): + _add_checkpoint(entity_node, entry) + + fork_branches = [ + (name, sorted(entries, key=lambda e: str(e.get("name", "")))) + for name, entries in branches.items() + if name != "main" + ] + remaining = fork_branches + max_passes = len(remaining) + 1 + while remaining and max_passes > 0: + max_passes -= 1 + deferred = [] + made_progress = False + for branch_name, entries in remaining: + first_parent = entries[0].get("parent_id") if entries else None + if first_parent and str(first_parent) not in node_by_name: + deferred.append((branch_name, entries)) + continue + attach_to: Any = entity_node + if first_parent: + attach_to = node_by_name.get(str(first_parent), entity_node) + branch_label = ( + f"[bold {_SECONDARY}]{branch_name}[/] " + f"[{_DIM}]({len(entries)})[/]" + ) + branch_node = attach_to.add(branch_label, expand=False) + for entry in entries: + _add_checkpoint(branch_node, entry) + made_progress = True + remaining = deferred + if not made_progress: + break - fork_branches = [ - (name, sorted(entries, key=lambda e: str(e.get("name", "")))) - for name, entries in branches.items() - if name != "main" - ] - remaining = fork_branches - max_passes = len(remaining) + 1 - while remaining and max_passes > 0: - max_passes -= 1 - deferred = [] - made_progress = False for branch_name, entries in remaining: - first_parent = entries[0].get("parent_id") if entries else None - if first_parent and str(first_parent) not in node_by_name: - deferred.append((branch_name, entries)) - continue - attach_to: Any = tree.root - if first_parent: - attach_to = node_by_name.get(str(first_parent), tree.root) branch_label = ( - f"[bold {_SECONDARY}]{branch_name}[/] [{_DIM}]({len(entries)})[/]" + f"[bold {_SECONDARY}]{branch_name}[/] " + f"[{_DIM}]({len(entries)})[/] [{_DIM}](orphaned)[/]" ) - branch_node = attach_to.add(branch_label, expand=False) + branch_node = entity_node.add(branch_label, expand=False) for entry in entries: _add_checkpoint(branch_node, entry) - made_progress = True - remaining = deferred - if not made_progress: - break - - for branch_name, entries in remaining: - branch_label = ( - f"[bold {_SECONDARY}]{branch_name}[/] " - f"[{_DIM}]({len(entries)})[/] [{_DIM}](orphaned)[/]" - ) - branch_node = tree.root.add(branch_label, expand=False) - for entry in entries: - _add_checkpoint(branch_node, entry) count = len(self._entries) storage = "SQLite" if _is_sqlite(self._location) else "JSON" self.sub_title = self._location self.query_one("#status", Static).update(f" {count} checkpoint(s) | {storage}") + # ── Detail panel ─────────────────────────────────────────────── + + async def _clear_scroll(self, tab_id: str) -> VerticalScroll: + tab = self.query_one(f"#{tab_id}", TabPane) + scroll = tab.query_one(VerticalScroll) + for child in list(scroll.children): + await child.remove() + return scroll + async def _show_detail(self, entry: dict[str, Any]) -> None: - """Update the detail panel for a checkpoint entry.""" self._selected_entry = entry - self.query_one("#action-buttons").add_class("visible") - detail_scroll = self.query_one("#detail-scroll", VerticalScroll) + await self._render_overview(entry) + await self._render_tasks(entry) + await self._render_inputs(entry.get("inputs", {})) - # Remove all dynamic children except the header — await so IDs are freed - to_remove = [c for c in detail_scroll.children if c.id != "detail-header"] - for child in to_remove: - await child.remove() + async def _render_overview(self, entry: dict[str, Any]) -> None: + scroll = await self._clear_scroll("tab-overview") - # Header name = entry.get("name", "") ts = entry.get("ts") or "unknown" trigger = entry.get("trigger") or "" @@ -414,42 +512,115 @@ async def _show_detail(self, entry: dict[str, Any]) -> None: header_lines.append(f" [bold]Branch[/] [{_SECONDARY}]{branch}[/]") if parent_id: header_lines.append(f" [bold]Parent[/] [{_DIM}]{parent_id}[/]") - if "path" in entry: - header_lines.append(f" [bold]Path[/] [{_DIM}]{entry['path']}[/]") - if "db" in entry: - header_lines.append(f" [bold]Database[/] [{_DIM}]{entry['db']}[/]") - self.query_one("#detail-header", Static).update("\n".join(header_lines)) + await scroll.mount(Static("\n".join(header_lines))) + + for ent in entry.get("entities", []): + etype = ent.get("type", "unknown") + ename = ent.get("name", "unnamed") + icon = _entity_icon(etype) + color = _ENTITY_COLORS.get(etype, _DIM) + + eid = str(ent.get("id", ""))[:8] + entity_title = ( + f"\n{icon} [bold {color}]{etype.upper()}[/] [bold]{ename}[/]" + ) + if eid: + entity_title += f" [{_DIM}]{eid}…[/]" + await scroll.mount(Static(entity_title, classes="section-header")) + await scroll.mount(Static(f"[{_DIM}]{'─' * 46}[/]", classes="detail-line")) + + if etype == "flow": + methods = ent.get("completed_methods", []) + if methods: + method_list = ", ".join(f"[{_SUCCESS}]{m}[/]" for m in methods) + await scroll.mount( + Static( + f" [bold]Methods[/] {method_list}", + classes="detail-line", + ) + ) + flow_state = ent.get("flow_state") + if isinstance(flow_state, dict) and flow_state: + state_parts: list[str] = [] + for k, v in list(flow_state.items())[:5]: + sv = str(v) + if len(sv) > 40: + sv = sv[:37] + "..." + state_parts.append(f"[{_DIM}]{k}[/]={sv}") + await scroll.mount( + Static( + f" [bold]State[/] {', '.join(state_parts)}", + classes="detail-line", + ) + ) + + agents = ent.get("agents", []) + if agents: + agent_lines: list[Static] = [] + for ag in agents: + role = ag.get("role", "unnamed") + goal = ag.get("goal", "") + if len(goal) > 60: + goal = goal[:57] + "..." + agent_line = f" {_entity_icon('agent')} [bold]{role}[/]" + if goal: + agent_line += f"\n [{_DIM}]{goal}[/]" + agent_lines.append(Static(agent_line)) + + collapsible = Collapsible( + *agent_lines, + title=f"Agents ({len(agents)})", + collapsed=len(agents) > 3, + ) + await scroll.mount(collapsible) + + async def _render_tasks(self, entry: dict[str, Any]) -> None: + scroll = await self._clear_scroll("tab-tasks") - # Entity details and editable task outputs — mounted flat for scrolling self._task_output_ids = [] flat_task_idx = 0 + has_tasks = False + for ent_idx, ent in enumerate(entry.get("entities", [])): etype = ent.get("type", "unknown") ename = ent.get("name", "unnamed") - completed = ent.get("tasks_completed") - total = ent.get("tasks_total") - entity_title = f"[bold {_SECONDARY}]{etype}: {ename}[/]" - if completed is not None and total is not None: - entity_title += f" [{_DIM}]{completed}/{total} tasks[/]" - await detail_scroll.mount(Static(entity_title, classes="entity-title")) - await detail_scroll.mount( - Static(_build_entity_header(ent), classes="entity-detail") - ) + icon = _entity_icon(etype) + color = _ENTITY_COLORS.get(etype, _DIM) tasks = ent.get("tasks", []) + if not tasks: + continue + has_tasks = True + + completed = ent.get("tasks_completed", 0) + total = ent.get("tasks_total", 0) + + await scroll.mount( + Static( + f"{icon} [bold {color}]{ename}[/] " + f"{_build_progress_bar(completed, total, width=16)}", + classes="section-header", + ) + ) + for i, task in enumerate(tasks): desc = str(task.get("description", "")) - if len(desc) > 55: - desc = desc[:52] + "..." + if len(desc) > 50: + desc = desc[:47] + "..." + agent_role = task.get("agent_role", "") + if task.get("completed"): - icon = "[green]✓[/]" - await detail_scroll.mount( - Static(f" {icon} {i + 1}. {desc}", classes="task-label") - ) + status_icon = f"[{_SUCCESS}]✓[/]" + task_line = f" {status_icon} {i + 1}. {desc}" + if agent_role: + task_line += ( + f" [{_DIM}]→ {_entity_icon('agent')} {agent_role}[/]" + ) + await scroll.mount(Static(task_line, classes="task-label")) output_text = task.get("output", "") editor_id = f"task-output-{ent_idx}-{i}" - await detail_scroll.mount( + await scroll.mount( TextArea( str(output_text), classes="task-output-editor", @@ -460,28 +631,25 @@ async def _show_detail(self, entry: dict[str, Any]) -> None: (flat_task_idx, editor_id, str(output_text)) ) else: - icon = "[yellow]○[/]" - await detail_scroll.mount( - Static(f" {icon} {i + 1}. {desc}", classes="task-label") - ) + status_icon = f"[{_PENDING}]○[/]" + task_line = f" {status_icon} {i + 1}. {desc}" + if agent_role: + task_line += ( + f" [{_DIM}]→ {_entity_icon('agent')} {agent_role}[/]" + ) + await scroll.mount(Static(task_line, classes="task-label")) flat_task_idx += 1 - # Build input fields - await self._build_input_fields(entry.get("inputs", {})) + if not has_tasks: + await scroll.mount(Static(f"[{_DIM}]No tasks[/]", classes="empty-state")) - async def _build_input_fields(self, inputs: dict[str, Any]) -> None: - """Rebuild the inputs section with one field per input key.""" - section = self.query_one("#inputs-section") - - # Remove old dynamic children — await so IDs are freed - for widget in list(section.query(".input-row, .no-inputs")): - await widget.remove() + async def _render_inputs(self, inputs: dict[str, Any]) -> None: + scroll = await self._clear_scroll("tab-inputs") self._input_keys = [] if not inputs: - await section.mount(Static(f"[{_DIM}]No inputs[/]", classes="no-inputs")) - section.add_class("visible") + await scroll.mount(Static(f"[{_DIM}]No inputs[/]", classes="empty-state")) return for key, value in inputs.items(): @@ -491,12 +659,11 @@ async def _build_input_fields(self, inputs: dict[str, Any]) -> None: row.compose_add_child( Input(value=str(value), placeholder=key, id=f"input-{key}") ) - await section.mount(row) + await scroll.mount(row) - section.add_class("visible") + # ── Data collection ──────────────────────────────────────────── def _collect_inputs(self) -> dict[str, Any] | None: - """Collect current values from input fields.""" if not self._input_keys: return None result: dict[str, Any] = {} @@ -506,7 +673,6 @@ def _collect_inputs(self) -> dict[str, Any] | None: return result def _collect_task_overrides(self) -> dict[int, str] | None: - """Collect edited task outputs. Returns only changed values.""" if not self._task_output_ids or self._selected_entry is None: return None overrides: dict[int, str] = {} @@ -517,37 +683,43 @@ def _collect_task_overrides(self) -> dict[int, str] | None: return overrides or None def _detect_entity_type(self, entry: dict[str, Any]) -> Literal["crew", "flow"]: - """Infer the top-level entity type from checkpoint entities.""" for ent in entry.get("entities", []): if ent.get("type") == "flow": return "flow" return "crew" def _resolve_location(self, entry: dict[str, Any]) -> str: - """Get the restore location string for a checkpoint entry.""" if "path" in entry: return str(entry["path"]) if _is_sqlite(self._location): return f"{self._location}#{entry['name']}" return str(entry.get("name", "")) + # ── Events ───────────────────────────────────────────────────── + async def on_tree_node_highlighted( self, event: Tree.NodeHighlighted[dict[str, Any]] ) -> None: if event.node.data is not None: await self._show_detail(event.node.data) - def on_button_pressed(self, event: Button.Pressed) -> None: + def _exit_with_action(self, action: str) -> None: if self._selected_entry is None: + self.notify("No checkpoint selected", severity="warning") return inputs = self._collect_inputs() overrides = self._collect_task_overrides() loc = self._resolve_location(self._selected_entry) etype = self._detect_entity_type(self._selected_entry) - if event.button.id == "btn-resume": - self.exit((loc, "resume", inputs, overrides, etype)) - elif event.button.id == "btn-fork": - self.exit((loc, "fork", inputs, overrides, etype)) + name = self._selected_entry.get("name", "")[:30] + self.notify(f"{action.title()}: {name}") + self.exit((loc, action, inputs, overrides, etype)) + + def action_resume(self) -> None: + self._exit_with_action("resume") + + def action_fork(self) -> None: + self._exit_with_action("fork") def action_refresh(self) -> None: self._refresh_tree() diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index de9a8f73d4..7631a4c2be 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -419,10 +419,32 @@ def fork( def _restore_runtime(self) -> None: """Re-create runtime objects after restoring from a checkpoint.""" + from crewai.events.event_bus import crewai_event_bus + + started_task_ids: set[str] = set() + state = crewai_event_bus._runtime_state + if state is not None: + for node in state.event_record.nodes.values(): + if node.event.type == "task_started" and node.event.task_id: + started_task_ids.add(node.event.task_id) + + resuming_task_agent_roles: set[str] = set() + for task in self.tasks: + if ( + task.output is None + and task.agent is not None + and str(task.id) in started_task_ids + ): + resuming_task_agent_roles.add(task.agent.role) + for agent in self.agents: agent.crew = self executor = agent.agent_executor - if executor and executor.messages: + if ( + executor + and executor.messages + and agent.role in resuming_task_agent_roles + ): executor.crew = self executor.agent = agent executor._resuming = True