Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,20 @@ No extra custom REST endpoint is introduced.
- session list => `Task` with `status.state=completed`
- message history => `Message`
- limit pagination defaults to `20`; requests above `100` are rejected
- `opencode.sessions.messages.list` also returns `result.next_cursor`
when older messages are available
- `contextId` is an A2A context key derived by the adapter
(format: `ctx:opencode-session:<session_id>`, not raw OpenCode session ID)
- OpenCode session identity is exposed explicitly at `metadata.shared.session.id`
- session title is available at `metadata.shared.session.title`
- Session list filters:
- optional `directory`, `roots`, `start`, `search`, `limit`
- `directory` is normalized through the same workspace-boundary rules used by
other OpenCode directory overrides before reaching upstream
- Session message history filters:
- optional `limit`, `before`
- `before` is an opaque cursor for loading older messages and is only
supported on `opencode.sessions.messages.list`

### Session List (`opencode.sessions.list`)

Expand All @@ -664,7 +674,12 @@ curl -sS http://127.0.0.1:8000/ \
"jsonrpc": "2.0",
"id": 1,
"method": "opencode.sessions.list",
"params": {"limit": 20}
"params": {
"directory": "services/api",
"roots": true,
"search": "planner",
"limit": 20
}
}'
```

Expand All @@ -680,11 +695,18 @@ curl -sS http://127.0.0.1:8000/ \
"method": "opencode.sessions.messages.list",
"params": {
"session_id": "<session_id>",
"before": "<next_cursor_from_previous_page>",
"limit": 50
}
}'
```

Message history responses include:

- `result.items`: normalized A2A `Message[]`
- `result.next_cursor`: opaque cursor for the next older page, or `null` when
no older page is available

### Session Prompt Async (`opencode.sessions.prompt_async`)

