diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index a2df8c2b97..113adbea84 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -394,15 +394,17 @@ def set_skills( self, resolved_crew_skills: list[SkillModel] | None = None, ) -> None: - """Resolve skill paths and activate skills to INSTRUCTIONS level. + """Resolve skill paths while preserving explicit disclosure levels. - Path entries trigger discovery and activation. Pre-loaded Skill objects - below INSTRUCTIONS level are activated. Crew-level skills are merged in - with event emission so observability is consistent regardless of origin. + Path entries trigger discovery and activation because directory-based + skills opt into eager loading. Pre-loaded Skill objects keep their + current disclosure level so callers can attach METADATA-only skills and + progressively activate them later. Crew-level skills are merged in with + event emission so observability is consistent regardless of origin. Args: - resolved_crew_skills: Pre-resolved crew skills (already discovered - and activated). When provided, avoids redundant discovery per agent. + resolved_crew_skills: Pre-resolved crew skills. When provided, + avoids redundant discovery per agent. """ from crewai.crew import Crew @@ -443,8 +445,7 @@ def set_skills( elif isinstance(item, SkillModel): if item.name not in seen: seen.add(item.name) - activated = activate_skill(item, source=self) - if activated is item and item.disclosure_level >= INSTRUCTIONS: + if item.disclosure_level >= INSTRUCTIONS: crewai_event_bus.emit( self, event=SkillActivatedEvent( @@ -454,7 +455,7 @@ def set_skills( disclosure_level=item.disclosure_level, ), ) - resolved.append(activated) + resolved.append(item) self.skills = resolved if resolved else None diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index a9c02a243c..0703654010 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -21,6 +21,7 @@ from crewai.events.event_bus import crewai_event_bus from crewai.events.handler_graph import CircularDependencyError + if TYPE_CHECKING: from crewai.events.types.agent_events import ( AgentEvaluationCompletedEvent, @@ -33,6 +34,20 @@ LiteAgentExecutionErrorEvent, LiteAgentExecutionStartedEvent, ) + from crewai.events.types.checkpoint_events import ( + CheckpointBaseEvent, + CheckpointCompletedEvent, + CheckpointFailedEvent, + CheckpointForkBaseEvent, + CheckpointForkCompletedEvent, + CheckpointForkStartedEvent, + CheckpointPrunedEvent, + CheckpointRestoreBaseEvent, + CheckpointRestoreCompletedEvent, + CheckpointRestoreFailedEvent, + CheckpointRestoreStartedEvent, + CheckpointStartedEvent, + ) from crewai.events.types.crew_events import ( CrewKickoffCompletedEvent, CrewKickoffFailedEvent, @@ -141,6 +156,19 @@ "LiteAgentExecutionCompletedEvent": "crewai.events.types.agent_events", "LiteAgentExecutionErrorEvent": "crewai.events.types.agent_events", "LiteAgentExecutionStartedEvent": "crewai.events.types.agent_events", + # checkpoint_events + "CheckpointBaseEvent": "crewai.events.types.checkpoint_events", + "CheckpointCompletedEvent": "crewai.events.types.checkpoint_events", + "CheckpointFailedEvent": "crewai.events.types.checkpoint_events", + "CheckpointForkBaseEvent": "crewai.events.types.checkpoint_events", + "CheckpointForkCompletedEvent": "crewai.events.types.checkpoint_events", + "CheckpointForkStartedEvent": "crewai.events.types.checkpoint_events", + "CheckpointPrunedEvent": "crewai.events.types.checkpoint_events", + "CheckpointRestoreBaseEvent": "crewai.events.types.checkpoint_events", + "CheckpointRestoreCompletedEvent": "crewai.events.types.checkpoint_events", + "CheckpointRestoreFailedEvent": "crewai.events.types.checkpoint_events", + "CheckpointRestoreStartedEvent": "crewai.events.types.checkpoint_events", + "CheckpointStartedEvent": "crewai.events.types.checkpoint_events", # crew_events "CrewKickoffCompletedEvent": "crewai.events.types.crew_events", "CrewKickoffFailedEvent": "crewai.events.types.crew_events", @@ -265,6 +293,18 @@ def __getattr__(name: str) -> Any: "AgentReasoningFailedEvent", "AgentReasoningStartedEvent", "BaseEventListener", + "CheckpointBaseEvent", + "CheckpointCompletedEvent", + "CheckpointFailedEvent", + "CheckpointForkBaseEvent", + "CheckpointForkCompletedEvent", + "CheckpointForkStartedEvent", + "CheckpointPrunedEvent", + "CheckpointRestoreBaseEvent", + "CheckpointRestoreCompletedEvent", + "CheckpointRestoreFailedEvent", + "CheckpointRestoreStartedEvent", + "CheckpointStartedEvent", "CircularDependencyError", "CrewKickoffCompletedEvent", "CrewKickoffFailedEvent", diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index 63b6cdfc84..f336ce75ad 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -30,6 +30,17 @@ AgentExecutionStartedEvent, LiteAgentExecutionCompletedEvent, ) +from crewai.events.types.checkpoint_events import ( + CheckpointCompletedEvent, + CheckpointFailedEvent, + CheckpointForkCompletedEvent, + CheckpointForkStartedEvent, + CheckpointPrunedEvent, + CheckpointRestoreCompletedEvent, + CheckpointRestoreFailedEvent, + CheckpointRestoreStartedEvent, + CheckpointStartedEvent, +) from crewai.events.types.crew_events import ( CrewKickoffCompletedEvent, CrewKickoffFailedEvent, @@ -183,4 +194,13 @@ | MCPToolExecutionCompletedEvent | MCPToolExecutionFailedEvent | MCPConfigFetchFailedEvent + | CheckpointStartedEvent + | CheckpointCompletedEvent + | CheckpointFailedEvent + | CheckpointForkStartedEvent + | CheckpointForkCompletedEvent + | CheckpointRestoreStartedEvent + | CheckpointRestoreCompletedEvent + | CheckpointRestoreFailedEvent + | CheckpointPrunedEvent ) diff --git a/lib/crewai/src/crewai/events/types/checkpoint_events.py b/lib/crewai/src/crewai/events/types/checkpoint_events.py new file mode 100644 index 0000000000..835ab49b5d --- /dev/null +++ b/lib/crewai/src/crewai/events/types/checkpoint_events.py @@ -0,0 +1,97 @@ +"""Event family for automatic state checkpointing and forking.""" + +from typing import Literal + +from crewai.events.base_events import BaseEvent + + +class CheckpointBaseEvent(BaseEvent): + """Base event for checkpoint lifecycle operations.""" + + type: str + location: str + provider: str + trigger: str | None = None + branch: str | None = None + parent_id: str | None = None + + +class CheckpointStartedEvent(CheckpointBaseEvent): + """Event emitted immediately before a checkpoint is written.""" + + type: Literal["checkpoint_started"] = "checkpoint_started" + + +class CheckpointCompletedEvent(CheckpointBaseEvent): + """Event emitted when a checkpoint has been written successfully.""" + + type: Literal["checkpoint_completed"] = "checkpoint_completed" + checkpoint_id: str + duration_ms: float + + +class CheckpointFailedEvent(CheckpointBaseEvent): + """Event emitted when a checkpoint write fails.""" + + type: Literal["checkpoint_failed"] = "checkpoint_failed" + error: str + + +class CheckpointPrunedEvent(CheckpointBaseEvent): + """Event emitted after pruning old checkpoints from a branch.""" + + type: Literal["checkpoint_pruned"] = "checkpoint_pruned" + removed_count: int + max_checkpoints: int + + +class CheckpointForkBaseEvent(BaseEvent): + """Base event for fork lifecycle operations on a RuntimeState.""" + + type: str + branch: str + parent_branch: str | None = None + parent_checkpoint_id: str | None = None + + +class CheckpointForkStartedEvent(CheckpointForkBaseEvent): + """Event emitted immediately before a fork relabels the branch.""" + + type: Literal["checkpoint_fork_started"] = "checkpoint_fork_started" + + +class CheckpointForkCompletedEvent(CheckpointForkBaseEvent): + """Event emitted after a fork has established the new branch.""" + + type: Literal["checkpoint_fork_completed"] = "checkpoint_fork_completed" + + +class CheckpointRestoreBaseEvent(BaseEvent): + """Base event for checkpoint restore lifecycle operations.""" + + type: str + location: str + provider: str | None = None + + +class CheckpointRestoreStartedEvent(CheckpointRestoreBaseEvent): + """Event emitted immediately before a checkpoint restore begins.""" + + type: Literal["checkpoint_restore_started"] = "checkpoint_restore_started" + + +class CheckpointRestoreCompletedEvent(CheckpointRestoreBaseEvent): + """Event emitted when a checkpoint has been restored successfully.""" + + type: Literal["checkpoint_restore_completed"] = "checkpoint_restore_completed" + checkpoint_id: str + branch: str | None = None + parent_id: str | None = None + duration_ms: float + + +class CheckpointRestoreFailedEvent(CheckpointRestoreBaseEvent): + """Event emitted when a checkpoint restore fails.""" + + type: Literal["checkpoint_restore_failed"] = "checkpoint_restore_failed" + error: str diff --git a/lib/crewai/src/crewai/state/checkpoint_listener.py b/lib/crewai/src/crewai/state/checkpoint_listener.py index 674a8436a6..0c2adc127d 100644 --- a/lib/crewai/src/crewai/state/checkpoint_listener.py +++ b/lib/crewai/src/crewai/state/checkpoint_listener.py @@ -10,12 +10,22 @@ import json import logging import threading +import time from typing import Any from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.crew import Crew from crewai.events.base_events import BaseEvent from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus +from crewai.events.types.checkpoint_events import ( + CheckpointBaseEvent, + CheckpointCompletedEvent, + CheckpointFailedEvent, + CheckpointForkBaseEvent, + CheckpointPrunedEvent, + CheckpointRestoreBaseEvent, + CheckpointStartedEvent, +) from crewai.flow.flow import Flow from crewai.state.checkpoint_config import CheckpointConfig from crewai.state.runtime import RuntimeState, _prepare_entities @@ -53,12 +63,26 @@ def _resolve(value: CheckpointConfig | bool | None) -> CheckpointConfig | None | if isinstance(value, CheckpointConfig): _ensure_handlers_registered() return value - if value is True: + if value: _ensure_handlers_registered() return CheckpointConfig() if value is False: return _SENTINEL - return None # None = inherit + return None + + +def _resolve_from_agent(agent: BaseAgent) -> CheckpointConfig | None: + """Resolve a checkpoint config starting from an agent, walking to its crew.""" + result = _resolve(agent.checkpoint) + if isinstance(result, CheckpointConfig): + return result + if result is _SENTINEL: + return None + crew = agent.crew + if isinstance(crew, Crew): + crew_result = _resolve(crew.checkpoint) + return crew_result if isinstance(crew_result, CheckpointConfig) else None + return None def _find_checkpoint(source: Any) -> CheckpointConfig | None: @@ -77,28 +101,11 @@ def _find_checkpoint(source: Any) -> CheckpointConfig | None: result = _resolve(source.checkpoint) return result if isinstance(result, CheckpointConfig) else None if isinstance(source, BaseAgent): - result = _resolve(source.checkpoint) - if isinstance(result, CheckpointConfig): - return result - if result is _SENTINEL: - return None - crew = source.crew - if isinstance(crew, Crew): - result = _resolve(crew.checkpoint) - return result if isinstance(result, CheckpointConfig) else None - return None + return _resolve_from_agent(source) if isinstance(source, Task): agent = source.agent if isinstance(agent, BaseAgent): - result = _resolve(agent.checkpoint) - if isinstance(result, CheckpointConfig): - return result - if result is _SENTINEL: - return None - crew = agent.crew - if isinstance(crew, Crew): - result = _resolve(crew.checkpoint) - return result if isinstance(result, CheckpointConfig) else None + return _resolve_from_agent(agent) return None return None @@ -107,27 +114,106 @@ def _do_checkpoint( state: RuntimeState, cfg: CheckpointConfig, event: BaseEvent | None = None ) -> None: """Write a checkpoint and prune old ones if configured.""" - _prepare_entities(state.root) - payload = state.model_dump(mode="json") - if event is not None: - payload["trigger"] = event.type - data = json.dumps(payload) - location = cfg.provider.checkpoint( - data, - cfg.location, - parent_id=state._parent_id, - branch=state._branch, + provider_name: str = type(cfg.provider).__name__ + trigger: str | None = event.type if event is not None else None + context: dict[str, Any] = { + "task_id": event.task_id if event is not None else None, + "task_name": event.task_name if event is not None else None, + "agent_id": event.agent_id if event is not None else None, + "agent_role": event.agent_role if event is not None else None, + } + + parent_id_snapshot: str | None = state._parent_id + branch_snapshot: str = state._branch + + crewai_event_bus.emit( + cfg, + CheckpointStartedEvent( + location=cfg.location, + provider=provider_name, + trigger=trigger, + branch=branch_snapshot, + parent_id=parent_id_snapshot, + **context, + ), ) - state._chain_lineage(cfg.provider, location) - checkpoint_id: str = cfg.provider.extract_id(location) + start: float = time.perf_counter() + try: + _prepare_entities(state.root) + payload = state.model_dump(mode="json") + if event is not None: + payload["trigger"] = event.type + data = json.dumps(payload) + location = cfg.provider.checkpoint( + data, + cfg.location, + parent_id=parent_id_snapshot, + branch=branch_snapshot, + ) + state._chain_lineage(cfg.provider, location) + checkpoint_id: str = cfg.provider.extract_id(location) + except Exception as exc: + crewai_event_bus.emit( + cfg, + CheckpointFailedEvent( + location=cfg.location, + provider=provider_name, + trigger=trigger, + branch=branch_snapshot, + parent_id=parent_id_snapshot, + error=str(exc), + **context, + ), + ) + raise + + duration_ms: float = (time.perf_counter() - start) * 1000.0 msg: str = ( f"Checkpoint saved. Resume with: crewai checkpoint resume {checkpoint_id}" ) logger.info(msg) + crewai_event_bus.emit( + cfg, + CheckpointCompletedEvent( + location=location, + provider=provider_name, + trigger=trigger, + branch=branch_snapshot, + parent_id=parent_id_snapshot, + checkpoint_id=checkpoint_id, + duration_ms=duration_ms, + **context, + ), + ) + if cfg.max_checkpoints is not None: - cfg.provider.prune(cfg.location, cfg.max_checkpoints, branch=state._branch) + try: + removed_count: int = cfg.provider.prune( + cfg.location, cfg.max_checkpoints, branch=branch_snapshot + ) + except Exception: + logger.warning( + "Checkpoint prune failed for %s (branch=%s)", + cfg.location, + branch_snapshot, + exc_info=True, + ) + return + crewai_event_bus.emit( + cfg, + CheckpointPrunedEvent( + location=cfg.location, + provider=provider_name, + trigger=trigger, + branch=branch_snapshot, + parent_id=parent_id_snapshot, + removed_count=removed_count, + max_checkpoints=cfg.max_checkpoints, + **context, + ), + ) def _should_checkpoint(source: Any, event: BaseEvent) -> CheckpointConfig | None: @@ -142,6 +228,11 @@ def _should_checkpoint(source: Any, event: BaseEvent) -> CheckpointConfig | None def _on_any_event(source: Any, event: BaseEvent, state: Any) -> None: """Sync handler registered on every event class.""" + if isinstance( + event, + (CheckpointBaseEvent, CheckpointForkBaseEvent, CheckpointRestoreBaseEvent), + ): + return cfg = _should_checkpoint(source, event) if cfg is None: return @@ -161,7 +252,8 @@ def _register_all_handlers(event_bus: CrewAIEventsBus) -> None: seen: set[type] = set() def _collect(cls: type[BaseEvent]) -> None: - for sub in cls.__subclasses__(): + subclasses: list[type[BaseEvent]] = cls.__subclasses__() + for sub in subclasses: if sub not in seen: seen.add(sub) type_field = sub.model_fields.get("type") diff --git a/lib/crewai/src/crewai/state/event_record.py b/lib/crewai/src/crewai/state/event_record.py index 7b8c20c5b3..866398e0ab 100644 --- a/lib/crewai/src/crewai/state/event_record.py +++ b/lib/crewai/src/crewai/state/event_record.py @@ -39,7 +39,8 @@ def _build_event_type_map() -> None: """Populate _event_type_map from all BaseEvent subclasses.""" def _collect(cls: type[BaseEvent]) -> None: - for sub in cls.__subclasses__(): + subclasses: list[type[BaseEvent]] = cls.__subclasses__() + for sub in subclasses: type_field = sub.model_fields.get("type") if type_field and type_field.default: _event_type_map[type_field.default] = sub diff --git a/lib/crewai/src/crewai/state/provider/core.py b/lib/crewai/src/crewai/state/provider/core.py index c386d519fc..fad06abe8d 100644 --- a/lib/crewai/src/crewai/state/provider/core.py +++ b/lib/crewai/src/crewai/state/provider/core.py @@ -61,13 +61,16 @@ async def acheckpoint( ... @abstractmethod - def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None: + def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int: """Remove old checkpoints, keeping at most *max_keep* per branch. Args: location: The storage destination passed to ``checkpoint``. max_keep: Maximum number of checkpoints to retain. branch: Only prune checkpoints on this branch. + + Returns: + The number of checkpoints removed. """ ... diff --git a/lib/crewai/src/crewai/state/provider/json_provider.py b/lib/crewai/src/crewai/state/provider/json_provider.py index 0f18a5901d..904526292d 100644 --- a/lib/crewai/src/crewai/state/provider/json_provider.py +++ b/lib/crewai/src/crewai/state/provider/json_provider.py @@ -95,17 +95,20 @@ async def acheckpoint( await f.write(data) return str(file_path) - def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None: + def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int: """Remove oldest checkpoint files beyond *max_keep* on a branch.""" _safe_branch(location, branch) branch_dir = os.path.join(location, branch) pattern = os.path.join(branch_dir, "*.json") files = sorted(glob.glob(pattern), key=os.path.getmtime) + removed = 0 for path in files if max_keep == 0 else files[:-max_keep]: try: os.remove(path) + removed += 1 except OSError: # noqa: PERF203 logger.debug("Failed to remove %s", path, exc_info=True) + return removed def extract_id(self, location: str) -> str: """Extract the checkpoint ID from a file path. diff --git a/lib/crewai/src/crewai/state/provider/sqlite_provider.py b/lib/crewai/src/crewai/state/provider/sqlite_provider.py index 5ee4dca26a..14fa3425d8 100644 --- a/lib/crewai/src/crewai/state/provider/sqlite_provider.py +++ b/lib/crewai/src/crewai/state/provider/sqlite_provider.py @@ -111,11 +111,13 @@ async def acheckpoint( await db.commit() return f"{location}#{checkpoint_id}" - def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None: + def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int: """Remove oldest checkpoint rows beyond *max_keep* on a branch.""" with sqlite3.connect(location) as conn: - conn.execute(_PRUNE, (branch, branch, max_keep)) + cursor = conn.execute(_PRUNE, (branch, branch, max_keep)) + removed: int = cursor.rowcount conn.commit() + return max(removed, 0) def extract_id(self, location: str) -> str: """Extract the checkpoint ID from a ``db_path#id`` string.""" diff --git a/lib/crewai/src/crewai/state/runtime.py b/lib/crewai/src/crewai/state/runtime.py index 3243d4c198..4711079970 100644 --- a/lib/crewai/src/crewai/state/runtime.py +++ b/lib/crewai/src/crewai/state/runtime.py @@ -10,6 +10,7 @@ from __future__ import annotations import logging +import time from typing import TYPE_CHECKING, Any import uuid @@ -23,6 +24,17 @@ ) from crewai.context import capture_execution_context +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.checkpoint_events import ( + CheckpointCompletedEvent, + CheckpointFailedEvent, + CheckpointForkCompletedEvent, + CheckpointForkStartedEvent, + CheckpointRestoreCompletedEvent, + CheckpointRestoreFailedEvent, + CheckpointRestoreStartedEvent, + CheckpointStartedEvent, +) from crewai.state.checkpoint_config import CheckpointConfig from crewai.state.event_record import EventRecord from crewai.state.provider.core import BaseProvider @@ -89,7 +101,7 @@ def _migrate(data: dict[str, Any]) -> dict[str, Any]: """ raw = data.get("crewai_version") current = Version(get_crewai_version()) - stored = Version(raw) if raw else Version("0.0.0") + stored = Version(raw) if isinstance(raw, str) and raw else Version("0.0.0") if raw is None: logger.warning("Checkpoint has no crewai_version — treating as 0.0.0") @@ -159,6 +171,63 @@ def _chain_lineage(self, provider: BaseProvider, location: str) -> None: self._checkpoint_id = provider.extract_id(location) self._parent_id = self._checkpoint_id + def _begin_checkpoint(self, location: str) -> tuple[str, str | None, str, float]: + """Emit the start event and return the invariant context for a checkpoint.""" + provider_name: str = type(self._provider).__name__ + parent_id_snapshot: str | None = self._parent_id + branch_snapshot: str = self._branch + crewai_event_bus.emit( + self, + CheckpointStartedEvent( + location=location, + provider=provider_name, + branch=branch_snapshot, + parent_id=parent_id_snapshot, + ), + ) + return provider_name, parent_id_snapshot, branch_snapshot, time.perf_counter() + + def _emit_checkpoint_failed( + self, + location: str, + provider_name: str, + branch_snapshot: str, + parent_id_snapshot: str | None, + exc: Exception, + ) -> None: + """Emit the failure event for a checkpoint write.""" + crewai_event_bus.emit( + self, + CheckpointFailedEvent( + location=location, + provider=provider_name, + branch=branch_snapshot, + parent_id=parent_id_snapshot, + error=str(exc), + ), + ) + + def _emit_checkpoint_completed( + self, + result: str, + provider_name: str, + branch_snapshot: str, + parent_id_snapshot: str | None, + start: float, + ) -> None: + """Emit the completion event for a successful checkpoint write.""" + crewai_event_bus.emit( + self, + CheckpointCompletedEvent( + location=result, + provider=provider_name, + branch=branch_snapshot, + parent_id=parent_id_snapshot, + checkpoint_id=self._provider.extract_id(result), + duration_ms=(time.perf_counter() - start) * 1000.0, + ), + ) + def checkpoint(self, location: str) -> str: """Write a checkpoint. @@ -169,14 +238,27 @@ def checkpoint(self, location: str) -> str: Returns: A location identifier for the saved checkpoint. """ - _prepare_entities(self.root) - result = self._provider.checkpoint( - self.model_dump_json(), - location, - parent_id=self._parent_id, - branch=self._branch, + provider_name, parent_id_snapshot, branch_snapshot, start = ( + self._begin_checkpoint(location) + ) + try: + _prepare_entities(self.root) + result = self._provider.checkpoint( + self.model_dump_json(), + location, + parent_id=parent_id_snapshot, + branch=branch_snapshot, + ) + self._chain_lineage(self._provider, result) + except Exception as exc: + self._emit_checkpoint_failed( + location, provider_name, branch_snapshot, parent_id_snapshot, exc + ) + raise + + self._emit_checkpoint_completed( + result, provider_name, branch_snapshot, parent_id_snapshot, start ) - self._chain_lineage(self._provider, result) return result async def acheckpoint(self, location: str) -> str: @@ -189,14 +271,27 @@ async def acheckpoint(self, location: str) -> str: Returns: A location identifier for the saved checkpoint. """ - _prepare_entities(self.root) - result = await self._provider.acheckpoint( - self.model_dump_json(), - location, - parent_id=self._parent_id, - branch=self._branch, + provider_name, parent_id_snapshot, branch_snapshot, start = ( + self._begin_checkpoint(location) + ) + try: + _prepare_entities(self.root) + result = await self._provider.acheckpoint( + self.model_dump_json(), + location, + parent_id=parent_id_snapshot, + branch=branch_snapshot, + ) + self._chain_lineage(self._provider, result) + except Exception as exc: + self._emit_checkpoint_failed( + location, provider_name, branch_snapshot, parent_id_snapshot, exc + ) + raise + + self._emit_checkpoint_completed( + result, provider_name, branch_snapshot, parent_id_snapshot, start ) - self._chain_lineage(self._provider, result) return result def fork(self, branch: str | None = None) -> None: @@ -211,11 +306,32 @@ def fork(self, branch: str | None = None) -> None: times without collisions. """ if branch: - self._branch = branch + new_branch = branch elif self._checkpoint_id: - self._branch = f"fork/{self._checkpoint_id}_{uuid.uuid4().hex[:6]}" + new_branch = f"fork/{self._checkpoint_id}_{uuid.uuid4().hex[:6]}" else: - self._branch = f"fork/{uuid.uuid4().hex[:8]}" + new_branch = f"fork/{uuid.uuid4().hex[:8]}" + + parent_branch: str | None = self._branch + parent_checkpoint_id: str | None = self._checkpoint_id + + crewai_event_bus.emit( + self, + CheckpointForkStartedEvent( + branch=new_branch, + parent_branch=parent_branch, + parent_checkpoint_id=parent_checkpoint_id, + ), + ) + self._branch = new_branch + crewai_event_bus.emit( + self, + CheckpointForkCompletedEvent( + branch=new_branch, + parent_branch=parent_branch, + parent_checkpoint_id=parent_checkpoint_id, + ), + ) @classmethod def from_checkpoint(cls, config: CheckpointConfig, **kwargs: Any) -> RuntimeState: @@ -233,13 +349,41 @@ def from_checkpoint(cls, config: CheckpointConfig, **kwargs: Any) -> RuntimeStat if config.restore_from is None: raise ValueError("CheckpointConfig.restore_from must be set") location = str(config.restore_from) - provider = detect_provider(location) - raw = provider.from_checkpoint(location) - state = cls.model_validate_json(raw, **kwargs) - state._provider = provider - checkpoint_id = provider.extract_id(location) - state._checkpoint_id = checkpoint_id - state._parent_id = checkpoint_id + + crewai_event_bus.emit(config, CheckpointRestoreStartedEvent(location=location)) + start: float = time.perf_counter() + provider_name: str | None = None + try: + provider = detect_provider(location) + provider_name = type(provider).__name__ + raw = provider.from_checkpoint(location) + state = cls.model_validate_json(raw, **kwargs) + state._provider = provider + checkpoint_id = provider.extract_id(location) + state._checkpoint_id = checkpoint_id + state._parent_id = checkpoint_id + except Exception as exc: + crewai_event_bus.emit( + config, + CheckpointRestoreFailedEvent( + location=location, + provider=provider_name, + error=str(exc), + ), + ) + raise + + crewai_event_bus.emit( + config, + CheckpointRestoreCompletedEvent( + location=location, + provider=provider_name, + checkpoint_id=checkpoint_id, + branch=state._branch, + parent_id=state._parent_id, + duration_ms=(time.perf_counter() - start) * 1000.0, + ), + ) return state @classmethod @@ -260,13 +404,41 @@ async def afrom_checkpoint( if config.restore_from is None: raise ValueError("CheckpointConfig.restore_from must be set") location = str(config.restore_from) - provider = detect_provider(location) - raw = await provider.afrom_checkpoint(location) - state = cls.model_validate_json(raw, **kwargs) - state._provider = provider - checkpoint_id = provider.extract_id(location) - state._checkpoint_id = checkpoint_id - state._parent_id = checkpoint_id + + crewai_event_bus.emit(config, CheckpointRestoreStartedEvent(location=location)) + start: float = time.perf_counter() + provider_name: str | None = None + try: + provider = detect_provider(location) + provider_name = type(provider).__name__ + raw = await provider.afrom_checkpoint(location) + state = cls.model_validate_json(raw, **kwargs) + state._provider = provider + checkpoint_id = provider.extract_id(location) + state._checkpoint_id = checkpoint_id + state._parent_id = checkpoint_id + except Exception as exc: + crewai_event_bus.emit( + config, + CheckpointRestoreFailedEvent( + location=location, + provider=provider_name, + error=str(exc), + ), + ) + raise + + crewai_event_bus.emit( + config, + CheckpointRestoreCompletedEvent( + location=location, + provider=provider_name, + checkpoint_id=checkpoint_id, + branch=state._branch, + parent_id=state._parent_id, + duration_ms=(time.perf_counter() - start) * 1000.0, + ), + ) return state diff --git a/lib/crewai/tests/skills/test_integration.py b/lib/crewai/tests/skills/test_integration.py index 23004d79e1..c13054e311 100644 --- a/lib/crewai/tests/skills/test_integration.py +++ b/lib/crewai/tests/skills/test_integration.py @@ -4,6 +4,8 @@ import pytest +from crewai import Agent +from crewai.agent.utils import append_skill_context from crewai.skills.loader import activate_skill, discover_skills, format_skill_context from crewai.skills.models import INSTRUCTIONS, METADATA @@ -76,3 +78,23 @@ def test_multiple_search_paths(self, tmp_path: Path) -> None: all_skills.extend(discover_skills(search_path)) names = {s.name for s in all_skills} assert names == {"skill-a", "skill-b"} + + def test_agent_preserves_metadata_for_discovered_skills(self, tmp_path: Path) -> None: + _create_skill_dir(tmp_path, "travel", body="Use this skill for travel planning.") + discovered = discover_skills(tmp_path) + + agent = Agent( + role="Travel Advisor", + goal="Provide personalized travel suggestions.", + backstory="An experienced travel consultant.", + skills=discovered, + ) + + assert agent.skills is not None + assert agent.skills[0].disclosure_level == METADATA + assert agent.skills[0].instructions is None + + prompt = append_skill_context(agent, "Plan a 10-day Japan itinerary.") + assert "## Skill: travel" in prompt + assert "Skill travel" in prompt + assert "Use this skill for travel planning." not in prompt