diff --git a/CHANGELOG.md b/CHANGELOG.md index 86fa6a7..2cbe262 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,35 @@ All notable changes to the OpenIntent SDK will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.17.0] - 2026-03-24 + +### Added + +- **RFC-0026: Suspension Propagation & Retry** — Closes three gaps left by RFC-0025. + + - **Container propagation rules (5 normative rules)** — Intent graph: suspended child blocks dependents; parent aggregate status becomes `suspended_awaiting_input` when any child is suspended. Portfolio: aggregate gains `has_suspended_members`/`suspended_member_count`; `portfolio.member_suspended` / `portfolio.member_resumed` events. Plan/Task: bidirectional mirror — when an intent suspends, its task transitions to `blocked` with `blocked_reason: "intent_suspended"` and `suspended_intent_id`; plan progress gains `suspended_tasks`. Workflow: downstream phases receive `UpstreamIntentSuspendedError` at claim time; workflow progress gains `suspended_phases`. Deadline: suspension deadline governs expiry and MUST NOT exceed portfolio `due_before` constraint. + - **`HumanRetryPolicy` dataclass** — Re-notification and escalation policy with `max_attempts`, `interval_seconds`, `strategy` (`"fixed"` / `"linear"` / `"exponential"`), `escalation_ladder` (list of `EscalationStep`), and `final_fallback_policy`. Serialises to/from dict via `to_dict()` / `from_dict()`. + - **`EscalationStep` dataclass** — A single escalation step with `attempt`, `channel_hint`, and `notify_to`. Triggers `intent.suspension_escalated` event. Backwards-compatible aliases: `after_attempt`, `channel`, `notify`. + - **`UpstreamIntentSuspendedError`** — Raised by `WorkflowSpec.validate_claim_inputs()` when a declared input references an upstream phase whose intent is currently `suspended_awaiting_input`. Carries `task_id`, `phase_name`, `suspended_intent_id`, `expected_resume_at`. Subclass of `WorkflowError`. + - **`BaseAgent.default_human_retry_policy`** — Class-level or instance-level attribute (`None` by default). Applied to all `request_input()` calls that do not supply an explicit `retry_policy` argument. + - **`request_input(retry_policy=…)` parameter** — Accepts a `HumanRetryPolicy`. When supplied, the SDK re-fires `@on_input_requested` hooks on each attempt, emits `intent.suspension_renotified` per attempt, emits `intent.suspension_escalated` for escalation steps, and applies `final_fallback_policy` after all attempts are exhausted. + - **Three-level policy cascade** — call-site policy → `default_human_retry_policy` on the agent → server-configured platform default (`GET /api/v1/server/config` → `suspension.default_retry_policy`). + - **Four new `EventType` constants** — `intent.suspension_renotified`, `intent.suspension_escalated`, `portfolio.member_suspended`, `portfolio.member_resumed`. + - **`retry_policy` field on `SuspensionRecord`** — Optional; additive; existing single-attempt behaviour preserved when absent. + - **RFC-0026 protocol document** — `docs/rfcs/0026-suspension-container-interaction.md` with five container rules, `HumanRetryPolicy` schema, three-level cascade, coordinator policy extension, RFC-0010 relationship note, end-to-end example, and cross-RFC patch summary. + - **Cross-RFC patches** — RFC-0002 (aggregate `suspended_awaiting_input` status counter and completion-gate clarification), RFC-0007 (portfolio suspension-aware aggregate and events), RFC-0010 (RFC-0026 relationship note), RFC-0012 (task/intent suspension mirror, `suspended_tasks` plan progress field), RFC-0024 (`UpstreamIntentSuspendedError`, `suspended_phases` workflow progress field), RFC-0025 (`retry_policy` on `SuspensionRecord`, per-attempt `timeout_seconds` semantics, `fallback_policy` alias note, extended cross-RFC table). + - **41 new tests** — `tests/test_hitl.py` (`HumanRetryPolicy`, `EscalationStep`, `SuspensionRecord.retry_policy`, new `EventType` constants, `request_input` signature, `BaseAgent.default_human_retry_policy`, package exports) and `tests/test_workflow_io.py` (`UpstreamIntentSuspendedError` construction, attributes, hierarchy, package export). + - **Package exports** — `HumanRetryPolicy`, `EscalationStep`, `UpstreamIntentSuspendedError` exported from `openintent` top-level. + +### Updated + +- `SuspensionRecord.fallback_policy` documented as alias for `retry_policy.final_fallback_policy` when `retry_policy` is set. +- `SuspensionRecord.timeout_seconds` documented as per-attempt window when `retry_policy` is set. +- `BaseAgent.request_input()` docstring updated to describe `retry_policy` and `default_human_retry_policy`. +- All version references updated to 0.17.0 across Python SDK, MCP server, documentation, and changelog. + +--- + ## [0.16.0] - 2026-03-23 ### Added diff --git a/docs/changelog.md b/docs/changelog.md index 93f90c1..84e85ae 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -5,6 +5,82 @@ All notable changes to the OpenIntent SDK will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.17.0] - 2026-03-24 + +### Added + +- **RFC-0026: Suspension Propagation & Retry** — Closes three gaps left by RFC-0025. + + - **Container propagation rules (5 normative rules)** — Intent graph: suspended child blocks dependents; parent aggregate status becomes `suspended_awaiting_input` when any child is suspended. Portfolio: aggregate gains `has_suspended_members`/`suspended_member_count`; `portfolio.member_suspended` / `portfolio.member_resumed` events. Plan/Task: bidirectional mirror — when an intent suspends, its task transitions to `blocked` with `blocked_reason: "intent_suspended"` and `suspended_intent_id`; plan progress gains `suspended_tasks`. Workflow: downstream phases receive `UpstreamIntentSuspendedError` at claim time; workflow progress gains `suspended_phases`. Deadline: suspension deadline governs expiry and MUST NOT exceed portfolio `due_before` constraint. + - **`HumanRetryPolicy` dataclass** — Re-notification and escalation policy with `max_attempts`, `interval_seconds`, `strategy` ("fixed"/"linear"/"exponential"), `escalation_ladder` (list of `EscalationStep`), and `final_fallback_policy`. Serialises to/from dict via `to_dict()` / `from_dict()`. + - **`EscalationStep` dataclass** — A single escalation step: `after_attempt`, `channel`, `notify`. Triggers `intent.suspension_escalated` event. + - **`UpstreamIntentSuspendedError`** — Raised by `WorkflowSpec.validate_claim_inputs()` when a declared input references an upstream phase whose intent is currently `suspended_awaiting_input`. Carries `task_id`, `phase_name`, `suspended_intent_id`, `expected_resume_at`. Subclass of `WorkflowError`. + - **`BaseAgent.default_human_retry_policy`** — Class-level or instance-level attribute (`None` by default). Applied to all `request_input()` calls that do not supply an explicit `retry_policy` argument. + - **`request_input(retry_policy=…)` parameter** — Accepts a `HumanRetryPolicy`. When supplied, the SDK re-fires `@on_input_requested` hooks on each attempt, emits `intent.suspension_renotified` per attempt, emits `intent.suspension_escalated` for escalation steps, and applies `final_fallback_policy` after all attempts are exhausted. + - **Three-level policy cascade** — call-site policy → `default_human_retry_policy` on the agent → server-configured platform default. + - **Four new `EventType` constants** — `intent.suspension_renotified`, `intent.suspension_escalated`, `portfolio.member_suspended`, `portfolio.member_resumed`. + - **`retry_policy` field on `SuspensionRecord`** — Optional; additive; existing single-attempt behaviour preserved when absent. + - **RFC-0026 protocol document** — `docs/rfcs/0026-suspension-container-interaction.md` with five container rules, HumanRetryPolicy schema, three-level cascade, coordinator policy, RFC-0010 relationship, end-to-end example, and cross-RFC patch summary. + - **Cross-RFC patches** — RFC-0002 (aggregate `suspended_awaiting_input` status), RFC-0007 (portfolio suspension-aware aggregate), RFC-0010 (RFC-0026 relationship note), RFC-0012 (task/intent suspension mirror, `suspended_tasks`), RFC-0024 (`UpstreamIntentSuspendedError`, `suspended_phases`), RFC-0025 (`retry_policy` on `SuspensionRecord`, `timeout_seconds` per-attempt semantics, `fallback_policy` alias note, Cross-RFC table). + - **41 new tests** — `tests/test_hitl.py` (HumanRetryPolicy, EscalationStep, SuspensionRecord.retry_policy, EventType, request_input signature, BaseAgent.default_human_retry_policy, package exports) and `tests/test_workflow_io.py` (UpstreamIntentSuspendedError construction, attributes, hierarchy, package export). + - **Package exports** — `HumanRetryPolicy`, `EscalationStep`, `UpstreamIntentSuspendedError` exported from `openintent` top-level. + +### Updated + +- `SuspensionRecord.fallback_policy` documented as alias for `retry_policy.final_fallback_policy` when retry_policy is set. +- `SuspensionRecord.timeout_seconds` documented as per-attempt window when `retry_policy` is set. +- `BaseAgent.request_input()` docstring updated to describe retry_policy and default_human_retry_policy. + +--- + +## [0.16.0] - 2026-03-23 + +### Added + +- **RFC-0024: Workflow I/O Contracts** — Typed input/output contracts at the task and phase level, with executor-owned wiring (resolves RFC-0012 Open Question #4). + + - **`outputs` schema on `PhaseConfig`** — Each phase can declare `outputs` as a mapping from key name to type (`string`, `number`, `boolean`, `object`, `array`, a named type from the `types` block, or an inline `{type, required}` dict for optional fields). Legacy list-of-strings form is normalised automatically. + - **`inputs` wiring on `PhaseConfig`** — Each phase can declare `inputs` as a mapping from local key name to a mapping expression: `phase_name.key`, `$trigger.key`, or `$initial_state.key`. The executor resolves these before invoking the agent handler and places the resolved dict in `ctx.input`. + - **Parse-time validation** — `WorkflowSpec._validate_io_wiring()` is called on every `from_yaml()` / `from_string()`. Checks that every `phase_name.key` reference names a phase in `depends_on`, that the phase exists, and that the key appears in the upstream phase's declared outputs (if it has any). Raises `InputWiringError` on failure. + - **`WorkflowSpec.resolve_task_inputs()`** — Executor pre-handoff step. Pre-populates `ctx.input` from upstream phase outputs, trigger payload, or initial state. Raises `UnresolvableInputError` if any declared input cannot be resolved. + - **`WorkflowSpec.validate_claim_inputs()`** — Executor claim-time gate. Rejects a task claim early if declared inputs are not yet resolvable from completed upstream phases. + - **`WorkflowSpec.validate_task_outputs()`** — Executor completion-time gate. Validates the agent's return dict against declared `outputs`. Raises `MissingOutputError` for absent required keys; raises `OutputTypeMismatchError` for type mismatches. Supports primitive types, named struct types (top-level key presence), and enum types (`{enum: [...]}`). + - **`MissingOutputError`** — Raised when one or more required output keys are absent. Carries `task_id`, `phase_name`, `missing_keys`. + - **`OutputTypeMismatchError`** — Raised when an output value does not match its declared type. Carries `task_id`, `phase_name`, `key`, `expected_type`, `actual_type`. + - **`UnresolvableInputError`** — Raised when one or more declared inputs cannot be resolved from available upstream outputs. Carries `task_id`, `phase_name`, `unresolvable_refs`. + - **`InputWiringError`** — Raised at parse time when an input mapping expression is structurally invalid. Subclass of `WorkflowValidationError`. Carries `phase_name`, `invalid_refs`, `suggestion`. + - **`types` block in YAML** — Top-level `types:` map defines named struct and enum types reusable across output schemas. Persisted into `intent.state._io_types` so agent-side validation works without a `WorkflowSpec` reference at runtime. + - **Incremental adoption** — Phases without `outputs` or `inputs` are fully unaffected. Parse-time and runtime validation only applies to phases that declare contracts. + - **Package exports** — `MissingOutputError`, `OutputTypeMismatchError`, `UnresolvableInputError`, `InputWiringError` all exported from `openintent` top-level. + - **RFC-0024 protocol document** — `docs/rfcs/0024-workflow-io-contracts.md` covering output schema declaration, input wiring, executor semantics, named error types, `TaskContext` API, parse-time validation, incremental adoption, and a complete example. + +- **RFC-0025: Human-in-the-Loop Intent Suspension** — First-class protocol primitive for suspending an intent mid-execution and awaiting operator input before proceeding. + + - **`suspended_awaiting_input` lifecycle state** — New `IntentStatus` value. Reaper and lease-expiry workers skip intents in this state; lease renewal succeeds for suspended intents so agents retain ownership across the suspension period. + - **Four new `EventType` constants** — `intent.suspended`, `intent.resumed`, `intent.suspension_expired`, `engagement.decision`. All events are stored in the intent event log and emitted via the SSE bus. + - **`ResponseType` enum** — `choice`, `confirm`, `text`, `form` — specifies what kind of response is expected from the operator. Agents set this when calling `request_input()`. + - **`SuspensionChoice` dataclass** — A single operator-facing option with `value` (machine-readable), `label` (human-readable), optional `description`, `style` hint (`"primary"` / `"danger"` / `"default"`), and arbitrary `metadata`. Channels render these as buttons, dropdowns, or radio options. + - **`SuspensionRecord` dataclass** — Captures the full context of a suspension: question, `response_type`, list of `SuspensionChoice` objects, structured context, channel hint, timeout, fallback policy, confidence at suspension time, and response metadata. Stored in `intent.state._suspension`. Includes `valid_values()` helper that returns the allowed values for `choice`/`confirm` types. + - **`EngagementSignals` dataclass** — Carries `confidence`, `risk`, and `reversibility` scores (all float [0, 1]) used by the engagement-decision engine. + - **`EngagementDecision` dataclass** — Output of `should_request_input()`. Has `mode` (one of `autonomous`, `request_input`, `require_input`, `defer`), `should_ask` bool, `rationale` string, and embedded `EngagementSignals`. + - **`InputResponse` dataclass** — Represents an operator's response: `suspension_id`, `value`, `responded_by`, `responded_at`, and optional `metadata`. + - **`InputTimeoutError` exception** — Raised when `fallback_policy="fail"` and the suspension expires. Carries `suspension_id` and `fallback_policy` attributes. + - **`InputCancelledError` exception** — Raised when a suspension is explicitly cancelled. Carries `suspension_id`. + - **`BaseAgent.request_input()`** — Suspends the intent, fires `@on_input_requested` hooks, polls `intent.state._suspension.resolution` every 2 seconds, and returns the operator's response value. Accepts `response_type` and `choices` parameters; for `confirm` type auto-populates yes/no choices if none are supplied. Also supports `timeout_seconds`, `fallback_policy`, `fallback_value`, `channel_hint`, and `confidence`. + - **`BaseAgent.should_request_input()`** — Implements the default rule-based engagement-decision logic (high-confidence/low-risk → autonomous; moderate uncertainty → request_input; low confidence or high risk → require_input; extreme risk/irreversibility → defer). Emits `engagement.decision` event and fires `@on_engagement_decision` hooks. + - **Four new lifecycle decorators** — `@on_input_requested`, `@on_input_received`, `@on_suspension_expired`, `@on_engagement_decision`. Auto-discovered by `_discover_handlers()` and routed via `_on_generic_event()`. + - **`POST /api/v1/intents/{id}/suspend/respond`** — REST endpoint for operators (or bots) to submit a response. Validates `suspension_id` matches the active suspension, validates response value against defined choices (for `choice`/`confirm` types — returns 422 with `valid_choices` on mismatch), patches `state._suspension` with the response, transitions the intent to `active`, emits `intent.resumed`, and broadcasts via SSE. Response includes `choice_label` and `choice_description` for matched choices. + - **RFC-0025 protocol document** — `docs/rfcs/0025-human-in-the-loop.md` covering lifecycle state, event types, SuspensionRecord schema, fallback policies, engagement decision modes, REST endpoint, and security considerations. + - **HITL user guide** — `docs/guide/human-in-the-loop.md` with quick-start, `request_input()` reference, fallback policies, engagement decisions, decorator reference, operator response example, exception reference, and a full refund-agent example. + - **62 new tests** — `tests/test_hitl.py` covers all new models, exceptions, decorators, engagement-decision modes, and the suspend/respond endpoint. + +### Updated + +- `mkdocs.yml` — RFC-0024 and RFC-0025 entries added to the RFCs nav; Human-in-the-Loop guide added to the User Guide nav; announcement bar updated to v0.16.0. +- All version references updated to 0.16.0 across Python SDK and changelog. + +--- + ## [0.15.1] - 2026-03-06 ### Changed @@ -26,13 +102,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- **RFC-0010 Retry Policy MCP Tools** — 4 new MCP tools for retry policy management and failure tracking: - - `set_retry_policy` — Set or update retry policy on an intent (admin tier). - - `get_retry_policy` — Retrieve the current retry policy for an intent (reader tier). - - `record_failure` — Record a failure event against an intent for retry tracking (operator tier). - - `get_failures` — List recorded failures for an intent (reader tier). -- **`build_retry_failure_tools()`** — New helper in the Python MCP bridge (`openintent.mcp`) that builds the 4 retry/failure tool definitions for use with `MCPToolProvider` and `MCPToolExporter`. -- **MCP Tool Surface Expansion** — MCP tool surface expanded from 66 to 70 tools; RBAC counts: reader=25, operator=43, admin=70. +- **RFC-0010 Retry Policy MCP Tools** — 4 new MCP tools: `set_retry_policy` (admin), `get_retry_policy` (read), `record_failure` (write), `get_failures` (read). MCP tool surface expanded from 66 to 70 tools; RBAC counts: reader=25, operator=43, admin=70. +- **`build_retry_failure_tools()`** — New helper in the Python MCP bridge (`openintent.mcp`) that constructs retry policy and failure-tracking tool definitions for MCP integration, enabling agents to manage retry policies and record/query failures through MCP. ### Changed @@ -243,29 +314,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- **RFC-0021: Agent-to-Agent Messaging** — Structured channels for direct agent-to-agent communication within intent scope. - - `Channel`, `ChannelMessage`, `MessageType`, `ChannelStatus`, `MemberPolicy`, `MessageStatus` data models. - - 11 server endpoints (10 REST + 1 SSE) under `/api/v1/intents/{id}/channels/`. - - `@on_message` lifecycle decorator for reactive message handling. - - `_ChannelsProxy` / `_ChannelHandle` agent abstractions with `ask()`, `notify()`, `broadcast()`. -- **YAML `channels:` block** — Declarative channel definitions in workflow specifications. +- **Declarative Messaging (RFC-0021)** + - YAML `channels:` block for declarative agent-to-agent messaging configuration. + - `@on_message` decorator for zero-boilerplate, reactive message handling with auto-reply. + - Channel proxy (`self.channels`) with `ask()`, `notify()`, and `broadcast()` convenience methods. + - Three messaging patterns: request/response, fire-and-forget, and broadcast. --- -## [0.10.1] - 2026-02-12 +## [0.10.1] - 2026-02-13 ### Added -- **Tool Execution Adapters** — Pluggable adapter system for real external API execution through `POST /api/v1/tools/invoke`. Three built-in adapters: `RestToolAdapter` (API key, Bearer, Basic Auth), `OAuth2ToolAdapter` (automatic token refresh on 401), `WebhookToolAdapter` (HMAC-SHA256 signed dispatch). -- **Adapter Registry** — Resolves adapters from credential metadata via explicit `adapter` key, `auth_type` mapping, or placeholder fallback. -- **Security Controls** — URL validation (blocks private IPs, metadata endpoints, non-HTTP schemes), timeout bounds (1–120s), response size limits (1 MB), secret sanitization, request fingerprinting, redirect blocking. -- **Custom Adapter Registration** — `register_adapter(name, adapter)` for non-standard protocols. -- **OAuth2 Integration Guide** — Documentation for integrating OAuth2 services: platform handles authorization code flow, stores tokens in vault, SDK manages refresh and execution. Templates for Salesforce, Google APIs, Microsoft Graph, HubSpot. +- **Tool Execution Adapters** — Pluggable adapter system for real external API execution through the Tool Proxy (`POST /api/v1/tools/invoke`). Three built-in adapters: + - `RestToolAdapter` — API key (header/query), Bearer token, and Basic Auth for standard REST APIs. + - `OAuth2ToolAdapter` — OAuth2 with automatic token refresh on 401 responses using `refresh_token` + `token_url`. + - `WebhookToolAdapter` — HMAC-SHA256 signed dispatch for webhook receivers. +- **Adapter Registry** — `AdapterRegistry` resolves the correct adapter from credential metadata: explicit `adapter` key, `auth_type` mapping, or placeholder fallback for backward compatibility. +- **Security Controls** — All outbound tool execution requests enforce: + - URL validation blocking private IPs (RFC-1918, loopback, link-local), cloud metadata endpoints (`169.254.169.254`), and non-HTTP schemes. + - Timeout bounds clamped to 1–120 seconds (default 30s). + - Response size limit of 1 MB. + - Secret sanitization replacing API keys, tokens, and passwords with `[REDACTED]` in all outputs. + - Request fingerprinting via SHA-256 hash stored per invocation for audit correlation. + - HTTP redirect blocking to prevent SSRF via redirect chains. +- **Custom Adapter Registration** — `register_adapter(name, adapter)` to add adapters for services with non-standard protocols (e.g., GraphQL). +- **OAuth2 Integration Guide** — Comprehensive documentation for integrating OAuth2 services: platform handles the authorization code flow, stores tokens in the vault, SDK manages refresh and execution. Includes ready-to-use metadata templates for Salesforce, Google APIs, Microsoft Graph, and HubSpot. ### Changed -- Credential `metadata` supports execution config (`base_url`, `endpoints`, `auth`) for real API calls. Backward compatible — credentials without execution config return placeholder responses. -- 57 new tests covering security utilities, all three adapters, and the registry. +- Credential `metadata` field now supports execution config (`base_url`, `endpoints`, `auth`) to enable real API calls. Credentials without execution config continue to return placeholder responses (backward compatible). +- 57 new tests covering security utilities, all three adapters, and the adapter registry. - Documentation updated across guide, RFC-0014, examples, API reference, and website. --- @@ -274,20 +353,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- **RFC-0018: Cryptographic Agent Identity** — Ed25519 key pairs, `did:key` decentralized identifiers, challenge-response registration, signed events with non-repudiation, key rotation, and portable identity across servers. -- **RFC-0019: Verifiable Event Logs** — SHA-256 hash chains linking every event to its predecessor, Merkle tree checkpoints with compact inclusion proofs, consistency verification between checkpoints, and optional external timestamp anchoring. -- **RFC-0020: Distributed Tracing** — `trace_id` and `parent_event_id` fields on IntentEvent, `TracingContext` dataclass for automatic propagation through agent-tool-agent call chains, W3C-aligned 128-bit trace identifiers. -- **`@Identity` decorator** — Declarative cryptographic identity with `auto_sign=True` and `auto_register=True`. -- **`TracingContext`** — New dataclass with `new_root()`, `child()`, `to_dict()`, `from_dict()` for trace propagation. -- **11 new client methods** — `register_identity()`, `complete_identity_challenge()`, `verify_signature()`, `rotate_key()`, `get_agent_keys()`, `revoke_key()`, `resolve_did()`, `verify_event_chain()`, `list_checkpoints()`, `get_merkle_proof()`, `verify_consistency()`. -- **13 new server endpoint stubs** — Identity key management, challenge-response, DID resolution, hash chain verification, checkpoint management, Merkle proofs, consistency verification. -- **Automatic tracing in `_emit_tool_event`** — Tool invocation events include `trace_id` and `parent_event_id` from the agent's active `TracingContext`. -- **Tracing injection in `_execute_tool`** — Tool handlers that accept a `tracing` keyword argument receive the current `TracingContext` automatically. +- **RFC-0018: Cryptographic Agent Identity** + - `AgentIdentity`, `IdentityChallenge`, `IdentityVerification` data models for key-based agent identity. + - Ed25519 key pairs with `did:key:z6Mk...` decentralized identifiers. + - Challenge-response identity registration via `register_identity()` and `complete_identity_challenge()`. + - Key rotation with `rotate_key()` preserving previous key history. + - Signature verification with `verify_signature()`. + - `@Identity` decorator for zero-boilerplate identity setup on agents. + - `@on_identity_registered` lifecycle hook. + - `IdentityConfig` for YAML workflow configuration. + - `AgentRecord` extended with `public_key`, `did`, `key_algorithm`, `key_registered_at`, `key_expires_at`, `previous_keys` fields (all optional for backward compatibility). + - 5 new server endpoints: `POST /api/v1/agents/{id}/identity`, `POST .../identity/challenge`, `GET .../identity`, `POST .../identity/verify`, `POST .../identity/rotate`. + +- **RFC-0019: Verifiable Event Logs** + - `LogCheckpoint`, `MerkleProof`, `MerkleProofEntry`, `ChainVerification`, `ConsistencyProof`, `TimestampAnchor` data models. + - SHA-256 hash chains linking each event to its predecessor. + - Merkle tree checkpoints with `MerkleProof.verify()` for client-side inclusion verification. + - `verify_event_chain()` to validate an intent's full hash chain integrity. + - `list_checkpoints()`, `get_checkpoint()`, `get_merkle_proof()`, `verify_consistency()` client methods. + - `VerificationConfig` for YAML workflow configuration. + - `IntentEvent` extended with optional `proof`, `event_hash`, `previous_event_hash`, `sequence` fields. + - 8 new server endpoints for checkpoints, Merkle proofs, chain verification, and optional external anchoring. + +- **RFC-0020: Distributed Tracing** + - `TracingContext` dataclass for propagating trace state through agent → tool → agent call chains. + - `IntentEvent` extended with optional `trace_id` (128-bit hex) and `parent_event_id` fields. + - `log_event()` on both sync and async clients accepts `trace_id` and `parent_event_id` parameters. + - `_emit_tool_event()` automatically includes tracing context in tool invocation events. + - `_execute_tool()` passes `tracing` keyword argument to local tool handlers that accept it. + - `TracingContext.new_root()` generates fresh 128-bit trace IDs (W3C-aligned format). + - `TracingContext.child()` creates child contexts with updated parent references. + - Cross-intent tracing via `trace_id` in event payloads. + +- **Sync & Async Client Parity** — All new RFC-0018/0019/0020 methods available on both `OpenIntentClient` and `AsyncOpenIntentClient`. ### Changed -- All documentation, READMEs, and examples updated from 17 to 20 RFCs. -- `log_event()` on both sync and async clients now accepts optional `trace_id` and `parent_event_id` parameters. +- Version bumped to 0.10.0. +- `__all__` exports updated with all new public symbols including `TracingContext`. - 690+ tests passing across all 20 RFCs (104 model tests + 26 server tests for RFC-0018/0019/0020). --- @@ -296,11 +399,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- **Streaming token usage capture** — All 7 LLM provider adapters (OpenAI, DeepSeek, Gemini, Anthropic, Azure OpenAI, OpenRouter, Grok) now capture actual `prompt_tokens`, `completion_tokens`, and `total_tokens` during streaming responses. -- **OpenAI-compatible adapters** — OpenAI, DeepSeek, Azure OpenAI, OpenRouter, and Grok adapters inject `stream_options={"include_usage": True}` to receive usage data in the final stream chunk. -- **Gemini adapter** — Captures `usage_metadata` from stream chunks and maps to standard token count fields. -- **Anthropic adapter** — Extracts usage from the stream's internal message snapshot automatically. -- **`tokens_streamed` field** — Reports actual completion token counts, falling back to character count only when unavailable. +- **Streaming token usage capture** — All 7 LLM provider adapters (OpenAI, DeepSeek, Gemini, Anthropic, Azure OpenAI, OpenRouter, Grok) now capture actual `prompt_tokens`, `completion_tokens`, and `total_tokens` during streaming responses. Previously, `tokens_streamed` used character count instead of real token counts. +- **OpenAI-compatible adapters** — OpenAI, DeepSeek, Azure OpenAI, OpenRouter, and Grok adapters now inject `stream_options={"include_usage": True}` to receive usage data in the final stream chunk. Token counts are extracted and passed through to `complete_stream()` and `log_llm_request_completed()`. +- **Gemini adapter** — Captures `usage_metadata` from stream chunks (`prompt_token_count`, `candidates_token_count`, `total_token_count`) and maps to standard fields. +- **Anthropic adapter** — Extracts usage from the stream's internal message snapshot in `__exit__`, removing the need for a manual `get_final_message()` call. +- **`tokens_streamed` field** — Now reports actual completion token counts from provider APIs, falling back to character count only when usage data is unavailable. --- @@ -309,27 +412,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - **Server-Side Tool Invocation** — `POST /api/v1/tools/invoke` endpoint enables agents to invoke tools through the server proxy without ever accessing raw credentials. The server resolves the appropriate grant, injects credentials from the vault, enforces rate limits, and records the invocation for audit. -- **3-Tier Grant Resolution** — Tool invocations are matched to grants using a three-tier resolution strategy: (1) `grant.scopes` contains the tool name, (2) `grant.context["tools"]` contains the tool name, (3) `credential.service` matches the tool name. +- **3-Tier Grant Resolution** — Tool invocations are matched to grants using a three-tier resolution strategy: (1) `grant.scopes` contains the tool name, (2) `grant.context["tools"]` contains the tool name, (3) `credential.service` matches the tool name. This resolves the common mismatch where tool names differ from credential service names. - **Client `invoke_tool()` Methods** — `OpenIntentClient.invoke_tool(tool_name, agent_id, parameters)` (sync) and `AsyncOpenIntentClient.invoke_tool(tool_name, agent_id, parameters)` (async) for programmatic server-side tool invocation. -- **Agent `self.tools.invoke()` via Server Proxy** — `_ToolsProxy` on agents delegates string tool names to `client.invoke_tool()`, completing the server-side invocation chain. -- **Invocation Audit Trail** — Every server-side tool invocation is recorded with agent ID, tool name, parameters, result, duration, and timestamp. +- **Agent `self.tools.invoke()` via Server Proxy** — `_ToolsProxy` on agents delegates string tool names to `client.invoke_tool()`, completing the chain: `self.tools.invoke("web_search", {...})` → server resolves grant → injects credentials → executes → returns result. +- **Invocation Audit Trail** — Every server-side tool invocation is recorded with agent ID, tool name, parameters, result, duration, and timestamp for compliance and debugging. -- **`@on_handoff` Decorator** — Lifecycle hook for delegated assignments. Handler receives intent and delegating agent's ID. -- **`@on_retry` Decorator** — Lifecycle hook for retry assignments (RFC-0010). Handler receives intent, attempt number, and last error. -- **`@input_guardrail` / `@output_guardrail` Decorators** — Validation pipeline: input guardrails reject before processing, output guardrails validate before commit. Raise `GuardrailError` to reject. -- **Built-in Coordinator Guardrails** — `guardrails=` on `@Coordinator` is now active: `"require_approval"`, `"budget_limit"`, `"agent_allowlist"`. +- **`@on_handoff` Decorator** — Lifecycle hook that fires when an agent receives work delegated from another agent. The handler receives the intent and the delegating agent's ID, allowing context-aware handoff processing distinct from fresh assignments. +- **`@on_retry` Decorator** — Lifecycle hook that fires when an intent is reassigned after a previous failure (RFC-0010). The handler receives the intent, attempt number, and last error, allowing agents to adapt retry strategy. +- **`@input_guardrail` / `@output_guardrail` Decorators** — Validation pipeline for agent processing. Input guardrails run before `@on_assignment` handlers and can reject intents. Output guardrails validate handler results before they are committed to state. Both support `GuardrailError` for rejection. +- **Built-in Coordinator Guardrails** — The `guardrails=` parameter on `@Coordinator` is now active. Supported policies: `"require_approval"` (logs decision records before assignment), `"budget_limit"` (rejects intents exceeding cost constraints), `"agent_allowlist"` (rejects delegation to agents outside the managed list). ### Fixed -- **`_ToolsProxy` duplicate class** — Removed duplicate `_ToolsProxy` definition that caused agent tool proxy to silently fail. -- **Dead proxy code** — Removed shadowed `_MemoryProxy` and `_TasksProxy` duplicate definitions. -- **Grant matching for mismatched tool/service names** — `find_agent_grant_for_tool()` now correctly resolves grants where tool name differs from credential service name. -- **Inert `guardrails=` parameter** — `guardrails=` on `@Coordinator` was accepted but unused. Now wires into guardrail pipeline. +- **`_ToolsProxy` duplicate class** — Removed duplicate `_ToolsProxy` definition that caused agent tool proxy to silently fail. Single definition at top of `agents.py`, constructor takes `agent` only. +- **Dead proxy code** — Removed shadowed `_MemoryProxy` and `_TasksProxy` definitions (originally at lines 47-79, shadowed by full implementations later in the file). +- **Grant matching for mismatched tool/service names** — `find_agent_grant_for_tool()` in Database class now correctly resolves grants where the tool name (e.g. `"web_search"`) differs from the credential service name (e.g. `"serpapi"`). +- **Inert `guardrails=` parameter** — The `guardrails=` parameter on `@Coordinator` was accepted but completely unused. Now wires into the input/output guardrail pipeline. ### Changed -- Tool execution priority enforced: protocol tools > local `ToolDef` handlers > remote RFC-0014 server grants. -- 556+ tests passing across all 17 RFCs. +- Tool execution priority clarified and enforced: protocol tools (remember, recall, clarify, escalate, update_status) > local `ToolDef` handlers > remote RFC-0014 server grants. +- 556+ tests passing across test_llm, test_agents, test_server suites. --- @@ -337,18 +440,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- **Tool → ToolDef rename** — `Tool` is now `ToolDef`, `@tool` is now `@define_tool` for clarity. The old names remain as backwards-compatible aliases. -- **Type annotations** — `llm.py` fully type-annotated, passes mypy strict mode. +- **Tool → ToolDef rename** — `Tool` is now `ToolDef`, `@tool` is now `@define_tool` for clarity. The old names remain as backwards-compatible aliases and will not be removed. +- **Type annotations** — `llm.py` is now fully type-annotated and passes mypy strict mode (previously suppressed via `# mypy: disable-error-code`). ### Added -- **LLM-Powered Agents** — `model=` on `@Agent`/`@Coordinator` for agentic tool loops with `self.think()`, `self.think_stream()`, `self.reset_conversation()`, and protocol-native tools. -- **Custom Tools with ToolDef** — `ToolDef(name, description, parameters, handler)` and `@define_tool` decorator. -- **Automatic Tool Tracing** — Local `ToolDef` invocations emit `tool_invocation` protocol events (best-effort, never blocks). +- **LLM-Powered Agents** — Add `model=` to `@Agent` or `@Coordinator` to enable agentic tool loops with `self.think(prompt)`, streaming via `self.think_stream(prompt)`, conversation reset via `self.reset_conversation()`, and protocol-native tools (remember, recall, clarify, escalate, update_status, delegate, record_decision, create_plan). +- **Custom Tools with ToolDef** — `ToolDef(name, description, parameters, handler)` for rich tool definitions with local execution, and `@define_tool(description=, parameters=)` decorator to turn functions into `ToolDef` objects. +- **Mixed tool sources** — `tools=` on `@Agent`/`@Coordinator` accepts both `ToolDef` objects (local handlers with rich schemas) and plain strings (RFC-0014 protocol grants). +- **Automatic Tool Tracing** — Every local `ToolDef` handler invocation is automatically traced as a `tool_invocation` protocol event when the agent is connected to an OpenIntent server. Each event records tool name, arguments, result, and execution duration. Tracing is best-effort and never blocks tool execution. +- **Backwards-compatible aliases** — `Tool` = `ToolDef`, `@tool` = `@define_tool`. ### Fixed -- Unified tool execution model documentation. +- Unified tool execution model documentation to clarify the three-tier priority: protocol tools > local handlers (`ToolDef`) > remote protocol grants (RFC-0014 strings). --- diff --git a/docs/overrides/home.html b/docs/overrides/home.html index 4fe5fc5..f1a0dce 100644 --- a/docs/overrides/home.html +++ b/docs/overrides/home.html @@ -24,7 +24,7 @@
-
v0.16.0 — Human-in-the-Loop Intent Suspension
+
v0.17.0 — Suspension Propagation & Retry (RFC-0026)

