From b41b61193d9c4de5888c4ff9f854351cb8b46a3c Mon Sep 17 00:00:00 2001 From: madina1203 Date: Fri, 24 Apr 2026 17:42:46 +0200 Subject: [PATCH 1/2] add CLI file input support for FILE_ANALYZER tool --- app/core/main.py | 117 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 93 insertions(+), 24 deletions(-) diff --git a/app/core/main.py b/app/core/main.py index 33a163d..691928c 100644 --- a/app/core/main.py +++ b/app/core/main.py @@ -1,13 +1,17 @@ import os +import shutil import argparse -from typing import Optional +import configparser +from typing import Optional, List +from pathlib import Path + from dotenv import load_dotenv from langsmith import Client -from pathlib import Path +from langchain_community.chat_models import ChatOpenAI, ChatLiteLLM + from app.core.workflow.langraph_workflow import create_workflow, process_workflow +from app.core.session import create_user_session, initialize_session_context from app.core.utils import IntRange, setup_logger -import configparser -from langchain_community.chat_models import ChatOpenAI, ChatLiteLLM from app.core.questions import standard_questions @@ -198,52 +202,117 @@ def langsmith_setup() -> Optional[Client]: return None +def _prepare_session_files(session_id: str, file_paths: List[str]) -> Path: + """ + Copy user-supplied local files into the session's input directory so that + the FILE_ANALYZER tool can discover them at runtime. + + Args: + session_id: Active session identifier. + file_paths: List of local file paths provided via the CLI. + + Returns: + Path to the session input directory. + + Raises: + FileNotFoundError: If any of the supplied paths do not exist. + """ + input_dir = create_user_session(session_id, input_dir=True) + + for raw_path in file_paths: + src = Path(raw_path).resolve() + if not src.exists(): + raise FileNotFoundError(f"File not found: {src}") + dest = input_dir / src.name + shutil.copy2(str(src), str(dest)) + logger.info(f"Copied '{src}' -> '{dest}'") + + return input_dir + + def main(): - """Main function to run the workflow.""" - # Define command line arguments - - parser = argparse.ArgumentParser(description="Process a workflow with a predefined question number.") - parser.add_argument('-q', '--question', type=int, choices=IntRange(1, len(standard_questions)), - help=f"Choose a standard question number from 1 to {len(standard_questions)}.") - parser.add_argument('-c', '--custom', type=str, - help="Provide a custom question.") - parser.add_argument('-e', '--evaluation', action='store_true', - help="Enable evaluation mode") - parser.add_argument('--api-key', type=str, - help="OpenAI API key (optional, defaults to environment variable)") - parser.add_argument('--endpoint', type=str, - help="Knowledge graph endpoint URL (optional)") + """ + CLI entry-point for running the MetaboT workflow. + + Usage examples: + python -m app.core.main -q 1 + python -m app.core.main -c "Describe my dataset" -f data.csv + python -m app.core.main -c "Compare files" -f file1.csv file2.tsv + """ + parser = argparse.ArgumentParser( + description="Process a workflow with a predefined question number." + ) + parser.add_argument( + '-q', '--question', type=int, + choices=IntRange(1, len(standard_questions)), + help=f"Choose a standard question number from 1 to {len(standard_questions)}.", + ) + parser.add_argument( + '-c', '--custom', type=str, + help="Provide a custom question.", + ) + parser.add_argument( + '-f', '--file', type=str, nargs='+', + help="One or more local file paths to make available for the FILE_ANALYZER tool.", + ) + parser.add_argument( + '-e', '--evaluation', action='store_true', + help="Enable evaluation mode.", + ) + parser.add_argument( + '--api-key', type=str, + help="OpenAI API key (optional, defaults to environment variable).", + ) + parser.add_argument( + '--endpoint', type=str, + help="Knowledge graph endpoint URL (optional).", + ) args = parser.parse_args() + # Resolve the question if args.question: question = standard_questions[args.question - 1] elif args.custom: question = args.custom else: - print("You must provide either a standard question number or a custom question.") + print("You must provide either a standard question number (-q) or a custom question (-c).") return # Initialize LangSmith if available - langsmith_client = langsmith_setup() + langsmith_setup() - # Get endpoint URL from arguments or environment + # Resolve endpoint URL endpoint_url = ( args.endpoint or os.environ.get("KG_ENDPOINT_URL") or "https://enpkg.commons-lab.org/graphdb/repositories/ENPKG" ) + + # Initialize language models models = llm_creation() + # Create a user session (mirrors the Streamlit session lifecycle) + session_id = create_user_session() + initialize_session_context(session_id) + + # Stage user-provided files into the session's input directory + if args.file: + try: + _prepare_session_files(session_id, args.file) + except FileNotFoundError as exc: + logger.error(str(exc)) + print(f"Error: {exc}") + return + try: - # Create and process workflow workflow = create_workflow( models=models, + session_id=session_id, endpoint_url=endpoint_url, evaluation=False, - api_key=args.api_key + api_key=args.api_key, ) - process_workflow(workflow, question) except Exception as e: From 1515fec90b52ec47948bc2e624dba7562bf90f2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Louis-F=C3=A9lix=20Nothias?= Date: Sat, 25 Apr 2026 00:27:12 +0200 Subject: [PATCH 2/2] Add session file preparation error handling and corresponding tests - Introduced SessionFilePreparationError for better error management during file staging. - Enhanced _prepare_session_files function to handle various file-related errors. - Added unit tests for session file preparation, covering valid file copying and error scenarios. - Documented changes in the agentic engineering log. --- app/core/main.py | 70 ++++++++++++--- app/core/tests/test_main.py | 172 ++++++++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 10 deletions(-) create mode 100644 app/core/tests/test_main.py diff --git a/app/core/main.py b/app/core/main.py index 691928c..db2c3e1 100644 --- a/app/core/main.py +++ b/app/core/main.py @@ -36,6 +36,14 @@ } +class SessionFilePreparationError(ValueError): + """Raised when CLI input files cannot be staged into the session directory.""" + + def __init__(self, source_path: Path, message: str): + super().__init__(message) + self.source_path = source_path + + def get_api_key(provider: str) -> Optional[str]: """ Get API key for specified provider from environment variables. @@ -215,16 +223,55 @@ def _prepare_session_files(session_id: str, file_paths: List[str]) -> Path: Path to the session input directory. Raises: - FileNotFoundError: If any of the supplied paths do not exist. + SessionFilePreparationError: If any supplied path cannot be staged safely. """ input_dir = create_user_session(session_id, input_dir=True) + staged_destinations: dict[Path, Path] = {} for raw_path in file_paths: - src = Path(raw_path).resolve() + src = Path(raw_path).expanduser().resolve(strict=False) if not src.exists(): - raise FileNotFoundError(f"File not found: {src}") + raise SessionFilePreparationError(src, f"File not found: {src}") + if not src.is_file(): + raise SessionFilePreparationError(src, f"Input path is not a file: {src}") + dest = input_dir / src.name - shutil.copy2(str(src), str(dest)) + previous_src = staged_destinations.get(dest) + if previous_src is not None: + if previous_src == src: + raise SessionFilePreparationError( + src, + f"Input file was provided more than once: {src}", + ) + raise SessionFilePreparationError( + src, + ( + f"Cannot stage '{src}' because it would overwrite '{previous_src}' in " + f"the session input directory. Rename one of the files or choose a different path." + ), + ) + + if src == dest.resolve(strict=False): + raise SessionFilePreparationError( + src, + f"Input file is already staged in the session directory: {src}", + ) + + if dest.exists(): + raise SessionFilePreparationError( + src, + f"Cannot stage '{src}' because destination '{dest}' already exists.", + ) + + try: + shutil.copy2(str(src), str(dest)) + except (shutil.SameFileError, OSError) as exc: + raise SessionFilePreparationError( + src, + f"Failed to stage '{src}' into '{dest}': {exc}", + ) from exc + + staged_destinations[dest] = src logger.info(f"Copied '{src}' -> '{dest}'") return input_dir @@ -279,6 +326,13 @@ def main(): print("You must provide either a standard question number (-q) or a custom question (-c).") return + # Create a user session (mirrors the Streamlit session lifecycle) and + # reconfigure the logger so subsequent CLI logs land in the session file. + session_id = create_user_session() + initialize_session_context(session_id) + global logger + logger = setup_logger(__name__) + # Initialize LangSmith if available langsmith_setup() @@ -290,17 +344,13 @@ def main(): ) # Initialize language models - models = llm_creation() - - # Create a user session (mirrors the Streamlit session lifecycle) - session_id = create_user_session() - initialize_session_context(session_id) + models = llm_creation(api_key=args.api_key) # Stage user-provided files into the session's input directory if args.file: try: _prepare_session_files(session_id, args.file) - except FileNotFoundError as exc: + except SessionFilePreparationError as exc: logger.error(str(exc)) print(f"Error: {exc}") return diff --git a/app/core/tests/test_main.py b/app/core/tests/test_main.py new file mode 100644 index 0000000..fbdd20b --- /dev/null +++ b/app/core/tests/test_main.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +import app.core.main as main_module + + +class DummyLogger: + def __init__(self): + self.info_messages = [] + self.error_messages = [] + + def info(self, message, *args): + if args: + message = message % args + self.info_messages.append(message) + + def error(self, message, *args): + if args: + message = message % args + self.error_messages.append(message) + + +def test_prepare_session_files_copies_valid_file(tmp_path, monkeypatch): + input_dir = tmp_path / "input_files" + input_dir.mkdir() + source_file = tmp_path / "source.csv" + source_file.write_text("a,b\n1,2\n", encoding="utf-8") + dummy_logger = DummyLogger() + input_dir_path = input_dir + + monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path) + monkeypatch.setattr(main_module, "logger", dummy_logger) + + staged_dir = main_module._prepare_session_files("session-id", [str(source_file)]) + + copied_file = input_dir / source_file.name + assert staged_dir == input_dir + assert copied_file.read_text(encoding="utf-8") == source_file.read_text(encoding="utf-8") + + +def test_prepare_session_files_rejects_directory(tmp_path, monkeypatch): + input_dir = tmp_path / "input_files" + input_dir.mkdir() + source_dir = tmp_path / "source_dir" + source_dir.mkdir() + input_dir_path = input_dir + + monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path) + + try: + main_module._prepare_session_files("session-id", [str(source_dir)]) + assert False, "Expected SessionFilePreparationError" + except main_module.SessionFilePreparationError as exc: + assert "not a file" in str(exc) + + +def test_prepare_session_files_rejects_colliding_basenames(tmp_path, monkeypatch): + input_dir = tmp_path / "input_files" + input_dir.mkdir() + first_dir = tmp_path / "first" + second_dir = tmp_path / "second" + first_dir.mkdir() + second_dir.mkdir() + first_file = first_dir / "results.csv" + second_file = second_dir / "results.csv" + first_file.write_text("a,b\n1,2\n", encoding="utf-8") + second_file.write_text("a,b\n3,4\n", encoding="utf-8") + dummy_logger = DummyLogger() + input_dir_path = input_dir + + monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path) + monkeypatch.setattr(main_module, "logger", dummy_logger) + + try: + main_module._prepare_session_files("session-id", [str(first_file), str(second_file)]) + assert False, "Expected SessionFilePreparationError" + except main_module.SessionFilePreparationError as exc: + assert "would overwrite" in str(exc) + + +def test_prepare_session_files_rejects_same_file_destination(tmp_path, monkeypatch): + input_dir = tmp_path / "input_files" + input_dir.mkdir() + staged_file = input_dir / "already_staged.csv" + staged_file.write_text("a,b\n1,2\n", encoding="utf-8") + input_dir_path = input_dir + + monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path) + + try: + main_module._prepare_session_files("session-id", [str(staged_file)]) + assert False, "Expected SessionFilePreparationError" + except main_module.SessionFilePreparationError as exc: + assert "already staged" in str(exc) + + +def test_main_passes_cli_api_key_and_reconfigures_logger(monkeypatch): + original_argv = sys.argv[:] + state = {"session_initialized": False, "setup_logger_states": []} + captured = {} + + def fake_setup_logger(name): + state["setup_logger_states"].append(state["session_initialized"]) + return DummyLogger() + + def fake_initialize_session_context(session_id): + state["session_initialized"] = True + + def fake_llm_creation(api_key=None, params_file=None): + captured["llm_api_key"] = api_key + return {"llm_o": object()} + + def fake_create_workflow(models, session_id=None, endpoint_url=None, evaluation=False, api_key=None): + captured["workflow_api_key"] = api_key + captured["session_id"] = session_id + return "workflow" + + def fake_process_workflow(workflow, question): + captured["workflow"] = workflow + captured["question"] = question + + monkeypatch.setattr(sys, "argv", ["prog", "-c", "hello", "--api-key", "cli-key"]) + monkeypatch.setattr(main_module, "logger", DummyLogger()) + monkeypatch.setattr(main_module, "setup_logger", fake_setup_logger) + monkeypatch.setattr(main_module, "initialize_session_context", fake_initialize_session_context) + monkeypatch.setattr(main_module, "create_user_session", lambda session_id=None, user_session_dir=False, input_dir=False: "session-123") + monkeypatch.setattr(main_module, "langsmith_setup", lambda: None) + monkeypatch.setattr(main_module, "llm_creation", fake_llm_creation) + monkeypatch.setattr(main_module, "create_workflow", fake_create_workflow) + monkeypatch.setattr(main_module, "process_workflow", fake_process_workflow) + + try: + main_module.main() + finally: + monkeypatch.setattr(sys, "argv", original_argv) + + assert captured["llm_api_key"] == "cli-key" + assert captured["workflow_api_key"] == "cli-key" + assert state["setup_logger_states"] == [True] + assert captured["session_id"] == "session-123" + assert captured["workflow"] == "workflow" + assert captured["question"] == "hello" + + +def test_main_prints_user_friendly_error_for_bad_staged_file(monkeypatch, capsys): + original_argv = sys.argv[:] + + monkeypatch.setattr(sys, "argv", ["prog", "-c", "hello", "-f", "/missing/file.csv"]) + monkeypatch.setattr(main_module, "logger", DummyLogger()) + monkeypatch.setattr(main_module, "setup_logger", lambda name: DummyLogger()) + monkeypatch.setattr(main_module, "initialize_session_context", lambda session_id: None) + monkeypatch.setattr(main_module, "create_user_session", lambda session_id=None, user_session_dir=False, input_dir=False: "session-123") + monkeypatch.setattr(main_module, "langsmith_setup", lambda: None) + monkeypatch.setattr(main_module, "llm_creation", lambda api_key=None, params_file=None: {"llm_o": object()}) + monkeypatch.setattr( + main_module, + "_prepare_session_files", + lambda session_id, file_paths: (_ for _ in ()).throw( + main_module.SessionFilePreparationError(Path(file_paths[0]), f"File not found: {file_paths[0]}") + ), + ) + monkeypatch.setattr(main_module, "create_workflow", lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("workflow should not run"))) + + try: + main_module.main() + finally: + monkeypatch.setattr(sys, "argv", original_argv) + + captured = capsys.readouterr() + assert "Error: File not found: /missing/file.csv" in captured.out