From fe0f2bc90bdb39a3608c5a7be8b5f266289909ad Mon Sep 17 00:00:00 2001 From: gszep Date: Tue, 20 Jan 2026 15:55:02 +0900 Subject: [PATCH] feat(events): add module:load_error event for silent module failures Emit module:load_error event when providers, tools, or hooks fail to load (e.g., missing dependency). Previously these failures were silently swallowed with only a log warning, making debugging difficult. Event payload includes module_id, module_type (provider|tool|hook), and error message. Failures now appear in events.jsonl for observability while maintaining existing graceful degradation behavior. --- amplifier_core/events.py | 4 ++ amplifier_core/session.py | 138 ++++++++++++++++++++++++++++++-------- 2 files changed, 114 insertions(+), 28 deletions(-) diff --git a/amplifier_core/events.py b/amplifier_core/events.py index 89e7e6f..c78c009 100644 --- a/amplifier_core/events.py +++ b/amplifier_core/events.py @@ -52,6 +52,9 @@ APPROVAL_GRANTED = "approval:granted" APPROVAL_DENIED = "approval:denied" +# Module lifecycle +MODULE_LOAD_ERROR = "module:load_error" # Module failed to load (provider, tool, hook) + # Cancellation lifecycle CANCEL_REQUESTED = "cancel:requested" # Cancellation initiated (graceful or immediate) CANCEL_COMPLETED = "cancel:completed" # Cancellation finalized, session stopping @@ -108,4 +111,5 @@ APPROVAL_DENIED, CANCEL_REQUESTED, CANCEL_COMPLETED, + MODULE_LOAD_ERROR, ] diff --git a/amplifier_core/session.py b/amplifier_core/session.py index 9c18bc3..47cb74e 100644 --- a/amplifier_core/session.py +++ b/amplifier_core/session.py @@ -31,7 +31,6 @@ def _safe_exception_str(e: Exception) -> str: return repr(e) - class AmplifierSession: """ A single Amplifier session tying everything together. @@ -88,17 +87,25 @@ def __init__( ) # Set default fields for all events (infrastructure propagation) - self.coordinator.hooks.set_default_fields(session_id=self.session_id, parent_id=self.parent_id) + self.coordinator.hooks.set_default_fields( + session_id=self.session_id, parent_id=self.parent_id + ) # Create loader with coordinator (for resolver injection) self.loader = loader or ModuleLoader(coordinator=self.coordinator) - def _merge_configs(self, base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]: + def _merge_configs( + self, base: dict[str, Any], overlay: dict[str, Any] + ) -> dict[str, Any]: """Deep merge two config dicts.""" result = base.copy() for key, value in overlay.items(): - if key in result and isinstance(result[key], dict) and isinstance(value, dict): + if ( + key in result + and isinstance(result[key], dict) + and isinstance(value, dict) + ): result[key] = self._merge_configs(result[key], value) else: result[key] = value @@ -119,33 +126,47 @@ async def initialize(self) -> None: try: # Load orchestrator (required) # Handle both dict (ModuleConfig) and string formats - orchestrator_spec = self.config.get("session", {}).get("orchestrator", "loop-basic") + orchestrator_spec = self.config.get("session", {}).get( + "orchestrator", "loop-basic" + ) if isinstance(orchestrator_spec, dict): orchestrator_id = orchestrator_spec.get("module", "loop-basic") orchestrator_source = orchestrator_spec.get("source") orchestrator_config = orchestrator_spec.get("config", {}) else: orchestrator_id = orchestrator_spec - orchestrator_source = self.config.get("session", {}).get("orchestrator_source") - orchestrator_config = self.config.get("orchestrator", {}).get("config", {}) + orchestrator_source = self.config.get("session", {}).get( + "orchestrator_source" + ) + orchestrator_config = self.config.get("orchestrator", {}).get( + "config", {} + ) logger.info(f"Loading orchestrator: {orchestrator_id}") try: orchestrator_mount = await self.loader.load( - orchestrator_id, orchestrator_config, source_hint=orchestrator_source + orchestrator_id, + orchestrator_config, + source_hint=orchestrator_source, ) # Note: config is already embedded in orchestrator_mount by the loader cleanup = await orchestrator_mount(self.coordinator) if cleanup: self.coordinator.register_cleanup(cleanup) except Exception as e: - logger.error(f"Failed to load orchestrator '{orchestrator_id}': {_safe_exception_str(e)}") - raise RuntimeError(f"Cannot initialize without orchestrator: {_safe_exception_str(e)}") + logger.error( + f"Failed to load orchestrator '{orchestrator_id}': {_safe_exception_str(e)}" + ) + raise RuntimeError( + f"Cannot initialize without orchestrator: {_safe_exception_str(e)}" + ) # Load context manager (required) # Handle both dict (ModuleConfig) and string formats - context_spec = self.config.get("session", {}).get("context", "context-simple") + context_spec = self.config.get("session", {}).get( + "context", "context-simple" + ) if isinstance(context_spec, dict): context_id = context_spec.get("module", "context-simple") context_source = context_spec.get("source") @@ -158,13 +179,19 @@ async def initialize(self) -> None: logger.info(f"Loading context manager: {context_id}") try: - context_mount = await self.loader.load(context_id, context_config, source_hint=context_source) + context_mount = await self.loader.load( + context_id, context_config, source_hint=context_source + ) cleanup = await context_mount(self.coordinator) if cleanup: self.coordinator.register_cleanup(cleanup) except Exception as e: - logger.error(f"Failed to load context manager '{context_id}': {_safe_exception_str(e)}") - raise RuntimeError(f"Cannot initialize without context manager: {_safe_exception_str(e)}") + logger.error( + f"Failed to load context manager '{context_id}': {_safe_exception_str(e)}" + ) + raise RuntimeError( + f"Cannot initialize without context manager: {_safe_exception_str(e)}" + ) # Load providers for provider_config in self.config.get("providers", []): @@ -175,13 +202,28 @@ async def initialize(self) -> None: try: logger.info(f"Loading provider: {module_id}") provider_mount = await self.loader.load( - module_id, provider_config.get("config", {}), source_hint=provider_config.get("source") + module_id, + provider_config.get("config", {}), + source_hint=provider_config.get("source"), ) cleanup = await provider_mount(self.coordinator) if cleanup: self.coordinator.register_cleanup(cleanup) except Exception as e: - logger.warning(f"Failed to load provider '{module_id}': {_safe_exception_str(e)}", exc_info=True) + logger.warning( + f"Failed to load provider '{module_id}': {_safe_exception_str(e)}", + exc_info=True, + ) + from .events import MODULE_LOAD_ERROR + + await self.coordinator.hooks.emit( + MODULE_LOAD_ERROR, + { + "module_id": module_id, + "module_type": "provider", + "error": _safe_exception_str(e), + }, + ) # Load tools for tool_config in self.config.get("tools", []): @@ -192,13 +234,28 @@ async def initialize(self) -> None: try: logger.info(f"Loading tool: {module_id}") tool_mount = await self.loader.load( - module_id, tool_config.get("config", {}), source_hint=tool_config.get("source") + module_id, + tool_config.get("config", {}), + source_hint=tool_config.get("source"), ) cleanup = await tool_mount(self.coordinator) if cleanup: self.coordinator.register_cleanup(cleanup) except Exception as e: - logger.warning(f"Failed to load tool '{module_id}': {_safe_exception_str(e)}", exc_info=True) + logger.warning( + f"Failed to load tool '{module_id}': {_safe_exception_str(e)}", + exc_info=True, + ) + from .events import MODULE_LOAD_ERROR + + await self.coordinator.hooks.emit( + MODULE_LOAD_ERROR, + { + "module_id": module_id, + "module_type": "tool", + "error": _safe_exception_str(e), + }, + ) # Note: agents section is app-layer data (config overlays), not modules to mount # The kernel passes agents through in the mount plan without interpretation @@ -212,13 +269,28 @@ async def initialize(self) -> None: try: logger.info(f"Loading hook: {module_id}") hook_mount = await self.loader.load( - module_id, hook_config.get("config", {}), source_hint=hook_config.get("source") + module_id, + hook_config.get("config", {}), + source_hint=hook_config.get("source"), ) cleanup = await hook_mount(self.coordinator) if cleanup: self.coordinator.register_cleanup(cleanup) except Exception as e: - logger.warning(f"Failed to load hook '{module_id}': {_safe_exception_str(e)}", exc_info=True) + logger.warning( + f"Failed to load hook '{module_id}': {_safe_exception_str(e)}", + exc_info=True, + ) + from .events import MODULE_LOAD_ERROR + + await self.coordinator.hooks.emit( + MODULE_LOAD_ERROR, + { + "module_id": module_id, + "module_type": "hook", + "error": _safe_exception_str(e), + }, + ) self._initialized = True @@ -226,7 +298,9 @@ async def initialize(self) -> None: if self.parent_id: from .events import SESSION_FORK - await self.coordinator.hooks.emit(SESSION_FORK, {"parent": self.parent_id}) + await self.coordinator.hooks.emit( + SESSION_FORK, {"parent": self.parent_id} + ) logger.info(f"Session {self.session_id} initialized successfully") @@ -284,9 +358,13 @@ async def execute(self, prompt: str) -> str: self.status.status = "cancelled" # Emit cancel:completed event from .events import CANCEL_COMPLETED - await self.coordinator.hooks.emit(CANCEL_COMPLETED, { - "was_immediate": self.coordinator.cancellation.is_immediate, - }) + + await self.coordinator.hooks.emit( + CANCEL_COMPLETED, + { + "was_immediate": self.coordinator.cancellation.is_immediate, + }, + ) else: self.status.status = "completed" return result @@ -296,10 +374,14 @@ async def execute(self, prompt: str) -> str: if self.coordinator.cancellation.is_cancelled: self.status.status = "cancelled" from .events import CANCEL_COMPLETED - await self.coordinator.hooks.emit(CANCEL_COMPLETED, { - "was_immediate": self.coordinator.cancellation.is_immediate, - "error": _safe_exception_str(e), - }) + + await self.coordinator.hooks.emit( + CANCEL_COMPLETED, + { + "was_immediate": self.coordinator.cancellation.is_immediate, + "error": _safe_exception_str(e), + }, + ) logger.info(f"Execution cancelled: {_safe_exception_str(e)}") raise else: