diff --git a/src/cognitive/cli.py b/src/cognitive/cli.py index 2f2443b..68b6aba 100644 --- a/src/cognitive/cli.py +++ b/src/cognitive/cli.py @@ -36,81 +36,209 @@ def build_parser() -> argparse.ArgumentParser: return parser -async def run_shell(args: argparse.Namespace): - try: - import readline # Enable history and line editing - except ImportError: - pass - - service = MissionExecutorService(workspace_root=args.workspace_root, mode=args.mode) - print(f"JeanBot interactive shell ({args.mode} mode)") - print(f"Workspace: {args.workspace_root} ({args.workspace_id})") - print("Type 'exit' or 'quit' to end session. Type 'help' for commands.") +class InteractiveShell: + def __init__(self, args: argparse.Namespace): + self.args = args + self.service = MissionExecutorService( + workspace_root=args.workspace_root, + mode=args.mode, + on_step_update=self.on_step_update, + ) + self.mission_id: str | None = None + self.history: list[str] = [] - last_result = None - mission_id = f"shell-{uuid.uuid4().hex[:8]}" - history: list[str] = [] + def on_step_update(self, step_id: str, event: str): + color = "\033[94m" if event == "started" else "\033[92m" + reset = "\033[0m" + print(f" {color}[{event.upper()}]{reset} Step: {step_id}") - while True: + async def run(self): try: - line = input("\njeanbot> ").strip() - if not line: - continue - if line.lower() in ("exit", "quit"): - break + import readline + except ImportError: + pass + + print(f"JeanBot interactive shell ({self.args.mode} mode)") + print(f"Workspace: {self.args.workspace_root} ({self.args.workspace_id})") + print("Type 'exit' or 'quit' to end session. Type 'help' for commands.") - history.append(line) - - if line.lower() == "help": - print("Commands:") - print(" help Show this help") - print(" history Show command history") - print(" exit | quit Exit shell") - print(" Plan and execute a mission") - print(" refine Refine the last mission result with feedback") - continue - - if line.lower() == "history": - for i, cmd in enumerate(history, 1): - print(f" {i:3} {cmd}") - continue - - if line.lower().startswith("refine "): - if not last_result: - print("Nothing to refine. Run a mission first.") + while True: + try: + line = input("\njeanbot> ").strip() + if not line: continue - feedback = line[7:].strip() - objective = ( - f"Refine previous mission results based on: {feedback}\n" - f"Previous summary: {last_result.verification_summary}" - ) - title = f"Refinement: {feedback[:30]}..." - else: - objective = line - title = f"Mission: {line[:30]}..." - - payload = { - "mission_id": mission_id, - "workspace_id": args.workspace_id, - "title": title, - "objective": objective, - "mode": args.mode, - } - - print(f"Executing: {title}") - last_result = await service.execute_payload(payload) - - print(f"\nStatus: {last_result.status}") - print(f"Summary: {last_result.verification_summary}") - if last_result.artifacts: - print(f"Artifacts: {len(last_result.artifacts)}") - for artifact in last_result.artifacts: - print(f" - {artifact.title}: {artifact.path}") - - except KeyboardInterrupt: - print("\nInterrupt received, type 'exit' to quit.") - except Exception as e: - print(f"\nError: {e}") + if line.lower() in ("exit", "quit"): + break + + self.history.append(line) + await self.handle_command(line) + + except EOFError: + print("\nExiting") + break + except KeyboardInterrupt: + print("\nInterrupt received, type 'exit' to quit.") + except Exception as e: + print(f"\033[91m\nError: {e}\033[0m") + + async def handle_command(self, line: str): + cmd = line.lower() + if cmd == "help": + self.show_help() + elif cmd == "history": + self.show_history() + elif cmd == "status": + await self.show_status() + elif cmd == "plan": + await self.show_plan() + elif cmd == "artifacts": + await self.show_artifacts() + elif cmd.startswith("view "): + await self.view_artifact(line[5:].strip()) + elif cmd.startswith("refine "): + await self.refine_mission(line[7:].strip()) + else: + await self.start_mission(line) + + def show_help(self): + print("Commands:") + print(" help Show this help") + print(" history Show command history") + print(" status Show status of current or last mission") + print(" plan Show plan for current or last mission") + print(" artifacts List artifacts for current or last mission") + print(" view View content of an artifact") + print(" exit | quit Exit shell") + print(" Plan and execute a mission") + print(" refine Refine the last mission result with feedback") + + def show_history(self): + for i, cmd in enumerate(self.history, 1): + print(f" {i:3} {cmd}") + + async def show_status(self, mission_id: str | None = None): + mid = mission_id or self.mission_id or self.service.get_last_mission_id() + if not mid: + print("No mission found.") + return + summary = self.service.get_mission_run_summary(mid) + if not summary: + print(f"No summary found for mission {mid}") + return + + res = summary.get("result") or {} + status = res.get("status", "unknown") + color = "\033[92m" if status == "completed" else "\033[93m" + reset = "\033[0m" + + print(f"Mission: {summary['mission']['title']} ({mid})") + print(f"Status: {color}{status.upper()}{reset}") + if res: + print(f"Summary: {res.get('verification_summary', 'N/A')}") + metrics = res.get("metrics", {}) + print(f"Progress: {metrics.get('completed_steps', 0)}/{metrics.get('total_steps', 0)} steps") + print(f"Average Score: {metrics.get('average_score', 0.0)}") + + async def show_plan(self, mission_id: str | None = None): + mid = mission_id or self.mission_id or self.service.get_last_mission_id() + if not mid: + print("No mission found.") + return + payload = self.service.get_mission_payload(mid) + if not payload: + print(f"No payload found for mission {mid}") + return + + print(f"Plan for: {payload['title']}") + for step in payload.get("steps", []): + status = step.get("status", "pending") + color = "\033[92m" if status == "completed" else "\033[94m" if status == "running" else "" + reset = "\033[0m" + print(f" - [{color}{status.upper()}{reset}] {step['id']}: {step['title']}") + print(f" {step['description']}") + + async def show_artifacts(self, mission_id: str | None = None): + mid = mission_id or self.mission_id or self.service.get_last_mission_id() + if not mid: + print("No mission found.") + return + summary = self.service.get_mission_run_summary(mid) + if not summary or not summary.get("result"): + print(f"No results found for mission {mid}") + return + + artifacts = summary["result"].get("artifacts", []) + print(f"Artifacts for mission {mid}:") + for art in artifacts: + print(f" - {art['title']}: {art['path']}") + + async def view_artifact(self, path: str): + p = Path(path) + if not p.exists(): + # Try relative to workspace + p = Path(self.args.workspace_root) / path + + if not p.exists(): + print(f"Artifact not found: {path}") + return + + print(f"--- Content of {path} ---") + print(p.read_text(encoding="utf-8")) + print("--- End of Content ---") + + async def refine_mission(self, feedback: str): + mid = self.mission_id or self.service.get_last_mission_id() + if not mid: + print("Nothing to refine. Run a mission first.") + return + + summary = self.service.get_mission_run_summary(mid) + res = summary.get("result") if summary else None + + objective = ( + f"Refine previous mission results based on: {feedback}\n" + f"Previous summary: {res.get('verification_summary') if res else 'N/A'}" + ) + title = f"Refinement: {feedback[:30]}..." + await self.start_mission(objective, title) + + async def start_mission(self, objective: str, title: str | None = None): + self.mission_id = f"shell-{uuid.uuid4().hex[:8]}" + payload = { + "mission_id": self.mission_id, + "workspace_id": self.args.workspace_id, + "title": title or f"Mission: {objective[:30]}...", + "objective": objective, + "mode": self.args.mode, + } + + print(f"Planning mission: {payload['title']}") + bundle = await self.service.plan_mission(payload) + + print("\nProposed Plan:") + for step in bundle.record.plan.steps: + print(f" - {step.id}: {step.title}") + print(f" {step.description}") + + confirm = input("\nExecute this plan? (Y/n): ").strip().lower() + if confirm and confirm != "y": + print("Mission cancelled.") + return + + print(f"\nExecuting: {payload['title']}") + result = await self.service.execute_bundle(bundle) + + print(f"\nStatus: {result.status}") + print(f"Summary: {result.verification_summary}") + if result.artifacts: + print(f"Artifacts: {len(result.artifacts)}") + for artifact in result.artifacts: + print(f" - {artifact.title}: {artifact.path}") + + +async def run_shell(args: argparse.Namespace): + shell = InteractiveShell(args) + await shell.run() async def run_command(args: argparse.Namespace) -> dict: diff --git a/src/cognitive/executor.py b/src/cognitive/executor.py index 1343686..8a91ecf 100644 --- a/src/cognitive/executor.py +++ b/src/cognitive/executor.py @@ -5,7 +5,7 @@ import uuid from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any, Awaitable, Protocol +from typing import Any, Awaitable, Callable, Protocol def sleep_ms(ms: int) -> Awaitable[None]: @@ -467,6 +467,7 @@ def __init__( sub_agent_service: SubAgentService, file_service: FileService, policy_service: PolicyService, + on_step_update: Callable[[str, str], Any] | None = None, ): self.runtime = runtime self.memory_service = memory_service @@ -474,6 +475,7 @@ def __init__( self.sub_agent_service = sub_agent_service self.file_service = file_service self.policy_service = policy_service + self.on_step_update = on_step_update self.intelligence = MissionExecutionIntelligence() self.replanner = AdaptiveReplanner() @@ -869,7 +871,9 @@ async def _execute_step( ) -> StepOutcome: step_started_at = utc_now_iso() step.status = "running" - + if self.on_step_update: + self.on_step_update(step.id, "started") + await self.audit_service.record( "mission.step.started", step.id, @@ -915,7 +919,9 @@ async def _execute_step( ) step.status = "completed" - + if self.on_step_update: + self.on_step_update(step.id, "completed") + report = StepExecutionRecord( step_id=sub_agent_result.step_report.step_id, started_at=step_started_at, diff --git a/src/cognitive/service.py b/src/cognitive/service.py index d6ff93d..5e2ef27 100644 --- a/src/cognitive/service.py +++ b/src/cognitive/service.py @@ -4,7 +4,7 @@ import uuid from dataclasses import asdict, dataclass, field from pathlib import Path -from typing import Any +from typing import Any, Callable from .adapters import ( DeterministicRuntimeService, @@ -56,6 +56,7 @@ class MissionExecutorService: capability_risk: dict[str, str] = field(default_factory=dict) failure_policy: dict[str, int] = field(default_factory=dict) mode: str = "local" + on_step_update: Callable[[str, str], Any] | None = None def build_bundle(self, mission_payload: dict[str, Any]) -> MissionExecutionBundle: workspace_id = mission_payload.get("workspace_id") or mission_payload.get("workspaceId") @@ -112,6 +113,7 @@ def build_bundle(self, mission_payload: dict[str, Any]) -> MissionExecutionBundl sub_agent_service=subagent_service, file_service=file_service, policy_service=policy_service, + on_step_update=self.on_step_update, ) return MissionExecutionBundle( record=record, @@ -125,12 +127,20 @@ def build_bundle(self, mission_payload: dict[str, Any]) -> MissionExecutionBundl policy_service=policy_service, ) - async def execute_payload(self, mission_payload: dict[str, Any]) -> MissionRunResult: + async def plan_mission(self, mission_payload: dict[str, Any]) -> MissionExecutionBundle: bundle = self.build_bundle(mission_payload) + self._persist_run_summary(bundle, None) + return bundle + + async def execute_bundle(self, bundle: MissionExecutionBundle) -> MissionRunResult: result = await bundle.executor.execute(bundle.record, bundle.context) self._persist_run_summary(bundle, result) return result + async def execute_payload(self, mission_payload: dict[str, Any]) -> MissionRunResult: + bundle = self.build_bundle(mission_payload) + return await self.execute_bundle(bundle) + async def finalize_distributed_payload(self, mission_payload: dict[str, Any]) -> MissionRunResult: bundle = self.build_bundle(mission_payload) bundle.record.active_execution = self._build_active_execution(mission_payload) @@ -138,6 +148,37 @@ async def finalize_distributed_payload(self, mission_payload: dict[str, Any]) -> self._persist_run_summary(bundle, result) return result + def get_mission_run_summary(self, mission_id: str) -> dict[str, Any] | None: + path = self._mission_dir(mission_id) / "mission-run.json" + if not path.exists(): + return None + return json.loads(path.read_text(encoding="utf-8")) + + def get_mission_payload(self, mission_id: str) -> dict[str, Any] | None: + path = self._mission_dir(mission_id) / "mission-payload.json" + if not path.exists(): + return None + return json.loads(path.read_text(encoding="utf-8")) + + def get_mission_state(self, mission_id: str) -> dict[str, Any] | None: + path = Path(self.workspace_root) / ".jeanbot" / "state" / f"mission-{mission_id}.json" + if not path.exists(): + return None + return json.loads(path.read_text(encoding="utf-8")) + + def list_missions(self) -> list[str]: + base = Path(self.workspace_root) / ".jeanbot" / "missions" + if not base.exists(): + return [] + # Sort by modification time, most recent first + dirs = [d for d in base.iterdir() if d.is_dir()] + dirs.sort(key=lambda d: d.stat().st_mtime, reverse=True) + return [d.name for d in dirs] + + def get_last_mission_id(self) -> str | None: + missions = self.list_missions() + return missions[0] if missions else None + def load_payload(self, path: str) -> dict[str, Any]: return json.loads(Path(path).read_text(encoding="utf-8")) @@ -182,13 +223,13 @@ def write_payload_template(self, path: str) -> str: def _persist_run_summary( self, bundle: MissionExecutionBundle, - result: MissionRunResult, + result: MissionRunResult | None, ) -> None: mission_dir = self._mission_dir(bundle.record.objective.id) summary = { "mission": asdict(bundle.record.objective), "plan_version": bundle.record.plan_version, - "result": self._result_to_dict(result), + "result": self._result_to_dict(result) if result else None, "audit_summary": bundle.audit_service.summarize(), "memory_summary": bundle.memory_service.summarize(bundle.record.objective.workspace_id), "artifact_paths": bundle.file_service.artifact_paths(bundle.record.objective.id),