Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,33 @@ Response:

- success => `{"items": [...], "default_by_provider": {...}, "connected": [...]}` (JSON-RPC result)

## Interrupt Recovery (Provider-Private Extension)

The runtime also exposes provider-private recovery queries for pending
interactive interrupts:

- `opencode.permissions.list`
- `opencode.questions.list`

These methods return recovery views over the local interrupt binding registry.
They do not replace the shared `a2a.interrupt.*` callback methods.

Response shape:

- success => `{"items": [{"request_id", "session_id", "interrupt_type", "task_id", "context_id", "details", "expires_at"}]}` (JSON-RPC result)

Notes:

- Recovery results are scoped to the current authenticated caller identity when
the runtime can resolve one.
- The runtime stores normalized interrupt `details` alongside request bindings,
so recovery results match the shape emitted in
`metadata.shared.interrupt.details`.
- The first implementation stage reads from the local interrupt registry rather
than proxying upstream global `/permission` or `/question` pending lists.
- Use recovery queries to rediscover pending requests after reconnecting; use
`a2a.interrupt.*` methods to resolve them.

## Shared Interrupt Callback (A2A Extension)

When stream metadata reports an interrupt request at `metadata.shared.interrupt`,
Expand All @@ -871,7 +898,8 @@ clients can reply through JSON-RPC extension methods:
Notes:

- `request_id` must be a live interrupt request observed from stream metadata
(`metadata.shared.interrupt.request_id`).
(`metadata.shared.interrupt.request_id`) or rediscovered through
`opencode.permissions.list` / `opencode.questions.list`.
- The server keeps an interrupt binding registry; callbacks with unknown or
expired `request_id` are rejected.
- The cache retention windows are controlled by
Expand Down
123 changes: 120 additions & 3 deletions src/opencode_a2a/contracts/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
SESSION_QUERY_EXTENSION_URI = "urn:opencode-a2a:session-query/v1"
PROVIDER_DISCOVERY_EXTENSION_URI = "urn:opencode-a2a:provider-discovery/v1"
INTERRUPT_CALLBACK_EXTENSION_URI = "urn:a2a:interactive-interrupt/v1"
INTERRUPT_RECOVERY_EXTENSION_URI = "urn:opencode-a2a:interrupt-recovery/v1"
COMPATIBILITY_PROFILE_EXTENSION_URI = "urn:a2a:compatibility-profile/v1"
WIRE_CONTRACT_EXTENSION_URI = "urn:a2a:wire-contract/v1"
SERVICE_BEHAVIOR_CLASSIFICATION = "service-level-semantic-enhancement"
Expand Down Expand Up @@ -57,6 +58,16 @@ class ProviderDiscoveryMethodContract:
notification_response_status: int | None = None


@dataclass(frozen=True)
class InterruptRecoveryMethodContract:
method: str
required_params: tuple[str, ...] = ()
optional_params: tuple[str, ...] = ()
result_fields: tuple[str, ...] = ()
items_type: str | None = None
notification_response_status: int | None = None


PROMPT_ASYNC_REQUEST_REQUIRED_FIELDS: tuple[str, ...] = ("parts",)
PROMPT_ASYNC_REQUEST_OPTIONAL_FIELDS: tuple[str, ...] = (
"messageID",
Expand Down Expand Up @@ -254,6 +265,25 @@ class ProviderDiscoveryMethodContract:
key: contract.method for key, contract in PROVIDER_DISCOVERY_METHOD_CONTRACTS.items()
}

INTERRUPT_RECOVERY_METHOD_CONTRACTS: dict[str, InterruptRecoveryMethodContract] = {
"list_permissions": InterruptRecoveryMethodContract(
method="opencode.permissions.list",
result_fields=("items",),
items_type="InterruptRequest[]",
notification_response_status=204,
),
"list_questions": InterruptRecoveryMethodContract(
method="opencode.questions.list",
result_fields=("items",),
items_type="InterruptRequest[]",
notification_response_status=204,
),
}

