diff --git a/src/vulcanai/console/console.py b/src/vulcanai/console/console.py index ca638ee..c0136c5 100644 --- a/src/vulcanai/console/console.py +++ b/src/vulcanai/console/console.py @@ -16,8 +16,12 @@ import argparse import asyncio +import json import sys import threading +from datetime import datetime +from pathlib import Path +from typing import Optional import pyperclip # To paste the clipboard into the terminal from textual import events, work @@ -33,6 +37,7 @@ from vulcanai.console.utils import SpinnerHook, StreamToTextual, attach_ros_logger_to_console, common_prefix from vulcanai.console.widget_custom_log_text_area import CustomLogTextArea from vulcanai.console.widget_spinner import SpinnerStatus +from vulcanai.core.plan_types import GlobalPlan class TextualLogSink: @@ -169,6 +174,9 @@ def __init__( # Suggestion index for RadioListModal self.suggestion_index = -1 self.suggestion_index_changed = threading.Event() + # Log creation files + now = datetime.now() + self.execution_time = now.strftime("%Y-%m-%d-%H-%M") async def on_mouse_down(self, event: MouseEvent) -> None: """ @@ -266,6 +274,8 @@ def worker() -> None: "/plan": self.cmd_plan, "/rerun": self.cmd_rerun, "/bb": self.cmd_blackboard_state, + "/save_plans": self.cmd_save_plans, + "/load_plan": self.cmd_load_plan, "/clear": self.cmd_clear, "/exit": self.cmd_quit, } @@ -288,8 +298,8 @@ def worker() -> None: self.manager.register_tools_from_file(tool_file_path) # Entry points tools - for ep in self.tools_from_entrypoints: - self.manager.register_tools_from_entry_points(ep) + if self.tools_from_entrypoints != "": + self.manager.register_tools_from_entry_points(self.tools_from_entrypoints) # Add user context self.manager.add_user_context(self.user_context) @@ -367,6 +377,153 @@ def worker(user_input: str = "") -> None: # region Utilities + def save_history_log(self, output_dir: Path | str = None) -> Optional[Path]: + """ + Save the queries history to a '.log' file. + """ + if output_dir is None: + # Default path + output_dir = Path("./saved_plans/") + else: + output_dir = Path(output_dir) + + # Create the output directory if it doesn't exist + output_dir.mkdir(parents=True, exist_ok=True) + + try: + # Create a filename with timestamp + filename = f"{self.execution_time}_history.log" + filepath = output_dir / filename + + # 186║ + # 187╗ + # 188╝ + # 200╚ + # 201╔ + # 202╩ + # 203╦ + # 204╠ + # 205═ + + # Log tittle + log_lines = [ + "╔" + "═" * 78 + "╗", + "║ VulcanAI Console Session History".ljust(79) + "║", + f"║ Execution Time: {self.execution_time}".ljust(79) + "║", + "╚" + "═" * 78 + "╝", + "", + ] + + # Add queries from manager history + if self.manager and hasattr(self.manager, "history") and self.manager.history: + log_lines.append(f"Total Queries: {len(self.manager.history)}") + log_lines.append("-" * 48) + log_lines.append("") + + for i, (user_text, plan_summary) in enumerate(self.manager.history, 1): + log_lines.append(f"Query #{i}") + log_lines.append( + f" User Input: {user_text.split(chr(10))[-1] if chr(10) in user_text else user_text}" + ) + log_lines.append(f" Plan Summary: {plan_summary}") + log_lines.append("") + else: + log_lines.append("No queries recorded.") + log_lines.append("") + + # Save to file + filepath.write_text("\n".join(log_lines), encoding="utf-8") + + self.logger.log_console(f"History log saved to: {filepath}") + return filepath + + except Exception as e: + self.logger.log_console(f"Error saving history log: {e}") + return None + + def save_plans_to_json(self, output_dir: Path | str = None) -> list[Path]: + """ + Save all plans from 'self.plans_list' to JSON files. + """ + if output_dir is None: + # Default path + output_dir = Path("./saved_plans/") + else: + output_dir = Path(output_dir) + + # Create the output directory if it doesn't exist + output_dir.mkdir(parents=True, exist_ok=True) + + saved_files = [] + + for idx, plan in enumerate(self.plans_list): + if plan is None: + self.logger.log_console(f"Skipping plan {idx}: plan is None") + continue + + try: + # Create a filename based on index and timestamp + filename = f"{self.execution_time}_plan_{idx}.json" + filepath = output_dir / filename + + # Convert plan to JSON dictionary + plan_dict = plan.model_dump() + + # Save to file + filepath.write_text(json.dumps(plan_dict, indent=2), encoding="utf-8") + + saved_files.append(filepath) + self.logger.log_console(f"Saved plan {idx} to: {filepath}") + + except Exception as e: + self.logger.log_console(f"Error saving plan {idx}: {e}") + + if saved_files: + self.logger.log_console(f"Successfully saved {len(saved_files)} plan(s)") + else: + self.logger.log_console("No plans were saved") + + return saved_files + + def load_and_execute_plan(self, plan_file: Path | str) -> Optional[dict]: + """ + Load a plan from a JSON file and execute it. + """ + try: + plan_file = Path(plan_file) + + if not plan_file.exists(): + self.logger.log_console(f"Error: Plan file not found: {plan_file}") + return None + + # Load the plan from JSON + plan_data = json.loads(plan_file.read_text(encoding="utf-8")) + + # Convert JSON to GlobalPlan object + plan = GlobalPlan.model_validate(plan_data) + + self.logger.log_console(f"Loaded plan from: {plan_file}") + self.logger.log_console("Executing plan...") + + # Execute the plan using the manager's executor and blackboard + result = self.manager.executor.run(plan, self.manager.bb) + + # Update the last blackboard state + self.last_bb = result.get("blackboard", None) + + # Log the result + bb_parsed = str(self.last_bb).replace("<", "'").replace(">", "'") + self.logger.log_console(f"Plan execution completed. Output: {bb_parsed}") + + return result + + except json.JSONDecodeError as e: + self.logger.log_console(f"Error: Invalid JSON in plan file: {e}") + return None + except Exception as e: + self.logger.log_console(f"Error executing plan: {e}") + return None + def _apply_history_to_input(self) -> None: """ Function used to apply the current history index to the input box. @@ -482,20 +639,22 @@ def cmd_help(self, _) -> None: "___________________\n" "Available commands:\n" "‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\n" - "/help - Show this help message\n" - "/tools - List available tools\n" - "/edit_tools - Edit the list of available tools\n" - "/change_k 'int' - Change the 'k' value for the top_k algorithm selection" + "/help - Show this help message\n" + "/tools - List available tools\n" + "/edit_tools - Edit the list of available tools\n" + "/change_k 'int' - Change the 'k' value for the top_k algorithm selection" " or show the current value if no 'int' is provided\n" - "/history 'int' - Change the history depth or show the current value if no" + "/history 'int' - Change the history depth or show the current value if no" " 'int' is provided\n" - "/show_history - Show the current history\n" - "/clear_history - Clear the history\n" - "/plan - Show the last generated plan\n" - "/rerun - Rerun the last plan\n" - "/bb - Show the last blackboard state\n" - "/clear - Clears the console screen\n" - "/exit - Exit the console\n" + "/show_history - Show the current history\n" + "/clear_history - Clear the history\n" + "/plan - Show the last generated plan\n" + "/rerun - Rerun the last plan\n" + "/save_plans 'dir' - Save all generated plans to JSON files (default: ./saved_plans/)\n" + "/load_plan 'path' - Load and execute a plan from a JSON file\n" + "/bb - Show the last blackboard state\n" + "/clear - Clears the console screen\n" + "/exit - Exit the console\n" "Query any other text to process it with the LLM and execute the plan generated.\n\n" "Add --image='path' to include images in the query. It can be used multiple times to add" " more images.\n" @@ -664,6 +823,55 @@ def cmd_blackboard_state(self, _) -> None: else: self.logger.log_console("No blackboard available.") + def cmd_save_plans(self, args) -> None: + """Save all generated plans to JSON files and history log.""" + output_dir = "./saved_plans/" + if len(args) > 0: + output_dir = args[0] + + # Save history log first + history_log_path = self.save_history_log(output_dir) + + if not self.plans_list: + self.logger.log_console("No plans to save") + if history_log_path: + self.logger.log_console(f"History log saved successfully to: {Path(output_dir).absolute()}") + return + + saved_files = self.save_plans_to_json(output_dir) + if saved_files: + self.logger.log_console(f"All plans saved to: {Path(output_dir).absolute()}") + else: + self.logger.log_console("No plans were saved") + + def cmd_load_plan(self, args) -> None: + self._load_plan_worker(args) # start worker (dont await) + + @work(thread=True) + async def _load_plan_worker(self, args) -> None: + """ + Worker function used to run the command "rerun". + It has to be a worker(thead=True) because the call 'self.manager.executor.run' + might have a "call_from_thread" in the tool executed, + and it is only valid in non Textual app Threads (separated Thread). + + @work runs on the app's event loop (app thread) and is for async, non-blocking code. + @work(thread=True) runs in a separate OS thread and is for blocking. + + e.g.: + 'move_turtle' tool contains a 'call_from_thread' + 'ros2_topic' tool does not contains a 'call_from_thread' + """ + + if len(args) == 0: + self.logger.log_console("Usage: /load_plan 'path/to/plan.json'") + return + + plan_file = args[0] + result = self.load_and_execute_plan(plan_file) + if result is None: + self.logger.log_console(f"Failed to load and execute plan from: {plan_file}") + def cmd_clear(self, _) -> None: self.left_pannel.clear_console()