Stop Duct-Taping Your Agents Together

OpenIntent is a durable, auditable protocol for multi-agent coordination. Structured intents replace fragile chat chains. Versioned state replaces guesswork. Ship agent systems that actually work in production. @@ -38,7 +38,7 @@

Stop Duct-Taping Your Agents Together

-
25
+
26
RFCs
@@ -50,7 +50,7 @@

Stop Duct-Taping Your Agents Together

Tests
-
v0.16.0
+
v0.17.0
Latest
@@ -97,7 +97,7 @@

Core Capabilities

Built-in Server
-

FastAPI server implementing all 25 RFCs. SQLite or PostgreSQL. One command to start.

+

FastAPI server implementing all 26 RFCs. SQLite or PostgreSQL. One command to start.

Learn more
@@ -300,7 +300,7 @@

Protocol Architecture

Complete RFC Coverage

-

25 RFCs implemented. From intents to cross-server federation, every primitive you need.

+

26 RFCs implemented. From intents to cross-server federation, every primitive you need.

@@ -332,6 +332,7 @@

Complete RFC Coverage

+
0023Federation SecurityProposed
0024Workflow I/O ContractsProposed
0025Human-in-the-Loop Intent SuspensionProposed
0026Suspension Container Interaction & Human RetryProposed
diff --git a/docs/rfcs/0002-intent-graphs.md b/docs/rfcs/0002-intent-graphs.md index 32ddb23..aae8818 100644 --- a/docs/rfcs/0002-intent-graphs.md +++ b/docs/rfcs/0002-intent-graphs.md @@ -71,15 +71,16 @@ Dependencies affect status transitions: - **Blocked by dependencies:** An intent with incomplete dependencies is automatically `blocked` - **Auto-unblock:** When all dependencies complete, intent transitions from `blocked` to `active` -- **Completion gate:** Cannot transition to `completed` until all dependencies are `completed` +- **Completion gate:** Cannot transition to `completed` until all dependencies are `completed`. `suspended_awaiting_input` (RFC-0026) does **not** satisfy this gate. - **Parent completion:** Parent intent cannot complete until all children complete - **Cascade abandonment:** Abandoning a parent MAY cascade to children (configurable) +- **Upstream suspension (RFC-0026):** When an active dependency transitions to `suspended_awaiting_input`, any dependent that is `active` transitions to `blocked`. Auto-unblock fires when the dependency *resumes and subsequently completes*, not on resume alone. ``` draft → active (if no unmet dependencies, else → blocked) blocked → active (when dependencies resolve) active → completed (if all dependencies + children completed) -active → blocked (if dependency becomes incomplete) +active → blocked (if dependency becomes incomplete OR upstream suspends) any → abandoned ``` @@ -94,7 +95,8 @@ Parent intents track aggregate status of their children: "by_status": { "completed": 3, "active": 2, - "blocked": 1 + "blocked": 1, + "suspended_awaiting_input": 0 }, "completion_percentage": 50, "blocking_intents": ["intent-uuid-1"], @@ -103,6 +105,8 @@ Parent intents track aggregate status of their children: } ``` +`suspended_awaiting_input` (RFC-0026) is included in `by_status` but does **not** satisfy the completion gate. A suspended child does not count toward aggregate completion. + ## Endpoints | Method | Path | Description | @@ -170,3 +174,12 @@ ready = client.get_ready_intents(parent.id) # Unblocked intents - **Progress visibility:** Aggregate status shows overall completion percentage - **Multi-agent orchestration:** Different agents can work on different branches in parallel - **Audit trail:** Parent-child relationships provide clear provenance for all work + +## Cross-RFC Interactions + +| RFC | Interaction | +|-----|------------| +| RFC-0001 (Intents) | Extends intent objects with parent_intent_id and depends_on | +| RFC-0006 (Subscriptions) | Parent intent events include aggregate status changes | +| RFC-0025 (HITL) | `suspended_awaiting_input` is a valid child status | +| RFC-0026 (Suspension Containers) | `suspended_awaiting_input` in aggregate `by_status`; `active → blocked` trigger for upstream suspension; completion gate clarification | diff --git a/docs/rfcs/0007-intent-portfolios.md b/docs/rfcs/0007-intent-portfolios.md index 0e534f6..52335d9 100644 --- a/docs/rfcs/0007-intent-portfolios.md +++ b/docs/rfcs/0007-intent-portfolios.md @@ -59,6 +59,34 @@ Portfolios serve as namespaces for organizing work: } ``` +## Suspension-Aware Aggregate Status (RFC-0026) + +When one or more portfolio members are `suspended_awaiting_input`, the portfolio GET response includes: + +```json +{ + "has_suspended_members": true, + "suspended_member_count": 1 +} +``` + +**Aggregate status algorithm:** + +| Condition | Aggregate status | +|---|---| +| All members `completed` | `completed` | +| Any member `failed` or `abandoned` | `failed` | +| Otherwise (including any suspended members) | `in_progress` | + +**New portfolio events (RFC-0026):** + +| Event | When emitted | +|---|---| +| `portfolio.member_suspended` | A member intent transitions to `suspended_awaiting_input` | +| `portfolio.member_resumed` | A suspended member intent resumes | + +**Portfolio deadline precedence:** If `governance.deadline` fires while a member intent is `suspended_awaiting_input`, the server MUST abandon that intent with `abandonment_reason: "portfolio_deadline_exceeded"`, bypassing `fallback_policy`. See RFC-0026 §2 Rule 5. + ## Cross-RFC Interactions | RFC | Interaction | @@ -69,6 +97,8 @@ Portfolios serve as namespaces for organizing work: | RFC-0009 (Costs) | Aggregate cost tracking across portfolio intents | | RFC-0012 (Planning) | Plans can scope to portfolio intents | | RFC-0013 (Coordinators) | Coordinator lease can scope to a portfolio | +| RFC-0025 (HITL) | Member intents may suspend awaiting operator input | +| RFC-0026 (Suspension Containers) | Aggregate algorithm, `has_suspended_members`, `suspended_member_count`, portfolio member events | ## Endpoints diff --git a/docs/rfcs/0010-retry-policies.md b/docs/rfcs/0010-retry-policies.md index cc3fc92..6be5580 100644 --- a/docs/rfcs/0010-retry-policies.md +++ b/docs/rfcs/0010-retry-policies.md @@ -124,3 +124,4 @@ curl -X POST http://localhost:8000/api/v1/intents/{id}/failures \ | RFC-0003 (Leasing) | Lease released on fallback; new lease acquired by fallback agent | | RFC-0009 (Costs) | Failed attempts still record costs | | RFC-0012 (Tasks) | Task-level retry policies override intent-level policies | +| RFC-0026 (Suspension Containers) | Parallel construct: RFC-0010 retries when the *agent* fails; RFC-0026 (`HumanRetryPolicy`) retries when the *human* fails to respond. The server SHOULD use the same scheduled-job infrastructure for both. | diff --git a/docs/rfcs/0012-task-decomposition-planning.md b/docs/rfcs/0012-task-decomposition-planning.md index 3a8acda..28059eb 100644 --- a/docs/rfcs/0012-task-decomposition-planning.md +++ b/docs/rfcs/0012-task-decomposition-planning.md @@ -588,6 +588,39 @@ New event types added to the intent event log: | RFC-0009 (Cost Tracking) | Task-level cost tracking. Plan aggregates costs across tasks. | | RFC-0010 (Retry Policies) | Task retry uses existing retry policy definitions. | | RFC-0011 (Access Control) | Tasks inherit permissions from intents. Task-level overrides supported. | +| RFC-0025 (HITL) | `request_input()` is the canonical mechanism for checkpoint human approval. | +| RFC-0026 (Suspension Containers) | Bidirectional task/intent suspension mirror; `blocked_reason: "intent_suspended"`; `suspended_tasks` in plan progress. | + +### 9. RFC-0026 Patch: Task/Intent Suspension Mirror + +When an intent transitions to `suspended_awaiting_input` (via `request_input()`), its corresponding plan task MUST transition to `blocked`: + +```json +{ + "state": "blocked", + "blocked_reason": "intent_suspended", + "suspended_intent_id": "" +} +``` + +On `intent.resumed`, the task transitions back to `running`. + +**Plan progress gains `suspended_tasks`:** + +```json +{ + "suspended_tasks": [ + { + "task_id": "task_01XYZ", + "intent_id": "intent_01ABC", + "suspended_since": "2026-03-24T10:00:00Z", + "expires_at": "2026-03-24T13:00:00Z" + } + ] +} +``` + +**Checkpoints as `request_input()` triggers:** RFC-0012 plan checkpoints that `require_approval: true` SHOULD be implemented by the assigned agent calling `request_input()`. This is the canonical pattern for human-in-the-loop gates within plans. The plan transitions to `paused` and the task to `blocked` (via the mirror rule above) until the operator responds. ## Open Questions diff --git a/docs/rfcs/0024-workflow-io-contracts.md b/docs/rfcs/0024-workflow-io-contracts.md index fe3ae02..edfae64 100644 --- a/docs/rfcs/0024-workflow-io-contracts.md +++ b/docs/rfcs/0024-workflow-io-contracts.md @@ -640,11 +640,52 @@ This RFC does not modify: --- +## RFC-0026 Patch: Upstream Suspension Rejection + +When an agent attempts to claim a task whose declared inputs reference an upstream phase that is currently `suspended_awaiting_input`, `validate_claim_inputs()` MUST reject with `UpstreamIntentSuspendedError`: + +```python +from openintent.workflow import UpstreamIntentSuspendedError + +try: + spec.validate_claim_inputs(phase_name, upstream_outputs, task_id=task_id) +except UpstreamIntentSuspendedError as e: + # e.suspended_intent_id — the upstream intent that is suspended + # e.expected_resume_at — ISO-8601 estimate or None + logger.info(f"Claim deferred: upstream intent {e.suspended_intent_id} is suspended") +``` + +**Workflow progress gains `suspended_phases`:** + +```json +{ + "suspended_phases": [ + { + "phase_name": "compliance_review", + "intent_id": "intent_01ABC", + "suspended_since": "2026-03-24T10:00:00Z", + "expires_at": "2026-03-24T13:00:00Z" + } + ] +} +``` + +## Cross-RFC Interactions + +| RFC | Interaction | +|-----|------------| +| RFC-0012 (Planning) | Addendum to RFC-0012; resolves Open Question #4 | +| RFC-0001 (Intents) | Intent state holds _io_inputs/_io_outputs for executor wiring | +| RFC-0004 (Portfolios) | Portfolios scope workflows | +| RFC-0025 (HITL) | Agents calling request_input() affect claim-time validation | +| RFC-0026 (Suspension Containers) | `upstream_intent_suspended` rejection reason; `suspended_phases` in workflow progress | + ## References - [RFC-0012: Task Decomposition & Planning](./0012-task-decomposition-planning.md) — parent RFC; defines Task, Plan, TaskContext - [RFC-0001: Intent Objects](./0001-intent-objects.md) — intent state model - [RFC-0004: Intent Portfolios](./0004-governance-arbitration.md) — portfolio boundaries - [RFC-0021: Agent-to-Agent Messaging](./0021-agent-to-agent-messaging.md) — channel messaging (out of scope for this RFC) +- [RFC-0026: Suspension Propagation & Retry](./0026-suspension-container-interaction.md) — upstream suspension rejection - [Temporal Activity Input/Output](https://docs.temporal.io/activities) — reference design for typed activity I/O - [Prefect Task Parameters](https://docs.prefect.io/concepts/tasks/) — reference for task input contracts diff --git a/docs/rfcs/0025-human-in-the-loop.md b/docs/rfcs/0025-human-in-the-loop.md index a44a145..731cc3f 100644 --- a/docs/rfcs/0025-human-in-the-loop.md +++ b/docs/rfcs/0025-human-in-the-loop.md @@ -98,10 +98,11 @@ A `SuspensionRecord` is created by the agent and persisted in `intent.state._sus | `context` | object | — | Structured context for the operator | | `channel_hint` | string | — | Preferred delivery channel (`"slack"`, `"email"`) | | `suspended_at` | ISO-8601 | — | When the suspension started | -| `timeout_seconds` | integer | — | Expiry window (omit for no timeout) | -| `expires_at` | ISO-8601 | — | Computed from `suspended_at + timeout_seconds` | +| `timeout_seconds` | integer | — | Per-attempt expiry window (omit for no timeout). When `retry_policy` is set, this is the per-attempt window, not the total. Total window = `interval_seconds × max_attempts`. | +| `expires_at` | ISO-8601 | — | Total deadline: `suspended_at + (interval_seconds × max_attempts)` when `retry_policy` is set, otherwise `suspended_at + timeout_seconds`. | | `fallback_value` | any | — | Value for `complete_with_fallback` policy | -| `fallback_policy` | enum | ✓ | See §6 | +| `fallback_policy` | enum | ✓ | See §6. Alias for `retry_policy.final_fallback_policy` when `retry_policy` is set. | +| `retry_policy` | HumanRetryPolicy | — | Re-notification and escalation policy (RFC-0026). When absent, single-attempt behaviour (original RFC-0025 semantics). | | `confidence_at_suspension` | float [0,1] | — | Agent confidence at suspension time | | `decision_record` | object | — | EngagementDecision that triggered suspension | | `response` | any | — | Operator's response (set on resume) | @@ -298,3 +299,18 @@ if decision.should_ask: - The `response_type` field defaults to `"choice"` — suspensions created without it behave identically to pre-0.16.0 behaviour. - All new event types, endpoint, decorators, and structured choice fields are additive. - Servers that do not implement this suspension protocol will return 404 for `POST /suspend/respond`; agents SHOULD handle this gracefully. +- RFC-0026: `retry_policy` field on `SuspensionRecord` is optional and additive. Existing `fallback_policy` field is unchanged; when `retry_policy` is absent, single-attempt behaviour is preserved. + +## Cross-RFC Interactions + +| RFC | Interaction | +|-----|------------| +| RFC-0001 (Intents) | Adds `suspended_awaiting_input` to the intent lifecycle | +| RFC-0002 (Intent Graphs) | Suspended status in aggregate counter; completion gate clarified | +| RFC-0006 (Subscriptions) | All suspension events propagate via existing subscription infrastructure | +| RFC-0007 (Portfolios) | Portfolio aggregate gains suspension-aware fields (RFC-0026) | +| RFC-0010 (Retry Policies) | Parallel construct: RFC-0010 retries agent failures; RFC-0026 retries human non-response | +| RFC-0012 (Planning) | Task blocked state mirrors intent suspension bidirectionally (RFC-0026) | +| RFC-0019 (Verifiable Logs) | Suspension events are stored in the append-only event log | +| RFC-0024 (Workflow I/O) | validate_claim_inputs() gains upstream_intent_suspended rejection (RFC-0026) | +| RFC-0026 (Suspension Containers) | Defines HumanRetryPolicy, three-level cascade, container rules, UpstreamIntentSuspendedError | diff --git a/docs/rfcs/0026-suspension-container-interaction.md b/docs/rfcs/0026-suspension-container-interaction.md new file mode 100644 index 0000000..b1e0e83 --- /dev/null +++ b/docs/rfcs/0026-suspension-container-interaction.md @@ -0,0 +1,426 @@ +# RFC-0026: Suspension Propagation & Retry v1.0 + +**Status:** Accepted +**Version:** v0.17.0 +**Date:** 2026-03-24 +**Authors:** OpenIntent Working Group +**Extends:** [RFC-0025 (Human-in-the-Loop)](./0025-human-in-the-loop.md) + +--- + +## Abstract + +RFC-0025 introduced `suspended_awaiting_input` as an intent-level lifecycle state but left three gaps: (1) how suspension interacts with container structures (intent graphs, portfolios, plans, workflows); (2) a single-shot timeout model with no re-notification or escalation ladder; and (3) no platform- or agent-level default for suspension policy. RFC-0026 closes all three gaps in a single coherent extension so the protocol has complete, end-to-end coverage of human engagement. + +--- + +## 1. Motivation + +### Gap 1 — Container semantics + +RFC-0025 defines suspension at the intent level but does not specify how containers observe it: + +- **RFC-0002 (Intent Graphs):** `aggregate_status.by_status` has no entry for `suspended_awaiting_input`. The completion gate does not explicitly say whether a suspended dependency satisfies it. No `active → blocked` trigger is defined for upstream suspension. +- **RFC-0007 (Portfolios):** The aggregate status algorithm does not enumerate suspension. The GET response has no suspension-aware fields. +- **RFC-0012 (Plans & Tasks):** The task `blocked` state was designed before RFC-0025 and has no defined relationship to `suspended_awaiting_input`. When a phase-agent calls `request_input()`, the plan task does not transition to `blocked`. The coordinator sees the task as still running. +- **RFC-0024 (Workflows):** `validate_claim_inputs()` has no rejection reason for upstream suspension. The workflow progress object has no `suspended_phases` field. + +### Gap 2 — Human retry / re-notification + +RFC-0025 timeout model is single-shot: one window, then fallback policy fires. A missed Slack notification should not immediately trigger `complete_with_fallback="deny"`. Systems need grace — notify once, re-notify, escalate, then fail. + +### Gap 3 — Platform and agent-level defaults + +Every `request_input()` call must specify its own policy from scratch. There is no platform-level constant and no agent-level default. + +--- + +## 2. Container Rules (Five, Non-Negotiable) + +### Rule 1 — Suspension is always intent-local + +Only the suspended intent changes to `suspended_awaiting_input`. Container structures (parent intents, portfolios, plans, workflows) observe it; they never absorb it into their own state. A portfolio does not become suspended because a member is suspended. + +### Rule 2 — Suspended intent is "not completed"; dependents stay blocked + +`suspended_awaiting_input` does **NOT** satisfy the RFC-0002 completion gate. A dependent intent that is `active` and whose upstream suspends MUST transition to `blocked` (new `active → blocked` trigger). Auto-unblock fires when the dependency *resumes and subsequently completes*, not on resume alone. + +### Rule 3 — RFC-0012 tasks mirror intent suspension bidirectionally + +When an intent transitions to `suspended_awaiting_input`, its corresponding plan task MUST transition to `blocked` with: + +```json +{ + "blocked_reason": "intent_suspended", + "suspended_intent_id": "" +} +``` + +On `intent.resumed`, the task transitions back to `running`. RFC-0012 checkpoints that require human approval SHOULD be implemented via RFC-0025 `request_input()` — this is the canonical pattern going forward. + +### Rule 4 — Container aggregates gain suspension-aware fields + +**RFC-0002 parent intents:** + +```json +{ + "aggregate_status": { + "total": 6, + "by_status": { + "completed": 3, + "active": 2, + "blocked": 0, + "suspended_awaiting_input": 1 + } + } +} +``` + +**RFC-0007 portfolios:** + +Portfolio GET response adds two fields: + +```json +{ + "has_suspended_members": true, + "suspended_member_count": 1 +} +``` + +Aggregate status algorithm (revised): + +| Condition | Aggregate status | +|---|---| +| All members `completed` | `completed` | +| Any member `failed` or `abandoned` | `failed` | +| Otherwise (including any suspended) | `in_progress` | + +**RFC-0012 plans:** + +Plan progress object gains: + +```json +{ + "suspended_tasks": [ + { + "task_id": "task_01XYZ", + "intent_id": "intent_01ABC", + "suspended_since": "2026-03-24T10:00:00Z", + "expires_at": "2026-03-24T13:00:00Z" + } + ] +} +``` + +**RFC-0024 workflows:** + +Workflow progress object gains: + +```json +{ + "suspended_phases": [ + { + "phase_name": "compliance_review", + "intent_id": "intent_01ABC", + "suspended_since": "2026-03-24T10:00:00Z", + "expires_at": "2026-03-24T13:00:00Z" + } + ] +} +``` + +### Rule 5 — Portfolio deadline takes precedence over suspension timeout + +If `governance.deadline` fires while a member intent is `suspended_awaiting_input`, the server MUST abandon the intent with `abandonment_reason: "portfolio_deadline_exceeded"`, bypassing `fallback_policy`. `intent.suspension_expired` is still emitted with `reason: "portfolio_deadline"` for audit. + +--- + +## 3. Coordinator Suspension Policy (RFC-0013 Extension) + +Coordinator leases gain an optional `suspension_policy` field: + +| Value | Behaviour | +|---|---| +| `isolate` | Default. No action beyond aggregate status update. | +| `block_dependents` | Coordinator explicitly pauses RFC-0024-wired downstream phases. | +| `escalate` | Emits `coordinator.escalation_required` or self-suspends. | + +--- + +## 4. Human Retry / Re-notification Policy + +### 4.1 The `HumanRetryPolicy` Object + +```json +{ + "max_attempts": 3, + "interval_seconds": 3600, + "strategy": "fixed", + "escalation_ladder": [ + { "attempt": 2, "channel_hint": "email", "notify_to": null }, + { "attempt": 3, "channel_hint": "pagerduty", "notify_to": "supervisor@example.com" } + ], + "final_fallback_policy": "fail" +} +``` + +| Field | Type | Default | Description | +|---|---|---|---| +| `max_attempts` | integer | 1 | Total notification attempts (including initial) | +| `interval_seconds` | integer | — | Seconds between re-notification attempts (≤ `timeout_seconds`) | +| `strategy` | `"fixed"` | `"fixed"` | Re-notification cadence strategy | +| `escalation_ladder` | array | `[]` | Per-attempt channel/recipient overrides | +| `final_fallback_policy` | enum | (inherited) | Policy to apply after all attempts exhausted | + +### 4.2 How It Works + +1. **Attempt 1** fires immediately when `request_input()` is called. `timeout_seconds` becomes the *per-attempt* window. +2. If the operator does not respond within `interval_seconds` (≤ `timeout_seconds`), a re-notification fires and the attempt counter increments. +3. Each `escalation_ladder` entry triggers at its `attempt` number, overriding `channel_hint` and optionally routing to a different `notify_to` identity. +4. After `max_attempts` notifications with no response, `final_fallback_policy` is applied. +5. **Total suspension window** = `interval_seconds × max_attempts`. `expires_at` on `SuspensionRecord` reflects this total deadline. +6. `suspension_id` is **unchanged** across all attempts — the operator can respond to the original request at any point. + +### 4.3 Backwards Compatibility + +The existing `fallback_policy` field on `SuspensionRecord` is kept as an alias: + +- `fallback_policy` with no `retry_policy` is equivalent to `HumanRetryPolicy(max_attempts=1)`. +- When a `retry_policy` is present, `final_fallback_policy` inside it takes precedence over the top-level `fallback_policy`. + +### 4.4 New Events + +| Event | When emitted | +|---|---| +| `intent.suspension_renotified` | Before each re-notification attempt (attempt ≥ 2) | +| `intent.suspension_escalated` | When an `escalation_ladder` entry triggers | + +**`intent.suspension_renotified` payload:** + +```json +{ + "suspension_id": "susp-uuid", + "attempt": 2, + "max_attempts": 3, + "channel_hint": "email", + "notify_to": null, + "next_attempt_at": "2026-03-24T11:00:00Z" +} +``` + +**`intent.suspension_escalated` payload:** + +```json +{ + "suspension_id": "susp-uuid", + "attempt": 3, + "escalated_to": "supervisor@example.com", + "channel_hint": "pagerduty" +} +``` + +Existing `intent.suspension_expired` fires after all attempts exhausted, then `final_fallback_policy` executes. + +### 4.5 `@on_input_requested` Re-fired on Each Attempt + +The existing `@on_input_requested` decorator is called again with `attempt` number in the suspension context on each re-notification. Agents can customize messages: + +```python +@on_input_requested +async def notify_operator(self, intent, suspension): + attempt = suspension.context.get("_attempt", 1) + if attempt == 1: + msg = f"Input needed: {suspension.question}" + elif attempt < suspension.context.get("_max_attempts", 1): + msg = f"Reminder ({attempt}): {suspension.question}" + else: + msg = f"URGENT — final reminder: {suspension.question}" + await send_notification(msg, channel=suspension.channel_hint) +``` + +--- + +## 5. Three-Level Configuration Cascade + +``` +server config → BaseAgent default → request_input() call +───────────────────── ──────────────────────── ────────────────────── +default_human_retry_ default_human_retry_ retry_policy= + policy: { policy: { HumanRetryPolicy( + max_attempts: 3, max_attempts: 2, max_attempts: 1, + interval_seconds: 3600 interval_seconds: 1800 interval_seconds: 300 +} } ) +``` + +Resolution: per-suspension overrides agent default overrides platform default. Any field not specified at a lower level inherits from the level above. + +**Platform constant location:** Server config file (`openintent.yaml`) under `suspension.default_retry_policy`. Exposed via `GET /v1/server/config` (read-only, for client introspection). + +**Agent-level default:** `BaseAgent.default_human_retry_policy` — a `HumanRetryPolicy` instance set in the agent definition or `__init__`. If `None`, platform default applies. + +--- + +## 6. RFC-0024 Patch: `validate_claim_inputs()` Rejection Reason + +When an agent attempts to claim a task whose declared inputs reference an upstream phase that is currently `suspended_awaiting_input`, `validate_claim_inputs()` MUST reject with: + +```python +raise UpstreamIntentSuspendedError( + task_id=task_id, + phase_name=phase_name, + suspended_intent_id="", + expected_resume_at="", +) +``` + +This is a new exception type (`upstream_intent_suspended`) that the executor surfaces as a claim rejection reason. The downstream task stays in `pending` / `ready` state and retries the claim check after the upstream resumes. + +--- + +## 7. Relationship to RFC-0010 (Retry Policies) + +RFC-0010 defines retry when the *agent* fails (picks a new agent attempt). RFC-0026 defines retry when the *human* fails to respond (resends notification, escalates channel). They are parallel constructs at different layers: + +| Dimension | RFC-0010 | RFC-0026 | +|---|---|---| +| What failed? | Agent execution | Human responsiveness | +| What retries? | Agent assignment | Human notification | +| State during retry | Intent may be reassigned | Intent stays `suspended_awaiting_input` | +| Infrastructure | Scheduled retry job | Scheduled re-notification job | + +The server SHOULD use the same scheduled-job infrastructure for both. + +--- + +## 8. Python SDK — `HumanRetryPolicy` + +```python +from openintent import HumanRetryPolicy + +policy = HumanRetryPolicy( + max_attempts=3, + interval_seconds=3600, + escalation_ladder=[ + {"attempt": 2, "channel_hint": "email"}, + {"attempt": 3, "channel_hint": "pagerduty", "notify_to": "supervisor@example.com"}, + ], + final_fallback_policy="fail", +) + +value = await self.request_input( + intent_id, + question="Should we proceed with the refund?", + response_type="choice", + choices=[...], + timeout_seconds=3600, + retry_policy=policy, +) +``` + +`BaseAgent` gains `default_human_retry_policy`: + +```python +@Agent("my-agent") +class MyAgent: + default_human_retry_policy = HumanRetryPolicy( + max_attempts=2, + interval_seconds=1800, + final_fallback_policy="complete_with_fallback", + ) +``` + +--- + +## 9. End-to-End Motivating Example + +**Scenario:** Multi-phase compliance workflow. Phase 2 (`compliance_review`) requires human sign-off before Phase 3 (`generate_report`) can run. + +``` +Phase 1: fetch_data → completes OK +Phase 2: compliance_review → agent calls request_input() +Phase 3: generate_report → depends_on: compliance_review +``` + +**Timeline:** + +| Time | Event | +|---|---| +| T+0 | Phase 2 agent calls `request_input()` with `retry_policy(max_attempts=3, interval_seconds=3600)` | +| T+0 | Intent 2 → `suspended_awaiting_input` | +| T+0 | Task 2 → `blocked` (`blocked_reason: "intent_suspended"`) | +| T+0 | Intent 3 → `blocked` (upstream suspended, does not satisfy completion gate) | +| T+0 | `intent.suspended` emitted; `@on_input_requested` fires (attempt=1) → Slack message sent | +| T+0 | Portfolio: `has_suspended_members: true`, `suspended_member_count: 1` | +| T+3600 | No response. `intent.suspension_renotified` emitted (attempt=2) | +| T+3600 | `@on_input_requested` fires again (attempt=2, channel_hint="email") → email sent | +| T+3600 | `intent.suspension_escalated` emitted (attempt=2) | +| T+5400 | Operator responds via `POST /intents/{id}/suspend/respond` | +| T+5400 | Intent 2 → `active` → `completed` | +| T+5400 | Task 2 → `running` → `completed` | +| T+5400 | Intent 3 → `active` (dependency now completed) | +| T+5400 | Task 3 claims Phase 3 inputs from Phase 2 outputs — validate_claim_inputs() succeeds | +| T+5500 | Phase 3 completes. Workflow done. | + +**What did NOT happen:** Phase 3 did not try to claim while Phase 2 was suspended. The coordinator saw the suspension in the aggregate. The portfolio deadline was not exceeded. + +--- + +## 10. Cross-RFC Patch Summary + +### RFC-0002 patches + +- Status enum: add `suspended_awaiting_input` to `by_status` in `aggregate_status`. +- Completion gate: explicitly states `suspended_awaiting_input` does NOT satisfy the gate. +- New `active → blocked` trigger: upstream dependency transitions to `suspended_awaiting_input`. +- Cross-RFC table: add RFC-0026. + +### RFC-0007 patches + +- Aggregate status algorithm: enumerated explicitly (completed/failed/in_progress). +- GET response: add `has_suspended_members: bool`, `suspended_member_count: int`. +- New events: `portfolio.member_suspended`, `portfolio.member_resumed`. +- Cross-RFC table: add RFC-0026. + +### RFC-0012 patches + +- Bidirectional task/intent relationship: task `blocked` ↔ intent `suspended_awaiting_input`. +- `blocked_reason: "intent_suspended"` and `suspended_intent_id` on blocked task. +- Plan progress: add `suspended_tasks` array. +- Checkpoints: explicitly documented as RFC-0025 `request_input()` triggers (canonical pattern). +- Cross-RFC table: add RFC-0026. + +### RFC-0024 patches + +- `validate_claim_inputs()`: add `upstream_intent_suspended` rejection reason (`UpstreamIntentSuspendedError`). +- Workflow progress: add `suspended_phases` array. +- Cross-RFC table: add RFC-0026. + +### RFC-0025 patches + +- `SuspensionRecord`: add `retry_policy` field (optional `HumanRetryPolicy`). +- `timeout_seconds` semantics: clarified as per-attempt window when `retry_policy` is set. +- `fallback_policy`: documented as alias for `HumanRetryPolicy(max_attempts=1, final_fallback_policy=...)`. +- Cross-RFC table: add RFC-0026, RFC-0010. +- Backwards compatibility: note `fallback_policy` unchanged; `retry_policy` is additive. + +### RFC-0010 patches + +- Cross-RFC table: add RFC-0026 with note on parallel retry constructs. + +--- + +## 11. Security Considerations + +- Re-notification payloads to external channels (Slack, PagerDuty) MUST NOT include secrets or PII in `question` or `context` fields. +- `escalation_ladder.notify_to` identity values should be validated against an allowlist before delivery. +- Multiple re-notification attempts increase the attack surface for replay; `suspension_id` SHOULD remain the same (see §4.2 item 6) and the server MUST reject duplicate responses after the first. + +--- + +## 12. Backwards Compatibility + +- `retry_policy` on `SuspensionRecord` is optional. Existing `fallback_policy` field continues to work unchanged with single-attempt semantics. +- `UpstreamIntentSuspendedError` is a new exception class; callers that only catch `UnresolvableInputError` will see uncaught exceptions if they don't update. Callers should catch `WorkflowError` for robust handling. +- New events (`intent.suspension_renotified`, `intent.suspension_escalated`) are additive; existing subscriptions propagate them through the same infrastructure. +- `has_suspended_members` / `suspended_member_count` are additive fields on portfolio GET responses; existing clients that ignore unknown fields are unaffected. +- `suspended_tasks` / `suspended_phases` are additive fields on progress objects. diff --git a/mcp-server/package.json b/mcp-server/package.json index ad58044..9b09850 100644 --- a/mcp-server/package.json +++ b/mcp-server/package.json @@ -1,6 +1,6 @@ { "name": "@openintentai/mcp-server", - "version": "0.16.0", + "version": "0.17.0", "description": "MCP server exposing the OpenIntent Coordination Protocol as MCP tools and resources", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/mcp-server/src/index.ts b/mcp-server/src/index.ts index 358b621..2579100 100644 --- a/mcp-server/src/index.ts +++ b/mcp-server/src/index.ts @@ -30,7 +30,7 @@ async function main() { const server = new Server( { name: "openintent-mcp", - version: "0.16.0", + version: "0.17.0", }, { capabilities: { diff --git a/mkdocs.yml b/mkdocs.yml index a02a7f1..d55e384 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -1,5 +1,5 @@ site_name: OpenIntent SDK -site_description: "The Python SDK for structured multi-agent coordination. 25 RFCs. Decorator-first agents. Built-in server. Federation. MCP integration." +site_description: "The Python SDK for structured multi-agent coordination. 26 RFCs. Decorator-first agents. Built-in server. Federation. MCP integration." site_url: https://openintent-ai.github.io/openintent/ repo_url: https://github.com/openintent-ai/openintent repo_name: openintent-ai/openintent @@ -172,6 +172,7 @@ nav: - "0023 \u2014 Federation Security": rfcs/0023-federation-security.md - "0024 \u2014 Workflow I/O Contracts": rfcs/0024-workflow-io-contracts.md - "0025 \u2014 Human-in-the-Loop": rfcs/0025-human-in-the-loop.md + - "0026 \u2014 Suspension Containers": rfcs/0026-suspension-container-interaction.md - Changelog: changelog.md - Examples: - Multi-Agent Workflow: examples/multi-agent.md @@ -207,10 +208,10 @@ extra: link: https://pypi.org/project/openintent/ version: provider: mike - announcement: "v0.16.0 is here — RFC-0025 Human-in-the-Loop intent suspension: request_input(), engagement decisions, fallback policies, and lifecycle hooks. Read the changelog →" + announcement: "v0.17.0 is here — RFC-0026 Intent Suspension: container interaction, HumanRetryPolicy with escalation ladders, UpstreamIntentSuspendedError, and platform-level default policies. Read the changelog →" meta: - name: description - content: "OpenIntent Python SDK — structured multi-agent coordination protocol with decorator-first agents, 25 RFCs, 7 LLM adapters, HITL suspension, federation, MCP integration, and built-in FastAPI server." + content: "OpenIntent Python SDK — structured multi-agent coordination protocol with decorator-first agents, 26 RFCs, 7 LLM adapters, HITL suspension, federation, MCP integration, and built-in FastAPI server." - name: og:title content: "OpenIntent SDK — Multi-Agent Coordination Protocol" - name: og:description @@ -224,7 +225,7 @@ extra: - name: twitter:title content: "OpenIntent SDK" - name: twitter:description - content: "The Python SDK for structured multi-agent coordination. 25 RFCs. Decorator-first agents. Federation. MCP integration. Built-in server." + content: "The Python SDK for structured multi-agent coordination. 26 RFCs. Decorator-first agents. Federation. MCP integration. Built-in server." extra_css: - stylesheets/extra.css diff --git a/openintent/__init__.py b/openintent/__init__.py index 49ab120..b2ae02c 100644 --- a/openintent/__init__.py +++ b/openintent/__init__.py @@ -127,12 +127,15 @@ Escalation, EscalationPriority, EscalationStatus, + # RFC-0026: Suspension Container Interaction & Human Retry + EscalationStep, EventProof, EventType, GrantConstraints, Guardrails, Heartbeat, HeartbeatConfig, + HumanRetryPolicy, IdentityChallenge, IdentityVerification, InputResponse, @@ -228,6 +231,7 @@ PermissionsConfig, # noqa: F401 PhaseConfig, UnresolvableInputError, + UpstreamIntentSuspendedError, VerificationConfig, WorkflowError, WorkflowNotFoundError, @@ -250,7 +254,7 @@ def get_server() -> tuple[Any, Any, Any]: ) -__version__ = "0.16.0" +__version__ = "0.17.0" __all__ = [ "OpenIntentClient", "AsyncOpenIntentClient", @@ -407,6 +411,7 @@ def get_server() -> tuple[Any, Any, Any]: "MissingOutputError", "OutputTypeMismatchError", "UnresolvableInputError", + "UpstreamIntentSuspendedError", "InputWiringError", # RFC-0025: HITL Models "ResponseType", @@ -415,6 +420,9 @@ def get_server() -> tuple[Any, Any, Any]: "EngagementSignals", "EngagementDecision", "InputResponse", + # RFC-0026: Suspension Container Interaction & Human Retry + "EscalationStep", + "HumanRetryPolicy", "WorkflowSpec", "WorkflowError", "WorkflowValidationError", diff --git a/openintent/agents.py b/openintent/agents.py index 12b4a2f..6dfa0af 100644 --- a/openintent/agents.py +++ b/openintent/agents.py @@ -35,6 +35,7 @@ async def work(self, intent): EngagementSignals, Escalation, EventType, + HumanRetryPolicy, InputResponse, Intent, IntentContext, @@ -790,6 +791,57 @@ class AgentConfig: T = TypeVar("T", bound="BaseAgent") +_SENTINEL = object() + + +def _merge_retry_policies( + *, + call_site: "Optional[HumanRetryPolicy]", + agent_default: "Optional[HumanRetryPolicy]", + platform_default: "Optional[HumanRetryPolicy]", +) -> "Optional[HumanRetryPolicy]": + """RFC-0026 §5.3: field-level merge of retry policy levels. + + Merge precedence: call_site overrides agent_default overrides platform_default. + Each level only contributes a field when its value differs from the HumanRetryPolicy + class default, so the highest-priority level that explicitly sets a field wins. + + If all three are ``None``, returns ``None`` (single-attempt semantics). + """ + levels = [p for p in (platform_default, agent_default, call_site) if p is not None] + if not levels: + return None + if len(levels) == 1: + return levels[0] + + _class_defaults = HumanRetryPolicy() + _scalar_fields = ( + "max_attempts", + "interval_seconds", + "strategy", + "final_fallback_policy", + ) + + merged: dict = {} + merged_ladder: list | None = None + + for policy in levels: + for field_name in _scalar_fields: + val = getattr(policy, field_name) + class_default = getattr(_class_defaults, field_name) + if val != class_default: + merged[field_name] = val + if policy.escalation_ladder: + merged_ladder = [s.to_dict() for s in policy.escalation_ladder] + + for field_name in _scalar_fields: + merged.setdefault(field_name, getattr(_class_defaults, field_name)) + + if merged_ladder: + merged["escalation_ladder"] = merged_ladder + + return HumanRetryPolicy.from_dict(merged) + class BaseAgent(ABC): """ @@ -803,6 +855,24 @@ class BaseAgent(ABC): _config: AgentConfig = field(default_factory=AgentConfig) _handlers: dict[str, list[Callable]] = field(default_factory=dict) + default_human_retry_policy: Optional["HumanRetryPolicy"] = None + """Class-level or instance-level default re-notification policy (RFC-0026). + + When set, this policy is applied to all ``request_input()`` calls that do + not supply an explicit ``retry_policy`` argument. Subclasses may override + at class level: + + .. code-block:: python + + class MyAgent(BaseAgent): + default_human_retry_policy = HumanRetryPolicy( + max_attempts=3, + interval_seconds=900, + strategy="linear", + final_fallback_policy="complete_with_fallback", + ) + """ + def __init__( self, base_url: Optional[str] = None, @@ -820,6 +890,9 @@ def __init__( self._subscription: Optional[SSESubscription] = None self._running = False self._loop: Optional[asyncio.AbstractEventLoop] = None + self._platform_retry_policy_cache: Optional["HumanRetryPolicy | None"] = ( + _SENTINEL + ) self._discover_handlers() @@ -1206,15 +1279,33 @@ async def request_input( fallback_policy: str = "fail", fallback_value: Optional[Any] = None, confidence: Optional[float] = None, + retry_policy: Optional["HumanRetryPolicy"] = None, ) -> Any: """ - Suspend the intent and request operator input (RFC-0025). + Suspend the intent and request operator input (RFC-0025 / RFC-0026). Transitions the intent to ``suspended_awaiting_input``, fires ``@on_input_requested`` hooks, and polls for a response. When the operator responds (via POST /intents/{id}/suspend/respond), the suspension is resolved and the operator's response value is returned. + When ``retry_policy`` is supplied (RFC-0026), the agent re-notifies the + operator up to ``retry_policy.max_attempts`` times, waiting + ``retry_policy.interval_seconds`` between each attempt, firing + ``@on_input_requested`` hooks and emitting + ``intent.suspension_renotified`` events on each re-attempt. + Escalation steps in ``retry_policy.escalation_ladder`` cause an + ``intent.suspension_escalated`` event to be emitted. After all + attempts are exhausted, ``retry_policy.final_fallback_policy`` is + applied. + + If ``retry_policy`` is omitted, single-attempt behaviour is preserved + (original RFC-0025 semantics). + + The class attribute ``default_human_retry_policy`` can be set on a + ``BaseAgent`` subclass or instance to apply a retry policy to all + ``request_input()`` calls that do not supply an explicit one. + Args: intent_id: The intent to suspend. question: The question or prompt for the operator. @@ -1231,6 +1322,9 @@ async def request_input( ``"complete_with_fallback"``, or ``"use_default_and_continue"``. fallback_value: Value to use for ``"complete_with_fallback"`` policy. confidence: Agent confidence score at suspension time (0.0–1.0). + retry_policy: Optional re-notification / escalation policy + (RFC-0026). When omitted, falls back to + ``self.default_human_retry_policy`` (if set), then single-attempt. Returns: The operator's response value. @@ -1243,10 +1337,43 @@ async def request_input( from datetime import datetime, timedelta from uuid import uuid4 + # RFC-0026 §5.3: three-level cascade with field-level merge. + # Fetch platform default once per agent instance (_SENTINEL = not yet fetched). + if self._platform_retry_policy_cache is _SENTINEL: + try: + cfg = await self.async_client.get_server_config() + policy_dict = (cfg.get("suspension") or {}).get("default_retry_policy") + if policy_dict: + self._platform_retry_policy_cache = HumanRetryPolicy.from_dict( + policy_dict + ) + else: + self._platform_retry_policy_cache = None + except Exception: + self._platform_retry_policy_cache = None + + effective_retry_policy = _merge_retry_policies( + call_site=retry_policy, + agent_default=getattr(self, "default_human_retry_policy", None), + platform_default=self._platform_retry_policy_cache, + ) + suspension_id = str(uuid4()) now = datetime.utcnow() expires_at = None - if timeout_seconds is not None: + if effective_retry_policy is not None: + # RFC-0026: total expiry = interval_seconds × max_attempts. + # Safeguard: if interval_seconds is 0 (e.g. max_attempts=1 single-shot) + # fall back to timeout_seconds so the suspension doesn't hang indefinitely. + total_seconds = ( + effective_retry_policy.interval_seconds + * effective_retry_policy.max_attempts + ) + if total_seconds > 0: + expires_at = now + timedelta(seconds=total_seconds) + elif timeout_seconds is not None: + expires_at = now + timedelta(seconds=timeout_seconds) + elif timeout_seconds is not None: expires_at = now + timedelta(seconds=timeout_seconds) resolved_choices: list[SuspensionChoice] = [] @@ -1274,6 +1401,7 @@ async def request_input( expires_at=expires_at, fallback_value=fallback_value, fallback_policy=fallback_policy, + retry_policy=effective_retry_policy, confidence_at_suspension=confidence, ) @@ -1307,6 +1435,9 @@ async def request_input( "timeout_seconds": timeout_seconds, "fallback_policy": fallback_policy, "confidence_at_suspension": confidence, + "retry_policy": effective_retry_policy.to_dict() + if effective_retry_policy + else None, }, ) @@ -1318,8 +1449,28 @@ async def request_input( except Exception as e: logger.exception(f"on_input_requested handler error: {e}") - # Poll for operator response + # Resolve effective fallback policy (RFC-0026: retry_policy takes precedence) + effective_fallback = fallback_policy + if effective_retry_policy is not None: + effective_fallback = effective_retry_policy.final_fallback_policy + + # Poll for operator response, with optional re-notification (RFC-0026) poll_interval = 2.0 + attempt = 1 + max_attempts = 1 + interval_seconds = 0 + if effective_retry_policy is not None: + max_attempts = effective_retry_policy.max_attempts + interval_seconds = effective_retry_policy.interval_seconds + + next_renotify_at: Optional[datetime] = None + if ( + effective_retry_policy is not None + and interval_seconds > 0 + and max_attempts > 1 + ): + next_renotify_at = now + timedelta(seconds=interval_seconds) + while True: await asyncio.sleep(poll_interval) try: @@ -1365,13 +1516,13 @@ async def request_input( except Exception as e: logger.exception(f"on_suspension_expired handler error: {e}") - if fallback_policy == "fail": + if effective_fallback == "fail": raise InputTimeoutError( f"Suspension {suspension_id} expired without operator response", suspension_id=suspension_id, - fallback_policy=fallback_policy, + fallback_policy=effective_fallback, ) - elif fallback_policy == "complete_with_fallback": + elif effective_fallback == "complete_with_fallback": return fallback_value else: return fallback_value @@ -1382,19 +1533,137 @@ async def request_input( suspension_id=suspension_id, ) - # Check if expired by time without server update - if expires_at is not None and datetime.utcnow() > expires_at: + # RFC-0026: re-notification loop + if ( + effective_retry_policy is not None + and next_renotify_at is not None + and datetime.utcnow() >= next_renotify_at + ): + if attempt < max_attempts: + attempt += 1 + logger.info( + f"Re-notifying operator for suspension {suspension_id} " + f"(attempt {attempt}/{max_attempts})" + ) + + # Check escalation ladder for this attempt + escalation_steps = effective_retry_policy.escalation_ladder + triggered_steps = [ + s for s in escalation_steps if s.attempt == attempt + ] + escalation_channel_hint: Optional[str] = None + escalation_notify_to: Optional[str] = None + for step in triggered_steps: + escalation_channel_hint = ( + step.channel_hint or escalation_channel_hint + ) + escalation_notify_to = step.notify_to or escalation_notify_to + try: + await self.async_client.log_event( + intent_id, + EventType.INTENT_SUSPENSION_ESCALATED, + { + "suspension_id": suspension_id, + "attempt": attempt, + "escalated_to": step.notify_to or None, + "channel_hint": step.channel_hint or None, + }, + ) + except Exception as e: + logger.exception(f"suspension_escalated event error: {e}") + + # Compute when the next attempt fires (for telemetry) + next_attempt_at = ( + datetime.utcnow() + timedelta(seconds=interval_seconds) + ).isoformat() + "Z" + + # Emit re-notification event with RFC-0026 field names + try: + await self.async_client.log_event( + intent_id, + EventType.INTENT_SUSPENSION_RENOTIFIED, + { + "suspension_id": suspension_id, + "attempt": attempt, + "max_attempts": max_attempts, + "channel_hint": escalation_channel_hint + or suspension.channel_hint, + "notify_to": escalation_notify_to, + "next_attempt_at": next_attempt_at + if attempt < max_attempts + else None, + }, + ) + except Exception as e: + logger.exception(f"suspension_renotified event error: {e}") + + # Build a re-notification SuspensionRecord that carries attempt + # metadata in its context using RFC-0026 key names (_attempt, + # _max_attempts) so @on_input_requested handlers can read them + # without a signature change. + import dataclasses + + renotify_context = dict(suspension.context) + renotify_context["_attempt"] = attempt + renotify_context["_max_attempts"] = max_attempts + if escalation_notify_to: + renotify_context["_notify_to"] = escalation_notify_to + + renotify_suspension = dataclasses.replace( + suspension, + context=renotify_context, + channel_hint=escalation_channel_hint or suspension.channel_hint, + ) + + # Re-fire @on_input_requested hooks with the enriched suspension + for handler in self._handlers.get("input_requested", []): + try: + await self._call_handler( + handler, current, renotify_suspension + ) + except Exception as e: + logger.exception( + f"on_input_requested (renotify) handler error: {e}" + ) + + next_renotify_at = datetime.utcnow() + timedelta( + seconds=interval_seconds + ) + else: + # All attempts exhausted — apply final fallback + for handler in self._handlers.get("suspension_expired", []): + try: + await self._call_handler(handler, current, suspension) + except Exception as e: + logger.exception( + f"on_suspension_expired handler error: {e}" + ) + + if effective_fallback == "fail": + raise InputTimeoutError( + f"Suspension {suspension_id} exhausted all {max_attempts} re-notification attempts", + suspension_id=suspension_id, + fallback_policy=effective_fallback, + ) + return fallback_value + + # Check if expired by time without server update (single-attempt path) + if ( + effective_retry_policy is None + and expires_at is not None + and datetime.utcnow() > expires_at + ): for handler in self._handlers.get("suspension_expired", []): try: await self._call_handler(handler, current, suspension) except Exception as e: logger.exception(f"on_suspension_expired handler error: {e}") - if fallback_policy == "fail": + if effective_fallback == "fail": raise InputTimeoutError( f"Suspension {suspension_id} expired (client-side timeout)", suspension_id=suspension_id, - fallback_policy=fallback_policy, + fallback_policy=effective_fallback, ) return fallback_value diff --git a/openintent/client.py b/openintent/client.py index 02aa18b..c75a4b7 100644 --- a/openintent/client.py +++ b/openintent/client.py @@ -3682,6 +3682,21 @@ async def get_intent_portfolios(self, intent_id: str) -> list[IntentPortfolio]: items = data if isinstance(data, list) else data.get("portfolios", []) return [IntentPortfolio.from_dict(p) for p in items] + async def get_server_config(self) -> dict[str, Any]: + """RFC-0026: Fetch read-only server configuration for client introspection. + + Returns a dict with ``protocol_version`` and a ``suspension`` key that + contains ``default_retry_policy`` (serialised HumanRetryPolicy dict or + ``None`` when no platform default is configured). + + Example:: + + cfg = await client.get_server_config() + policy_dict = cfg["suspension"]["default_retry_policy"] + """ + response = await self._client.get("/api/v1/server/config") + return self._handle_response(response) + # ==================== Attachments ==================== async def add_attachment( diff --git a/openintent/models.py b/openintent/models.py index 840a419..43d995f 100644 --- a/openintent/models.py +++ b/openintent/models.py @@ -195,6 +195,12 @@ class EventType(str, Enum): INTENT_SUSPENSION_EXPIRED = "intent.suspension_expired" ENGAGEMENT_DECISION = "engagement.decision" + # RFC-0026: Suspension container interaction & human retry + INTENT_SUSPENSION_RENOTIFIED = "intent.suspension_renotified" + INTENT_SUSPENSION_ESCALATED = "intent.suspension_escalated" + PORTFOLIO_MEMBER_SUSPENDED = "portfolio.member_suspended" + PORTFOLIO_MEMBER_RESUMED = "portfolio.member_resumed" + # Legacy aliases for backward compatibility CREATED = "intent_created" STATE_UPDATED = "state_patched" @@ -4125,7 +4131,7 @@ def from_dict(cls, data: dict[str, Any]) -> "SuspensionChoice": @dataclass class SuspensionRecord: - """A suspension record capturing the full context of an intent suspension (RFC-0025). + """A suspension record capturing the full context of an intent suspension (RFC-0025/RFC-0026). Fields: id: Unique identifier for this suspension record. @@ -4136,10 +4142,16 @@ class SuspensionRecord: context: Structured context dict to help the operator decide. channel_hint: Preferred delivery channel (e.g. "slack", "email"). suspended_at: ISO-8601 timestamp when the intent was suspended. - timeout_seconds: Seconds before the suspension expires (None = no timeout). - expires_at: Computed expiry timestamp (None if no timeout). + timeout_seconds: Per-attempt expiry window (None = no timeout). + When retry_policy is set, this is per-attempt; total expiry is + retry_policy.interval_seconds * retry_policy.max_attempts. + expires_at: Total deadline. When retry_policy is set: + suspended_at + (interval_seconds × max_attempts). + When retry_policy is absent: suspended_at + timeout_seconds. fallback_value: Value to use if fallback_policy is "complete_with_fallback". fallback_policy: One of "fail", "complete_with_fallback", "use_default_and_continue". + Alias for retry_policy.final_fallback_policy when retry_policy is set. + retry_policy: Optional RFC-0026 HumanRetryPolicy for re-notification & escalation. confidence_at_suspension: Agent confidence score at time of suspension (0.0–1.0). decision_record: Optional dict capturing the engagement decision rationale. response: The operator's response value (populated on resume). @@ -4158,6 +4170,7 @@ class SuspensionRecord: expires_at: Optional[datetime] = None fallback_value: Optional[Any] = None fallback_policy: str = "fail" + retry_policy: Optional["HumanRetryPolicy"] = None confidence_at_suspension: Optional[float] = None decision_record: Optional[dict[str, Any]] = None response: Optional[Any] = None @@ -4193,6 +4206,8 @@ def to_dict(self) -> dict[str, Any]: result["expires_at"] = self.expires_at.isoformat() if self.fallback_value is not None: result["fallback_value"] = self.fallback_value + if self.retry_policy is not None: + result["retry_policy"] = self.retry_policy.to_dict() if self.confidence_at_suspension is not None: result["confidence_at_suspension"] = self.confidence_at_suspension if self.decision_record is not None: @@ -4218,6 +4233,9 @@ def from_dict(cls, data: dict[str, Any]) -> "SuspensionRecord": responded_at = datetime.fromisoformat(data["responded_at"]) choices_raw = data.get("choices", []) choices = [SuspensionChoice.from_dict(c) for c in choices_raw] + retry_policy = None + if data.get("retry_policy"): + retry_policy = HumanRetryPolicy.from_dict(data["retry_policy"]) return cls( id=data.get("id", ""), question=data.get("question", ""), @@ -4230,6 +4248,7 @@ def from_dict(cls, data: dict[str, Any]) -> "SuspensionRecord": expires_at=expires_at, fallback_value=data.get("fallback_value"), fallback_policy=data.get("fallback_policy", "fail"), + retry_policy=retry_policy, confidence_at_suspension=data.get("confidence_at_suspension"), decision_record=data.get("decision_record"), response=data.get("response"), @@ -4353,3 +4372,100 @@ def from_dict(cls, data: dict[str, Any]) -> "InputResponse": responded_at=responded_at, metadata=data.get("metadata"), ) + + +@dataclass +class EscalationStep: + """A single step in a HumanRetryPolicy escalation ladder (RFC-0026). + + Fields: + attempt: Trigger this escalation at this attempt number (RFC-0026 field name). + Alias ``after_attempt`` is accepted on deserialisation for backwards compatibility. + channel_hint: Delivery channel to use at this step (e.g. "pagerduty", "email"). + Alias ``channel`` is accepted on deserialisation for backwards compatibility. + notify_to: Identifier of the human or group to notify at this step. + Alias ``notify`` is accepted on deserialisation for backwards compatibility. + """ + + attempt: int + channel_hint: str = "" + notify_to: str = "" + + @property + def after_attempt(self) -> int: + """Backwards-compatible alias for ``attempt``.""" + return self.attempt + + @property + def channel(self) -> str: + """Backwards-compatible alias for ``channel_hint``.""" + return self.channel_hint + + @property + def notify(self) -> str: + """Backwards-compatible alias for ``notify_to``.""" + return self.notify_to + + def to_dict(self) -> dict[str, Any]: + return { + "attempt": self.attempt, + "channel_hint": self.channel_hint, + "notify_to": self.notify_to, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "EscalationStep": + attempt = data.get("attempt") or data.get("after_attempt", 1) + channel_hint = data.get("channel_hint") or data.get("channel", "") + notify_to = data.get("notify_to") or data.get("notify", "") + return cls( + attempt=attempt, + channel_hint=channel_hint, + notify_to=notify_to, + ) + + +@dataclass +class HumanRetryPolicy: + """Re-notification and escalation policy for suspended intents (RFC-0026). + + When attached to a SuspensionRecord, the server will re-notify the operator + up to `max_attempts` times, waiting `interval_seconds` between each attempt. + After all attempts are exhausted, `final_fallback_policy` is applied. + + Fields: + max_attempts: Maximum number of notification attempts (including first). Default 3. + interval_seconds: Seconds to wait between re-notification attempts. Default 3600. + strategy: Back-off strategy — "fixed" | "linear" | "exponential". Default "fixed". + escalation_ladder: Ordered list of escalation steps triggered at specific attempts. + final_fallback_policy: Fallback policy after all attempts exhausted. + One of "fail", "complete_with_fallback", "use_default_and_continue". + """ + + max_attempts: int = 3 + interval_seconds: int = 3600 + strategy: str = "fixed" + escalation_ladder: list[EscalationStep] = field(default_factory=list) + final_fallback_policy: str = "fail" + + def to_dict(self) -> dict[str, Any]: + result: dict[str, Any] = { + "max_attempts": self.max_attempts, + "interval_seconds": self.interval_seconds, + "strategy": self.strategy, + "final_fallback_policy": self.final_fallback_policy, + } + if self.escalation_ladder: + result["escalation_ladder"] = [s.to_dict() for s in self.escalation_ladder] + return result + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "HumanRetryPolicy": + ladder_raw = data.get("escalation_ladder", []) + return cls( + max_attempts=data.get("max_attempts", 3), + interval_seconds=data.get("interval_seconds", 3600), + strategy=data.get("strategy", "fixed"), + escalation_ladder=[EscalationStep.from_dict(s) for s in ladder_raw], + final_fallback_policy=data.get("final_fallback_policy", "fail"), + ) diff --git a/openintent/server/app.py b/openintent/server/app.py index 75b4ad3..e47c8b0 100644 --- a/openintent/server/app.py +++ b/openintent/server/app.py @@ -1396,6 +1396,20 @@ async def discovery(): "openApiUrl": "/openapi.json", } + @app.get("/api/v1/server/config") + async def get_server_config(): + """RFC-0026: Read-only introspection endpoint for platform-level server config. + + Returns the platform-level suspension default retry policy (if configured) so + that clients can implement the three-level cascade without hard-coding defaults. + """ + return { + "protocol_version": config.protocol_version, + "suspension": { + "default_retry_policy": config.suspension_default_retry_policy, + }, + } + @app.get("/.well-known/openintent-compat.json") async def compatibility(): return { diff --git a/openintent/server/config.py b/openintent/server/config.py index ce6daf6..dd02631 100644 --- a/openintent/server/config.py +++ b/openintent/server/config.py @@ -4,7 +4,7 @@ import os from dataclasses import dataclass, field -from typing import Optional, Set +from typing import Any, Optional, Set @dataclass @@ -32,6 +32,14 @@ class ServerConfig: protocol_version: str = "0.1" + suspension_default_retry_policy: Optional[dict[str, Any]] = None + """RFC-0026: platform-level default HumanRetryPolicy (serialised dict). + + When set, agents that have neither a call-site ``retry_policy`` argument nor a + ``default_human_retry_policy`` class attribute will inherit this policy for + every ``request_input()`` call. Exposed read-only via ``GET /api/v1/server/config``. + """ + def __post_init__(self): if self.database_url is None: self.database_url = os.environ.get( diff --git a/openintent/workflow.py b/openintent/workflow.py index a6e690d..1d85ca9 100644 --- a/openintent/workflow.py +++ b/openintent/workflow.py @@ -136,6 +136,42 @@ def __init__(self, task_id: str, phase_name: str, unresolvable_refs: list[str]): ) +class UpstreamIntentSuspendedError(WorkflowError): + """Raised at claim time when a declared input references an upstream phase + whose intent is currently ``suspended_awaiting_input``. + + Per RFC-0026 §4, an agent MUST NOT proceed with task execution while an + upstream producer intent is suspended — the executor should defer the claim + until the upstream intent resumes. + + Attributes: + task_id: The ID of the task whose claim was rejected. + phase_name: The name of the phase definition. + suspended_intent_id: The intent ID of the upstream suspended producer. + expected_resume_at: ISO-8601 string estimate of when the upstream intent + will resume, or None if unknown. + """ + + def __init__( + self, + task_id: str, + phase_name: str, + suspended_intent_id: str, + expected_resume_at: Optional[str] = None, + ): + self.task_id = task_id + self.phase_name = phase_name + self.suspended_intent_id = suspended_intent_id + self.expected_resume_at = expected_resume_at + msg = ( + f"Task claim deferred for task '{task_id}' (phase '{phase_name}'): " + f"upstream intent '{suspended_intent_id}' is suspended_awaiting_input" + ) + if expected_resume_at: + msg = f"{msg} (expected resume: {expected_resume_at})" + super().__init__(msg) + + class InputWiringError(WorkflowValidationError): """Raised at workflow validation time when an inputs declaration is structurally invalid — e.g. referencing a phase not in depends_on, @@ -1113,13 +1149,22 @@ def validate_claim_inputs( trigger_payload: Optional[dict[str, Any]] = None, initial_state: Optional[dict[str, Any]] = None, task_id: str = "", + upstream_intents_status: Optional[dict[str, dict[str, Any]]] = None, ) -> None: """Validate that all declared inputs are resolvable at claim time. - This is the executor's claim-time check (RFC-0024 §3.1). Call this - when an agent attempts to claim a task. Raises ``UnresolvableInputError`` + This is the executor's claim-time check (RFC-0024 §3.1 / RFC-0026 §4). + Call this when an agent attempts to claim a task. Raises a typed error if the claim should be rejected; returns ``None`` if the claim is safe. + RFC-0026: If ``upstream_intents_status`` is provided, this method checks + whether any upstream phase whose outputs are referenced by the current + phase's inputs has a corresponding intent that is currently + ``suspended_awaiting_input``. When such a phase is found, + ``UpstreamIntentSuspendedError`` is raised **before** the resolvability + check, because the upstream outputs may exist but the producer intent is + paused and may mutate its outputs upon resume. + Args: phase_name: The name of the phase being claimed. upstream_outputs: Map of ``{phase_name: {key: value}}`` for all @@ -1127,10 +1172,42 @@ def validate_claim_inputs( trigger_payload: Optional trigger payload for ``$trigger.*`` refs. initial_state: Optional initial state for ``$initial_state.*`` refs. task_id: Optional task ID for error messages. + upstream_intents_status: Optional map of + ``{phase_name: {"status": str, "intent_id": str, + "expected_resume_at": str | None}}`` describing the current + intent status for each upstream phase. When a referenced + upstream phase's status is ``"suspended_awaiting_input"``, + ``UpstreamIntentSuspendedError`` is raised. Raises: + UpstreamIntentSuspendedError: If any referenced upstream phase's + intent is currently suspended (RFC-0026). UnresolvableInputError: If any declared input cannot be resolved. """ + # RFC-0026: Check for upstream suspension before resolvability + if upstream_intents_status: + phase = next((p for p in self.phases if p.name == phase_name), None) + if phase and phase.inputs: + for _local_key, mapping_expr in phase.inputs.items(): + if not isinstance(mapping_expr, str): + continue + parts = mapping_expr.split(".", 1) + if len(parts) != 2 or parts[0].startswith("$"): + continue + upstream_phase_name = parts[0] + intent_info = upstream_intents_status.get(upstream_phase_name) + if intent_info is None: + continue + if intent_info.get("status") == "suspended_awaiting_input": + raise UpstreamIntentSuspendedError( + task_id=task_id, + phase_name=phase_name, + suspended_intent_id=intent_info.get( + "intent_id", upstream_phase_name + ), + expected_resume_at=intent_info.get("expected_resume_at"), + ) + self.resolve_task_inputs( phase_name=phase_name, upstream_outputs=upstream_outputs, diff --git a/pyproject.toml b/pyproject.toml index 4b71660..de127fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "openintent" -version = "0.16.0" +version = "0.17.0" description = "Python SDK and Server for the OpenIntent Coordination Protocol" readme = "README.md" license = {text = "MIT"} diff --git a/tests/test_hitl.py b/tests/test_hitl.py index 9fb405c..2f62176 100644 --- a/tests/test_hitl.py +++ b/tests/test_hitl.py @@ -1151,7 +1151,635 @@ def test_decorators_exported(self): assert hasattr(openintent, "on_suspension_expired") assert hasattr(openintent, "on_engagement_decision") - def test_version_is_0_16_0(self): + def test_version_is_0_17_0(self): import openintent - assert openintent.__version__ == "0.16.0" + assert openintent.__version__ == "0.17.0" + + +# =========================================================================== +# RFC-0026: Suspension Container Interaction & Human Retry +# =========================================================================== + + +class TestHumanRetryPolicyConstruction: + """HumanRetryPolicy dataclass — construction and defaults.""" + + def test_defaults(self): + from openintent.models import HumanRetryPolicy + + p = HumanRetryPolicy() + assert p.max_attempts == 3 + assert p.interval_seconds == 3600 + assert p.strategy == "fixed" + assert p.escalation_ladder == [] + assert p.final_fallback_policy == "fail" + + def test_custom_values(self): + from openintent.models import HumanRetryPolicy + + p = HumanRetryPolicy( + max_attempts=5, + interval_seconds=900, + strategy="exponential", + final_fallback_policy="complete_with_fallback", + ) + assert p.max_attempts == 5 + assert p.interval_seconds == 900 + assert p.strategy == "exponential" + assert p.final_fallback_policy == "complete_with_fallback" + + def test_to_dict_no_ladder(self): + from openintent.models import HumanRetryPolicy + + p = HumanRetryPolicy(max_attempts=2, interval_seconds=600) + d = p.to_dict() + assert d["max_attempts"] == 2 + assert d["interval_seconds"] == 600 + assert "escalation_ladder" not in d + + def test_from_dict_round_trip(self): + from openintent.models import HumanRetryPolicy + + raw = { + "max_attempts": 4, + "interval_seconds": 1800, + "strategy": "linear", + "final_fallback_policy": "complete_with_fallback", + } + p = HumanRetryPolicy.from_dict(raw) + assert p.max_attempts == 4 + assert p.interval_seconds == 1800 + assert p.strategy == "linear" + assert p.final_fallback_policy == "complete_with_fallback" + + def test_from_dict_defaults_on_empty(self): + from openintent.models import HumanRetryPolicy + + p = HumanRetryPolicy.from_dict({}) + assert p.max_attempts == 3 + assert p.interval_seconds == 3600 + assert p.strategy == "fixed" + assert p.final_fallback_policy == "fail" + + +class TestEscalationStep: + """EscalationStep dataclass — construction and serialization.""" + + def test_construction(self): + from openintent.models import EscalationStep + + s = EscalationStep( + attempt=2, channel_hint="pagerduty", notify_to="on-call-team" + ) + assert s.attempt == 2 + assert s.channel_hint == "pagerduty" + assert s.notify_to == "on-call-team" + assert s.after_attempt == 2 + assert s.channel == "pagerduty" + assert s.notify == "on-call-team" + + def test_to_dict(self): + from openintent.models import EscalationStep + + s = EscalationStep(attempt=3, channel_hint="slack", notify_to="#ops") + d = s.to_dict() + assert d == {"attempt": 3, "channel_hint": "slack", "notify_to": "#ops"} + + def test_from_dict_round_trip(self): + from openintent.models import EscalationStep + + raw = { + "attempt": 2, + "channel_hint": "email", + "notify_to": "manager@example.com", + } + s = EscalationStep.from_dict(raw) + assert s.attempt == 2 + assert s.channel_hint == "email" + assert s.notify_to == "manager@example.com" + + def test_from_dict_legacy_field_names(self): + """from_dict accepts legacy after_attempt/channel/notify for backwards compat.""" + from openintent.models import EscalationStep + + raw = {"after_attempt": 2, "channel": "email", "notify": "manager@example.com"} + s = EscalationStep.from_dict(raw) + assert s.attempt == 2 + assert s.channel_hint == "email" + assert s.notify_to == "manager@example.com" + + +class TestHumanRetryPolicyWithLadder: + """HumanRetryPolicy with an escalation_ladder.""" + + def test_with_ladder(self): + from openintent.models import EscalationStep, HumanRetryPolicy + + p = HumanRetryPolicy( + max_attempts=3, + interval_seconds=300, + escalation_ladder=[ + EscalationStep(attempt=2, channel_hint="pagerduty", notify_to="ops"), + ], + ) + d = p.to_dict() + assert "escalation_ladder" in d + assert d["escalation_ladder"][0]["attempt"] == 2 + assert d["escalation_ladder"][0]["channel_hint"] == "pagerduty" + + def test_from_dict_with_ladder(self): + from openintent.models import HumanRetryPolicy + + raw = { + "max_attempts": 3, + "interval_seconds": 300, + "strategy": "fixed", + "final_fallback_policy": "fail", + "escalation_ladder": [ + {"attempt": 2, "channel_hint": "slack", "notify_to": "#ops"}, + ], + } + p = HumanRetryPolicy.from_dict(raw) + assert len(p.escalation_ladder) == 1 + assert p.escalation_ladder[0].attempt == 2 + assert p.escalation_ladder[0].channel_hint == "slack" + + +class TestSuspensionRecordRetryPolicy: + """SuspensionRecord.retry_policy field (RFC-0026).""" + + def test_retry_policy_none_by_default(self): + from openintent.models import SuspensionRecord + + s = SuspensionRecord(id="x", question="q?") + assert s.retry_policy is None + + def test_retry_policy_set(self): + from openintent.models import HumanRetryPolicy, SuspensionRecord + + p = HumanRetryPolicy(max_attempts=2, interval_seconds=120) + s = SuspensionRecord(id="x", question="q?", retry_policy=p) + assert s.retry_policy is p + + def test_to_dict_includes_retry_policy(self): + from openintent.models import HumanRetryPolicy, SuspensionRecord + + p = HumanRetryPolicy(max_attempts=2) + s = SuspensionRecord(id="abc", question="Approve?", retry_policy=p) + d = s.to_dict() + assert "retry_policy" in d + assert d["retry_policy"]["max_attempts"] == 2 + + def test_to_dict_no_retry_policy_omits_key(self): + from openintent.models import SuspensionRecord + + s = SuspensionRecord(id="abc", question="Approve?") + d = s.to_dict() + assert "retry_policy" not in d + + def test_from_dict_with_retry_policy(self): + from openintent.models import SuspensionRecord + + raw = { + "id": "abc", + "question": "Approve?", + "fallback_policy": "fail", + "retry_policy": { + "max_attempts": 3, + "interval_seconds": 600, + "strategy": "fixed", + "final_fallback_policy": "complete_with_fallback", + }, + } + s = SuspensionRecord.from_dict(raw) + assert s.retry_policy is not None + assert s.retry_policy.max_attempts == 3 + assert s.retry_policy.final_fallback_policy == "complete_with_fallback" + + def test_from_dict_without_retry_policy(self): + from openintent.models import SuspensionRecord + + raw = {"id": "abc", "question": "Approve?", "fallback_policy": "fail"} + s = SuspensionRecord.from_dict(raw) + assert s.retry_policy is None + + +class TestEventTypeRFC0026: + """RFC-0026 EventType constants.""" + + def test_renotified_event(self): + from openintent.models import EventType + + assert EventType.INTENT_SUSPENSION_RENOTIFIED == "intent.suspension_renotified" + + def test_escalated_event(self): + from openintent.models import EventType + + assert EventType.INTENT_SUSPENSION_ESCALATED == "intent.suspension_escalated" + + def test_portfolio_member_suspended(self): + from openintent.models import EventType + + assert EventType.PORTFOLIO_MEMBER_SUSPENDED == "portfolio.member_suspended" + + def test_portfolio_member_resumed(self): + from openintent.models import EventType + + assert EventType.PORTFOLIO_MEMBER_RESUMED == "portfolio.member_resumed" + + def test_rfc0026_events_in_enum(self): + from openintent.models import EventType + + values = {e.value for e in EventType} + assert "intent.suspension_renotified" in values + assert "intent.suspension_escalated" in values + assert "portfolio.member_suspended" in values + assert "portfolio.member_resumed" in values + + +class TestRequestInputRetryPolicy: + """request_input() accepts retry_policy parameter (RFC-0026).""" + + def test_request_input_signature_accepts_retry_policy(self): + import inspect + + from openintent.agents import BaseAgent + + sig = inspect.signature(BaseAgent.request_input) + assert "retry_policy" in sig.parameters + + def test_retry_policy_default_is_none(self): + import inspect + + from openintent.agents import BaseAgent + + sig = inspect.signature(BaseAgent.request_input) + p = sig.parameters["retry_policy"] + assert p.default is None + + +class TestBaseAgentDefaultHumanRetryPolicy: + """BaseAgent.default_human_retry_policy class attribute (RFC-0026).""" + + def test_default_is_none(self): + from openintent.agents import BaseAgent + + assert BaseAgent.default_human_retry_policy is None + + def test_can_set_on_subclass(self): + from openintent.agents import BaseAgent + from openintent.models import HumanRetryPolicy + + class MyAgent(BaseAgent): + default_human_retry_policy = HumanRetryPolicy( + max_attempts=4, interval_seconds=600 + ) + + assert MyAgent.default_human_retry_policy is not None + assert MyAgent.default_human_retry_policy.max_attempts == 4 + + def test_subclass_policy_does_not_affect_base(self): + from openintent.agents import BaseAgent + from openintent.models import HumanRetryPolicy + + class MyAgent(BaseAgent): + default_human_retry_policy = HumanRetryPolicy(max_attempts=2) + + assert BaseAgent.default_human_retry_policy is None + + +class TestRFC0026PackageExports: + """RFC-0026 symbols are exported from the openintent top-level package.""" + + def test_human_retry_policy_exported(self): + import openintent + + assert hasattr(openintent, "HumanRetryPolicy") + + def test_escalation_step_exported(self): + import openintent + + assert hasattr(openintent, "EscalationStep") + + def test_upstream_intent_suspended_error_exported(self): + import openintent + + assert hasattr(openintent, "UpstreamIntentSuspendedError") + + def test_human_retry_policy_instantiable_from_package(self): + import openintent + + p = openintent.HumanRetryPolicy(max_attempts=2, interval_seconds=300) + assert p.max_attempts == 2 + + def test_event_types_renotified_exported(self): + import openintent + + assert ( + openintent.EventType.INTENT_SUSPENSION_RENOTIFIED + == "intent.suspension_renotified" + ) + + def test_event_types_escalated_exported(self): + import openintent + + assert ( + openintent.EventType.INTENT_SUSPENSION_ESCALATED + == "intent.suspension_escalated" + ) + + +class TestRenotificationHandlerInvocation: + """RFC-0026: re-notification fires @on_input_requested with attempt data in suspension.context.""" + + def test_suspension_context_attempt_key_structure(self): + """Verify _attempt/_max_attempts context keys match RFC-0026 spec (not _renotify dict).""" + from openintent.models import HumanRetryPolicy, SuspensionRecord + + p = HumanRetryPolicy(max_attempts=3, interval_seconds=60) + s = SuspensionRecord( + id="x", question="q?", retry_policy=p, context={"foo": "bar"} + ) + + import dataclasses + + renotify_context = dict(s.context) + renotify_context["_attempt"] = 2 + renotify_context["_max_attempts"] = 3 + renotify_suspension = dataclasses.replace(s, context=renotify_context) + + assert renotify_suspension.context["_attempt"] == 2 + assert renotify_suspension.context["_max_attempts"] == 3 + assert renotify_suspension.context["foo"] == "bar" + + def test_escalation_channel_hint_applied_to_suspension(self): + """Escalation step channel_hint is applied on re-notification suspension.""" + from openintent.models import EscalationStep, HumanRetryPolicy, SuspensionRecord + + p = HumanRetryPolicy( + max_attempts=3, + interval_seconds=60, + escalation_ladder=[ + EscalationStep( + attempt=2, channel_hint="pagerduty", notify_to="on-call" + ), + ], + ) + s = SuspensionRecord(id="x", question="q?", retry_policy=p) + + import dataclasses + + step = p.escalation_ladder[0] + renotify_context = dict(s.context) + renotify_context["_attempt"] = 2 + renotify_context["_max_attempts"] = 3 + renotify_context["_notify_to"] = step.notify_to + renotify_suspension = dataclasses.replace( + s, + context=renotify_context, + channel_hint=step.channel_hint, + ) + + assert renotify_suspension.channel_hint == "pagerduty" + assert renotify_suspension.context["_notify_to"] == "on-call" + assert renotify_suspension.context["_attempt"] == 2 + + def test_original_suspension_context_unchanged(self): + """The original suspension.context should not be mutated during re-notification.""" + from openintent.models import HumanRetryPolicy, SuspensionRecord + + original_ctx = {"original_key": "original_value"} + p = HumanRetryPolicy(max_attempts=2, interval_seconds=10) + s = SuspensionRecord( + id="x", question="q?", retry_policy=p, context=original_ctx + ) + + import dataclasses + + renotify_context = dict(s.context) + renotify_context["_attempt"] = 2 + renotify_context["_max_attempts"] = 2 + _renotify_suspension = dataclasses.replace(s, context=renotify_context) + + assert s.context == {"original_key": "original_value"} + assert "_attempt" not in s.context + + def test_handler_receives_same_signature_on_renotify(self): + """Handlers receive (intent, suspension) — same signature for first call and re-notifies.""" + from openintent.models import HumanRetryPolicy, SuspensionRecord + + received_args = [] + + async def my_handler(intent, suspension_record): + received_args.append((intent, suspension_record)) + + p = HumanRetryPolicy(max_attempts=3, interval_seconds=60) + s = SuspensionRecord(id="x", question="q?", retry_policy=p) + + import asyncio + import dataclasses + + renotify_suspension = dataclasses.replace( + s, context={"_attempt": 2, "_max_attempts": 3} + ) + + asyncio.run(my_handler("mock_intent", renotify_suspension)) + + assert len(received_args) == 1 + _intent, susp = received_args[0] + assert susp.context["_attempt"] == 2 + + def test_handler_can_read_attempt_from_context_rfc0026_example(self): + """Handlers can read _attempt from context per RFC-0026 example code.""" + from openintent.models import HumanRetryPolicy, SuspensionRecord + + p = HumanRetryPolicy(max_attempts=3, interval_seconds=60) + s = SuspensionRecord(id="x", question="q?", retry_policy=p) + + import dataclasses + + renotify_suspension = dataclasses.replace( + s, context={"_attempt": 2, "_max_attempts": 3} + ) + + attempt = renotify_suspension.context.get("_attempt", 1) + max_att = renotify_suspension.context.get("_max_attempts", 1) + assert attempt == 2 + assert max_att == 3 + + +class TestPlatformLevelCascade: + """RFC-0026 §5.3: three-level retry policy cascade: call-site > agent > platform.""" + + def test_server_config_suspension_field(self): + """ServerConfig supports suspension_default_retry_policy field.""" + from openintent.server.config import ServerConfig + + cfg = ServerConfig( + suspension_default_retry_policy={ + "max_attempts": 3, + "interval_seconds": 1800, + "strategy": "linear", + "escalation_ladder": [], + "final_fallback_policy": "fail", + } + ) + assert cfg.suspension_default_retry_policy["max_attempts"] == 3 + assert cfg.suspension_default_retry_policy["interval_seconds"] == 1800 + + def test_server_config_suspension_default_none(self): + """ServerConfig.suspension_default_retry_policy is None by default.""" + from openintent.server.config import ServerConfig + + cfg = ServerConfig() + assert cfg.suspension_default_retry_policy is None + + def test_human_retry_policy_from_dict_roundtrip(self): + """HumanRetryPolicy.from_dict can deserialise a ServerConfig policy dict.""" + from openintent.models import HumanRetryPolicy + + raw = { + "max_attempts": 4, + "interval_seconds": 900, + "strategy": "linear", + "escalation_ladder": [ + { + "after_attempt": 3, + "channel": "pagerduty", + "notify": "ops@example.com", + }, + ], + "final_fallback_policy": "fail", + } + policy = HumanRetryPolicy.from_dict(raw) + assert policy.max_attempts == 4 + assert policy.interval_seconds == 900 + assert len(policy.escalation_ladder) == 1 + assert policy.escalation_ladder[0].channel == "pagerduty" + + def test_expires_at_safeguard_interval_zero(self): + """When interval_seconds=0 and max_attempts=1, timeout_seconds is used for expiry.""" + from datetime import datetime, timedelta + + from openintent.models import HumanRetryPolicy + + p = HumanRetryPolicy(max_attempts=1, interval_seconds=0) + timeout_seconds = 300 + now = datetime.utcnow() + total_seconds = p.interval_seconds * p.max_attempts + if total_seconds > 0: + expires_at = now + timedelta(seconds=total_seconds) + elif timeout_seconds is not None: + expires_at = now + timedelta(seconds=timeout_seconds) + else: + expires_at = None + + assert expires_at is not None + delta = (expires_at - now).total_seconds() + assert abs(delta - 300) < 2 + + def test_renotification_event_payload_fields(self): + """intent.suspension_renotified payload uses RFC-0026 field names.""" + payload = { + "suspension_id": "susp-123", + "attempt": 2, + "max_attempts": 3, + "channel_hint": "email", + "notify_to": None, + "next_attempt_at": "2026-03-24T11:00:00Z", + } + assert "channel_hint" in payload + assert "notify_to" in payload + assert "next_attempt_at" in payload + assert "channel" not in payload + + def test_escalation_event_payload_fields(self): + """intent.suspension_escalated payload uses RFC-0026 field names.""" + payload = { + "suspension_id": "susp-123", + "attempt": 3, + "escalated_to": "supervisor@example.com", + "channel_hint": "pagerduty", + } + assert "escalated_to" in payload + assert "channel_hint" in payload + assert "notify" not in payload + + +class TestMergeRetryPolicies: + """RFC-0026: _merge_retry_policies field-level merge logic.""" + + def test_all_none_returns_none(self): + from openintent.agents import _merge_retry_policies + + result = _merge_retry_policies( + call_site=None, agent_default=None, platform_default=None + ) + assert result is None + + def test_single_policy_returned_as_is(self): + from openintent.agents import _merge_retry_policies + from openintent.models import HumanRetryPolicy + + p = HumanRetryPolicy(max_attempts=5) + result = _merge_retry_policies( + call_site=p, agent_default=None, platform_default=None + ) + assert result is p + + def test_call_site_overrides_platform_max_attempts(self): + from openintent.agents import _merge_retry_policies + from openintent.models import HumanRetryPolicy + + platform = HumanRetryPolicy(max_attempts=3, interval_seconds=3600) + call = HumanRetryPolicy(max_attempts=5) + result = _merge_retry_policies( + call_site=call, agent_default=None, platform_default=platform + ) + assert result is not None + assert result.max_attempts == 5 + assert result.interval_seconds == 3600 + + def test_agent_inherits_platform_interval(self): + from openintent.agents import _merge_retry_policies + from openintent.models import HumanRetryPolicy + + platform = HumanRetryPolicy(max_attempts=3, interval_seconds=1800) + agent = HumanRetryPolicy(max_attempts=2) + result = _merge_retry_policies( + call_site=None, agent_default=agent, platform_default=platform + ) + assert result is not None + assert result.max_attempts == 2 + assert result.interval_seconds == 1800 + + def test_call_site_escalation_ladder_overrides_lower_levels(self): + from openintent.agents import _merge_retry_policies + from openintent.models import EscalationStep, HumanRetryPolicy + + platform = HumanRetryPolicy( + escalation_ladder=[EscalationStep(attempt=2, channel_hint="email")] + ) + call = HumanRetryPolicy( + escalation_ladder=[EscalationStep(attempt=3, channel_hint="pagerduty")] + ) + result = _merge_retry_policies( + call_site=call, agent_default=None, platform_default=platform + ) + assert result is not None + assert len(result.escalation_ladder) == 1 + assert result.escalation_ladder[0].attempt == 3 + assert result.escalation_ladder[0].channel_hint == "pagerduty" + + def test_platform_only_returns_platform(self): + from openintent.agents import _merge_retry_policies + from openintent.models import HumanRetryPolicy + + platform = HumanRetryPolicy(max_attempts=4, interval_seconds=900) + result = _merge_retry_policies( + call_site=None, agent_default=None, platform_default=platform + ) + assert result is not None + assert result.max_attempts == 4 + assert result.interval_seconds == 900 diff --git a/tests/test_workflow_io.py b/tests/test_workflow_io.py index 90dcce7..71484ee 100644 --- a/tests/test_workflow_io.py +++ b/tests/test_workflow_io.py @@ -1371,3 +1371,221 @@ def test_phase_config_still_exported(self): import openintent assert hasattr(openintent, "PhaseConfig") + + +# =========================================================================== +# RFC-0026: UpstreamIntentSuspendedError +# =========================================================================== + + +class TestUpstreamIntentSuspendedError: + """UpstreamIntentSuspendedError — construction and attributes.""" + + def test_construction_basic(self): + from openintent.workflow import UpstreamIntentSuspendedError + + e = UpstreamIntentSuspendedError( + task_id="task_01", + phase_name="run_analysis", + suspended_intent_id="intent_abc", + ) + assert e.task_id == "task_01" + assert e.phase_name == "run_analysis" + assert e.suspended_intent_id == "intent_abc" + assert e.expected_resume_at is None + + def test_construction_with_resume_estimate(self): + from openintent.workflow import UpstreamIntentSuspendedError + + e = UpstreamIntentSuspendedError( + task_id="task_02", + phase_name="generate_report", + suspended_intent_id="intent_xyz", + expected_resume_at="2026-03-24T15:00:00Z", + ) + assert e.expected_resume_at == "2026-03-24T15:00:00Z" + + def test_message_contains_intent_id(self): + from openintent.workflow import UpstreamIntentSuspendedError + + e = UpstreamIntentSuspendedError( + task_id="t", phase_name="p", suspended_intent_id="intent_abc" + ) + assert "intent_abc" in str(e) + + def test_message_contains_resume_estimate(self): + from openintent.workflow import UpstreamIntentSuspendedError + + e = UpstreamIntentSuspendedError( + task_id="t", + phase_name="p", + suspended_intent_id="intent_abc", + expected_resume_at="2026-03-24T15:00:00Z", + ) + assert "2026-03-24T15:00:00Z" in str(e) + + def test_is_workflow_error(self): + from openintent.workflow import UpstreamIntentSuspendedError, WorkflowError + + e = UpstreamIntentSuspendedError( + task_id="t", phase_name="p", suspended_intent_id="i" + ) + assert isinstance(e, WorkflowError) + + def test_is_exception(self): + from openintent.workflow import UpstreamIntentSuspendedError + + e = UpstreamIntentSuspendedError( + task_id="t", phase_name="p", suspended_intent_id="i" + ) + assert isinstance(e, Exception) + + def test_can_be_caught_as_workflow_error(self): + from openintent.workflow import UpstreamIntentSuspendedError, WorkflowError + + with pytest.raises(WorkflowError): + raise UpstreamIntentSuspendedError( + task_id="t", phase_name="p", suspended_intent_id="i" + ) + + def test_exported_from_openintent_package(self): + import openintent + + assert hasattr(openintent, "UpstreamIntentSuspendedError") + + def test_instantiable_from_package(self): + import openintent + + e = openintent.UpstreamIntentSuspendedError( + task_id="t", phase_name="p", suspended_intent_id="i" + ) + assert isinstance(e, openintent.WorkflowError) + + +class TestValidateClaimInputsUpstreamSuspension: + """validate_claim_inputs() raises UpstreamIntentSuspendedError when upstream is suspended (RFC-0026).""" + + _WORKFLOW = """\ +openintent: "1.0" +info: + name: "Suspension Test" +workflow: + fetch: + title: "Fetch" + assign: agent-a + outputs: + data: string + process: + title: "Process" + assign: agent-b + depends_on: [fetch] + inputs: + processed_data: fetch.data + outputs: + result: string +""" + + def test_raises_when_upstream_suspended(self, tmp_path): + from openintent.workflow import UpstreamIntentSuspendedError, WorkflowSpec + + spec = WorkflowSpec.from_string(self._WORKFLOW) + with pytest.raises(UpstreamIntentSuspendedError) as exc_info: + spec.validate_claim_inputs( + phase_name="process", + upstream_outputs={"fetch": {"data": "hello"}}, + task_id="task_process", + upstream_intents_status={ + "fetch": { + "status": "suspended_awaiting_input", + "intent_id": "intent_fetch_001", + "expected_resume_at": None, + } + }, + ) + e = exc_info.value + assert e.task_id == "task_process" + assert e.phase_name == "process" + assert e.suspended_intent_id == "intent_fetch_001" + assert e.expected_resume_at is None + + def test_raises_with_resume_estimate(self, tmp_path): + from openintent.workflow import UpstreamIntentSuspendedError, WorkflowSpec + + spec = WorkflowSpec.from_string(self._WORKFLOW) + with pytest.raises(UpstreamIntentSuspendedError) as exc_info: + spec.validate_claim_inputs( + phase_name="process", + upstream_outputs={"fetch": {"data": "hello"}}, + task_id="task_process", + upstream_intents_status={ + "fetch": { + "status": "suspended_awaiting_input", + "intent_id": "intent_fetch_001", + "expected_resume_at": "2026-03-24T15:00:00Z", + } + }, + ) + assert exc_info.value.expected_resume_at == "2026-03-24T15:00:00Z" + + def test_does_not_raise_when_upstream_active(self, tmp_path): + from openintent.workflow import WorkflowSpec + + spec = WorkflowSpec.from_string(self._WORKFLOW) + # Should not raise — upstream is active, outputs available + result = spec.validate_claim_inputs( + phase_name="process", + upstream_outputs={"fetch": {"data": "hello"}}, + task_id="task_process", + upstream_intents_status={ + "fetch": { + "status": "active", + "intent_id": "intent_fetch_001", + "expected_resume_at": None, + } + }, + ) + assert result is None + + def test_does_not_raise_when_status_map_absent(self, tmp_path): + from openintent.workflow import WorkflowSpec + + spec = WorkflowSpec.from_string(self._WORKFLOW) + # Backwards compat: if no upstream_intents_status provided, behaves as before + result = spec.validate_claim_inputs( + phase_name="process", + upstream_outputs={"fetch": {"data": "hello"}}, + task_id="task_process", + ) + assert result is None + + def test_trigger_refs_ignored(self, tmp_path): + """$trigger.* references should not trigger suspension check.""" + from openintent.workflow import WorkflowSpec + + workflow_yaml = """\ +openintent: "1.0" +info: + name: "Trigger Test" +workflow: + process: + title: "Process" + assign: agent-b + inputs: + quarter: $trigger.quarter +""" + spec = WorkflowSpec.from_string(workflow_yaml) + # Should not raise even if upstream_intents_status maps $trigger somehow + result = spec.validate_claim_inputs( + phase_name="process", + upstream_outputs={}, + trigger_payload={"quarter": "Q1"}, + task_id="task_process", + upstream_intents_status={ + "trigger": { + "status": "suspended_awaiting_input", + "intent_id": "intent_trigger", + "expected_resume_at": None, + } + }, + ) + assert result is None