INTERRUPT_RECOVERY_METHODS: dict[str, str] = {
key: contract.method for key, contract in INTERRUPT_RECOVERY_METHOD_CONTRACTS.items()
}

INTERRUPT_SUCCESS_RESULT_FIELDS: tuple[str, ...] = ("ok", "request_id")
INTERRUPT_ERROR_BUSINESS_CODES: dict[str, int] = {
"INTERRUPT_REQUEST_NOT_FOUND": -32004,
Expand Down Expand Up @@ -299,6 +329,11 @@ class ProviderDiscoveryMethodContract:
"field",
"fields",
)
INTERRUPT_RECOVERY_INVALID_PARAMS_DATA_FIELDS: tuple[str, ...] = (
"type",
"field",
"fields",
)


@dataclass(frozen=True)
Expand Down Expand Up @@ -358,6 +393,9 @@ def session_control_methods(self) -> dict[str, str]:
def provider_discovery_methods(self) -> dict[str, str]:
return dict(PROVIDER_DISCOVERY_METHODS)

def interrupt_recovery_methods(self) -> dict[str, str]:
return dict(INTERRUPT_RECOVERY_METHODS)

def interrupt_callback_methods(self) -> dict[str, str]:
return dict(INTERRUPT_CALLBACK_METHODS)

Expand All @@ -369,6 +407,7 @@ def supported_jsonrpc_methods(self) -> list[str]:
SESSION_CONTROL_METHODS["prompt_async"],
SESSION_CONTROL_METHODS["command"],
*PROVIDER_DISCOVERY_METHODS.values(),
*INTERRUPT_RECOVERY_METHODS.values(),
*INTERRUPT_CALLBACK_METHODS.values(),
]
if self.is_method_enabled(SESSION_CONTROL_METHODS["shell"]):
Expand All @@ -382,6 +421,7 @@ def extension_jsonrpc_methods(self) -> list[str]:
SESSION_CONTROL_METHODS["prompt_async"],
SESSION_CONTROL_METHODS["command"],
*PROVIDER_DISCOVERY_METHODS.values(),
*INTERRUPT_RECOVERY_METHODS.values(),
*INTERRUPT_CALLBACK_METHODS.values(),
]
if self.is_method_enabled(SESSION_CONTROL_METHODS["shell"]):
Expand Down Expand Up @@ -684,6 +724,66 @@ def build_interrupt_callback_extension_params(
}


def build_interrupt_recovery_extension_params(
*,
runtime_profile: RuntimeProfile,
) -> dict[str, Any]:
method_contracts: dict[str, Any] = {}

for method_contract in INTERRUPT_RECOVERY_METHOD_CONTRACTS.values():
params_contract = _build_method_contract_params(
required=method_contract.required_params,
optional=method_contract.optional_params,
unsupported=(),
)
result_contract: dict[str, Any] = {"fields": list(method_contract.result_fields)}
if method_contract.items_type:
result_contract["items_type"] = method_contract.items_type
contract_doc: dict[str, Any] = {
"params": params_contract,
"result": result_contract,
}
if method_contract.notification_response_status is not None:
contract_doc["notification_response_status"] = (
method_contract.notification_response_status
)
method_contracts[method_contract.method] = contract_doc

return {
"methods": dict(INTERRUPT_RECOVERY_METHODS),
"method_contracts": method_contracts,
"supported_metadata": [],
"provider_private_metadata": [],
"item_fields": {
"request_id": "items[].request_id",
"session_id": "items[].session_id",
"interrupt_type": "items[].interrupt_type",
"task_id": "items[].task_id",
"context_id": "items[].context_id",
"details": "items[].details",
"expires_at": "items[].expires_at",
},
"errors": {
"invalid_params_data_fields": list(INTERRUPT_RECOVERY_INVALID_PARAMS_DATA_FIELDS),
},
"profile": runtime_profile.summary_dict(),
"notes": [
(
"Interrupt recovery methods read from the local interrupt binding registry "
"instead of directly proxying upstream global pending lists."
),
(
"Results are scoped to the current authenticated caller identity when the "
"runtime can resolve one."
),
(
"Use a2a.interrupt.* methods to resolve requests; opencode.permissions.list "
"and opencode.questions.list are recovery surfaces only."
),
],
}


