diff --git a/src/cognitive/cli.py b/src/cognitive/cli.py index 2f2443b..a1f79e7 100644 --- a/src/cognitive/cli.py +++ b/src/cognitive/cli.py @@ -36,81 +36,212 @@ def build_parser() -> argparse.ArgumentParser: return parser -async def run_shell(args: argparse.Namespace): - try: - import readline # Enable history and line editing - except ImportError: - pass +def colorize(text: str, color_code: str) -> str: + return f"\033[{color_code}m{text}\033[0m" + - service = MissionExecutorService(workspace_root=args.workspace_root, mode=args.mode) - print(f"JeanBot interactive shell ({args.mode} mode)") - print(f"Workspace: {args.workspace_root} ({args.workspace_id})") - print("Type 'exit' or 'quit' to end session. Type 'help' for commands.") +class InteractiveShell: + def __init__(self, workspace_root: str, workspace_id: str, mode: str): + self.workspace_root = workspace_root + self.workspace_id = workspace_id + self.mode = mode + self.service = MissionExecutorService( + workspace_root=workspace_root, + mode=mode, + on_step_update=self._on_step_update, + ) + self.last_result = None + self.history: list[str] = [] + self.mission_id = f"shell-{uuid.uuid4().hex[:8]}" - last_result = None - mission_id = f"shell-{uuid.uuid4().hex[:8]}" - history: list[str] = [] + def _on_step_update(self, step_id: str, status: str): + color = "34" if status == "running" else "32" + print(f" [{colorize(status.upper(), color)}] {step_id}") - while True: + async def run(self): try: - line = input("\njeanbot> ").strip() - if not line: - continue - if line.lower() in ("exit", "quit"): - break - - history.append(line) - - if line.lower() == "help": - print("Commands:") - print(" help Show this help") - print(" history Show command history") - print(" exit | quit Exit shell") - print(" Plan and execute a mission") - print(" refine Refine the last mission result with feedback") - continue - - if line.lower() == "history": - for i, cmd in enumerate(history, 1): - print(f" {i:3} {cmd}") - continue - - if line.lower().startswith("refine "): - if not last_result: - print("Nothing to refine. Run a mission first.") + import readline + except ImportError: + pass + + print(colorize(f"JeanBot interactive shell ({self.mode} mode)", "1;36")) + print(f"Workspace: {self.workspace_root} ({self.workspace_id})") + print("Type 'help' for commands.") + + while True: + try: + try: + line = input(colorize("\njeanbot> ", "1;32")).strip() + except EOFError: + print("\nExiting.") + break + + if not line: continue - feedback = line[7:].strip() - objective = ( - f"Refine previous mission results based on: {feedback}\n" - f"Previous summary: {last_result.verification_summary}" - ) - title = f"Refinement: {feedback[:30]}..." - else: - objective = line - title = f"Mission: {line[:30]}..." - - payload = { - "mission_id": mission_id, - "workspace_id": args.workspace_id, - "title": title, - "objective": objective, - "mode": args.mode, - } - - print(f"Executing: {title}") - last_result = await service.execute_payload(payload) - - print(f"\nStatus: {last_result.status}") - print(f"Summary: {last_result.verification_summary}") - if last_result.artifacts: - print(f"Artifacts: {len(last_result.artifacts)}") - for artifact in last_result.artifacts: - print(f" - {artifact.title}: {artifact.path}") - - except KeyboardInterrupt: - print("\nInterrupt received, type 'exit' to quit.") - except Exception as e: - print(f"\nError: {e}") + if line.lower() in ("exit", "quit"): + break + + self.history.append(line) + + if line.lower() == "help": + self._show_help() + continue + + if line.lower() == "history": + for i, cmd in enumerate(self.history, 1): + print(f" {i:3} {cmd}") + continue + + if line.lower() == "status": + await self._cmd_status() + continue + + if line.lower() == "plan": + self._cmd_plan() + continue + + if line.lower() == "artifacts": + self._cmd_artifacts() + continue + + if line.lower().startswith("view "): + self._cmd_view(line[5:].strip()) + continue + + if line.lower().startswith("refine "): + await self._handle_mission(line[7:].strip(), is_refinement=True) + else: + await self._handle_mission(line) + + except KeyboardInterrupt: + print("\nInterrupt received, type 'exit' to quit.") + except Exception as e: + print(colorize(f"\nError: {e}", "31")) + + def _show_help(self): + print("Commands:") + print(f" {colorize('help', '33'):18} Show this help") + print(f" {colorize('status', '33'):18} Show current/last mission status") + print(f" {colorize('plan', '33'):18} Show the current mission plan") + print(f" {colorize('artifacts', '33'):18} List produced artifacts") + print(f" {colorize('view ', '33'):18} View artifact content") + print(f" {colorize('history', '33'):18} Show command history") + print(f" {colorize('refine ', '33'):18} Refine last mission with feedback") + print(f" {colorize('exit | quit', '33'):18} Exit shell") + print(f" {colorize('', '33'):18} Plan and execute a new mission") + + async def _handle_mission(self, text: str, is_refinement: bool = False): + if is_refinement: + if not self.last_result: + print(colorize("Nothing to refine. Run a mission first.", "31")) + return + objective = ( + f"Refine previous mission results based on: {text}\n" + f"Previous summary: {self.last_result.verification_summary}" + ) + title = f"Refinement: {text[:30]}..." + else: + objective = text + title = f"Mission: {text[:30]}..." + + payload = { + "mission_id": self.mission_id, + "workspace_id": self.workspace_id, + "title": title, + "objective": objective, + "mode": self.mode, + } + + print(colorize(f"\nPlanning: {title}", "1;34")) + bundle = self.service.plan_mission(payload) + + print("\nProposed Plan:") + for step in bundle.record.plan.steps: + print(f" - [{step.capability}] {step.title}: {step.description}") + + confirm = input(colorize("\nExecute this plan? [Y/n] ", "1;33")).strip().lower() + if confirm and confirm != 'y': + print("Execution cancelled.") + return + + print(colorize("\nExecuting...", "1;34")) + self.last_result = await self.service.execute_bundle(bundle) + + print(f"\nStatus: {colorize(self.last_result.status.upper(), '1;32' if self.last_result.status == 'completed' else '1;31')}") + print(f"Summary: {self.last_result.verification_summary}") + if self.last_result.artifacts: + print(f"Artifacts: {len(self.last_result.artifacts)}") + + async def _cmd_status(self): + m_id = self.service.get_last_mission_id() + if not m_id: + print("No missions found.") + return + + state = self.service.get_mission_state(m_id) + if not state: + print(f"Could not retrieve state for mission {m_id}") + return + + res = state["result"] + print(f"\nMission: {state['mission']['title']} ({m_id})") + print(f"Status: {colorize(res['status'].upper(), '1;32' if res['status'] == 'completed' else '1;31')}") + print(f"Summary: {res['verification_summary']}") + + steps = res.get("step_reports", []) + print(f"\nSteps ({len(steps)}):") + for s in steps: + diag = s.get("diagnostics") + score = f"score={diag['overall_score']:.2f}" if diag else "" + print(f" - {s['step_id']}: {score}") + + def _cmd_plan(self): + if not self.last_result: + print("No active mission plan.") + return + # In a real shell, we might want to reload from disk or keep the last bundle + print(f"\nLast Mission: {self.mission_id}") + # Note: self.last_result doesn't have the full plan object directly in the same way bundle does + # but we can list step reports which reflect the plan + for report in self.last_result.step_reports: + print(f" - {report.step_id}: {report.summary[:100]}...") + + def _cmd_artifacts(self): + if not self.last_result or not self.last_result.artifacts: + print("No artifacts found for the last mission.") + return + + print(f"\nArtifacts ({len(self.last_result.artifacts)}):") + for a in self.last_result.artifacts: + print(f" - {colorize(a.title, '36')}: {a.path}") + + def _cmd_view(self, path: str): + p = Path(path) + if not p.exists(): + # Try relative to workspace + p = Path(self.workspace_root) / path + + if not p.exists(): + print(colorize(f"File not found: {path}", "31")) + return + + if p.is_dir(): + print(f"Contents of {path}:") + for item in p.iterdir(): + print(f" {'[DIR]' if item.is_dir() else ' '} {item.name}") + else: + print(colorize(f"\n--- {p.name} ---", "1")) + print(p.read_text(encoding="utf-8")) + print(colorize("--- End of file ---", "1")) + + +async def run_shell(args: argparse.Namespace): + shell = InteractiveShell( + workspace_root=args.workspace_root, + workspace_id=args.workspace_id, + mode=args.mode + ) + await shell.run() async def run_command(args: argparse.Namespace) -> dict: diff --git a/src/cognitive/executor.py b/src/cognitive/executor.py index 1343686..93f625a 100644 --- a/src/cognitive/executor.py +++ b/src/cognitive/executor.py @@ -467,6 +467,7 @@ def __init__( sub_agent_service: SubAgentService, file_service: FileService, policy_service: PolicyService, + on_step_update: Any | None = None, ): self.runtime = runtime self.memory_service = memory_service @@ -474,6 +475,7 @@ def __init__( self.sub_agent_service = sub_agent_service self.file_service = file_service self.policy_service = policy_service + self.on_step_update = on_step_update self.intelligence = MissionExecutionIntelligence() self.replanner = AdaptiveReplanner() @@ -869,7 +871,9 @@ async def _execute_step( ) -> StepOutcome: step_started_at = utc_now_iso() step.status = "running" - + if self.on_step_update: + self.on_step_update(step.id, "running") + await self.audit_service.record( "mission.step.started", step.id, @@ -913,9 +917,11 @@ async def _execute_step( context, policy_decision, ) - + step.status = "completed" - + if self.on_step_update: + self.on_step_update(step.id, "completed") + report = StepExecutionRecord( step_id=sub_agent_result.step_report.step_id, started_at=step_started_at, diff --git a/src/cognitive/service.py b/src/cognitive/service.py index d6ff93d..722678a 100644 --- a/src/cognitive/service.py +++ b/src/cognitive/service.py @@ -56,6 +56,7 @@ class MissionExecutorService: capability_risk: dict[str, str] = field(default_factory=dict) failure_policy: dict[str, int] = field(default_factory=dict) mode: str = "local" + on_step_update: Any | None = None def build_bundle(self, mission_payload: dict[str, Any]) -> MissionExecutionBundle: workspace_id = mission_payload.get("workspace_id") or mission_payload.get("workspaceId") @@ -112,6 +113,7 @@ def build_bundle(self, mission_payload: dict[str, Any]) -> MissionExecutionBundl sub_agent_service=subagent_service, file_service=file_service, policy_service=policy_service, + on_step_update=self.on_step_update, ) return MissionExecutionBundle( record=record, @@ -126,7 +128,13 @@ def build_bundle(self, mission_payload: dict[str, Any]) -> MissionExecutionBundl ) async def execute_payload(self, mission_payload: dict[str, Any]) -> MissionRunResult: - bundle = self.build_bundle(mission_payload) + bundle = self.plan_mission(mission_payload) + return await self.execute_bundle(bundle) + + def plan_mission(self, mission_payload: dict[str, Any]) -> MissionExecutionBundle: + return self.build_bundle(mission_payload) + + async def execute_bundle(self, bundle: MissionExecutionBundle) -> MissionRunResult: result = await bundle.executor.execute(bundle.record, bundle.context) self._persist_run_summary(bundle, result) return result @@ -138,6 +146,30 @@ async def finalize_distributed_payload(self, mission_payload: dict[str, Any]) -> self._persist_run_summary(bundle, result) return result + def get_mission_state(self, mission_id: str) -> dict[str, Any] | None: + mission_dir = self._mission_dir(mission_id) + path = mission_dir / "mission-run.json" + if not path.exists(): + return None + return json.loads(path.read_text(encoding="utf-8")) + + def list_missions(self) -> list[str]: + missions_dir = Path(self.workspace_root) / ".jeanbot" / "missions" + if not missions_dir.exists(): + return [] + return sorted([d.name for d in missions_dir.iterdir() if d.is_dir()]) + + def get_last_mission_id(self) -> str | None: + missions_dir = Path(self.workspace_root) / ".jeanbot" / "missions" + if not missions_dir.exists(): + return None + dirs = [d for d in missions_dir.iterdir() if d.is_dir()] + if not dirs: + return None + # Sort by mtime + dirs.sort(key=lambda d: d.stat().st_mtime, reverse=True) + return dirs[0].name + def load_payload(self, path: str) -> dict[str, Any]: return json.loads(Path(path).read_text(encoding="utf-8")) diff --git a/tests/python/test_service.py b/tests/python/test_service.py index bcfc4c2..6700e3b 100644 --- a/tests/python/test_service.py +++ b/tests/python/test_service.py @@ -98,6 +98,43 @@ async def test_finalize_distributed_payload_uses_active_execution(self): self.assertEqual(result.execution_mode, "distributed") self.assertEqual([report.step_id for report in result.step_reports], ["step-a", "step-b"]) + async def test_plan_and_execute_split_workflow(self): + with tempfile.TemporaryDirectory() as tmpdir: + service = MissionExecutorService(workspace_root=tmpdir) + payload = { + "workspace_id": "workspace-split", + "title": "Split Workflow Mission", + "objective": "Test planning then executing.", + "steps": [ + { + "title": "One", + "description": "Step one", + "capability": "research", + } + ], + } + + # 1. Plan + bundle = service.plan_mission(payload) + self.assertEqual(bundle.record.objective.title, "Split Workflow Mission") + self.assertEqual(len(bundle.record.plan.steps), 1) + + # 2. Execute + result = await service.execute_bundle(bundle) + self.assertEqual(result.status, "completed") + self.assertEqual(len(result.step_reports), 1) + + # 3. Helpers + last_id = service.get_last_mission_id() + self.assertEqual(last_id, bundle.record.objective.id) + + state = service.get_mission_state(last_id) + self.assertIsNotNone(state) + self.assertEqual(state["result"]["status"], "completed") + + missions = service.list_missions() + self.assertIn(last_id, missions) + class MissionExecutorCliTests(unittest.TestCase): def test_cli_write_template_and_execute(self):