Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions amplifier_core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
APPROVAL_GRANTED = "approval:granted"
APPROVAL_DENIED = "approval:denied"

# Module lifecycle
MODULE_LOAD_ERROR = "module:load_error" # Module failed to load (provider, tool, hook)

# Cancellation lifecycle
CANCEL_REQUESTED = "cancel:requested" # Cancellation initiated (graceful or immediate)
CANCEL_COMPLETED = "cancel:completed" # Cancellation finalized, session stopping
Expand Down Expand Up @@ -108,4 +111,5 @@
APPROVAL_DENIED,
CANCEL_REQUESTED,
CANCEL_COMPLETED,
MODULE_LOAD_ERROR,
]
138 changes: 110 additions & 28 deletions amplifier_core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def _safe_exception_str(e: Exception) -> str:
return repr(e)



class AmplifierSession:
"""
A single Amplifier session tying everything together.
Expand Down Expand Up @@ -88,17 +87,25 @@ def __init__(
)

# Set default fields for all events (infrastructure propagation)
self.coordinator.hooks.set_default_fields(session_id=self.session_id, parent_id=self.parent_id)
self.coordinator.hooks.set_default_fields(
session_id=self.session_id, parent_id=self.parent_id
)

# Create loader with coordinator (for resolver injection)
self.loader = loader or ModuleLoader(coordinator=self.coordinator)

def _merge_configs(self, base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]:
def _merge_configs(
self, base: dict[str, Any], overlay: dict[str, Any]
) -> dict[str, Any]:
"""Deep merge two config dicts."""
result = base.copy()

for key, value in overlay.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
if (
key in result
and isinstance(result[key], dict)
and isinstance(value, dict)
):
result[key] = self._merge_configs(result[key], value)
else:
result[key] = value
Expand All @@ -119,33 +126,47 @@ async def initialize(self) -> None:
try:
# Load orchestrator (required)
# Handle both dict (ModuleConfig) and string formats
orchestrator_spec = self.config.get("session", {}).get("orchestrator", "loop-basic")
orchestrator_spec = self.config.get("session", {}).get(
"orchestrator", "loop-basic"
)
if isinstance(orchestrator_spec, dict):
orchestrator_id = orchestrator_spec.get("module", "loop-basic")
orchestrator_source = orchestrator_spec.get("source")
orchestrator_config = orchestrator_spec.get("config", {})
else:
orchestrator_id = orchestrator_spec
orchestrator_source = self.config.get("session", {}).get("orchestrator_source")
orchestrator_config = self.config.get("orchestrator", {}).get("config", {})
orchestrator_source = self.config.get("session", {}).get(
"orchestrator_source"
)
orchestrator_config = self.config.get("orchestrator", {}).get(
"config", {}
)

logger.info(f"Loading orchestrator: {orchestrator_id}")

try:
orchestrator_mount = await self.loader.load(
orchestrator_id, orchestrator_config, source_hint=orchestrator_source
orchestrator_id,
orchestrator_config,
source_hint=orchestrator_source,
)
# Note: config is already embedded in orchestrator_mount by the loader
cleanup = await orchestrator_mount(self.coordinator)
if cleanup:
self.coordinator.register_cleanup(cleanup)
except Exception as e:
logger.error(f"Failed to load orchestrator '{orchestrator_id}': {_safe_exception_str(e)}")
raise RuntimeError(f"Cannot initialize without orchestrator: {_safe_exception_str(e)}")
logger.error(
f"Failed to load orchestrator '{orchestrator_id}': {_safe_exception_str(e)}"
)
raise RuntimeError(
f"Cannot initialize without orchestrator: {_safe_exception_str(e)}"
)

# Load context manager (required)
# Handle both dict (ModuleConfig) and string formats
context_spec = self.config.get("session", {}).get("context", "context-simple")
context_spec = self.config.get("session", {}).get(
"context", "context-simple"
)
if isinstance(context_spec, dict):
context_id = context_spec.get("module", "context-simple")
context_source = context_spec.get("source")
Expand All @@ -158,13 +179,19 @@ async def initialize(self) -> None:
logger.info(f"Loading context manager: {context_id}")

try:
context_mount = await self.loader.load(context_id, context_config, source_hint=context_source)
context_mount = await self.loader.load(
context_id, context_config, source_hint=context_source
)
cleanup = await context_mount(self.coordinator)
if cleanup:
self.coordinator.register_cleanup(cleanup)
except Exception as e:
logger.error(f"Failed to load context manager '{context_id}': {_safe_exception_str(e)}")
raise RuntimeError(f"Cannot initialize without context manager: {_safe_exception_str(e)}")
logger.error(
f"Failed to load context manager '{context_id}': {_safe_exception_str(e)}"
)
raise RuntimeError(
f"Cannot initialize without context manager: {_safe_exception_str(e)}"
)

# Load providers
for provider_config in self.config.get("providers", []):
Expand All @@ -175,13 +202,28 @@ async def initialize(self) -> None:
try:
logger.info(f"Loading provider: {module_id}")
provider_mount = await self.loader.load(
module_id, provider_config.get("config", {}), source_hint=provider_config.get("source")
module_id,
provider_config.get("config", {}),
source_hint=provider_config.get("source"),
)
cleanup = await provider_mount(self.coordinator)
if cleanup:
self.coordinator.register_cleanup(cleanup)
except Exception as e:
logger.warning(f"Failed to load provider '{module_id}': {_safe_exception_str(e)}", exc_info=True)
logger.warning(
f"Failed to load provider '{module_id}': {_safe_exception_str(e)}",
exc_info=True,
)
from .events import MODULE_LOAD_ERROR

await self.coordinator.hooks.emit(
MODULE_LOAD_ERROR,
{
"module_id": module_id,
"module_type": "provider",
"error": _safe_exception_str(e),
},
)

# Load tools
for tool_config in self.config.get("tools", []):
Expand All @@ -192,13 +234,28 @@ async def initialize(self) -> None:
try:
logger.info(f"Loading tool: {module_id}")
tool_mount = await self.loader.load(
module_id, tool_config.get("config", {}), source_hint=tool_config.get("source")
module_id,
tool_config.get("config", {}),
source_hint=tool_config.get("source"),
)
cleanup = await tool_mount(self.coordinator)
if cleanup:
self.coordinator.register_cleanup(cleanup)
except Exception as e:
logger.warning(f"Failed to load tool '{module_id}': {_safe_exception_str(e)}", exc_info=True)
logger.warning(
f"Failed to load tool '{module_id}': {_safe_exception_str(e)}",
exc_info=True,
)
from .events import MODULE_LOAD_ERROR

await self.coordinator.hooks.emit(
MODULE_LOAD_ERROR,
{
"module_id": module_id,
"module_type": "tool",
"error": _safe_exception_str(e),
},
)

# Note: agents section is app-layer data (config overlays), not modules to mount
# The kernel passes agents through in the mount plan without interpretation
Expand All @@ -212,21 +269,38 @@ async def initialize(self) -> None:
try:
logger.info(f"Loading hook: {module_id}")
hook_mount = await self.loader.load(
module_id, hook_config.get("config", {}), source_hint=hook_config.get("source")
module_id,
hook_config.get("config", {}),
source_hint=hook_config.get("source"),
)
cleanup = await hook_mount(self.coordinator)
if cleanup:
self.coordinator.register_cleanup(cleanup)
except Exception as e:
logger.warning(f"Failed to load hook '{module_id}': {_safe_exception_str(e)}", exc_info=True)
logger.warning(
f"Failed to load hook '{module_id}': {_safe_exception_str(e)}",
exc_info=True,
)
from .events import MODULE_LOAD_ERROR

await self.coordinator.hooks.emit(
MODULE_LOAD_ERROR,
{
"module_id": module_id,
"module_type": "hook",
"error": _safe_exception_str(e),
},
)

self._initialized = True

# Emit session:fork event if this is a child session
if self.parent_id:
from .events import SESSION_FORK

await self.coordinator.hooks.emit(SESSION_FORK, {"parent": self.parent_id})
await self.coordinator.hooks.emit(
SESSION_FORK, {"parent": self.parent_id}
)

logger.info(f"Session {self.session_id} initialized successfully")

Expand Down Expand Up @@ -284,9 +358,13 @@ async def execute(self, prompt: str) -> str:
self.status.status = "cancelled"
# Emit cancel:completed event
from .events import CANCEL_COMPLETED
await self.coordinator.hooks.emit(CANCEL_COMPLETED, {
"was_immediate": self.coordinator.cancellation.is_immediate,
})

await self.coordinator.hooks.emit(
CANCEL_COMPLETED,
{
"was_immediate": self.coordinator.cancellation.is_immediate,
},
)
else:
self.status.status = "completed"
return result
Expand All @@ -296,10 +374,14 @@ async def execute(self, prompt: str) -> str:
if self.coordinator.cancellation.is_cancelled:
self.status.status = "cancelled"
from .events import CANCEL_COMPLETED
await self.coordinator.hooks.emit(CANCEL_COMPLETED, {
"was_immediate": self.coordinator.cancellation.is_immediate,
"error": _safe_exception_str(e),
})

await self.coordinator.hooks.emit(
CANCEL_COMPLETED,
{
"was_immediate": self.coordinator.cancellation.is_immediate,
"error": _safe_exception_str(e),
},
)
logger.info(f"Execution cancelled: {_safe_exception_str(e)}")
raise
else:
Expand Down