def build_provider_discovery_extension_params(
*,
runtime_profile: RuntimeProfile,
Expand Down Expand Up @@ -800,6 +900,17 @@ def build_compatibility_profile_params(
for method in PROVIDER_DISCOVERY_METHODS.values()
}
)
method_retention.update(
{
method: {
"surface": "extension",
"availability": "always",
"retention": "stable",
"extension_uri": INTERRUPT_RECOVERY_EXTENSION_URI,
}
for method in INTERRUPT_RECOVERY_METHODS.values()
}
)
method_retention.update(
{
method: {
Expand Down Expand Up @@ -843,6 +954,11 @@ def build_compatibility_profile_params(
"availability": "always",
"retention": "stable",
},
INTERRUPT_RECOVERY_EXTENSION_URI: {
"surface": "jsonrpc-extension",
"availability": "always",
"retention": "stable",
},
INTERRUPT_CALLBACK_EXTENSION_URI: {
"surface": "jsonrpc-extension",
"availability": "always",
Expand All @@ -862,9 +978,9 @@ def build_compatibility_profile_params(
"surface for the main chat path; provider defaults still belong to OpenCode."
),
(
"Treat opencode.sessions.*, opencode.providers.*, and opencode.models.* as "
"provider-private operational surfaces rather than portable A2A baseline "
"capabilities."
"Treat opencode.sessions.*, opencode.providers.*, opencode.models.*, "
"opencode.permissions.list, and opencode.questions.list as provider-private "
"operational surfaces rather than portable A2A baseline capabilities."
),
(
"Treat a2a.interrupt.* methods as declared shared extensions and opencode.* "
Expand Down Expand Up @@ -911,6 +1027,7 @@ def build_wire_contract_params(
STREAMING_EXTENSION_URI,
SESSION_QUERY_EXTENSION_URI,
PROVIDER_DISCOVERY_EXTENSION_URI,
INTERRUPT_RECOVERY_EXTENSION_URI,
INTERRUPT_CALLBACK_EXTENSION_URI,
],
},
Expand Down
1 change: 1 addition & 0 deletions src/opencode_a2a/execution/stream_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def _tool_chunks(
identity=identity,
task_id=task_id,
context_id=context_id,
details=asked["details"],
)
await _emit_interrupt_status(
state=TaskState.input_required,
Expand Down
4 changes: 4 additions & 0 deletions src/opencode_a2a/jsonrpc/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def __init__(
self._method_shell = methods.get("shell")
self._method_list_providers = methods["list_providers"]
self._method_list_models = methods["list_models"]
self._method_list_permissions = methods["list_permissions"]
self._method_list_questions = methods["list_questions"]
self._method_reply_permission = methods["reply_permission"]
self._method_reply_question = methods["reply_question"]
self._method_reject_question = methods["reject_question"]
Expand Down Expand Up @@ -116,6 +118,8 @@ def __init__(
method_shell=self._method_shell,
method_list_providers=self._method_list_providers,
method_list_models=self._method_list_models,
method_list_permissions=self._method_list_permissions,
method_list_questions=self._method_list_questions,
method_reply_permission=self._method_reply_permission,
method_reply_question=self._method_reply_question,
method_reject_question=self._method_reject_question,
Expand Down
13 changes: 13 additions & 0 deletions src/opencode_a2a/jsonrpc/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ExtensionHandlerContext:
method_shell: str | None
method_list_providers: str
method_list_models: str
method_list_permissions: str
method_list_questions: str
method_reply_permission: str
method_reply_question: str
method_reject_question: str
Expand Down Expand Up @@ -89,6 +91,7 @@ def build_extension_method_registry(
context: ExtensionHandlerContext,
) -> ExtensionMethodRegistry:
from .handlers.interrupt_callbacks import handle_interrupt_callback_request
from .handlers.interrupt_queries import handle_interrupt_query_request
from .handlers.provider_discovery import handle_provider_discovery_request
from .handlers.session_control import handle_session_control_request
from .handlers.session_queries import handle_session_query_request
Expand Down Expand Up @@ -119,6 +122,16 @@ def build_extension_method_registry(
),
handler=handle_provider_discovery_request,
),
ExtensionMethodSpec(
name="interrupt_query",
methods=frozenset(
{
context.method_list_permissions,
context.method_list_questions,
}
),
handler=handle_interrupt_query_request,
),
ExtensionMethodSpec(
name="session_control",
methods=frozenset(session_control_methods),
Expand Down
64 changes: 64 additions & 0 deletions src/opencode_a2a/jsonrpc/handlers/interrupt_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

from typing import Any

from a2a.types import JSONRPCRequest
from starlette.requests import Request
from starlette.responses import Response

from ..dispatch import ExtensionHandlerContext
from ..error_responses import invalid_params_error
from .common import build_internal_error_response, build_success_response


def _binding_to_result_item(binding: Any) -> dict[str, Any]:
return {
"request_id": binding.request_id,
"session_id": binding.session_id,
"interrupt_type": binding.interrupt_type,
"task_id": binding.task_id,
"context_id": binding.context_id,
"details": dict(binding.details) if isinstance(binding.details, dict) else None,
"expires_at": binding.expires_at,
}


async def handle_interrupt_query_request(
context: ExtensionHandlerContext,
base_request: JSONRPCRequest,
params: dict[str, Any],
request: Request,
) -> Response:
unknown_fields = sorted(params)
if unknown_fields:
return context.error_response(
base_request.id,
invalid_params_error(
f"Unsupported fields: {', '.join(unknown_fields)}",
data={"type": "INVALID_FIELD", "fields": unknown_fields},
),
)

request_identity = getattr(request.state, "user_identity", None)
identity = request_identity.strip() if isinstance(request_identity, str) else ""
if not identity:
return build_success_response(context, base_request.id, {"items": []})

try:
if base_request.method == context.method_list_permissions:
items = await context.upstream_client.list_permission_requests(identity=identity)
else:
items = await context.upstream_client.list_question_requests(identity=identity)
except Exception as exc:
return build_internal_error_response(
context,
base_request.id,
log_message="Interrupt recovery JSON-RPC method failed",
exc=exc,
)

return build_success_response(
context,
base_request.id,
{"items": [_binding_to_result_item(item) for item in items]},
)
4 changes: 1 addition & 3 deletions src/opencode_a2a/jsonrpc/handlers/provider_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@

logger = logging.getLogger(__name__)

ERR_DISCOVERY_UPSTREAM_UNREACHABLE = PROVIDER_DISCOVERY_ERROR_BUSINESS_CODES[
"UPSTREAM_UNREACHABLE"
]
ERR_DISCOVERY_UPSTREAM_UNREACHABLE = PROVIDER_DISCOVERY_ERROR_BUSINESS_CODES["UPSTREAM_UNREACHABLE"]
ERR_DISCOVERY_UPSTREAM_HTTP_ERROR = PROVIDER_DISCOVERY_ERROR_BUSINESS_CODES["UPSTREAM_HTTP_ERROR"]
ERR_DISCOVERY_UPSTREAM_PAYLOAD_ERROR = PROVIDER_DISCOVERY_ERROR_BUSINESS_CODES[
"UPSTREAM_PAYLOAD_ERROR"
Expand Down
Loading