Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,52 @@ async def switch_conversation_llm(
return Success()


@conversation_router.post(
"/{conversation_id}/switch_acp_model",
responses={
400: {"description": "Agent is not ACP, or provider can't switch models"},
404: {"description": "Conversation not found"},
409: {"description": "ACP session not initialized yet"},
504: {"description": "ACP server did not answer the model switch in time"},
},
)
async def switch_conversation_acp_model(
conversation_id: UUID,
model: str = Body(..., embed=True),
Comment thread
simonrosenberg marked this conversation as resolved.
conversation_service: ConversationService = Depends(get_conversation_service),
) -> Success:
"""Switch the model of a running ACP conversation, mid-conversation.

Issues a protocol-level ``session/set_model`` call to the ACP subprocess
so the new model applies to subsequent turns without losing context. Only
valid for ACP conversations whose provider supports runtime switching.
"""
event_service = await conversation_service.get_event_service(conversation_id)
if event_service is None:
raise HTTPException(status.HTTP_404_NOT_FOUND)
try:
await event_service.switch_acp_model(model)
Comment thread
simonrosenberg marked this conversation as resolved.
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
except TimeoutError as e:
# The bounded session/set_model round-trip expired. The ACP server is
# wedged/slow rather than rejecting the request, so surface a 504
# instead of an opaque 500.
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail=str(e),
)
except RuntimeError as e:
Comment thread
simonrosenberg marked this conversation as resolved.
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=str(e),
)
return Success()


@conversation_router.patch(
"/{conversation_id}", responses={404: {"description": "Item not found"}}
)
Expand Down
23 changes: 23 additions & 0 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,29 @@ async def set_security_analyzer(
None, self._conversation.set_security_analyzer, security_analyzer
)

async def switch_acp_model(self, model: str) -> None:
"""Switch the model on a running ACP conversation, mid-conversation.

Runs the (blocking) protocol-level ``session/set_model`` round-trip in a
worker thread, then mirrors the new model into ``meta.json`` so the
switch survives an agent-server restart: ``start()`` rebuilds the agent
from ``self.stored.agent`` and ``ConversationState.create()`` copies
that over the persisted base_state.json on resume. Only ``acp_model``
needs updating — ``model_post_init`` re-derives the sentinel
``llm.model`` on reload.
"""
if self._conversation is None:
raise RuntimeError(
"Conversation is not active; it has not been started or has "
"been closed."
)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._conversation.switch_acp_model, model)
self.stored = self.stored.model_copy(
update={"agent": self.stored.agent.model_copy(update={"acp_model": model})}
)
await self.save_meta()

async def close(self):
if self._lease_task is not None:
self._lease_task.cancel()
Expand Down
240 changes: 214 additions & 26 deletions openhands-sdk/openhands/sdk/agent/acp_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,20 @@ async def _maybe_set_session_model(
session_id: str,
acp_model: str | None,
) -> None:
"""Apply a protocol-level session model override when the server supports it.

Uses :func:`~openhands.sdk.settings.acp_providers.detect_acp_provider_by_agent_name`
to check whether the server supports ``set_session_model``.
claude-agent-acp uses session ``_meta`` via
:func:`~openhands.sdk.settings.acp_providers.build_session_model_meta` instead.
"""Apply the *initial* session model right after session creation.

This is the session-creation path only, gated on
:attr:`~openhands.sdk.settings.acp_providers.ACPProviderInfo.supports_set_session_model`.
Providers that select their initial model via session ``_meta``
(claude-agent-acp, ``supports_set_session_model=False``) already received
the model in ``new_session()``, so this is a no-op for them. Providers that
Comment thread
simonrosenberg marked this conversation as resolved.
use the protocol call for initial selection (codex-acp, gemini-cli) get a
one-shot ``set_session_model`` call here.

Runtime, mid-conversation switches go through
:meth:`ACPAgent.set_acp_model` instead, which always uses
``set_session_model`` and is gated on the separate
``supports_runtime_model_switch`` capability flag.
"""
if not acp_model:
return
Expand All @@ -222,6 +230,44 @@ async def _maybe_set_session_model(
await conn.set_session_model(model_id=acp_model, session_id=session_id)


async def _reapply_session_model_on_resume(
conn: ClientSideConnection,
agent_name: str,
session_id: str,
acp_model: str | None,
) -> None:
"""Reapply the persisted model to a *resumed* session.

``load_session()`` carries no model ``_meta``, so a session resumed after a
runtime switch (or with any persisted ``acp_model``) would otherwise run on
the ACP server's default. This issues ``set_session_model`` so the resumed
live session matches the serialized ``acp_model``.