```bash
Expand Down
24 changes: 19 additions & 5 deletions src/opencode_a2a/contracts/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,28 @@ class InterruptRecoveryMethodContract:
*SHELL_REQUEST_OPTIONAL_FIELDS,
)

SESSION_QUERY_PAGINATION_MODE = "limit"
SESSION_QUERY_PAGINATION_MODE = "limit_and_optional_cursor"
SESSION_QUERY_PAGINATION_BEHAVIOR = "passthrough"
SESSION_QUERY_DEFAULT_LIMIT = 20
SESSION_QUERY_MAX_LIMIT = 100
SESSION_QUERY_PAGINATION_PARAMS: tuple[str, ...] = ("limit",)
SESSION_QUERY_PAGINATION_PARAMS: tuple[str, ...] = ("limit", "before")
SESSION_QUERY_PAGINATION_UNSUPPORTED: tuple[str, ...] = ("cursor", "page", "size")

SESSION_QUERY_METHOD_CONTRACTS: dict[str, SessionQueryMethodContract] = {
"list_sessions": SessionQueryMethodContract(
method="opencode.sessions.list",
optional_params=("limit", "query.limit"),
optional_params=(
"limit",
"directory",
"roots",
"start",
"search",
"query.limit",
"query.directory",
"query.roots",
"query.start",
"query.search",
),
unsupported_params=SESSION_QUERY_PAGINATION_UNSUPPORTED,
result_fields=("items",),
items_type="Task[]",
Expand All @@ -122,9 +133,9 @@ class InterruptRecoveryMethodContract:
"get_session_messages": SessionQueryMethodContract(
method="opencode.sessions.messages.list",
required_params=("session_id",),
optional_params=("limit", "query.limit"),
optional_params=("limit", "before", "query.limit", "query.before"),
unsupported_params=SESSION_QUERY_PAGINATION_UNSUPPORTED,
result_fields=("items",),
result_fields=("items", "next_cursor"),
items_type="Message[]",
notification_response_status=204,
pagination_mode=SESSION_QUERY_PAGINATION_MODE,
Expand Down Expand Up @@ -660,7 +671,10 @@ def build_session_query_extension_params(
"max_limit": SESSION_QUERY_MAX_LIMIT,
"behavior": SESSION_QUERY_PAGINATION_BEHAVIOR,
"params": list(SESSION_QUERY_PAGINATION_PARAMS),
"cursor_param": "before",
"result_cursor_field": "next_cursor",
"applies_to": pagination_applies_to,
"cursor_applies_to": [SESSION_QUERY_METHODS["get_session_messages"]],
},
"method_contracts": method_contracts,
"errors": {
Expand Down
34 changes: 30 additions & 4 deletions src/opencode_a2a/jsonrpc/handlers/session_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ async def handle_session_query_request(
params: dict[str, Any],
request: Request,
) -> Response:
del request
try:
if base_request.method == context.method_list_sessions:
query = parse_list_sessions_params(params)
Expand All @@ -60,9 +59,33 @@ async def handle_session_query_request(
)

limit = int(query["limit"])
directory = None
if base_request.method == context.method_list_sessions:
requested_directory = query.pop("directory", None)
if requested_directory is not None and not isinstance(requested_directory, str):
return context.error_response(
base_request.id,
invalid_params_error(
"directory must be a string",
data={"type": "INVALID_FIELD", "field": "directory"},
),
)
try:
directory = context.directory_resolver(requested_directory)
except ValueError as exc:
return context.error_response(
base_request.id,
invalid_params_error(
str(exc),
data={"type": "INVALID_FIELD", "field": "directory"},
),
)
try:
if base_request.method == context.method_list_sessions:
raw_result = await context.upstream_client.list_sessions(params=query)
raw_result = await context.upstream_client.list_sessions(
params=query,
directory=directory,
)
else:
assert session_id is not None
raw_result = await context.upstream_client.list_messages(session_id, params=query)
Expand Down Expand Up @@ -105,7 +128,7 @@ async def handle_session_query_request(
if base_request.method == context.method_list_sessions:
raw_items = _extract_raw_items(raw_result, kind="sessions")
else:
raw_items = _extract_raw_items(raw_result, kind="messages")
raw_items = _extract_raw_items(raw_result.payload, kind="messages")
except ValueError as exc:
logger.warning("Upstream OpenCode payload mismatch: %s", exc)
return build_upstream_payload_error_response(
Expand All @@ -131,4 +154,7 @@ async def handle_session_query_request(
mapped.append(message)
items = mapped

return build_success_response(context, base_request.id, {"items": items})
result: dict[str, Any] = {"items": items}
if base_request.method == context.method_get_session_messages:
result["next_cursor"] = raw_result.next_cursor
return build_success_response(context, base_request.id, result)
134 changes: 132 additions & 2 deletions src/opencode_a2a/jsonrpc/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,66 @@ def _parse_positive_int(value: Any, *, field: str) -> int | None:
return parsed


def _parse_non_negative_int(value: Any, *, field: str) -> int | None:
if value is None:
return None
if isinstance(value, bool):
raise JsonRpcParamsValidationError(
message=f"{field} must be an integer",
data={"type": "INVALID_FIELD", "field": field},
)
if isinstance(value, int):
parsed = value
elif isinstance(value, str):
try:
parsed = int(value)
except ValueError as exc:
raise JsonRpcParamsValidationError(
message=f"{field} must be an integer",
data={"type": "INVALID_FIELD", "field": field},
) from exc
else:
raise JsonRpcParamsValidationError(
message=f"{field} must be an integer",
data={"type": "INVALID_FIELD", "field": field},
)
if parsed < 0:
raise JsonRpcParamsValidationError(
message=f"{field} must be >= 0",
data={"type": "INVALID_FIELD", "field": field},
)
return parsed


def _parse_string_field(value: Any, *, field: str) -> str | None:
if value is None:
return None
if not isinstance(value, str):
raise JsonRpcParamsValidationError(
message=f"{field} must be a string",
data={"type": "INVALID_FIELD", "field": field},
)
normalized = value.strip()
return normalized or None


def _parse_bool_field(value: Any, *, field: str) -> bool | None:
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"true", "1", "yes", "on"}:
return True
if normalized in {"false", "0", "no", "off"}:
return False
raise JsonRpcParamsValidationError(
message=f"{field} must be a boolean",
data={"type": "INVALID_FIELD", "field": field},
)


def _parse_query_object(params: dict[str, Any]) -> dict[str, Any]:
raw_query = params.get("query")
if raw_query is None:
Expand Down Expand Up @@ -104,10 +164,69 @@ def _normalize_session_query_limit(
return normalized_query


def _normalize_alias_field(
*,
params: dict[str, Any],
query: dict[str, Any],
field: str,
parser,
) -> Any:
top_level_value = parser(params.get(field), field=field)
query_value = parser(query.get(field), field=field)
if top_level_value is not None and query_value is not None and top_level_value != query_value:
raise JsonRpcParamsValidationError(
message=f"{field} is ambiguous between params.{field} and params.query.{field}",
data={"type": "INVALID_FIELD", "field": field},
)
return top_level_value if top_level_value is not None else query_value


def parse_list_sessions_params(params: dict[str, Any]) -> dict[str, Any]:
query = _parse_query_object(params)
_validate_pagination_fields(params, query)
return _normalize_session_query_limit(params=params, query=query)
normalized_query = _normalize_session_query_limit(params=params, query=query)
directory = _normalize_alias_field(
params=params,
query=query,
field="directory",
parser=_parse_string_field,
)
roots = _normalize_alias_field(
params=params,
query=query,
field="roots",
parser=_parse_bool_field,
)
start = _normalize_alias_field(
params=params,
query=query,
field="start",
parser=_parse_non_negative_int,
)
search = _normalize_alias_field(
params=params,
query=query,
field="search",
parser=_parse_string_field,
)

if directory is not None:
normalized_query["directory"] = directory
else:
normalized_query.pop("directory", None)
if roots is not None:
normalized_query["roots"] = roots
else:
normalized_query.pop("roots", None)
if start is not None:
normalized_query["start"] = start
else:
normalized_query.pop("start", None)
if search is not None:
normalized_query["search"] = search
else:
normalized_query.pop("search", None)
return normalized_query


def parse_get_session_messages_params(params: dict[str, Any]) -> tuple[str, dict[str, Any]]:
Expand All @@ -120,4 +239,15 @@ def parse_get_session_messages_params(params: dict[str, Any]) -> tuple[str, dict

query = _parse_query_object(params)
_validate_pagination_fields(params, query)
return raw_session_id.strip(), _normalize_session_query_limit(params=params, query=query)
normalized_query = _normalize_session_query_limit(params=params, query=query)
before = _normalize_alias_field(
params=params,
query=query,
field="before",
parser=_parse_string_field,
)
if before is not None:
normalized_query["before"] = before
else:
normalized_query.pop("before", None)
return raw_session_id.strip(), normalized_query
44 changes: 34 additions & 10 deletions src/opencode_a2a/opencode_upstream_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class OpencodeMessage:
raw: dict[str, Any]


@dataclass(frozen=True)
class OpencodeMessagePage:
payload: Any
next_cursor: str | None


class _FastFailConcurrencyBudget:
def __init__(self, *, category: str, limit: int) -> None:
self._category = category
Expand Down Expand Up @@ -410,23 +416,41 @@ async def abort_session(self, session_id: str, *, directory: str | None = None)
params=self._query_params(directory=directory),
)

async def list_sessions(self, *, params: dict[str, Any] | None = None) -> Any:
async def list_sessions(
self,
*,
params: dict[str, Any] | None = None,
directory: str | None = None,
) -> Any:
"""List sessions from OpenCode."""
# Note: directory override is not explicitly supported by list_sessions params yet.
# If needed, we can add it later. For now we use the default.
return await self._get_json(
"/session",
endpoint="/session",
params=self._merge_params(params),
params=self._merge_params(params, directory=directory),
)

async def list_messages(self, session_id: str, *, params: dict[str, Any] | None = None) -> Any:
async def list_messages(
self,
session_id: str,
*,
params: dict[str, Any] | None = None,
) -> OpencodeMessagePage:
"""List messages for a session from OpenCode."""
return await self._get_json(
f"/session/{session_id}/message",
endpoint="/session/{sessionID}/message",
params=self._merge_params(params),
)
endpoint = "/session/{sessionID}/message"
async with self._request_budget.reserve(operation=endpoint):
response = await self._client.get(
f"/session/{session_id}/message",
params=self._merge_params(params),
)
response.raise_for_status()
payload = self._decode_json_response(response, endpoint=endpoint)
raw_next_cursor = response.headers.get("X-Next-Cursor")
next_cursor = None
if isinstance(raw_next_cursor, str):
normalized_cursor = raw_next_cursor.strip()
if normalized_cursor:
next_cursor = normalized_cursor
return OpencodeMessagePage(payload=payload, next_cursor=next_cursor)

async def session_prompt_async(
self,
Expand Down
Loading