The gating mirrors :meth:`ACPAgent.set_acp_model` (attempt for custom/unknown
servers and known providers that support runtime switching; skip only known
providers that don't), deliberately differing from the initial-selection
gate: claude-agent-acp selects its initial model via ``_meta`` yet supports
``set_session_model`` for later switches. A server that rejects the call is
tolerated (logged) — like the ``load_session`` fallback above — so resume
can't break; the session keeps the server default until the next switch.
"""
if not acp_model:
return
provider = detect_acp_provider_by_agent_name(agent_name)
if provider is not None and not provider.supports_runtime_model_switch:
Comment thread
simonrosenberg marked this conversation as resolved.
return
try:
await conn.set_session_model(model_id=acp_model, session_id=session_id)
except ACPRequestError as e:
Comment thread
simonrosenberg marked this conversation as resolved.
logger.warning(
"Could not reapply model %r on resumed session %s (%s); the live "
"session may run on the server default until the next switch",
acp_model,
session_id,
e,
)


def _extract_token_usage(
response: Any,
) -> tuple[int, int, int, int, int]:
Expand Down Expand Up @@ -1076,7 +1122,7 @@ def _start_acp_server(self, state: ConversationState) -> None:
)
prior_session_id = None

async def _init() -> tuple[Any, Any, Any, str, str, str]:
async def _init() -> tuple[str, str, str]:
# Spawn the subprocess directly so we can install a
# filtering reader that skips non-JSON-RPC lines some
# ACP servers (e.g. claude-code-acp v0.1.x) write to
Expand Down Expand Up @@ -1106,6 +1152,17 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]:
filtered_reader, # read filtered output
)

# Track the subprocess/connection on self as soon as they exist, so
# that if a *later* init step fails (e.g. the resume model reapply
# times out or the server errors), init_state()'s _cleanup() can
# still tear them down instead of leaking the subprocess/connection.
# The "session initialized" gating keys off _session_id (assigned
# last, on full success), so an early _conn here does not make the
# agent look ready before _init completes.
self._process = process
self._conn = conn
self._filtered_reader = filtered_reader

# Initialize the protocol and discover server identity
init_response = await conn.initialize(protocol_version=1)
agent_name = ""
Expand Down Expand Up @@ -1180,18 +1237,34 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]:
)

if session_id is None:
# Build _meta content for session options (e.g. model selection).
# Extra kwargs to new_session() become the _meta dict in the
# JSON-RPC request — do NOT wrap in _meta= (that double-nests).
# Fresh session. Build _meta content for session options (e.g.
# model selection). Extra kwargs to new_session() become the
# _meta dict in the JSON-RPC request — do NOT wrap in _meta=
# (that double-nests).
session_meta = build_session_model_meta(agent_name, self.acp_model)
response = await conn.new_session(cwd=working_dir, **session_meta)
session_id = response.session_id
await _maybe_set_session_model(
conn,
agent_name,
session_id,
self.acp_model,
)
# Initial-selection protocol call for providers that use it
# (codex-acp, gemini-cli); no-op for claude, which selected its
# model via the _meta above.
await _maybe_set_session_model(
conn,
agent_name,
session_id,
self.acp_model,
)
else:
# Resumed session. load_session() does not carry model _meta, so
# reapply the persisted (possibly runtime-switched) acp_model via
# the runtime-switch capability — otherwise the resumed live
# session would run on the server default while serialized state
# claims the switched model.
await _reapply_session_model_on_resume(
Comment thread
simonrosenberg marked this conversation as resolved.
conn,
agent_name,
session_id,
self.acp_model,
)

# Resolve the permission mode. Known providers each have their
# own mode ID (bypassPermissions, full-access, yolo …).
Expand All @@ -1205,17 +1278,14 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]:
logger.info("Setting ACP session mode: %s", mode_id)
await conn.set_session_mode(mode_id=mode_id, session_id=session_id)

return conn, process, filtered_reader, session_id, agent_name, agent_version
return session_id, agent_name, agent_version

result = self._executor.run_async(_init)
(
self._conn,
self._process,
self._filtered_reader,
self._session_id,
self._agent_name,
self._agent_version,
) = result
# _conn / _process / _filtered_reader are assigned inside _init() (right
# after creation) so a mid-init failure can be cleaned up; only the
# success-only fields are returned here.
self._session_id, self._agent_name, self._agent_version = (
self._executor.run_async(_init)
)
self._working_dir = working_dir

def _reset_client_for_turn(
Expand Down Expand Up @@ -1828,6 +1898,105 @@ async def _fork_and_prompt() -> str:
with client._fork_lock:
return self._executor.run_async(_fork_and_prompt)

def set_acp_model(self, model: str) -> None:
Comment thread
simonrosenberg marked this conversation as resolved.
"""Switch the model on the running ACP session (mid-conversation).

Issues a protocol-level ``session/set_model`` call on the live
connection so the new model takes effect for subsequent turns in the
*same* session — no subprocess restart, no loss of conversation
context. Verified against claude-agent-acp and codex-acp.

This is the low-level agent primitive; prefer
:meth:`LocalConversation.switch_acp_model` as the entry point. That
wrapper (a) holds the state lock so the switch cannot race a running
``step()``, and (b) persists the new value by swapping in an agent
``model_copy`` — ``acp_model`` is frozen, so this method updates only
the live session and the sentinel ``llm.model``/metrics, **not**
``self.acp_model``. A direct caller therefore leaves ``acp_model``
(which ``_record_usage`` reads for cost attribution) stale and the
switch unpersisted; go through ``switch_acp_model`` instead.

Args:
model: Provider-specific model id to switch to (e.g.
``"claude-haiku-4-5-20251001"`` or ``"gpt-5.4/low"``).

Raises:
ValueError: If ``model`` is empty or whitespace-only, if the
detected provider does not support runtime model switching, or
if the ACP server rejects the ``session/set_model`` call (e.g.
method-not-found on a custom server, or an invalid model id).
RuntimeError: If the ACP session has not been initialized yet
(i.e. before the first ``run()``).
TimeoutError: If the server does not answer within
``acp_prompt_timeout`` seconds.

Note:
A timeout means the client stopped waiting, not that the switch was
rejected: the ``session/set_model`` request may already have been
written and could still be applied server-side. The connection and
session stay alive and the local sentinel model is intentionally
left unchanged, so a timed-out switch leaves the server-side model
indeterminate. The conservative choice (treat it as failed locally)
keeps cost/token accounting on the previously-known model and
self-heals on the next successful switch; the agent itself always
runs whatever model the live ACP session holds.
"""
if not model or not model.strip():
raise ValueError("model must be a non-empty string")
if self._conn is None or self._session_id is None or self._executor is None:
Comment thread
simonrosenberg marked this conversation as resolved.
raise RuntimeError(
"ACP session is not initialized; the model can only be switched "
"after the conversation has started (first run())."
)
provider = detect_acp_provider_by_agent_name(self._agent_name)
if provider is not None and not provider.supports_runtime_model_switch:
raise ValueError(
f"ACP provider '{provider.key}' does not support runtime model "
"switching via set_session_model."
)
# Bounded round-trip: this runs while LocalConversation.switch_acp_model
# holds the state lock, so a server that accepts the call but never
# answers must not wedge the lock indefinitely. On timeout / protocol
# error we propagate *before* mutating any local state, so the sentinel
# LLM is only updated once the live session has actually switched.
try:
self._executor.run_async(
Comment thread
simonrosenberg marked this conversation as resolved.
self._conn.set_session_model(
model_id=model, session_id=self._session_id
),
timeout=self.acp_prompt_timeout,
)
except ACPRequestError as e:
Comment thread
simonrosenberg marked this conversation as resolved.
# Server-internal failures (JSON-RPC -32603) are not the caller's
# fault, and the prompt path already treats them as retriable. Let
# them propagate (-> 5xx) instead of mislabeling them as a 400
# client error.
if e.code in _RETRIABLE_SERVER_ERROR_CODES:
raise
# acp.exceptions.RequestError derives from Exception (not
# RuntimeError); surface a true client/protocol rejection (e.g.
# method-not-found, invalid model id) as a ValueError so callers —
# and the agent-server route — treat it as a 400-class client error
# rather than an opaque 500.
raise ValueError(
f"ACP server rejected set_session_model(model={model!r}): {e}"
) from e
# Reflect the live model on the sentinel LLM + metrics so cost/token
# accounting and serialized state show the model actually in use
Comment thread
simonrosenberg marked this conversation as resolved.
# (mirrors model_post_init). The ``acp_model`` field is frozen, so the
# authoritative current model is persisted by
# :meth:`LocalConversation.switch_acp_model` via an agent ``model_copy``.
self.llm.model = model
self.llm.metrics.model_name = model
if self.llm.metrics.accumulated_token_usage is not None:
self.llm.metrics.accumulated_token_usage.model = model
logger.info(
"Switched ACP session model to %s (provider=%s, session=%s)",
model,
provider.key if provider else "unknown",
self._session_id,
)

def close(self) -> None:
"""Terminate the ACP subprocess and clean up resources."""
if self._closed:
Expand Down Expand Up @@ -1864,6 +2033,25 @@ def _cleanup(self) -> None:
logger.debug("Error closing executor: %s", e)
self._executor = None

def release_runtime(self) -> None:
Comment thread
simonrosenberg marked this conversation as resolved.
"""Disarm this agent's finalizer after handing its live ACP runtime to a
shallow :meth:`~pydantic.BaseModel.model_copy`.

The copy shares this agent's ``_conn`` / ``_executor`` / ``_process``
references (``model_copy`` is shallow). Marking this now-stale instance
closed makes its ``__del__`` -> :meth:`close` a no-op, so dropping it
cannot tear down the runtime the copy now owns.

The runtime references are intentionally left intact: an in-flight
:meth:`ask_agent` fork — which is thread-safe and may still hold this
pre-switch agent — keeps a valid connection until it finishes. Sole
ownership for teardown passes to the copy (the live ``self.agent``
going forward), which is closed on conversation shutdown.

See :meth:`LocalConversation.switch_acp_model`.
"""
self._closed = True

def __del__(self) -> None:
try:
self.close()
Expand Down
Loading
Loading