From 4755abb64ba48b3a9e08fe588c4dfab547fb215a Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Tue, 26 May 2026 16:32:32 +0200 Subject: [PATCH 01/15] feat(acp): support runtime mid-conversation model switching Add a protocol-level path to switch an ACP agent's model on a live session, so the new model applies to subsequent turns of the *same* conversation without restarting the subprocess or losing context. - ACPAgent.set_acp_model(): issues session/set_model on the live connection (via the portal executor) and reflects the model on the sentinel LLM + metrics. Gated on provider.supports_set_session_model. - LocalConversation.switch_acp_model(): conversation-level entry point; takes the state lock so the switch can't race a running step(). - agent-server: POST /conversations/{id}/switch_acp_model -> EventService.switch_acp_model (run_in_executor), mapping ValueError->400 and RuntimeError->409. - acp_providers: claude-code now supports_set_session_model=True. The flag is repurposed to mean "supports the session/set_model protocol call" (runtime switching); session_meta_key continues to govern only the *initial* model selection. _maybe_set_session_model keeps init behavior identical (claude still uses _meta at creation). Verified end-to-end with real LLM calls (context preserved across the switch): Claude Code (sonnet->haiku) and Codex (gpt-5.5/xhigh->low) via LocalConversation, and the agent-server HTTP route for Claude Code. Note: the frontend/typescript-client still gate ACP model switching on their own copy of supports_set_session_model; surfacing this in the UI is a follow-up (agent-canvas + typescript-client release). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agent_server/conversation_router.py | 37 ++++++++ .../openhands/agent_server/event_service.py | 11 +++ .../openhands/sdk/agent/acp_agent.py | 73 ++++++++++++++-- .../conversation/impl/local_conversation.py | 27 ++++++ .../openhands/sdk/settings/acp_providers.py | 40 ++++++--- .../agent_server/test_conversation_router.py | 85 +++++++++++++++++++ tests/sdk/agent/test_acp_agent.py | 72 +++++++++++++++- tests/sdk/conversation/test_switch_model.py | 7 ++ tests/sdk/settings/test_acp_providers.py | 4 +- 9 files changed, 336 insertions(+), 20 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/conversation_router.py b/openhands-agent-server/openhands/agent_server/conversation_router.py index 73cea2985e..ccfca0d416 100644 --- a/openhands-agent-server/openhands/agent_server/conversation_router.py +++ b/openhands-agent-server/openhands/agent_server/conversation_router.py @@ -394,6 +394,43 @@ async def switch_conversation_llm( return Success() +@conversation_router.post( + "/{conversation_id}/switch_acp_model", + responses={ + 400: {"description": "Agent is not ACP, or provider can't switch models"}, + 404: {"description": "Conversation not found"}, + 409: {"description": "ACP session not initialized yet"}, + }, +) +async def switch_conversation_acp_model( + conversation_id: UUID, + model: str = Body(..., embed=True), + conversation_service: ConversationService = Depends(get_conversation_service), +) -> Success: + """Switch the model of a running ACP conversation, mid-conversation. + + Issues a protocol-level ``session/set_model`` call to the ACP subprocess + so the new model applies to subsequent turns without losing context. Only + valid for ACP conversations whose provider supports runtime switching. + """ + event_service = await conversation_service.get_event_service(conversation_id) + if event_service is None: + raise HTTPException(status.HTTP_404_NOT_FOUND) + try: + await event_service.switch_acp_model(model) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=str(e), + ) + return Success() + + @conversation_router.patch( "/{conversation_id}", responses={404: {"description": "Item not found"}} ) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 0a5a501008..c942e79534 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -911,6 +911,17 @@ async def set_security_analyzer( None, self._conversation.set_security_analyzer, security_analyzer ) + async def switch_acp_model(self, model: str): + """Switch the model on a running ACP conversation. + + Runs the (blocking) protocol-level ``session/set_model`` round-trip in + a worker thread so the event loop is not blocked. + """ + if not self._conversation: + raise ValueError("inactive_service") + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._conversation.switch_acp_model, model) + async def close(self): if self._lease_task is not None: self._lease_task.cancel() diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index 5f9883954c..ff50a51775 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -208,17 +208,26 @@ async def _maybe_set_session_model( session_id: str, acp_model: str | None, ) -> None: - """Apply a protocol-level session model override when the server supports it. + """Apply the *initial* session model right after session creation. - Uses :func:`~openhands.sdk.settings.acp_providers.detect_acp_provider_by_agent_name` - to check whether the server supports ``set_session_model``. - claude-agent-acp uses session ``_meta`` via - :func:`~openhands.sdk.settings.acp_providers.build_session_model_meta` instead. + This is the session-creation path only. Providers that select their + initial model via session ``_meta`` (i.e. ``session_meta_key`` is set — + claude-agent-acp) already received the model in ``new_session()``, so this + is a no-op for them. Providers without a meta key (codex-acp, gemini-cli) + get a one-shot ``set_session_model`` call here. + + Runtime, mid-conversation switches go through + :meth:`ACPAgent.set_acp_model` instead, which always uses + ``set_session_model`` and is gated only on ``supports_set_session_model``. """ if not acp_model: return provider = detect_acp_provider_by_agent_name(agent_name) - if provider is not None and provider.supports_set_session_model: + if ( + provider is not None + and provider.session_meta_key is None + and provider.supports_set_session_model + ): await conn.set_session_model(model_id=acp_model, session_id=session_id) @@ -1828,6 +1837,58 @@ async def _fork_and_prompt() -> str: with client._fork_lock: return self._executor.run_async(_fork_and_prompt) + def set_acp_model(self, model: str) -> None: + """Switch the model on the running ACP session (mid-conversation). + + Issues a protocol-level ``session/set_model`` call on the live + connection so the new model takes effect for subsequent turns in the + *same* session — no subprocess restart, no loss of conversation + context. Verified against claude-agent-acp and codex-acp. + + Thread-safety is the caller's responsibility: drive this through + :meth:`LocalConversation.switch_acp_model`, which holds the state lock + so the switch cannot race a running ``step()``. + + Args: + model: Provider-specific model id to switch to (e.g. + ``"claude-haiku-4-5-20251001"`` or ``"gpt-5.4/low"``). + + Raises: + RuntimeError: If the ACP session has not been initialized yet + (i.e. before the first ``run()``). + ValueError: If the detected provider does not support the + ``session/set_model`` protocol call. + """ + if self._conn is None or self._session_id is None or self._executor is None: + raise RuntimeError( + "ACP session is not initialized; the model can only be switched " + "after the conversation has started (first run())." + ) + provider = detect_acp_provider_by_agent_name(self._agent_name) + if provider is not None and not provider.supports_set_session_model: + raise ValueError( + f"ACP provider '{provider.key}' does not support runtime model " + "switching via set_session_model." + ) + self._executor.run_async( + self._conn.set_session_model(model_id=model, session_id=self._session_id) + ) + # Reflect the live model on the sentinel LLM + metrics so cost/token + # accounting and serialized state show the model actually in use + # (mirrors model_post_init). The ``acp_model`` field itself is frozen + # and stays at its construction-time value; the session now owns the + # authoritative current model. + self.llm.model = model + self.llm.metrics.model_name = model + if self.llm.metrics.accumulated_token_usage is not None: + self.llm.metrics.accumulated_token_usage.model = model + logger.info( + "Switched ACP session model to %s (provider=%s, session=%s)", + model, + provider.key if provider else "unknown", + self._session_id, + ) + def close(self) -> None: """Terminate the ACP subprocess and clean up resources.""" if self._closed: diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index a6bb1ec60e..79351a1b48 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -699,6 +699,33 @@ def switch_profile(self, profile_name: str) -> None: cached = loaded.model_copy(update={"usage_id": usage_id}) self.switch_llm(cached) + def switch_acp_model(self, model: str) -> None: + """Switch the model on a running ACP conversation (mid-conversation). + + Unlike :meth:`switch_llm`, which swaps OpenHands' own LLM object, this + issues a protocol-level ``session/set_model`` call to the ACP + subprocess so the new model applies to subsequent turns of the *same* + session, preserving conversation context. ``switch_llm`` would not + affect an ACP conversation, since the subprocess owns its own model. + + Args: + model: Provider-specific model id to switch to. + + Raises: + ValueError: If the conversation's agent is not an :class:`ACPAgent`, + or the provider does not support runtime model switching. + RuntimeError: If the ACP session is not yet initialized. + """ + if not isinstance(self.agent, ACPAgent): + raise ValueError( + "switch_acp_model is only supported for ACP conversations." + ) + with self._state: + self.agent.set_acp_model(model) + # Re-assign so the (mutated) agent is persisted with the updated + # sentinel-LLM model, matching switch_llm's contract. + self._state.agent = self.agent + @observe(name="conversation.send_message") def send_message(self, message: str | Message, sender: str | None = None) -> None: """Send a message to the agent. diff --git a/openhands-sdk/openhands/sdk/settings/acp_providers.py b/openhands-sdk/openhands/sdk/settings/acp_providers.py index 44c5bc0f3a..400babc041 100644 --- a/openhands-sdk/openhands/sdk/settings/acp_providers.py +++ b/openhands-sdk/openhands/sdk/settings/acp_providers.py @@ -11,8 +11,9 @@ - ``default_session_mode`` ACP mode ID that disables permission prompts - ``agent_name_patterns`` lowercase substrings in the runtime agent name; used by ``ACPAgent`` to auto-detect mode / protocol -- ``supports_set_session_model`` whether to use the ``set_session_model`` - protocol call (vs ``_meta``) for model selection +- ``supports_set_session_model`` whether the server supports the + ``session/set_model`` protocol call (enables + runtime, mid-conversation model switching) Callers outside the SDK (e.g. ``openhands-agent-server``, the ``OpenHands`` frontend) can import :data:`ACP_PROVIDERS` and :func:`get_acp_provider` instead @@ -73,19 +74,31 @@ class ACPProviderInfo: """ supports_set_session_model: bool - """``True`` if this provider uses the ``set_session_model`` protocol call. - - - ``False`` for claude-agent-acp, which uses session ``_meta`` instead. - - ``True`` for codex-acp and gemini-cli. + """``True`` if the server supports the ``session/set_model`` protocol call. + + This is what enables **runtime, mid-conversation model switching**: the + call applies to the live session, so subsequent turns use the new model + without restarting the subprocess or losing context. All three built-in + providers support it (verified against claude-agent-acp, codex-acp, and + gemini-cli). + + Note this is independent of how the model is selected *at session + creation*: see :attr:`session_meta_key`. A provider may select its initial + model via ``_meta`` (claude-agent-acp) yet still support ``set_session_model`` + for later switches. """ session_meta_key: str | None - """Top-level ``_meta`` key for model selection, or ``None``. + """Top-level ``_meta`` key for model selection *at session creation*. + + When non-``None``, the provider selects its **initial** model via ACP + session ``_meta`` using the structure + ``{session_meta_key: {"options": {"model": }}}`` passed to + ``new_session()``. When ``None``, the initial model is applied with a + one-shot ``set_session_model`` call right after the session is created. - When non-``None``, the provider selects its model via ACP session ``_meta`` - using the structure ``{session_meta_key: {"options": {"model": }}}``. - ``None`` means the provider uses the ``set_session_model`` protocol call - instead (see :attr:`supports_set_session_model`). + This only governs the *initial* selection; runtime switches always use + ``set_session_model`` (gated on :attr:`supports_set_session_model`). - ``"claudeCode"`` — claude-agent-acp - ``None`` — codex-acp, gemini-cli @@ -102,7 +115,10 @@ class ACPProviderInfo: base_url_env_var="ANTHROPIC_BASE_URL", default_session_mode="bypassPermissions", agent_name_patterns=("claude-agent",), - supports_set_session_model=False, + # claude-agent-acp selects its *initial* model via session _meta + # (session_meta_key below), but it DOES support the + # session/set_model protocol call for mid-conversation switches. + supports_set_session_model=True, session_meta_key="claudeCode", ), "codex": ACPProviderInfo( diff --git a/tests/agent_server/test_conversation_router.py b/tests/agent_server/test_conversation_router.py index b9cdc12720..f7dc4f5cf6 100644 --- a/tests/agent_server/test_conversation_router.py +++ b/tests/agent_server/test_conversation_router.py @@ -1092,6 +1092,91 @@ def test_run_conversation_not_found( client.app.dependency_overrides.clear() +def test_switch_acp_model_success( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """switch_acp_model endpoint forwards the model to the event service.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.switch_acp_model.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/switch_acp_model", + json={"model": "haiku"}, + ) + assert response.status_code == 200 + assert response.json()["success"] is True + mock_event_service.switch_acp_model.assert_awaited_once_with("haiku") + finally: + client.app.dependency_overrides.clear() + + +def test_switch_acp_model_not_found( + client, mock_conversation_service, sample_conversation_id +): + """switch_acp_model returns 404 when the conversation is unknown.""" + mock_conversation_service.get_event_service.return_value = None + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/switch_acp_model", + json={"model": "haiku"}, + ) + assert response.status_code == 404 + finally: + client.app.dependency_overrides.clear() + + +def test_switch_acp_model_non_acp_returns_400( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """A ValueError (e.g. non-ACP agent / unsupported provider) maps to 400.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.switch_acp_model.side_effect = ValueError( + "switch_acp_model is only supported for ACP conversations." + ) + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/switch_acp_model", + json={"model": "haiku"}, + ) + assert response.status_code == 400 + finally: + client.app.dependency_overrides.clear() + + +def test_switch_acp_model_uninitialized_returns_409( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """A RuntimeError (session not initialized yet) maps to 409.""" + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.switch_acp_model.side_effect = RuntimeError( + "ACP session is not initialized" + ) + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/switch_acp_model", + json={"model": "haiku"}, + ) + assert response.status_code == 409 + finally: + client.app.dependency_overrides.clear() + + def test_run_conversation_already_running( client, mock_conversation_service, mock_event_service, sample_conversation_id ): diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 7f0bd2f689..a51b70c11f 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -3188,7 +3188,10 @@ async def test_codex_agent_uses_protocol_model_override(self): ) @pytest.mark.asyncio - async def test_non_codex_agent_skips_protocol_override(self): + async def test_meta_key_provider_skips_protocol_override_at_init(self): + # claude-agent-acp selects its *initial* model via session _meta, so the + # one-shot init set_session_model call is skipped (even though the + # provider now supports the protocol call for runtime switches). conn = AsyncMock() await _maybe_set_session_model( conn, @@ -3205,6 +3208,73 @@ async def test_missing_model_skips_protocol_override(self): conn.set_session_model.assert_not_called() +class TestSetACPModel: + """Runtime (mid-conversation) model switching via set_session_model.""" + + @staticmethod + def _wire(agent: ACPAgent, agent_name: str) -> ACPAgent: + agent._conn = MagicMock() + agent._session_id = "sess-1" + agent._agent_name = agent_name + executor = MagicMock() + executor.run_async = MagicMock() + agent._executor = executor + return agent + + def test_switches_model_on_live_codex_session(self): + agent = self._wire(_make_agent(), "codex-acp") + agent.set_acp_model("gpt-5.4/low") + agent._conn.set_session_model.assert_called_once_with( + model_id="gpt-5.4/low", session_id="sess-1" + ) + agent._executor.run_async.assert_called_once() + # Sentinel LLM + metrics reflect the live model for cost/token tracking. + assert agent.llm.model == "gpt-5.4/low" + assert agent.llm.metrics.model_name == "gpt-5.4/low" + + def test_claude_provider_supports_runtime_switch(self): + agent = self._wire(_make_agent(), "claude-agent-acp") + agent.set_acp_model("claude-haiku-4-5-20251001") + agent._conn.set_session_model.assert_called_once_with( + model_id="claude-haiku-4-5-20251001", session_id="sess-1" + ) + + def test_unknown_provider_still_attempts_switch(self): + # A custom/unrecognised server (provider=None) is allowed to attempt + # the call; the ACP layer errors if it isn't actually supported. + agent = self._wire(_make_agent(), "some-custom-acp") + agent.set_acp_model("whatever") + agent._conn.set_session_model.assert_called_once() + + def test_raises_before_session_initialized(self): + agent = _make_agent() # no _conn / _session_id / _executor + with pytest.raises(RuntimeError, match="not initialized"): + agent.set_acp_model("gpt-5.4") + + def test_raises_for_provider_without_protocol_support(self): + from openhands.sdk.settings.acp_providers import ACPProviderInfo + + unsupported = ACPProviderInfo( + key="legacy", + display_name="Legacy", + default_command=("legacy",), + api_key_env_var=None, + base_url_env_var=None, + default_session_mode="default", + agent_name_patterns=("legacy",), + supports_set_session_model=False, + session_meta_key=None, + ) + agent = self._wire(_make_agent(), "legacy-acp") + with patch( + "openhands.sdk.agent.acp_agent.detect_acp_provider_by_agent_name", + return_value=unsupported, + ): + with pytest.raises(ValueError, match="does not support runtime"): + agent.set_acp_model("x") + agent._conn.set_session_model.assert_not_called() + + # --------------------------------------------------------------------------- # acp_session_mode field # --------------------------------------------------------------------------- diff --git a/tests/sdk/conversation/test_switch_model.py b/tests/sdk/conversation/test_switch_model.py index cb40b54cec..7a292afe9f 100644 --- a/tests/sdk/conversation/test_switch_model.py +++ b/tests/sdk/conversation/test_switch_model.py @@ -42,6 +42,13 @@ def _make_conversation() -> LocalConversation: ) +def test_switch_acp_model_rejects_non_acp_agent(): + """switch_acp_model is only valid for ACP conversations.""" + conv = _make_conversation() # plain Agent, not ACPAgent + with pytest.raises(ValueError, match="only supported for ACP"): + conv.switch_acp_model("haiku") + + def test_switch_profile(profile_store): """switch_profile switches the agent's LLM.""" conv = _make_conversation() diff --git a/tests/sdk/settings/test_acp_providers.py b/tests/sdk/settings/test_acp_providers.py index 1aee88951b..b16960038b 100644 --- a/tests/sdk/settings/test_acp_providers.py +++ b/tests/sdk/settings/test_acp_providers.py @@ -33,7 +33,9 @@ def test_claude_code_metadata(self): assert info.base_url_env_var == "ANTHROPIC_BASE_URL" assert info.default_session_mode == "bypassPermissions" assert "claude-agent" in info.agent_name_patterns - assert info.supports_set_session_model is False + # claude-agent-acp selects its initial model via _meta (session_meta_key) + # but DOES support session/set_model for mid-conversation switches. + assert info.supports_set_session_model is True assert info.session_meta_key == "claudeCode" def test_codex_metadata(self): From a35d333b1f84f24d02627536177e63061b326559 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Tue, 26 May 2026 20:00:24 +0200 Subject: [PATCH 02/15] fix(acp): address review on runtime model switch Addresses the four review findings on the runtime switch_acp_model path: - Persist the switched model authoritatively. switch_acp_model now replaces the agent with a model_copy carrying the new acp_model, instead of re-assigning the same mutated object. The old approach was an autosave no-op (old == new) and left the frozen acp_model field stale, so reload / resume (model_post_init, _start_acp_server) reverted to the original model. model_copy preserves the live ACP connection in private attrs. - Bound the set_session_model round-trip with acp_prompt_timeout. It runs under the conversation state lock, so an unresponsive server could otherwise wedge the lock indefinitely. The sentinel LLM is only updated after the RPC succeeds. - Map ACP protocol failures to 400, not 500. acp.exceptions.RequestError derives from Exception (not RuntimeError); set_acp_model now translates it to ValueError (method-not-found / invalid model), which the agent-server route already maps to 400. The server stays decoupled from the acp package. - Stop repurposing the exported supports_set_session_model flag. Restored its original initial-selection semantics (claude-code = False) and added a separate supports_runtime_model_switch capability flag (all three providers = True). _maybe_set_session_model keeps using the original flag at session creation; set_acp_model gates runtime switches on the new flag. Tests: switch -> reload persistence regression, RPC timeout, ACPRequestError -> ValueError translation, and a router protocol-error -> 400 case. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/agent/acp_agent.py | 60 +++++++++++------ .../conversation/impl/local_conversation.py | 18 ++++- .../openhands/sdk/settings/acp_providers.py | 65 +++++++++++++------ .../agent_server/test_conversation_router.py | 28 ++++++++ tests/sdk/agent/test_acp_agent.py | 22 +++++++ tests/sdk/conversation/test_switch_model.py | 60 +++++++++++++++++ tests/sdk/settings/test_acp_providers.py | 10 ++- 7 files changed, 217 insertions(+), 46 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index ff50a51775..ae15ac283f 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -210,24 +210,23 @@ async def _maybe_set_session_model( ) -> None: """Apply the *initial* session model right after session creation. - This is the session-creation path only. Providers that select their - initial model via session ``_meta`` (i.e. ``session_meta_key`` is set — - claude-agent-acp) already received the model in ``new_session()``, so this - is a no-op for them. Providers without a meta key (codex-acp, gemini-cli) - get a one-shot ``set_session_model`` call here. + This is the session-creation path only, gated on + :attr:`~openhands.sdk.settings.acp_providers.ACPProviderInfo.supports_set_session_model`. + Providers that select their initial model via session ``_meta`` + (claude-agent-acp, ``supports_set_session_model=False``) already received + the model in ``new_session()``, so this is a no-op for them. Providers that + use the protocol call for initial selection (codex-acp, gemini-cli) get a + one-shot ``set_session_model`` call here. Runtime, mid-conversation switches go through :meth:`ACPAgent.set_acp_model` instead, which always uses - ``set_session_model`` and is gated only on ``supports_set_session_model``. + ``set_session_model`` and is gated on the separate + ``supports_runtime_model_switch`` capability flag. """ if not acp_model: return provider = detect_acp_provider_by_agent_name(agent_name) - if ( - provider is not None - and provider.session_meta_key is None - and provider.supports_set_session_model - ): + if provider is not None and provider.supports_set_session_model: await conn.set_session_model(model_id=acp_model, session_id=session_id) @@ -1856,8 +1855,12 @@ def set_acp_model(self, model: str) -> None: Raises: RuntimeError: If the ACP session has not been initialized yet (i.e. before the first ``run()``). - ValueError: If the detected provider does not support the - ``session/set_model`` protocol call. + ValueError: If the detected provider does not support runtime model + switching, or the ACP server rejects the ``session/set_model`` + call (e.g. method-not-found on a custom server, or an invalid + model id). + TimeoutError: If the server does not answer within + ``acp_prompt_timeout`` seconds. """ if self._conn is None or self._session_id is None or self._executor is None: raise RuntimeError( @@ -1865,19 +1868,36 @@ def set_acp_model(self, model: str) -> None: "after the conversation has started (first run())." ) provider = detect_acp_provider_by_agent_name(self._agent_name) - if provider is not None and not provider.supports_set_session_model: + if provider is not None and not provider.supports_runtime_model_switch: raise ValueError( f"ACP provider '{provider.key}' does not support runtime model " "switching via set_session_model." ) - self._executor.run_async( - self._conn.set_session_model(model_id=model, session_id=self._session_id) - ) + # Bounded round-trip: this runs while LocalConversation.switch_acp_model + # holds the state lock, so a server that accepts the call but never + # answers must not wedge the lock indefinitely. On timeout / protocol + # error we propagate *before* mutating any local state, so the sentinel + # LLM is only updated once the live session has actually switched. + try: + self._executor.run_async( + self._conn.set_session_model( + model_id=model, session_id=self._session_id + ), + timeout=self.acp_prompt_timeout, + ) + except ACPRequestError as e: + # acp.exceptions.RequestError derives from Exception (not + # RuntimeError); surface it as a ValueError so callers — and the + # agent-server route — treat a rejected switch as a 400-class + # client error rather than an opaque 500. + raise ValueError( + f"ACP server rejected set_session_model(model={model!r}): {e}" + ) from e # Reflect the live model on the sentinel LLM + metrics so cost/token # accounting and serialized state show the model actually in use - # (mirrors model_post_init). The ``acp_model`` field itself is frozen - # and stays at its construction-time value; the session now owns the - # authoritative current model. + # (mirrors model_post_init). The ``acp_model`` field is frozen, so the + # authoritative current model is persisted by + # :meth:`LocalConversation.switch_acp_model` via an agent ``model_copy``. self.llm.model = model self.llm.metrics.model_name = model if self.llm.metrics.accumulated_token_usage is not None: diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 79351a1b48..0963344a0e 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -713,7 +713,8 @@ def switch_acp_model(self, model: str) -> None: Raises: ValueError: If the conversation's agent is not an :class:`ACPAgent`, - or the provider does not support runtime model switching. + or the provider does not support runtime model switching, or + the ACP server rejects the switch. RuntimeError: If the ACP session is not yet initialized. """ if not isinstance(self.agent, ACPAgent): @@ -721,9 +722,20 @@ def switch_acp_model(self, model: str) -> None: "switch_acp_model is only supported for ACP conversations." ) with self._state: + # Perform the live protocol switch first; if it fails we leave the + # persisted state untouched. self.agent.set_acp_model(model) - # Re-assign so the (mutated) agent is persisted with the updated - # sentinel-LLM model, matching switch_llm's contract. + # Persist the switched model as the authoritative value. ``acp_model`` + # is frozen, so we replace the agent with a copy carrying the new + # value (model_copy preserves the live ACP connection in private + # attrs). This matters on two counts the in-place mutation missed: + # 1. A fresh object identity makes the autosave path actually + # write base_state.json (re-assigning the same object is a + # no-op because old == new). + # 2. model_post_init / _start_acp_server derive the sentinel model + # and the resumed session model from ``acp_model`` on reload, so + # it must hold the switched value, not the construction-time one. + self.agent = self.agent.model_copy(update={"acp_model": model}) self._state.agent = self.agent @observe(name="conversation.send_message") diff --git a/openhands-sdk/openhands/sdk/settings/acp_providers.py b/openhands-sdk/openhands/sdk/settings/acp_providers.py index 400babc041..bfcc47f4a4 100644 --- a/openhands-sdk/openhands/sdk/settings/acp_providers.py +++ b/openhands-sdk/openhands/sdk/settings/acp_providers.py @@ -11,9 +11,12 @@ - ``default_session_mode`` ACP mode ID that disables permission prompts - ``agent_name_patterns`` lowercase substrings in the runtime agent name; used by ``ACPAgent`` to auto-detect mode / protocol -- ``supports_set_session_model`` whether the server supports the - ``session/set_model`` protocol call (enables - runtime, mid-conversation model switching) +- ``supports_set_session_model`` whether the provider selects its *initial* + model via the ``set_session_model`` protocol + call (vs session ``_meta``) at session creation +- ``supports_runtime_model_switch`` whether the server supports the + ``session/set_model`` protocol call for + runtime, mid-conversation model switching Callers outside the SDK (e.g. ``openhands-agent-server``, the ``OpenHands`` frontend) can import :data:`ACP_PROVIDERS` and :func:`get_acp_provider` instead @@ -74,18 +77,35 @@ class ACPProviderInfo: """ supports_set_session_model: bool - """``True`` if the server supports the ``session/set_model`` protocol call. - - This is what enables **runtime, mid-conversation model switching**: the - call applies to the live session, so subsequent turns use the new model - without restarting the subprocess or losing context. All three built-in - providers support it (verified against claude-agent-acp, codex-acp, and - gemini-cli). - - Note this is independent of how the model is selected *at session - creation*: see :attr:`session_meta_key`. A provider may select its initial - model via ``_meta`` (claude-agent-acp) yet still support ``set_session_model`` - for later switches. + """``True`` if this provider selects its *initial* model via the + ``set_session_model`` protocol call (rather than session ``_meta``). + + This governs the **session-creation** path only: + + - ``False`` for claude-agent-acp, which selects its initial model via + session ``_meta`` (see :attr:`session_meta_key`). + - ``True`` for codex-acp and gemini-cli, which get a one-shot + ``set_session_model`` call right after the session is created. + + This is **independent of** runtime switching capability — see + :attr:`supports_runtime_model_switch`. The original meaning of this flag + is preserved so external consumers that use it to pick the initial + selection path keep working. + """ + + supports_runtime_model_switch: bool + """``True`` if the server supports the ``session/set_model`` protocol call + for **runtime, mid-conversation model switching**. + + The call applies to the live session, so subsequent turns use the new + model without restarting the subprocess or losing context. All three + built-in providers support it (verified against claude-agent-acp, + codex-acp, and gemini-cli). + + Unlike :attr:`supports_set_session_model`, this is about switching the + model of an *already-running* session, not the initial selection. A + provider may select its initial model via ``_meta`` (claude-agent-acp) + yet still support ``set_session_model`` for later switches. """ session_meta_key: str | None @@ -95,10 +115,11 @@ class ACPProviderInfo: session ``_meta`` using the structure ``{session_meta_key: {"options": {"model": }}}`` passed to ``new_session()``. When ``None``, the initial model is applied with a - one-shot ``set_session_model`` call right after the session is created. + one-shot ``set_session_model`` call right after the session is created + (gated on :attr:`supports_set_session_model`). This only governs the *initial* selection; runtime switches always use - ``set_session_model`` (gated on :attr:`supports_set_session_model`). + ``set_session_model`` (gated on :attr:`supports_runtime_model_switch`). - ``"claudeCode"`` — claude-agent-acp - ``None`` — codex-acp, gemini-cli @@ -116,9 +137,11 @@ class ACPProviderInfo: default_session_mode="bypassPermissions", agent_name_patterns=("claude-agent",), # claude-agent-acp selects its *initial* model via session _meta - # (session_meta_key below), but it DOES support the - # session/set_model protocol call for mid-conversation switches. - supports_set_session_model=True, + # (session_meta_key below), so the init path does NOT use + # set_session_model. It DOES, however, support session/set_model + # for mid-conversation switches. + supports_set_session_model=False, + supports_runtime_model_switch=True, session_meta_key="claudeCode", ), "codex": ACPProviderInfo( @@ -130,6 +153,7 @@ class ACPProviderInfo: default_session_mode="full-access", agent_name_patterns=("codex-acp",), supports_set_session_model=True, + supports_runtime_model_switch=True, session_meta_key=None, ), "gemini-cli": ACPProviderInfo( @@ -141,6 +165,7 @@ class ACPProviderInfo: default_session_mode="yolo", agent_name_patterns=("gemini-cli",), supports_set_session_model=True, + supports_runtime_model_switch=True, session_meta_key=None, ), } diff --git a/tests/agent_server/test_conversation_router.py b/tests/agent_server/test_conversation_router.py index f7dc4f5cf6..d93039b0ea 100644 --- a/tests/agent_server/test_conversation_router.py +++ b/tests/agent_server/test_conversation_router.py @@ -1177,6 +1177,34 @@ def test_switch_acp_model_uninitialized_returns_409( client.app.dependency_overrides.clear() +def test_switch_acp_model_protocol_error_returns_400( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """A rejected ACP ``session/set_model`` call maps to 400, not 500. + + ``ACPAgent.set_acp_model`` translates ``acp.exceptions.RequestError`` (e.g. + method-not-found on a custom server, or an invalid model id) into a + ValueError, so a protocol-level rejection surfaces as a 400 client error + rather than an opaque 500. + """ + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.switch_acp_model.side_effect = ValueError( + "ACP server rejected set_session_model(model='bogus'): method not found" + ) + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/switch_acp_model", + json={"model": "bogus"}, + ) + assert response.status_code == 400 + finally: + client.app.dependency_overrides.clear() + + def test_run_conversation_already_running( client, mock_conversation_service, mock_event_service, sample_conversation_id ): diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index a51b70c11f..0a340016b9 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -3263,6 +3263,7 @@ def test_raises_for_provider_without_protocol_support(self): default_session_mode="default", agent_name_patterns=("legacy",), supports_set_session_model=False, + supports_runtime_model_switch=False, session_meta_key=None, ) agent = self._wire(_make_agent(), "legacy-acp") @@ -3274,6 +3275,27 @@ def test_raises_for_provider_without_protocol_support(self): agent.set_acp_model("x") agent._conn.set_session_model.assert_not_called() + def test_translates_acp_request_error_to_value_error(self): + # A protocol-level rejection (e.g. method-not-found on a custom server, + # or an invalid model id) must surface as a ValueError — not leak as a + # raw acp.exceptions.RequestError — so the agent-server maps it to 400. + agent = self._wire(_make_agent(), "codex-acp") + agent._executor.run_async.side_effect = ACPRequestError( + code=-32601, message="method not found" + ) + with pytest.raises(ValueError, match="rejected set_session_model"): + agent.set_acp_model("bogus-model") + # The sentinel LLM must not be mutated when the switch fails. + assert agent.llm.model != "bogus-model" + + def test_passes_timeout_to_run_async(self): + # The protocol round-trip runs under the conversation state lock, so it + # must be bounded to avoid wedging the lock if the server never answers. + agent = self._wire(_make_agent(acp_prompt_timeout=42.0), "codex-acp") + agent.set_acp_model("gpt-5.4/low") + _, kwargs = agent._executor.run_async.call_args + assert kwargs["timeout"] == 42.0 + # --------------------------------------------------------------------------- # acp_session_mode field diff --git a/tests/sdk/conversation/test_switch_model.py b/tests/sdk/conversation/test_switch_model.py index 7a292afe9f..b669c1c903 100644 --- a/tests/sdk/conversation/test_switch_model.py +++ b/tests/sdk/conversation/test_switch_model.py @@ -1,10 +1,15 @@ +import json from pathlib import Path +from unittest.mock import MagicMock import pytest from pydantic import SecretStr from openhands.sdk import LLM, LocalConversation from openhands.sdk.agent import Agent +from openhands.sdk.agent.acp_agent import ACPAgent +from openhands.sdk.conversation.persistence_const import BASE_STATE +from openhands.sdk.conversation.state import ConversationState from openhands.sdk.llm import llm_profile_store from openhands.sdk.llm.llm_profile_store import LLMProfileStore from openhands.sdk.testing import TestLLM @@ -49,6 +54,61 @@ def test_switch_acp_model_rejects_non_acp_agent(): conv.switch_acp_model("haiku") +def _make_acp_conversation(tmp_path) -> tuple[LocalConversation, ACPAgent]: + """A persisted ACP conversation with a faked-out live session. + + The fake ``_conn`` / ``_executor`` let ``set_acp_model`` issue its + protocol call without launching a real ACP subprocess. + """ + agent = ACPAgent(acp_command=["echo", "test"], acp_model="model-a") + agent._conn = MagicMock() + agent._session_id = "sess-1" + agent._agent_name = "codex-acp" + executor = MagicMock() + executor.run_async = MagicMock() + agent._executor = executor + conv = LocalConversation( + agent=agent, + workspace=tmp_path, + persistence_dir=str(tmp_path / "persist"), + ) + return conv, agent + + +def test_switch_acp_model_persists_authoritative_model(tmp_path): + """A runtime switch persists as the authoritative ``acp_model``. + + Regression for the review finding that re-assigning the same (mutated) + agent object was an autosave no-op, and that the frozen ``acp_model`` + field — which ``model_post_init`` / ``_start_acp_server`` read on + reload/resume — stayed at its construction-time value. + """ + conv, agent = _make_acp_conversation(tmp_path) + live_conn = agent._conn + + conv.switch_acp_model("model-b") + + # In-memory: agent + state agree on the new model, and the live connection + # survived the model_copy so the conversation can keep running. + switched = conv.agent + assert isinstance(switched, ACPAgent) + assert switched.acp_model == "model-b" + assert isinstance(conv.state.agent, ACPAgent) + assert conv.state.agent.acp_model == "model-b" + assert switched.llm.model == "model-b" + assert switched._conn is live_conn + assert switched._session_id == "sess-1" + + # On disk: base_state.json actually changed (not an autosave no-op), and the + # persisted agent reconstructs with the switched model as authoritative. + base_text = conv.state._fs.read(BASE_STATE) + reloaded = ConversationState.model_validate(json.loads(base_text)) + assert isinstance(reloaded.agent, ACPAgent) + assert reloaded.agent.acp_model == "model-b" + # model_post_init derives the sentinel LLM model from the persisted acp_model. + assert reloaded.agent.llm.model == "model-b" + + def test_switch_profile(profile_store): """switch_profile switches the agent's LLM.""" conv = _make_conversation() diff --git a/tests/sdk/settings/test_acp_providers.py b/tests/sdk/settings/test_acp_providers.py index b16960038b..b7ea5fc0f0 100644 --- a/tests/sdk/settings/test_acp_providers.py +++ b/tests/sdk/settings/test_acp_providers.py @@ -33,9 +33,11 @@ def test_claude_code_metadata(self): assert info.base_url_env_var == "ANTHROPIC_BASE_URL" assert info.default_session_mode == "bypassPermissions" assert "claude-agent" in info.agent_name_patterns - # claude-agent-acp selects its initial model via _meta (session_meta_key) - # but DOES support session/set_model for mid-conversation switches. - assert info.supports_set_session_model is True + # claude-agent-acp selects its *initial* model via _meta (session_meta_key), + # so it does NOT use set_session_model at session creation ... + assert info.supports_set_session_model is False + # ... but it DOES support session/set_model for mid-conversation switches. + assert info.supports_runtime_model_switch is True assert info.session_meta_key == "claudeCode" def test_codex_metadata(self): @@ -48,6 +50,7 @@ def test_codex_metadata(self): assert info.default_session_mode == "full-access" assert "codex-acp" in info.agent_name_patterns assert info.supports_set_session_model is True + assert info.supports_runtime_model_switch is True assert info.session_meta_key is None def test_gemini_cli_metadata(self): @@ -60,6 +63,7 @@ def test_gemini_cli_metadata(self): assert info.default_session_mode == "yolo" assert "gemini-cli" in info.agent_name_patterns assert info.supports_set_session_model is True + assert info.supports_runtime_model_switch is True assert info.session_meta_key is None def test_provider_info_is_frozen(self): From e99aaa07c16cdbda4d5ab66cb2847b1a4a08a1d1 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Tue, 26 May 2026 20:34:16 +0200 Subject: [PATCH 03/15] fix(acp): non-breaking provider field + 409 on inactive switch - Move supports_runtime_model_switch to the end of ACPProviderInfo with a default of False so it is a backward-compatible addition, not a moved positional / newly-required field (fixes Python API breakage check). - switch_acp_model: raise RuntimeError("Conversation is not active.") instead of ValueError("inactive_service") so an inactive/None conversation maps to 409 (per the documented endpoint contract) and surfaces a human-readable detail. Use an explicit `is None` guard. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 4 +-- .../openhands/sdk/settings/acp_providers.py | 34 +++++++++++-------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index c942e79534..a857a57782 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -917,8 +917,8 @@ async def switch_acp_model(self, model: str): Runs the (blocking) protocol-level ``session/set_model`` round-trip in a worker thread so the event loop is not blocked. """ - if not self._conversation: - raise ValueError("inactive_service") + if self._conversation is None: + raise RuntimeError("Conversation is not active.") loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._conversation.switch_acp_model, model) diff --git a/openhands-sdk/openhands/sdk/settings/acp_providers.py b/openhands-sdk/openhands/sdk/settings/acp_providers.py index 3bed026357..c34e7d5b6a 100644 --- a/openhands-sdk/openhands/sdk/settings/acp_providers.py +++ b/openhands-sdk/openhands/sdk/settings/acp_providers.py @@ -109,21 +109,6 @@ class ACPProviderInfo: selection path keep working. """ - supports_runtime_model_switch: bool - """``True`` if the server supports the ``session/set_model`` protocol call - for **runtime, mid-conversation model switching**. - - The call applies to the live session, so subsequent turns use the new - model without restarting the subprocess or losing context. All three - built-in providers support it (verified against claude-agent-acp, - codex-acp, and gemini-cli). - - Unlike :attr:`supports_set_session_model`, this is about switching the - model of an *already-running* session, not the initial selection. A - provider may select its initial model via ``_meta`` (claude-agent-acp) - yet still support ``set_session_model`` for later switches. - """ - session_meta_key: str | None """Top-level ``_meta`` key for model selection *at session creation*. @@ -158,6 +143,25 @@ class ACPProviderInfo: the ACP server pick its own default. """ + supports_runtime_model_switch: bool = False + """``True`` if the server supports the ``session/set_model`` protocol call + for **runtime, mid-conversation model switching**. + + The call applies to the live session, so subsequent turns use the new + model without restarting the subprocess or losing context. All three + built-in providers support it (verified against claude-agent-acp, + codex-acp, and gemini-cli). + + Unlike :attr:`supports_set_session_model`, this is about switching the + model of an *already-running* session, not the initial selection. A + provider may select its initial model via ``_meta`` (claude-agent-acp) + yet still support ``set_session_model`` for later switches. + + Defaults to ``False`` so forward-compat providers — and any external + caller constructing this dataclass positionally — keep working without a + signature break; the built-in providers set it explicitly. + """ + # --------------------------------------------------------------------------- # Curated ``acp_model`` candidate lists for the built-in providers. From d8a7d0728544ab75cc014b4e60a477250307c16f Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Tue, 26 May 2026 20:41:20 +0200 Subject: [PATCH 04/15] fix(acp): persist model switch to meta.json + map TimeoutError to 504 Addresses review on switch_acp_model: - Persistence: EventService.switch_acp_model now mirrors the new acp_model into meta.json (self.stored). start() rebuilds the runtime agent from meta.json and ConversationState.create() copies it over the resumed base_state.json, so without this a restart silently reverted the switch. model_post_init re-derives the sentinel llm.model from acp_model on reload, so only acp_model needs mirroring. - TimeoutError -> 504: ACPAgent.set_acp_model bounds the session/set_model round-trip and raises TimeoutError on a wedged/slow server. The route only mapped ValueError (400) and RuntimeError (409), so a timeout surfaced as an opaque 500. Catch TimeoutError -> 504 and document it. Adds regression tests for both (meta.json persistence + 504 mapping). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agent_server/conversation_router.py | 9 ++++ .../openhands/agent_server/event_service.py | 13 +++++- .../agent_server/test_conversation_router.py | 27 ++++++++++++ tests/agent_server/test_event_service.py | 42 +++++++++++++++++++ 4 files changed, 90 insertions(+), 1 deletion(-) diff --git a/openhands-agent-server/openhands/agent_server/conversation_router.py b/openhands-agent-server/openhands/agent_server/conversation_router.py index ccfca0d416..fbe0353647 100644 --- a/openhands-agent-server/openhands/agent_server/conversation_router.py +++ b/openhands-agent-server/openhands/agent_server/conversation_router.py @@ -400,6 +400,7 @@ async def switch_conversation_llm( 400: {"description": "Agent is not ACP, or provider can't switch models"}, 404: {"description": "Conversation not found"}, 409: {"description": "ACP session not initialized yet"}, + 504: {"description": "ACP server did not answer the model switch in time"}, }, ) async def switch_conversation_acp_model( @@ -423,6 +424,14 @@ async def switch_conversation_acp_model( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) + except TimeoutError as e: + # The bounded session/set_model round-trip expired. The ACP server is + # wedged/slow rather than rejecting the request, so surface a 504 + # instead of an opaque 500. + raise HTTPException( + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + detail=str(e), + ) except RuntimeError as e: raise HTTPException( status_code=status.HTTP_409_CONFLICT, diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index a857a57782..a715b82625 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -915,12 +915,23 @@ async def switch_acp_model(self, model: str): """Switch the model on a running ACP conversation. Runs the (blocking) protocol-level ``session/set_model`` round-trip in - a worker thread so the event loop is not blocked. + a worker thread so the event loop is not blocked, then mirrors the new + model into ``meta.json`` so the switch survives an agent-server restart. """ if self._conversation is None: raise RuntimeError("Conversation is not active.") loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._conversation.switch_acp_model, model) + # Persist the switch into meta.json. ``start()`` rebuilds the runtime + # agent from ``self.stored.agent``, and ``ConversationState.create()`` + # copies that agent over the persisted base_state.json on resume — so + # without mirroring the new model here, a restart would silently revert + # to the old one. Only ``acp_model`` needs updating: ``model_post_init`` + # re-derives the sentinel ``llm.model`` from it on reload. + self.stored = self.stored.model_copy( + update={"agent": self.stored.agent.model_copy(update={"acp_model": model})} + ) + await self.save_meta() async def close(self): if self._lease_task is not None: diff --git a/tests/agent_server/test_conversation_router.py b/tests/agent_server/test_conversation_router.py index d93039b0ea..38cb74c269 100644 --- a/tests/agent_server/test_conversation_router.py +++ b/tests/agent_server/test_conversation_router.py @@ -1205,6 +1205,33 @@ def test_switch_acp_model_protocol_error_returns_400( client.app.dependency_overrides.clear() +def test_switch_acp_model_timeout_returns_504( + client, mock_conversation_service, mock_event_service, sample_conversation_id +): + """A TimeoutError (wedged/slow ACP server) maps to 504, not 500. + + ``ACPAgent.set_acp_model`` bounds the ``session/set_model`` round-trip with + ``acp_prompt_timeout``; an expired call raises ``TimeoutError``, which the + route surfaces as a Gateway Timeout rather than an opaque 500. + """ + mock_conversation_service.get_event_service.return_value = mock_event_service + mock_event_service.switch_acp_model.side_effect = TimeoutError( + "ACP server did not answer set_session_model within 600s" + ) + + client.app.dependency_overrides[get_conversation_service] = ( + lambda: mock_conversation_service + ) + try: + response = client.post( + f"/api/conversations/{sample_conversation_id}/switch_acp_model", + json={"model": "haiku"}, + ) + assert response.status_code == 504 + finally: + client.app.dependency_overrides.clear() + + def test_run_conversation_already_running( client, mock_conversation_service, mock_event_service, sample_conversation_id ): diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 2058052c10..7dc415bd8b 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -1308,6 +1308,48 @@ async def test_save_meta_preserves_updated_at(self, event_service, tmp_path): loaded = StoredConversation.model_validate_json(meta_file.read_text()) assert loaded.updated_at == original_updated_at + @pytest.mark.asyncio + async def test_switch_acp_model_persists_to_meta(self, tmp_path): + """switch_acp_model mirrors the new model into meta.json. + + start() rebuilds the runtime agent from meta.json (self.stored.agent), + and ConversationState.create() copies that agent over the persisted + base_state.json on resume. So the switched model must also be written + to meta.json, otherwise a restart silently reverts to the old model. + """ + from openhands.sdk.agent import ACPAgent + + stored = StoredConversation( + id=uuid4(), + agent=ACPAgent(acp_command=["echo", "test"], acp_model="old-model"), + workspace=LocalWorkspace(working_dir=str(tmp_path)), + confirmation_policy=NeverConfirm(), + initial_message=None, + metrics=None, + ) + service = EventService(stored=stored, conversations_dir=tmp_path) + conv_dir = tmp_path / stored.id.hex + conv_dir.mkdir(parents=True, exist_ok=True) + + # Stand in for a live conversation; the protocol-level switch is + # covered elsewhere — here we only assert the meta.json mirroring. + service._conversation = MagicMock() + + await service.switch_acp_model("new-model") + + # Live switch was delegated to the conversation... + service._conversation.switch_acp_model.assert_called_once_with("new-model") + # ...the in-memory stored agent was updated... + assert isinstance(service.stored.agent, ACPAgent) + assert service.stored.agent.acp_model == "new-model" + # ...and the new model was persisted to meta.json so it survives a + # restart. + loaded = StoredConversation.model_validate_json( + (conv_dir / "meta.json").read_text() + ) + assert isinstance(loaded.agent, ACPAgent) + assert loaded.agent.acp_model == "new-model" + class TestEventServiceStartWithRunningStatus: """Test cases for EventService.start handling of RUNNING execution status.""" From 575e6fd4373926ee48aa5dba42d613fa48a2d0ac Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Tue, 26 May 2026 20:42:48 +0200 Subject: [PATCH 05/15] docs(acp): document TimeoutError in switch_acp_model Raises section set_acp_model propagates TimeoutError from the bounded session/set_model round-trip; surface it in the public contract. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/conversation/impl/local_conversation.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 3b860e119e..5ce4ecbd99 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -721,6 +721,8 @@ def switch_acp_model(self, model: str) -> None: or the provider does not support runtime model switching, or the ACP server rejects the switch. RuntimeError: If the ACP session is not yet initialized. + TimeoutError: If the ACP server does not respond within + ``acp_prompt_timeout`` seconds. """ if not isinstance(self.agent, ACPAgent): raise ValueError( From 46a1cf4d069ab423b762bb9a898a524e5dd49970 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Tue, 26 May 2026 23:14:03 +0200 Subject: [PATCH 06/15] fix(acp): release live runtime from discarded agent on model switch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit switch_acp_model swaps the agent for a shallow model_copy that shares the live ACP runtime (_conn/_executor/_process) with the old agent. ACPAgent defines __del__ -> close(), so when the discarded old agent is garbage collected it would close the connection, kill the subprocess and shut down the executor — out from under the copy, breaking the next turn even though the live switch succeeded. Add ACPAgent.release_runtime() to disarm the finalizer (mark closed + clear the shared references) and call it on the old agent before dropping it. Adds a regression test asserting the discarded agent is disarmed and its close()/__del__ leaves the copy's shared connection/executor intact. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/agent/acp_agent.py | 19 ++++++++++ .../conversation/impl/local_conversation.py | 16 ++++++--- tests/sdk/conversation/test_switch_model.py | 36 +++++++++++++++++++ 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index ae15ac283f..8e1f518396 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -1945,6 +1945,25 @@ def _cleanup(self) -> None: logger.debug("Error closing executor: %s", e) self._executor = None + def release_runtime(self) -> None: + """Relinquish ownership of the live ACP runtime without tearing it down. + + Call this on an agent whose live session has been handed to a shallow + :meth:`~pydantic.BaseModel.model_copy` — which shares the same + ``_conn`` / ``_executor`` / ``_process`` references. It marks the agent + closed and clears those references, disarming the + ``__del__`` -> :meth:`close` finalizer so dropping this now-stale + instance cannot close resources the copy still owns. + + See :meth:`LocalConversation.switch_acp_model`. + """ + self._closed = True + self._conn = None + self._executor = None + self._process = None + self._client = None + self._filtered_reader = None + def __del__(self) -> None: try: self.close() diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 5ce4ecbd99..e14707c6d0 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -734,16 +734,24 @@ def switch_acp_model(self, model: str) -> None: self.agent.set_acp_model(model) # Persist the switched model as the authoritative value. ``acp_model`` # is frozen, so we replace the agent with a copy carrying the new - # value (model_copy preserves the live ACP connection in private - # attrs). This matters on two counts the in-place mutation missed: + # value. This matters on two counts the in-place mutation missed: # 1. A fresh object identity makes the autosave path actually # write base_state.json (re-assigning the same object is a # no-op because old == new). # 2. model_post_init / _start_acp_server derive the sentinel model # and the resumed session model from ``acp_model`` on reload, so # it must hold the switched value, not the construction-time one. - self.agent = self.agent.model_copy(update={"acp_model": model}) - self._state.agent = self.agent + # + # model_copy is shallow, so the copy shares the live ACP runtime + # (_conn/_executor/_process) with the old agent. Release it from the + # old agent before dropping it: otherwise ACPAgent.__del__ -> close() + # on the discarded agent would tear down the session the copy now + # owns, leaving the next turn pointing at a dead connection. + old_agent = self.agent + new_agent = old_agent.model_copy(update={"acp_model": model}) + old_agent.release_runtime() + self.agent = new_agent + self._state.agent = new_agent @observe(name="conversation.send_message") def send_message(self, message: str | Message, sender: str | None = None) -> None: diff --git a/tests/sdk/conversation/test_switch_model.py b/tests/sdk/conversation/test_switch_model.py index 1bdd78e26f..362ca8e818 100644 --- a/tests/sdk/conversation/test_switch_model.py +++ b/tests/sdk/conversation/test_switch_model.py @@ -110,6 +110,42 @@ def test_switch_acp_model_persists_authoritative_model(tmp_path): assert reloaded.agent.llm.model == "model-b" +def test_switch_acp_model_disarms_discarded_agent_finalizer(tmp_path): + """The pre-switch agent must not tear down the shared live session. + + Regression: ``switch_acp_model`` swaps in a shallow ``model_copy`` that + shares ``_conn`` / ``_executor`` / ``_process`` with the old agent. Without + releasing the runtime first, ``ACPAgent.__del__`` -> ``close()`` on the + discarded agent closes the connection, kills the subprocess and shuts down + the executor — out from under the copy, breaking the next turn. + """ + conv, old_agent = _make_acp_conversation(tmp_path) + live_conn = old_agent._conn + live_executor = old_agent._executor + + conv.switch_acp_model("model-b") + + # The copy took over the live runtime... + switched = conv.agent + assert isinstance(switched, ACPAgent) + assert switched._conn is live_conn + assert switched._executor is live_executor + + # ...and the discarded agent was disarmed: marked closed with its runtime + # references cleared, so its finalizer is a no-op. + assert old_agent._closed is True + assert old_agent._conn is None + assert old_agent._executor is None + assert old_agent._process is None + + # Simulating GC (__del__ -> close()) on the old agent leaves the copy's + # shared connection/executor untouched. + live_executor.run_async.reset_mock() + old_agent.close() + live_executor.run_async.assert_not_called() + live_executor.close.assert_not_called() + + def test_switch_profile(profile_store): """switch_profile switches the agent's LLM.""" conv = _make_conversation() From 5a302cb426bedb1b9739267d91911c83fb2e16a6 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Tue, 26 May 2026 23:44:25 +0200 Subject: [PATCH 07/15] fix(acp): robustness on resume, error mapping, races, and atomicity Addresses review on switch_acp_model: - Resume reapplies the persisted model. load_session() carries no model _meta, so a session resumed after a runtime switch ran on the server default while serialized state claimed the switched model. Add _reapply_session_model_on_resume (gated on supports_runtime_model_switch) and call it on the resumed-session path; keep _meta/initial-selection for fresh sessions. - set_acp_model no longer mislabels server errors. JSON-RPC -32603 (server-internal) now propagates as a 5xx instead of being converted to ValueError -> 400; only true client/protocol rejections become ValueError (reuses _RETRIABLE_SERVER_ERROR_CODES, mirroring the prompt path). - release_runtime() only disarms the finalizer (sets _closed) and leaves the shared _conn/_executor/_process intact, so an in-flight ask_agent()/fork holding the pre-switch agent keeps a valid connection. Still prevents the discarded agent's __del__ -> close() from tearing down the copy's runtime. - EventService.switch_acp_model serializes the live switch + meta.json mirror under a per-conversation asyncio.Lock so concurrent switches can't leave persisted metadata inconsistent with the live session. Adds tests for resume reapplication, -32603 propagation, and the disarm (no-clear) behavior. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 45 ++++++--- .../openhands/sdk/agent/acp_agent.py | 99 ++++++++++++++----- .../conversation/impl/local_conversation.py | 8 +- tests/sdk/agent/test_acp_agent.py | 62 ++++++++++++ tests/sdk/conversation/test_switch_model.py | 20 ++-- 5 files changed, 182 insertions(+), 52 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index a715b82625..d174af5364 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -72,6 +72,9 @@ class EventService: # wrapping up; consumed by _run_and_publish to re-run the stranded message. _rerun_requested: bool = field(default=False, init=False) _run_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False) + _acp_model_switch_lock: asyncio.Lock = field( + default_factory=asyncio.Lock, init=False + ) _callback_wrapper: AsyncCallbackWrapper | None = field(default=None, init=False) _lease: ConversationLease | None = field(default=None, init=False) _lease_generation: int | None = field(default=None, init=False) @@ -918,20 +921,34 @@ async def switch_acp_model(self, model: str): a worker thread so the event loop is not blocked, then mirrors the new model into ``meta.json`` so the switch survives an agent-server restart. """ - if self._conversation is None: - raise RuntimeError("Conversation is not active.") - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, self._conversation.switch_acp_model, model) - # Persist the switch into meta.json. ``start()`` rebuilds the runtime - # agent from ``self.stored.agent``, and ``ConversationState.create()`` - # copies that agent over the persisted base_state.json on resume — so - # without mirroring the new model here, a restart would silently revert - # to the old one. Only ``acp_model`` needs updating: ``model_post_init`` - # re-derives the sentinel ``llm.model`` from it on reload. - self.stored = self.stored.model_copy( - update={"agent": self.stored.agent.model_copy(update={"acp_model": model})} - ) - await self.save_meta() + # Serialize concurrent switches for this conversation. The live switch + # and the meta.json mirror must commit as one unit; otherwise two + # interleaved requests could leave meta.json pointing at a different + # model than the last live switch (A switches live->A, B switches + # live->B and saves meta=B, then A saves meta=A while the session runs + # on B). The lock keeps persisted metadata consistent with the live + # session. + async with self._acp_model_switch_lock: + if self._conversation is None: + raise RuntimeError("Conversation is not active.") + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, self._conversation.switch_acp_model, model + ) + # Persist the switch into meta.json. ``start()`` rebuilds the runtime + # agent from ``self.stored.agent``, and ``ConversationState.create()`` + # copies that agent over the persisted base_state.json on resume — so + # without mirroring the new model here, a restart would silently + # revert to the old one. Only ``acp_model`` needs updating: + # ``model_post_init`` re-derives the sentinel ``llm.model`` on reload. + self.stored = self.stored.model_copy( + update={ + "agent": self.stored.agent.model_copy( + update={"acp_model": model} + ) + } + ) + await self.save_meta() async def close(self): if self._lease_task is not None: diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index 8e1f518396..1226225a01 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -230,6 +230,34 @@ async def _maybe_set_session_model( await conn.set_session_model(model_id=acp_model, session_id=session_id) +async def _reapply_session_model_on_resume( + conn: ClientSideConnection, + agent_name: str, + session_id: str, + acp_model: str | None, +) -> None: + """Reapply the persisted model to a *resumed* session. + + ``load_session()`` carries no model ``_meta``, so a session resumed after a + runtime switch (or with any persisted ``acp_model``) would otherwise run on + the ACP server's default. This issues ``set_session_model`` gated on + :attr:`~openhands.sdk.settings.acp_providers.ACPProviderInfo.supports_runtime_model_switch` + — the capability for switching an *already-running* session — so the resumed + live session matches the serialized ``acp_model``. + + This deliberately uses the runtime-switch gate, not the initial-selection + one: claude-agent-acp selects its initial model via ``_meta`` + (``supports_set_session_model=False``) yet supports ``set_session_model`` + for later switches, so on resume it needs this call to honour the persisted + model rather than the server default. + """ + if not acp_model: + return + provider = detect_acp_provider_by_agent_name(agent_name) + if provider is not None and provider.supports_runtime_model_switch: + await conn.set_session_model(model_id=acp_model, session_id=session_id) + + def _extract_token_usage( response: Any, ) -> tuple[int, int, int, int, int]: @@ -1188,18 +1216,34 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]: ) if session_id is None: - # Build _meta content for session options (e.g. model selection). - # Extra kwargs to new_session() become the _meta dict in the - # JSON-RPC request — do NOT wrap in _meta= (that double-nests). + # Fresh session. Build _meta content for session options (e.g. + # model selection). Extra kwargs to new_session() become the + # _meta dict in the JSON-RPC request — do NOT wrap in _meta= + # (that double-nests). session_meta = build_session_model_meta(agent_name, self.acp_model) response = await conn.new_session(cwd=working_dir, **session_meta) session_id = response.session_id - await _maybe_set_session_model( - conn, - agent_name, - session_id, - self.acp_model, - ) + # Initial-selection protocol call for providers that use it + # (codex-acp, gemini-cli); no-op for claude, which selected its + # model via the _meta above. + await _maybe_set_session_model( + conn, + agent_name, + session_id, + self.acp_model, + ) + else: + # Resumed session. load_session() does not carry model _meta, so + # reapply the persisted (possibly runtime-switched) acp_model via + # the runtime-switch capability — otherwise the resumed live + # session would run on the server default while serialized state + # claims the switched model. + await _reapply_session_model_on_resume( + conn, + agent_name, + session_id, + self.acp_model, + ) # Resolve the permission mode. Known providers each have their # own mode ID (bypassPermissions, full-access, yolo …). @@ -1886,10 +1930,17 @@ def set_acp_model(self, model: str) -> None: timeout=self.acp_prompt_timeout, ) except ACPRequestError as e: + # Server-internal failures (JSON-RPC -32603) are not the caller's + # fault, and the prompt path already treats them as retriable. Let + # them propagate (-> 5xx) instead of mislabeling them as a 400 + # client error. + if e.code in _RETRIABLE_SERVER_ERROR_CODES: + raise # acp.exceptions.RequestError derives from Exception (not - # RuntimeError); surface it as a ValueError so callers — and the - # agent-server route — treat a rejected switch as a 400-class - # client error rather than an opaque 500. + # RuntimeError); surface a true client/protocol rejection (e.g. + # method-not-found, invalid model id) as a ValueError so callers — + # and the agent-server route — treat it as a 400-class client error + # rather than an opaque 500. raise ValueError( f"ACP server rejected set_session_model(model={model!r}): {e}" ) from e @@ -1946,23 +1997,23 @@ def _cleanup(self) -> None: self._executor = None def release_runtime(self) -> None: - """Relinquish ownership of the live ACP runtime without tearing it down. + """Disarm this agent's finalizer after handing its live ACP runtime to a + shallow :meth:`~pydantic.BaseModel.model_copy`. + + The copy shares this agent's ``_conn`` / ``_executor`` / ``_process`` + references (``model_copy`` is shallow). Marking this now-stale instance + closed makes its ``__del__`` -> :meth:`close` a no-op, so dropping it + cannot tear down the runtime the copy now owns. - Call this on an agent whose live session has been handed to a shallow - :meth:`~pydantic.BaseModel.model_copy` — which shares the same - ``_conn`` / ``_executor`` / ``_process`` references. It marks the agent - closed and clears those references, disarming the - ``__del__`` -> :meth:`close` finalizer so dropping this now-stale - instance cannot close resources the copy still owns. + The runtime references are intentionally left intact: an in-flight + :meth:`ask_agent` fork — which is thread-safe and may still hold this + pre-switch agent — keeps a valid connection until it finishes. Sole + ownership for teardown passes to the copy (the live ``self.agent`` + going forward), which is closed on conversation shutdown. See :meth:`LocalConversation.switch_acp_model`. """ self._closed = True - self._conn = None - self._executor = None - self._process = None - self._client = None - self._filtered_reader = None def __del__(self) -> None: try: diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index e14707c6d0..1d2e7cfb2e 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -743,10 +743,10 @@ def switch_acp_model(self, model: str) -> None: # it must hold the switched value, not the construction-time one. # # model_copy is shallow, so the copy shares the live ACP runtime - # (_conn/_executor/_process) with the old agent. Release it from the - # old agent before dropping it: otherwise ACPAgent.__del__ -> close() - # on the discarded agent would tear down the session the copy now - # owns, leaving the next turn pointing at a dead connection. + # (_conn/_executor/_process) with the old agent. Disarm the old + # agent's finalizer before dropping it: otherwise ACPAgent.__del__ + # -> close() on the discarded agent would tear down the session the + # copy now owns, leaving the next turn pointing at a dead connection. old_agent = self.agent new_agent = old_agent.model_copy(update={"acp_model": model}) old_agent.release_runtime() diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 0a340016b9..fc216d7e49 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -19,6 +19,7 @@ _image_url_to_acp_block, _maybe_set_session_model, _OpenHandsACPBridge, + _reapply_session_model_on_resume, _select_auth_method, _serialize_tool_content, ) @@ -3208,6 +3209,53 @@ async def test_missing_model_skips_protocol_override(self): conn.set_session_model.assert_not_called() +class TestReapplySessionModelOnResume: + """Resume reapplies the persisted model via the runtime-switch gate.""" + + @pytest.mark.asyncio + async def test_claude_reapplies_persisted_model_on_resume(self): + # claude selects its initial model via _meta (supports_set_session_model + # =False) but DOES support set_session_model for runtime switches. + # load_session() carries no _meta, so on resume the persisted model must + # be reapplied via the runtime-switch gate — _maybe_set_session_model + # would skip it. + conn = AsyncMock() + await _reapply_session_model_on_resume( + conn, "claude-agent-acp", "sess-1", "claude-haiku-4-5-20251001" + ) + conn.set_session_model.assert_awaited_once_with( + model_id="claude-haiku-4-5-20251001", session_id="sess-1" + ) + + @pytest.mark.asyncio + async def test_codex_reapplies_persisted_model_on_resume(self): + conn = AsyncMock() + await _reapply_session_model_on_resume( + conn, "codex-acp", "sess-1", "gpt-5.4/low" + ) + conn.set_session_model.assert_awaited_once_with( + model_id="gpt-5.4/low", session_id="sess-1" + ) + + @pytest.mark.asyncio + async def test_missing_model_skips_reapply(self): + conn = AsyncMock() + await _reapply_session_model_on_resume( + conn, "claude-agent-acp", "sess-1", None + ) + conn.set_session_model.assert_not_called() + + @pytest.mark.asyncio + async def test_unknown_provider_skips_reapply(self): + # provider=None (custom server) is out of scope here; resume + # reapplication is gated on a known provider that supports the switch. + conn = AsyncMock() + await _reapply_session_model_on_resume( + conn, "some-custom-acp", "sess-1", "whatever" + ) + conn.set_session_model.assert_not_called() + + class TestSetACPModel: """Runtime (mid-conversation) model switching via set_session_model.""" @@ -3288,6 +3336,20 @@ def test_translates_acp_request_error_to_value_error(self): # The sentinel LLM must not be mutated when the switch fails. assert agent.llm.model != "bogus-model" + def test_propagates_server_internal_error(self): + # JSON-RPC -32603 is a server-internal failure, not a bad client + # request. It must propagate (as the raw ACPRequestError -> 5xx) rather + # than be mislabeled as a 400-class ValueError, mirroring the retriable + # handling on the prompt path. + agent = self._wire(_make_agent(), "codex-acp") + agent._executor.run_async.side_effect = ACPRequestError( + code=-32603, message="internal error" + ) + with pytest.raises(ACPRequestError): + agent.set_acp_model("some-model") + # The sentinel LLM must not be mutated when the switch fails. + assert agent.llm.model != "some-model" + def test_passes_timeout_to_run_async(self): # The protocol round-trip runs under the conversation state lock, so it # must be bounded to avoid wedging the lock if the server never answers. diff --git a/tests/sdk/conversation/test_switch_model.py b/tests/sdk/conversation/test_switch_model.py index 362ca8e818..ccd591ff81 100644 --- a/tests/sdk/conversation/test_switch_model.py +++ b/tests/sdk/conversation/test_switch_model.py @@ -115,9 +115,9 @@ def test_switch_acp_model_disarms_discarded_agent_finalizer(tmp_path): Regression: ``switch_acp_model`` swaps in a shallow ``model_copy`` that shares ``_conn`` / ``_executor`` / ``_process`` with the old agent. Without - releasing the runtime first, ``ACPAgent.__del__`` -> ``close()`` on the - discarded agent closes the connection, kills the subprocess and shuts down - the executor — out from under the copy, breaking the next turn. + disarming it first, ``ACPAgent.__del__`` -> ``close()`` on the discarded + agent closes the connection, kills the subprocess and shuts down the + executor — out from under the copy, breaking the next turn. """ conv, old_agent = _make_acp_conversation(tmp_path) live_conn = old_agent._conn @@ -131,15 +131,15 @@ def test_switch_acp_model_disarms_discarded_agent_finalizer(tmp_path): assert switched._conn is live_conn assert switched._executor is live_executor - # ...and the discarded agent was disarmed: marked closed with its runtime - # references cleared, so its finalizer is a no-op. + # ...and the discarded agent's finalizer was disarmed (marked closed) + # WITHOUT clearing its runtime references — an in-flight ask_agent()/fork + # still holding the old agent keeps a valid connection. assert old_agent._closed is True - assert old_agent._conn is None - assert old_agent._executor is None - assert old_agent._process is None + assert old_agent._conn is live_conn + assert old_agent._executor is live_executor - # Simulating GC (__del__ -> close()) on the old agent leaves the copy's - # shared connection/executor untouched. + # Simulating GC (__del__ -> close()) on the disarmed old agent is a no-op: + # the copy's shared connection/executor are left intact. live_executor.run_async.reset_mock() old_agent.close() live_executor.run_async.assert_not_called() From c93ffba5f2f4e6e72be8b2a2c7e8230f1f3b6a3b Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Tue, 26 May 2026 23:47:06 +0200 Subject: [PATCH 08/15] style: apply ruff format to acp switch changes Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 8 ++------ tests/sdk/agent/test_acp_agent.py | 4 +--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index d174af5364..9859086945 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -932,9 +932,7 @@ async def switch_acp_model(self, model: str): if self._conversation is None: raise RuntimeError("Conversation is not active.") loop = asyncio.get_running_loop() - await loop.run_in_executor( - None, self._conversation.switch_acp_model, model - ) + await loop.run_in_executor(None, self._conversation.switch_acp_model, model) # Persist the switch into meta.json. ``start()`` rebuilds the runtime # agent from ``self.stored.agent``, and ``ConversationState.create()`` # copies that agent over the persisted base_state.json on resume — so @@ -943,9 +941,7 @@ async def switch_acp_model(self, model: str): # ``model_post_init`` re-derives the sentinel ``llm.model`` on reload. self.stored = self.stored.model_copy( update={ - "agent": self.stored.agent.model_copy( - update={"acp_model": model} - ) + "agent": self.stored.agent.model_copy(update={"acp_model": model}) } ) await self.save_meta() diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index fc216d7e49..46a2f4279c 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -3240,9 +3240,7 @@ async def test_codex_reapplies_persisted_model_on_resume(self): @pytest.mark.asyncio async def test_missing_model_skips_reapply(self): conn = AsyncMock() - await _reapply_session_model_on_resume( - conn, "claude-agent-acp", "sess-1", None - ) + await _reapply_session_model_on_resume(conn, "claude-agent-acp", "sess-1", None) conn.set_session_model.assert_not_called() @pytest.mark.asyncio From eff6fbc97499ac295d68bef07742e13f1506c8e8 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Wed, 27 May 2026 00:00:49 +0200 Subject: [PATCH 09/15] docs(acp): clarify inactive-conversation message and agent-swap comment - EventService.switch_acp_model: spell out that an inactive conversation means it was not started or has been closed (it maps to 409, not 404). - LocalConversation.switch_acp_model: note why both self.agent and self._state.agent are reassigned (live reference vs. autosaved state). Addresses review suggestions; no behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 5 ++++- .../openhands/sdk/conversation/impl/local_conversation.py | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 9859086945..be9f56de31 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -930,7 +930,10 @@ async def switch_acp_model(self, model: str): # session. async with self._acp_model_switch_lock: if self._conversation is None: - raise RuntimeError("Conversation is not active.") + raise RuntimeError( + "Conversation is not active; it has not been started or has " + "been closed." + ) loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._conversation.switch_acp_model, model) # Persist the switch into meta.json. ``start()`` rebuilds the runtime diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 1d2e7cfb2e..23296b1972 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -750,6 +750,10 @@ def switch_acp_model(self, model: str) -> None: old_agent = self.agent new_agent = old_agent.model_copy(update={"acp_model": model}) old_agent.release_runtime() + # ``self.agent`` is the live reference used by subsequent ``step()`` + # calls; ``self._state.agent`` is what the autosave path serializes + # to base_state.json. Update both so the running conversation and the + # persisted state agree on the switched model. self.agent = new_agent self._state.agent = new_agent From 881291e649aca7141c65e784bcda08bdebf01db0 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Wed, 27 May 2026 00:09:51 +0200 Subject: [PATCH 10/15] fix(acp): consistent resume reapply for custom servers + cancel-safe switch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review on switch_acp_model: - Resume reapply now mirrors set_acp_model's gating: it attempts the set_session_model reapply for custom/unknown servers (provider is None) — which set_acp_model also lets switch and whose switches are persisted as authoritative — and skips only known providers that don't support runtime switching. A server that rejects the call has the error swallowed/logged (like the load_session fallback) so resume can't break. - EventService.switch_acp_model is now cancel-safe. run_in_executor can't cancel the worker thread, so the live switch + base_state could complete while meta.json was never mirrored (and the lock released mid-flight). The switch + meta mirror now run as one shielded unit; on cancellation it finishes (holding the lock) before propagating, keeping persisted metadata consistent with the live session. Adds tests for custom-provider resume reapply, rejection-swallowing, the known-unsupported skip, and the cancel-during-switch meta mirror. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 48 ++++++++++----- .../openhands/sdk/agent/acp_agent.py | 35 ++++++++--- tests/agent_server/test_event_service.py | 61 +++++++++++++++++++ tests/sdk/agent/test_acp_agent.py | 47 +++++++++++++- 4 files changed, 165 insertions(+), 26 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index be9f56de31..2a52fbe71a 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -934,20 +934,40 @@ async def switch_acp_model(self, model: str): "Conversation is not active; it has not been started or has " "been closed." ) - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, self._conversation.switch_acp_model, model) - # Persist the switch into meta.json. ``start()`` rebuilds the runtime - # agent from ``self.stored.agent``, and ``ConversationState.create()`` - # copies that agent over the persisted base_state.json on resume — so - # without mirroring the new model here, a restart would silently - # revert to the old one. Only ``acp_model`` needs updating: - # ``model_post_init`` re-derives the sentinel ``llm.model`` on reload. - self.stored = self.stored.model_copy( - update={ - "agent": self.stored.agent.model_copy(update={"acp_model": model}) - } - ) - await self.save_meta() + conversation = self._conversation + + async def _switch_and_persist() -> None: + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, conversation.switch_acp_model, model) + # Persist the switch into meta.json. ``start()`` rebuilds the + # runtime agent from ``self.stored.agent``, and + # ``ConversationState.create()`` copies that agent over the + # persisted base_state.json on resume — so without mirroring the + # new model here, a restart would silently revert to the old one. + # Only ``acp_model`` needs updating: ``model_post_init`` + # re-derives the sentinel ``llm.model`` on reload. + self.stored = self.stored.model_copy( + update={ + "agent": self.stored.agent.model_copy( + update={"acp_model": model} + ) + } + ) + await self.save_meta() + + # ``run_in_executor`` cannot cancel the worker thread once it starts, + # so a cancellation (client disconnect / shutdown) could leave the + # live session + base_state.json switched while meta.json is never + # mirrored — and would release the lock with the worker still in + # flight. Run the switch + mirror as one shielded unit: on + # cancellation, let it finish (keeping the lock held) so persisted + # metadata matches the live session, then propagate the cancel. + task = asyncio.create_task(_switch_and_persist()) + try: + await asyncio.shield(task) + except asyncio.CancelledError: + await task + raise async def close(self): if self._lease_task is not None: diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index 1226225a01..ec7930ae73 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -240,22 +240,39 @@ async def _reapply_session_model_on_resume( ``load_session()`` carries no model ``_meta``, so a session resumed after a runtime switch (or with any persisted ``acp_model``) would otherwise run on - the ACP server's default. This issues ``set_session_model`` gated on - :attr:`~openhands.sdk.settings.acp_providers.ACPProviderInfo.supports_runtime_model_switch` - — the capability for switching an *already-running* session — so the resumed + the ACP server's default. This issues ``set_session_model`` so the resumed live session matches the serialized ``acp_model``. - This deliberately uses the runtime-switch gate, not the initial-selection - one: claude-agent-acp selects its initial model via ``_meta`` - (``supports_set_session_model=False``) yet supports ``set_session_model`` - for later switches, so on resume it needs this call to honour the persisted - model rather than the server default. + The gating mirrors :meth:`ACPAgent.set_acp_model` exactly — attempt for + custom/unknown servers (``provider is None``, which ``set_acp_model`` also + lets switch and whose switches are persisted as authoritative) and for known + providers that support runtime switching; skip only known providers that + don't. This deliberately differs from the *initial-selection* gate + (``supports_set_session_model``): claude-agent-acp selects its initial model + via ``_meta`` yet supports ``set_session_model`` for later switches, so on + resume it needs this call to honour the persisted model rather than the + server default. + + A server that doesn't actually support the call rejects it; that rejection + is swallowed (logged) here — like the ``load_session`` fallback — so a + stale-capability server can't break resume. The session simply keeps the + server default until the next explicit switch. """ if not acp_model: return provider = detect_acp_provider_by_agent_name(agent_name) - if provider is not None and provider.supports_runtime_model_switch: + if provider is not None and not provider.supports_runtime_model_switch: + return + try: await conn.set_session_model(model_id=acp_model, session_id=session_id) + except ACPRequestError as e: + logger.warning( + "Could not reapply model %r on resumed session %s (%s); the live " + "session may run on the server default until the next switch", + acp_model, + session_id, + e, + ) def _extract_token_usage( diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 7dc415bd8b..74c008f60d 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -1350,6 +1350,67 @@ async def test_switch_acp_model_persists_to_meta(self, tmp_path): assert isinstance(loaded.agent, ACPAgent) assert loaded.agent.acp_model == "new-model" + @pytest.mark.asyncio + async def test_switch_acp_model_finishes_meta_on_cancel(self, tmp_path): + """A cancel mid-switch still mirrors meta.json before propagating. + + ``run_in_executor`` cannot cancel the worker thread, so the live switch + may complete; ``switch_acp_model`` must finish the meta.json mirror + (holding the lock) before re-raising ``CancelledError``, or a restart + resumes from stale metadata. + """ + from openhands.sdk.agent import ACPAgent + + stored = StoredConversation( + id=uuid4(), + agent=ACPAgent(acp_command=["echo", "test"], acp_model="old-model"), + workspace=LocalWorkspace(working_dir=str(tmp_path)), + confirmation_policy=NeverConfirm(), + initial_message=None, + metrics=None, + ) + service = EventService(stored=stored, conversations_dir=tmp_path) + conv_dir = tmp_path / stored.id.hex + conv_dir.mkdir(parents=True, exist_ok=True) + + started = threading.Event() + release = threading.Event() + + def _slow_switch(model): + # Stand in for the blocking worker-thread switch; block until the + # test releases it so we can cancel while it is in flight. + started.set() + release.wait(timeout=5) + + conversation = MagicMock() + conversation.switch_acp_model.side_effect = _slow_switch + service._conversation = conversation + + task = asyncio.create_task(service.switch_acp_model("new-model")) + loop = asyncio.get_running_loop() + # Wait until the worker thread is mid-switch, then cancel. + await loop.run_in_executor(None, started.wait, 5) + task.cancel() + # Let the cancellation reach the shield (coroutine enters the except and + # starts awaiting the in-flight worker) before releasing the worker. + for _ in range(3): + await asyncio.sleep(0) + release.set() + + with pytest.raises(asyncio.CancelledError): + await task + + # Despite the cancel, the worker ran to completion and meta.json was + # mirrored with the new model. + conversation.switch_acp_model.assert_called_once_with("new-model") + assert isinstance(service.stored.agent, ACPAgent) + assert service.stored.agent.acp_model == "new-model" + loaded = StoredConversation.model_validate_json( + (conv_dir / "meta.json").read_text() + ) + assert isinstance(loaded.agent, ACPAgent) + assert loaded.agent.acp_model == "new-model" + class TestEventServiceStartWithRunningStatus: """Test cases for EventService.start handling of RUNNING execution status.""" diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 46a2f4279c..d87c6f5e83 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -3244,15 +3244,56 @@ async def test_missing_model_skips_reapply(self): conn.set_session_model.assert_not_called() @pytest.mark.asyncio - async def test_unknown_provider_skips_reapply(self): - # provider=None (custom server) is out of scope here; resume - # reapplication is gated on a known provider that supports the switch. + async def test_unknown_provider_attempts_reapply(self): + # provider=None (custom server) is allowed to attempt the switch by + # set_acp_model, and such switches are persisted as authoritative — so + # resume must mirror that and attempt the reapply too (otherwise the + # resumed session would silently revert to the server default). conn = AsyncMock() await _reapply_session_model_on_resume( conn, "some-custom-acp", "sess-1", "whatever" ) + conn.set_session_model.assert_awaited_once_with( + model_id="whatever", session_id="sess-1" + ) + + @pytest.mark.asyncio + async def test_known_unsupported_provider_skips_reapply(self): + from openhands.sdk.settings.acp_providers import ACPProviderInfo + + unsupported = ACPProviderInfo( + key="legacy", + display_name="Legacy", + default_command=("legacy",), + api_key_env_var=None, + base_url_env_var=None, + default_session_mode="default", + agent_name_patterns=("legacy",), + supports_set_session_model=False, + supports_runtime_model_switch=False, + session_meta_key=None, + ) + conn = AsyncMock() + with patch( + "openhands.sdk.agent.acp_agent.detect_acp_provider_by_agent_name", + return_value=unsupported, + ): + await _reapply_session_model_on_resume(conn, "legacy-acp", "sess-1", "x") conn.set_session_model.assert_not_called() + @pytest.mark.asyncio + async def test_rejection_is_swallowed_on_resume(self): + # A server that rejects the reapply must not break resume (mirrors the + # load_session fallback). The error is logged, not raised. + conn = AsyncMock() + conn.set_session_model.side_effect = ACPRequestError( + code=-32601, message="method not found" + ) + await _reapply_session_model_on_resume( + conn, "some-custom-acp", "sess-1", "whatever" + ) + conn.set_session_model.assert_awaited_once() + class TestSetACPModel: """Runtime (mid-conversation) model switching via set_session_model.""" From 4760fa0860d9655f618984e469a3a850ab15c2c7 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Wed, 27 May 2026 00:30:11 +0200 Subject: [PATCH 11/15] fix(acp): cancel precedence in switch + document timeout ambiguity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review on switch_acp_model: - EventService.switch_acp_model: on cancellation, suppress any exception from the in-flight switch task before re-raising CancelledError, so a switch failure during the cancellation window can't mask the cancel. - ACPAgent.set_acp_model: document that a TimeoutError means the client stopped waiting, not that the switch was rejected — the request may still be applied server-side, leaving the server model indeterminate; local state is intentionally left unchanged (conservative, self-heals on the next switch; the agent always runs whatever model the live ACP session holds). Adds a test asserting CancelledError takes precedence over a switch failure during cancellation. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 7 ++- .../openhands/sdk/agent/acp_agent.py | 11 +++++ tests/agent_server/test_event_service.py | 46 +++++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 2a52fbe71a..f5dbf8dd1c 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -966,7 +966,12 @@ async def _switch_and_persist() -> None: try: await asyncio.shield(task) except asyncio.CancelledError: - await task + # Let the in-flight switch settle (the worker thread can't be + # cancelled) so meta.json is mirrored, but don't let a failure + # of the switch itself mask the cancellation — suppress any + # task exception and always propagate CancelledError. + with suppress(Exception): + await task raise async def close(self): diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index ec7930ae73..bbdb3ebe94 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -1922,6 +1922,17 @@ def set_acp_model(self, model: str) -> None: model id). TimeoutError: If the server does not answer within ``acp_prompt_timeout`` seconds. + + Note: + A timeout means the client stopped waiting, not that the switch was + rejected: the ``session/set_model`` request may already have been + written and could still be applied server-side. The connection and + session stay alive and the local sentinel model is intentionally + left unchanged, so a timed-out switch leaves the server-side model + indeterminate. The conservative choice (treat it as failed locally) + keeps cost/token accounting on the previously-known model and + self-heals on the next successful switch; the agent itself always + runs whatever model the live ACP session holds. """ if self._conn is None or self._session_id is None or self._executor is None: raise RuntimeError( diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 74c008f60d..7acfe1490a 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -1411,6 +1411,52 @@ def _slow_switch(model): assert isinstance(loaded.agent, ACPAgent) assert loaded.agent.acp_model == "new-model" + @pytest.mark.asyncio + async def test_switch_acp_model_cancel_takes_precedence_over_failure( + self, tmp_path + ): + """A switch failure during the cancellation window must not mask the + cancel: ``CancelledError`` still propagates, not the switch's error. + """ + from openhands.sdk.agent import ACPAgent + + stored = StoredConversation( + id=uuid4(), + agent=ACPAgent(acp_command=["echo", "test"], acp_model="old-model"), + workspace=LocalWorkspace(working_dir=str(tmp_path)), + confirmation_policy=NeverConfirm(), + initial_message=None, + metrics=None, + ) + service = EventService(stored=stored, conversations_dir=tmp_path) + conv_dir = tmp_path / stored.id.hex + conv_dir.mkdir(parents=True, exist_ok=True) + + started = threading.Event() + release = threading.Event() + + def _failing_switch(model): + started.set() + release.wait(timeout=5) + raise RuntimeError("switch boom") + + conversation = MagicMock() + conversation.switch_acp_model.side_effect = _failing_switch + service._conversation = conversation + + task = asyncio.create_task(service.switch_acp_model("new-model")) + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, started.wait, 5) + task.cancel() + for _ in range(3): + await asyncio.sleep(0) + release.set() + + # The switch raises RuntimeError after release, but the cancellation + # takes precedence — the caller sees CancelledError, not "switch boom". + with pytest.raises(asyncio.CancelledError): + await task + class TestEventServiceStartWithRunningStatus: """Test cases for EventService.start handling of RUNNING execution status.""" From 340cf451fc5704a9381742f745b8f25e98464fa9 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Wed, 27 May 2026 00:57:27 +0200 Subject: [PATCH 12/15] fix(acp): bound + classify resume reapply errors; switch annotations/comments Addresses review on switch_acp_model / resume: - _reapply_session_model_on_resume now bounds the set_session_model call with the agent's acp_prompt_timeout (a hung server can no longer block session startup forever) and mirrors set_acp_model's error classification: server-internal failures (-32603) propagate rather than silently leaving the resumed session on the wrong model; client/protocol rejections (method-not-found, invalid model id) are swallowed/logged so a stale-capability server can't break resume. - EventService.switch_acp_model: add -> None return annotation; expand the cancellation comment to note CancelledError (a BaseException) is never suppressed by suppress(Exception). Adds resume tests for -32603 propagation and the timeout bound. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 8 +-- .../openhands/sdk/agent/acp_agent.py | 26 ++++++++-- tests/sdk/agent/test_acp_agent.py | 51 +++++++++++++++---- 3 files changed, 68 insertions(+), 17 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index f5dbf8dd1c..725e5ae447 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -914,7 +914,7 @@ async def set_security_analyzer( None, self._conversation.set_security_analyzer, security_analyzer ) - async def switch_acp_model(self, model: str): + async def switch_acp_model(self, model: str) -> None: """Switch the model on a running ACP conversation. Runs the (blocking) protocol-level ``session/set_model`` round-trip in @@ -968,8 +968,10 @@ async def _switch_and_persist() -> None: except asyncio.CancelledError: # Let the in-flight switch settle (the worker thread can't be # cancelled) so meta.json is mirrored, but don't let a failure - # of the switch itself mask the cancellation — suppress any - # task exception and always propagate CancelledError. + # of the switch itself mask the cancellation. suppress(Exception) + # swallows only the task's own error — CancelledError is a + # BaseException, not an Exception, so it is never suppressed — + # then we always re-raise it. with suppress(Exception): await task raise diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index bbdb3ebe94..cf077a1ca8 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -235,6 +235,7 @@ async def _reapply_session_model_on_resume( agent_name: str, session_id: str, acp_model: str | None, + timeout: float, ) -> None: """Reapply the persisted model to a *resumed* session. @@ -253,10 +254,19 @@ async def _reapply_session_model_on_resume( resume it needs this call to honour the persisted model rather than the server default. - A server that doesn't actually support the call rejects it; that rejection - is swallowed (logged) here — like the ``load_session`` fallback — so a - stale-capability server can't break resume. The session simply keeps the - server default until the next explicit switch. + Error handling mirrors :meth:`ACPAgent.set_acp_model`: + + - The call is bounded by ``timeout`` (the agent's ``acp_prompt_timeout``) so + a server that hangs after reconnect cannot block session startup forever; + a ``TimeoutError`` propagates. + - A server-internal failure (JSON-RPC ``-32603``) propagates — silently + continuing on the wrong model while serialized state claims otherwise + would be worse than a loud startup failure. + - A client/protocol rejection (e.g. ``method-not-found`` on a server that + doesn't actually support the call, or an invalid persisted model id) is + swallowed and logged — like the ``load_session`` fallback — so a + stale-capability/stale-model server can't break resume. The session keeps + the server default until the next explicit switch. """ if not acp_model: return @@ -264,8 +274,13 @@ async def _reapply_session_model_on_resume( if provider is not None and not provider.supports_runtime_model_switch: return try: - await conn.set_session_model(model_id=acp_model, session_id=session_id) + await asyncio.wait_for( + conn.set_session_model(model_id=acp_model, session_id=session_id), + timeout=timeout, + ) except ACPRequestError as e: + if e.code in _RETRIABLE_SERVER_ERROR_CODES: + raise logger.warning( "Could not reapply model %r on resumed session %s (%s); the live " "session may run on the server default until the next switch", @@ -1260,6 +1275,7 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]: agent_name, session_id, self.acp_model, + self.acp_prompt_timeout, ) # Resolve the permission mode. Known providers each have their diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index d87c6f5e83..084c8383d0 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -3221,7 +3221,7 @@ async def test_claude_reapplies_persisted_model_on_resume(self): # would skip it. conn = AsyncMock() await _reapply_session_model_on_resume( - conn, "claude-agent-acp", "sess-1", "claude-haiku-4-5-20251001" + conn, "claude-agent-acp", "sess-1", "claude-haiku-4-5-20251001", 600.0 ) conn.set_session_model.assert_awaited_once_with( model_id="claude-haiku-4-5-20251001", session_id="sess-1" @@ -3231,7 +3231,7 @@ async def test_claude_reapplies_persisted_model_on_resume(self): async def test_codex_reapplies_persisted_model_on_resume(self): conn = AsyncMock() await _reapply_session_model_on_resume( - conn, "codex-acp", "sess-1", "gpt-5.4/low" + conn, "codex-acp", "sess-1", "gpt-5.4/low", 600.0 ) conn.set_session_model.assert_awaited_once_with( model_id="gpt-5.4/low", session_id="sess-1" @@ -3240,7 +3240,9 @@ async def test_codex_reapplies_persisted_model_on_resume(self): @pytest.mark.asyncio async def test_missing_model_skips_reapply(self): conn = AsyncMock() - await _reapply_session_model_on_resume(conn, "claude-agent-acp", "sess-1", None) + await _reapply_session_model_on_resume( + conn, "claude-agent-acp", "sess-1", None, 600.0 + ) conn.set_session_model.assert_not_called() @pytest.mark.asyncio @@ -3251,7 +3253,7 @@ async def test_unknown_provider_attempts_reapply(self): # resumed session would silently revert to the server default). conn = AsyncMock() await _reapply_session_model_on_resume( - conn, "some-custom-acp", "sess-1", "whatever" + conn, "some-custom-acp", "sess-1", "whatever", 600.0 ) conn.set_session_model.assert_awaited_once_with( model_id="whatever", session_id="sess-1" @@ -3278,22 +3280,53 @@ async def test_known_unsupported_provider_skips_reapply(self): "openhands.sdk.agent.acp_agent.detect_acp_provider_by_agent_name", return_value=unsupported, ): - await _reapply_session_model_on_resume(conn, "legacy-acp", "sess-1", "x") + await _reapply_session_model_on_resume( + conn, "legacy-acp", "sess-1", "x", 600.0 + ) conn.set_session_model.assert_not_called() @pytest.mark.asyncio - async def test_rejection_is_swallowed_on_resume(self): - # A server that rejects the reapply must not break resume (mirrors the - # load_session fallback). The error is logged, not raised. + async def test_client_rejection_is_swallowed_on_resume(self): + # A client/protocol rejection (method-not-found = server doesn't support + # the call, or invalid model id) must not break resume — mirrors the + # load_session fallback. The error is logged, not raised. conn = AsyncMock() conn.set_session_model.side_effect = ACPRequestError( code=-32601, message="method not found" ) await _reapply_session_model_on_resume( - conn, "some-custom-acp", "sess-1", "whatever" + conn, "some-custom-acp", "sess-1", "whatever", 600.0 ) conn.set_session_model.assert_awaited_once() + @pytest.mark.asyncio + async def test_server_error_propagates_on_resume(self): + # A server-internal failure (-32603) must NOT be silently swallowed: + # continuing on the wrong model while serialized state claims otherwise + # is worse than a loud startup failure (matches set_acp_model). + conn = AsyncMock() + conn.set_session_model.side_effect = ACPRequestError( + code=-32603, message="internal error" + ) + with pytest.raises(ACPRequestError): + await _reapply_session_model_on_resume( + conn, "codex-acp", "sess-1", "gpt-5.4/low", 600.0 + ) + + @pytest.mark.asyncio + async def test_hang_times_out_on_resume(self): + # A server that hangs after reconnect must not block session startup + # forever; the bounded call raises TimeoutError instead. + async def _hang(**kwargs): + await asyncio.sleep(10) + + conn = AsyncMock() + conn.set_session_model = AsyncMock(side_effect=_hang) + with pytest.raises((TimeoutError, asyncio.TimeoutError)): + await _reapply_session_model_on_resume( + conn, "codex-acp", "sess-1", "gpt-5.4/low", 0.01 + ) + class TestSetACPModel: """Runtime (mid-conversation) model switching via set_session_model.""" From 0baa3128940d954c2981b428ce7fb323244d7ef3 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Wed, 27 May 2026 01:17:52 +0200 Subject: [PATCH 13/15] fix(acp): no subprocess leak on init failure; close() coordinates with switch Addresses review on the resume reapply / switch lifecycle: - ACPAgent._init now assigns self._conn/_process/_filtered_reader as soon as the subprocess and connection are created, instead of only via the returned tuple at the end. A mid-init failure (e.g. the resume model reapply now raises on timeout / -32603) therefore leaves them reachable by init_state()'s _cleanup(), so the subprocess/connection are torn down rather than leaked. The "session initialized" gating keys off _session_id (assigned last), so an early _conn doesn't make the agent look ready prematurely. - EventService.close() now acquires _acp_model_switch_lock around the conversation teardown, so it waits for an in-flight (shielded) switch to finish mirroring meta.json instead of closing the ACP runtime out from under the worker. Adds a test asserting close() waits for an in-flight switch before tearing the conversation down. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 15 +++-- .../openhands/sdk/agent/acp_agent.py | 30 ++++++---- tests/agent_server/test_event_service.py | 55 +++++++++++++++++++ 3 files changed, 85 insertions(+), 15 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 725e5ae447..c16a9ff0c6 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -1008,10 +1008,17 @@ async def close(self): self._run_task = None await self._pub_sub.close() - if self._conversation: - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, self._conversation.close) - self._conversation = None + # Coordinate with in-flight model switches: switch_acp_model holds this + # lock while its (shielded) worker applies the live switch and mirrors + # meta.json. Acquiring it here ensures we don't tear the conversation + # down from under an in-flight switch — which would leave the worker + # running against a closed ACP runtime and mirror meta.json for a + # session that has already been torn down. + async with self._acp_model_switch_lock: + if self._conversation: + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._conversation.close) + self._conversation = None if self._lease is not None and self._lease_generation is not None: self._lease.release(self._lease_generation) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index cf077a1ca8..ff20cf0f4c 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -1144,7 +1144,7 @@ def _start_acp_server(self, state: ConversationState) -> None: ) prior_session_id = None - async def _init() -> tuple[Any, Any, Any, str, str, str]: + async def _init() -> tuple[str, str, str]: # Spawn the subprocess directly so we can install a # filtering reader that skips non-JSON-RPC lines some # ACP servers (e.g. claude-code-acp v0.1.x) write to @@ -1174,6 +1174,17 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]: filtered_reader, # read filtered output ) + # Track the subprocess/connection on self as soon as they exist, so + # that if a *later* init step fails (e.g. the resume model reapply + # times out or the server errors), init_state()'s _cleanup() can + # still tear them down instead of leaking the subprocess/connection. + # The "session initialized" gating keys off _session_id (assigned + # last, on full success), so an early _conn here does not make the + # agent look ready before _init completes. + self._process = process + self._conn = conn + self._filtered_reader = filtered_reader + # Initialize the protocol and discover server identity init_response = await conn.initialize(protocol_version=1) agent_name = "" @@ -1290,17 +1301,14 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]: logger.info("Setting ACP session mode: %s", mode_id) await conn.set_session_mode(mode_id=mode_id, session_id=session_id) - return conn, process, filtered_reader, session_id, agent_name, agent_version + return session_id, agent_name, agent_version - result = self._executor.run_async(_init) - ( - self._conn, - self._process, - self._filtered_reader, - self._session_id, - self._agent_name, - self._agent_version, - ) = result + # _conn / _process / _filtered_reader are assigned inside _init() (right + # after creation) so a mid-init failure can be cleaned up; only the + # success-only fields are returned here. + self._session_id, self._agent_name, self._agent_version = ( + self._executor.run_async(_init) + ) self._working_dir = working_dir def _reset_client_for_turn( diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 7acfe1490a..12e5a51b4b 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -1457,6 +1457,61 @@ def _failing_switch(model): with pytest.raises(asyncio.CancelledError): await task + @pytest.mark.asyncio + async def test_close_waits_for_in_flight_switch(self, tmp_path): + """close() acquires the switch lock, so it waits for an in-flight switch + to finish (mirroring meta.json) before tearing down the conversation. + """ + from openhands.sdk.agent import ACPAgent + + stored = StoredConversation( + id=uuid4(), + agent=ACPAgent(acp_command=["echo", "test"], acp_model="old-model"), + workspace=LocalWorkspace(working_dir=str(tmp_path)), + confirmation_policy=NeverConfirm(), + initial_message=None, + metrics=None, + ) + service = EventService(stored=stored, conversations_dir=tmp_path) + conv_dir = tmp_path / stored.id.hex + conv_dir.mkdir(parents=True, exist_ok=True) + + started = threading.Event() + release = threading.Event() + order: list[str] = [] + + def _slow_switch(model): + started.set() + release.wait(timeout=5) + order.append("switch") + + conversation = MagicMock() + conversation.switch_acp_model.side_effect = _slow_switch + conversation.close.side_effect = lambda: order.append("close") + service._conversation = conversation + + switch_task = asyncio.create_task(service.switch_acp_model("new-model")) + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, started.wait, 5) + + # Start close() while the switch holds the lock; it must block on the + # lock rather than tear the conversation down mid-switch. + close_task = asyncio.create_task(service.close()) + for _ in range(3): + await asyncio.sleep(0) + assert order == [] # neither the switch nor close() has progressed yet + + release.set() + await switch_task + await close_task + + # The switch finished (and mirrored the model) before close() tore the + # conversation down. + assert order == ["switch", "close"] + assert isinstance(service.stored.agent, ACPAgent) + assert service.stored.agent.acp_model == "new-model" + assert service._conversation is None + class TestEventServiceStartWithRunningStatus: """Test cases for EventService.start handling of RUNNING execution status.""" From 00f4eb8c08c7216631e18ddced683cfe98f50a8a Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Wed, 27 May 2026 01:48:37 +0200 Subject: [PATCH 14/15] fix(acp): bound close() lock wait, surface cancel-time failures, guard model Addresses review: - EventService.close() now waits for the switch lock with a bounded timeout (ACP_SWITCH_LOCK_CLOSE_TIMEOUT_SECONDS) instead of unconditionally: a switch is normally sub-second, but a wedged server could otherwise hold the lock for the worker's full acp_prompt_timeout (30 min) while lease renewal is already stopped. After the bound, close() proceeds with teardown. - The cancellation handler now logs a failure of the in-flight switch task (e.g. a meta.json write that fails after the live switch) instead of swallowing it silently, while still always re-raising CancelledError. - set_acp_model rejects empty/whitespace model ids early with a clear ValueError (-> 400) rather than forwarding them to the ACP server, and its docstring now warns that direct callers bypass acp_model persistence / cost attribution (use LocalConversation.switch_acp_model). Adds a test for the empty-model guard. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 56 +++++++++++++++---- .../openhands/sdk/agent/acp_agent.py | 22 +++++--- tests/sdk/agent/test_acp_agent.py | 8 ++- 3 files changed, 67 insertions(+), 19 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index c16a9ff0c6..cc1bb3ed23 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -44,6 +44,12 @@ LEASE_RENEW_INTERVAL_SECONDS = 15.0 +# How long close() waits for an in-flight model switch to release its lock +# before tearing the conversation down anyway. A switch is normally sub-second; +# this only bounds the pathological wedged-server case so close() can't block +# for the worker's full acp_prompt_timeout (default 30 min) while lease renewal +# is already stopped. +ACP_SWITCH_LOCK_CLOSE_TIMEOUT_SECONDS = 10.0 # Bounds initial-state push so subscribe_to_events does not stall on a # subscriber whose __call__ blocks (e.g. WS with a full TCP send buffer). INITIAL_STATE_PUSH_TIMEOUT_SECONDS = 0.5 @@ -967,13 +973,22 @@ async def _switch_and_persist() -> None: await asyncio.shield(task) except asyncio.CancelledError: # Let the in-flight switch settle (the worker thread can't be - # cancelled) so meta.json is mirrored, but don't let a failure - # of the switch itself mask the cancellation. suppress(Exception) - # swallows only the task's own error — CancelledError is a - # BaseException, not an Exception, so it is never suppressed — - # then we always re-raise it. - with suppress(Exception): + # cancelled) so meta.json is mirrored, but always re-raise the + # cancellation. CancelledError is a BaseException, so the + # except below only catches the task's own failure — which we + # log rather than swallow silently, so e.g. a meta.json write + # that fails after the live switch is visible (meta could be + # left at the old model while base_state.json has the new one) + # instead of disappearing — then we re-raise the cancel. + try: await task + except Exception: + logger.warning( + "ACP model switch task failed while handling " + "cancellation for conversation %s", + self.stored.id, + exc_info=True, + ) raise async def close(self): @@ -1010,15 +1025,34 @@ async def close(self): await self._pub_sub.close() # Coordinate with in-flight model switches: switch_acp_model holds this # lock while its (shielded) worker applies the live switch and mirrors - # meta.json. Acquiring it here ensures we don't tear the conversation - # down from under an in-flight switch — which would leave the worker - # running against a closed ACP runtime and mirror meta.json for a - # session that has already been torn down. - async with self._acp_model_switch_lock: + # meta.json. Waiting for it here avoids tearing the conversation down + # from under an in-flight switch (which would leave the worker running + # against a closed ACP runtime and mirror meta.json for a torn-down + # session). The wait is bounded: a switch is normally sub-second, but a + # wedged server could otherwise hold the lock for the full + # acp_prompt_timeout while lease renewal is already stopped, so after + # the bound we proceed with teardown anyway. + switch_lock_held = False + try: + await asyncio.wait_for( + self._acp_model_switch_lock.acquire(), + timeout=ACP_SWITCH_LOCK_CLOSE_TIMEOUT_SECONDS, + ) + switch_lock_held = True + except TimeoutError: + logger.warning( + "Tearing down conversation %s without the model-switch lock; " + "a switch may still be in flight", + self.stored.id, + ) + try: if self._conversation: loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._conversation.close) self._conversation = None + finally: + if switch_lock_held: + self._acp_model_switch_lock.release() if self._lease is not None and self._lease_generation is not None: self._lease.release(self._lease_generation) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index ff20cf0f4c..dcb9daf740 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -1929,21 +1929,27 @@ def set_acp_model(self, model: str) -> None: *same* session — no subprocess restart, no loss of conversation context. Verified against claude-agent-acp and codex-acp. - Thread-safety is the caller's responsibility: drive this through - :meth:`LocalConversation.switch_acp_model`, which holds the state lock - so the switch cannot race a running ``step()``. + This is the low-level agent primitive; prefer + :meth:`LocalConversation.switch_acp_model` as the entry point. That + wrapper (a) holds the state lock so the switch cannot race a running + ``step()``, and (b) persists the new value by swapping in an agent + ``model_copy`` — ``acp_model`` is frozen, so this method updates only + the live session and the sentinel ``llm.model``/metrics, **not** + ``self.acp_model``. A direct caller therefore leaves ``acp_model`` + (which ``_record_usage`` reads for cost attribution) stale and the + switch unpersisted; go through ``switch_acp_model`` instead. Args: model: Provider-specific model id to switch to (e.g. ``"claude-haiku-4-5-20251001"`` or ``"gpt-5.4/low"``). Raises: + ValueError: If ``model`` is empty or whitespace-only, if the + detected provider does not support runtime model switching, or + if the ACP server rejects the ``session/set_model`` call (e.g. + method-not-found on a custom server, or an invalid model id). RuntimeError: If the ACP session has not been initialized yet (i.e. before the first ``run()``). - ValueError: If the detected provider does not support runtime model - switching, or the ACP server rejects the ``session/set_model`` - call (e.g. method-not-found on a custom server, or an invalid - model id). TimeoutError: If the server does not answer within ``acp_prompt_timeout`` seconds. @@ -1958,6 +1964,8 @@ def set_acp_model(self, model: str) -> None: self-heals on the next successful switch; the agent itself always runs whatever model the live ACP session holds. """ + if not model or not model.strip(): + raise ValueError("model must be a non-empty string") if self._conn is None or self._session_id is None or self._executor is None: raise RuntimeError( "ACP session is not initialized; the model can only be switched " diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 084c8383d0..053217dd5f 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -3322,7 +3322,7 @@ async def _hang(**kwargs): conn = AsyncMock() conn.set_session_model = AsyncMock(side_effect=_hang) - with pytest.raises((TimeoutError, asyncio.TimeoutError)): + with pytest.raises(TimeoutError): await _reapply_session_model_on_resume( conn, "codex-acp", "sess-1", "gpt-5.4/low", 0.01 ) @@ -3366,6 +3366,12 @@ def test_unknown_provider_still_attempts_switch(self): agent.set_acp_model("whatever") agent._conn.set_session_model.assert_called_once() + def test_rejects_empty_model(self): + agent = self._wire(_make_agent(), "codex-acp") + with pytest.raises(ValueError, match="non-empty"): + agent.set_acp_model(" ") + agent._conn.set_session_model.assert_not_called() + def test_raises_before_session_initialized(self): agent = _make_agent() # no _conn / _session_id / _executor with pytest.raises(RuntimeError, match="not initialized"): From 6094c645dfb185445587425176012c365627c59a Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Wed, 27 May 2026 09:52:39 +0200 Subject: [PATCH 15/15] refactor(acp): simplify switch_acp_model to match switch_llm's footprint The agent-server switch endpoint had accreted concurrency/cancellation machinery far beyond its sibling switch_llm (a one-liner). Strip it back: - EventService.switch_acp_model: drop the per-conversation lock, the shield/await-task cancellation handling, and the cancel-time logging. It is now guard + run_in_executor(live switch) + mirror acp_model into meta.json + save_meta (~12 lines). close() reverts to its original form (no lock coordination / bounded wait). The guarded scenarios (concurrent switches on one conversation; cancellation mid-switch) are not handled by switch_llm either and are not realistic for a user-driven picker. - _reapply_session_model_on_resume: drop the timeout param and the -32603-vs-client-error distinction; tolerate (log) any ACPRequestError on resume, exactly like the load_session fallback in the same function. This makes it consistent with the other unbounded _init protocol calls (initialize/new_session/set_session_mode). Removes obsolete tests for the deleted machinery. Behavior preserved on the real paths: re-validated end-to-end against the pinned @agentclientprotocol/claude-agent-acp@0.30.0 via the live agent-server (switch -> 200, edge cases 409/400/404), plus 373 unit tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/agent_server/event_service.py | 131 +++----------- .../openhands/sdk/agent/acp_agent.py | 39 +---- tests/agent_server/test_event_service.py | 162 ------------------ tests/sdk/agent/test_acp_agent.py | 46 ++--- 4 files changed, 46 insertions(+), 332 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index cc1bb3ed23..4ec523f031 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -44,12 +44,6 @@ LEASE_RENEW_INTERVAL_SECONDS = 15.0 -# How long close() waits for an in-flight model switch to release its lock -# before tearing the conversation down anyway. A switch is normally sub-second; -# this only bounds the pathological wedged-server case so close() can't block -# for the worker's full acp_prompt_timeout (default 30 min) while lease renewal -# is already stopped. -ACP_SWITCH_LOCK_CLOSE_TIMEOUT_SECONDS = 10.0 # Bounds initial-state push so subscribe_to_events does not stall on a # subscriber whose __call__ blocks (e.g. WS with a full TCP send buffer). INITIAL_STATE_PUSH_TIMEOUT_SECONDS = 0.5 @@ -78,9 +72,6 @@ class EventService: # wrapping up; consumed by _run_and_publish to re-run the stranded message. _rerun_requested: bool = field(default=False, init=False) _run_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False) - _acp_model_switch_lock: asyncio.Lock = field( - default_factory=asyncio.Lock, init=False - ) _callback_wrapper: AsyncCallbackWrapper | None = field(default=None, init=False) _lease: ConversationLease | None = field(default=None, init=False) _lease_generation: int | None = field(default=None, init=False) @@ -921,75 +912,27 @@ async def set_security_analyzer( ) async def switch_acp_model(self, model: str) -> None: - """Switch the model on a running ACP conversation. - - Runs the (blocking) protocol-level ``session/set_model`` round-trip in - a worker thread so the event loop is not blocked, then mirrors the new - model into ``meta.json`` so the switch survives an agent-server restart. + """Switch the model on a running ACP conversation, mid-conversation. + + Runs the (blocking) protocol-level ``session/set_model`` round-trip in a + worker thread, then mirrors the new model into ``meta.json`` so the + switch survives an agent-server restart: ``start()`` rebuilds the agent + from ``self.stored.agent`` and ``ConversationState.create()`` copies + that over the persisted base_state.json on resume. Only ``acp_model`` + needs updating — ``model_post_init`` re-derives the sentinel + ``llm.model`` on reload. """ - # Serialize concurrent switches for this conversation. The live switch - # and the meta.json mirror must commit as one unit; otherwise two - # interleaved requests could leave meta.json pointing at a different - # model than the last live switch (A switches live->A, B switches - # live->B and saves meta=B, then A saves meta=A while the session runs - # on B). The lock keeps persisted metadata consistent with the live - # session. - async with self._acp_model_switch_lock: - if self._conversation is None: - raise RuntimeError( - "Conversation is not active; it has not been started or has " - "been closed." - ) - conversation = self._conversation - - async def _switch_and_persist() -> None: - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, conversation.switch_acp_model, model) - # Persist the switch into meta.json. ``start()`` rebuilds the - # runtime agent from ``self.stored.agent``, and - # ``ConversationState.create()`` copies that agent over the - # persisted base_state.json on resume — so without mirroring the - # new model here, a restart would silently revert to the old one. - # Only ``acp_model`` needs updating: ``model_post_init`` - # re-derives the sentinel ``llm.model`` on reload. - self.stored = self.stored.model_copy( - update={ - "agent": self.stored.agent.model_copy( - update={"acp_model": model} - ) - } - ) - await self.save_meta() - - # ``run_in_executor`` cannot cancel the worker thread once it starts, - # so a cancellation (client disconnect / shutdown) could leave the - # live session + base_state.json switched while meta.json is never - # mirrored — and would release the lock with the worker still in - # flight. Run the switch + mirror as one shielded unit: on - # cancellation, let it finish (keeping the lock held) so persisted - # metadata matches the live session, then propagate the cancel. - task = asyncio.create_task(_switch_and_persist()) - try: - await asyncio.shield(task) - except asyncio.CancelledError: - # Let the in-flight switch settle (the worker thread can't be - # cancelled) so meta.json is mirrored, but always re-raise the - # cancellation. CancelledError is a BaseException, so the - # except below only catches the task's own failure — which we - # log rather than swallow silently, so e.g. a meta.json write - # that fails after the live switch is visible (meta could be - # left at the old model while base_state.json has the new one) - # instead of disappearing — then we re-raise the cancel. - try: - await task - except Exception: - logger.warning( - "ACP model switch task failed while handling " - "cancellation for conversation %s", - self.stored.id, - exc_info=True, - ) - raise + if self._conversation is None: + raise RuntimeError( + "Conversation is not active; it has not been started or has " + "been closed." + ) + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._conversation.switch_acp_model, model) + self.stored = self.stored.model_copy( + update={"agent": self.stored.agent.model_copy(update={"acp_model": model})} + ) + await self.save_meta() async def close(self): if self._lease_task is not None: @@ -1023,36 +966,10 @@ async def close(self): self._run_task = None await self._pub_sub.close() - # Coordinate with in-flight model switches: switch_acp_model holds this - # lock while its (shielded) worker applies the live switch and mirrors - # meta.json. Waiting for it here avoids tearing the conversation down - # from under an in-flight switch (which would leave the worker running - # against a closed ACP runtime and mirror meta.json for a torn-down - # session). The wait is bounded: a switch is normally sub-second, but a - # wedged server could otherwise hold the lock for the full - # acp_prompt_timeout while lease renewal is already stopped, so after - # the bound we proceed with teardown anyway. - switch_lock_held = False - try: - await asyncio.wait_for( - self._acp_model_switch_lock.acquire(), - timeout=ACP_SWITCH_LOCK_CLOSE_TIMEOUT_SECONDS, - ) - switch_lock_held = True - except TimeoutError: - logger.warning( - "Tearing down conversation %s without the model-switch lock; " - "a switch may still be in flight", - self.stored.id, - ) - try: - if self._conversation: - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, self._conversation.close) - self._conversation = None - finally: - if switch_lock_held: - self._acp_model_switch_lock.release() + if self._conversation: + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._conversation.close) + self._conversation = None if self._lease is not None and self._lease_generation is not None: self._lease.release(self._lease_generation) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index dcb9daf740..9511074c9d 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -235,7 +235,6 @@ async def _reapply_session_model_on_resume( agent_name: str, session_id: str, acp_model: str | None, - timeout: float, ) -> None: """Reapply the persisted model to a *resumed* session. @@ -244,29 +243,13 @@ async def _reapply_session_model_on_resume( the ACP server's default. This issues ``set_session_model`` so the resumed live session matches the serialized ``acp_model``. - The gating mirrors :meth:`ACPAgent.set_acp_model` exactly — attempt for - custom/unknown servers (``provider is None``, which ``set_acp_model`` also - lets switch and whose switches are persisted as authoritative) and for known - providers that support runtime switching; skip only known providers that - don't. This deliberately differs from the *initial-selection* gate - (``supports_set_session_model``): claude-agent-acp selects its initial model - via ``_meta`` yet supports ``set_session_model`` for later switches, so on - resume it needs this call to honour the persisted model rather than the - server default. - - Error handling mirrors :meth:`ACPAgent.set_acp_model`: - - - The call is bounded by ``timeout`` (the agent's ``acp_prompt_timeout``) so - a server that hangs after reconnect cannot block session startup forever; - a ``TimeoutError`` propagates. - - A server-internal failure (JSON-RPC ``-32603``) propagates — silently - continuing on the wrong model while serialized state claims otherwise - would be worse than a loud startup failure. - - A client/protocol rejection (e.g. ``method-not-found`` on a server that - doesn't actually support the call, or an invalid persisted model id) is - swallowed and logged — like the ``load_session`` fallback — so a - stale-capability/stale-model server can't break resume. The session keeps - the server default until the next explicit switch. + The gating mirrors :meth:`ACPAgent.set_acp_model` (attempt for custom/unknown + servers and known providers that support runtime switching; skip only known + providers that don't), deliberately differing from the initial-selection + gate: claude-agent-acp selects its initial model via ``_meta`` yet supports + ``set_session_model`` for later switches. A server that rejects the call is + tolerated (logged) — like the ``load_session`` fallback above — so resume + can't break; the session keeps the server default until the next switch. """ if not acp_model: return @@ -274,13 +257,8 @@ async def _reapply_session_model_on_resume( if provider is not None and not provider.supports_runtime_model_switch: return try: - await asyncio.wait_for( - conn.set_session_model(model_id=acp_model, session_id=session_id), - timeout=timeout, - ) + await conn.set_session_model(model_id=acp_model, session_id=session_id) except ACPRequestError as e: - if e.code in _RETRIABLE_SERVER_ERROR_CODES: - raise logger.warning( "Could not reapply model %r on resumed session %s (%s); the live " "session may run on the server default until the next switch", @@ -1286,7 +1264,6 @@ async def _init() -> tuple[str, str, str]: agent_name, session_id, self.acp_model, - self.acp_prompt_timeout, ) # Resolve the permission mode. Known providers each have their diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 12e5a51b4b..7dc415bd8b 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -1350,168 +1350,6 @@ async def test_switch_acp_model_persists_to_meta(self, tmp_path): assert isinstance(loaded.agent, ACPAgent) assert loaded.agent.acp_model == "new-model" - @pytest.mark.asyncio - async def test_switch_acp_model_finishes_meta_on_cancel(self, tmp_path): - """A cancel mid-switch still mirrors meta.json before propagating. - - ``run_in_executor`` cannot cancel the worker thread, so the live switch - may complete; ``switch_acp_model`` must finish the meta.json mirror - (holding the lock) before re-raising ``CancelledError``, or a restart - resumes from stale metadata. - """ - from openhands.sdk.agent import ACPAgent - - stored = StoredConversation( - id=uuid4(), - agent=ACPAgent(acp_command=["echo", "test"], acp_model="old-model"), - workspace=LocalWorkspace(working_dir=str(tmp_path)), - confirmation_policy=NeverConfirm(), - initial_message=None, - metrics=None, - ) - service = EventService(stored=stored, conversations_dir=tmp_path) - conv_dir = tmp_path / stored.id.hex - conv_dir.mkdir(parents=True, exist_ok=True) - - started = threading.Event() - release = threading.Event() - - def _slow_switch(model): - # Stand in for the blocking worker-thread switch; block until the - # test releases it so we can cancel while it is in flight. - started.set() - release.wait(timeout=5) - - conversation = MagicMock() - conversation.switch_acp_model.side_effect = _slow_switch - service._conversation = conversation - - task = asyncio.create_task(service.switch_acp_model("new-model")) - loop = asyncio.get_running_loop() - # Wait until the worker thread is mid-switch, then cancel. - await loop.run_in_executor(None, started.wait, 5) - task.cancel() - # Let the cancellation reach the shield (coroutine enters the except and - # starts awaiting the in-flight worker) before releasing the worker. - for _ in range(3): - await asyncio.sleep(0) - release.set() - - with pytest.raises(asyncio.CancelledError): - await task - - # Despite the cancel, the worker ran to completion and meta.json was - # mirrored with the new model. - conversation.switch_acp_model.assert_called_once_with("new-model") - assert isinstance(service.stored.agent, ACPAgent) - assert service.stored.agent.acp_model == "new-model" - loaded = StoredConversation.model_validate_json( - (conv_dir / "meta.json").read_text() - ) - assert isinstance(loaded.agent, ACPAgent) - assert loaded.agent.acp_model == "new-model" - - @pytest.mark.asyncio - async def test_switch_acp_model_cancel_takes_precedence_over_failure( - self, tmp_path - ): - """A switch failure during the cancellation window must not mask the - cancel: ``CancelledError`` still propagates, not the switch's error. - """ - from openhands.sdk.agent import ACPAgent - - stored = StoredConversation( - id=uuid4(), - agent=ACPAgent(acp_command=["echo", "test"], acp_model="old-model"), - workspace=LocalWorkspace(working_dir=str(tmp_path)), - confirmation_policy=NeverConfirm(), - initial_message=None, - metrics=None, - ) - service = EventService(stored=stored, conversations_dir=tmp_path) - conv_dir = tmp_path / stored.id.hex - conv_dir.mkdir(parents=True, exist_ok=True) - - started = threading.Event() - release = threading.Event() - - def _failing_switch(model): - started.set() - release.wait(timeout=5) - raise RuntimeError("switch boom") - - conversation = MagicMock() - conversation.switch_acp_model.side_effect = _failing_switch - service._conversation = conversation - - task = asyncio.create_task(service.switch_acp_model("new-model")) - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, started.wait, 5) - task.cancel() - for _ in range(3): - await asyncio.sleep(0) - release.set() - - # The switch raises RuntimeError after release, but the cancellation - # takes precedence — the caller sees CancelledError, not "switch boom". - with pytest.raises(asyncio.CancelledError): - await task - - @pytest.mark.asyncio - async def test_close_waits_for_in_flight_switch(self, tmp_path): - """close() acquires the switch lock, so it waits for an in-flight switch - to finish (mirroring meta.json) before tearing down the conversation. - """ - from openhands.sdk.agent import ACPAgent - - stored = StoredConversation( - id=uuid4(), - agent=ACPAgent(acp_command=["echo", "test"], acp_model="old-model"), - workspace=LocalWorkspace(working_dir=str(tmp_path)), - confirmation_policy=NeverConfirm(), - initial_message=None, - metrics=None, - ) - service = EventService(stored=stored, conversations_dir=tmp_path) - conv_dir = tmp_path / stored.id.hex - conv_dir.mkdir(parents=True, exist_ok=True) - - started = threading.Event() - release = threading.Event() - order: list[str] = [] - - def _slow_switch(model): - started.set() - release.wait(timeout=5) - order.append("switch") - - conversation = MagicMock() - conversation.switch_acp_model.side_effect = _slow_switch - conversation.close.side_effect = lambda: order.append("close") - service._conversation = conversation - - switch_task = asyncio.create_task(service.switch_acp_model("new-model")) - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, started.wait, 5) - - # Start close() while the switch holds the lock; it must block on the - # lock rather than tear the conversation down mid-switch. - close_task = asyncio.create_task(service.close()) - for _ in range(3): - await asyncio.sleep(0) - assert order == [] # neither the switch nor close() has progressed yet - - release.set() - await switch_task - await close_task - - # The switch finished (and mirrored the model) before close() tore the - # conversation down. - assert order == ["switch", "close"] - assert isinstance(service.stored.agent, ACPAgent) - assert service.stored.agent.acp_model == "new-model" - assert service._conversation is None - class TestEventServiceStartWithRunningStatus: """Test cases for EventService.start handling of RUNNING execution status.""" diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 053217dd5f..6d2557b83e 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -3221,7 +3221,7 @@ async def test_claude_reapplies_persisted_model_on_resume(self): # would skip it. conn = AsyncMock() await _reapply_session_model_on_resume( - conn, "claude-agent-acp", "sess-1", "claude-haiku-4-5-20251001", 600.0 + conn, "claude-agent-acp", "sess-1", "claude-haiku-4-5-20251001" ) conn.set_session_model.assert_awaited_once_with( model_id="claude-haiku-4-5-20251001", session_id="sess-1" @@ -3231,7 +3231,7 @@ async def test_claude_reapplies_persisted_model_on_resume(self): async def test_codex_reapplies_persisted_model_on_resume(self): conn = AsyncMock() await _reapply_session_model_on_resume( - conn, "codex-acp", "sess-1", "gpt-5.4/low", 600.0 + conn, "codex-acp", "sess-1", "gpt-5.4/low" ) conn.set_session_model.assert_awaited_once_with( model_id="gpt-5.4/low", session_id="sess-1" @@ -3240,9 +3240,7 @@ async def test_codex_reapplies_persisted_model_on_resume(self): @pytest.mark.asyncio async def test_missing_model_skips_reapply(self): conn = AsyncMock() - await _reapply_session_model_on_resume( - conn, "claude-agent-acp", "sess-1", None, 600.0 - ) + await _reapply_session_model_on_resume(conn, "claude-agent-acp", "sess-1", None) conn.set_session_model.assert_not_called() @pytest.mark.asyncio @@ -3253,7 +3251,7 @@ async def test_unknown_provider_attempts_reapply(self): # resumed session would silently revert to the server default). conn = AsyncMock() await _reapply_session_model_on_resume( - conn, "some-custom-acp", "sess-1", "whatever", 600.0 + conn, "some-custom-acp", "sess-1", "whatever" ) conn.set_session_model.assert_awaited_once_with( model_id="whatever", session_id="sess-1" @@ -3280,9 +3278,7 @@ async def test_known_unsupported_provider_skips_reapply(self): "openhands.sdk.agent.acp_agent.detect_acp_provider_by_agent_name", return_value=unsupported, ): - await _reapply_session_model_on_resume( - conn, "legacy-acp", "sess-1", "x", 600.0 - ) + await _reapply_session_model_on_resume(conn, "legacy-acp", "sess-1", "x") conn.set_session_model.assert_not_called() @pytest.mark.asyncio @@ -3295,37 +3291,23 @@ async def test_client_rejection_is_swallowed_on_resume(self): code=-32601, message="method not found" ) await _reapply_session_model_on_resume( - conn, "some-custom-acp", "sess-1", "whatever", 600.0 + conn, "some-custom-acp", "sess-1", "whatever" ) conn.set_session_model.assert_awaited_once() @pytest.mark.asyncio - async def test_server_error_propagates_on_resume(self): - # A server-internal failure (-32603) must NOT be silently swallowed: - # continuing on the wrong model while serialized state claims otherwise - # is worse than a loud startup failure (matches set_acp_model). + async def test_any_request_error_is_swallowed_on_resume(self): + # Any ACPRequestError (here a -32603 server error) is tolerated on + # resume — like the load_session fallback — so a flaky/stale server + # can't break session startup; the session keeps the server default. conn = AsyncMock() conn.set_session_model.side_effect = ACPRequestError( code=-32603, message="internal error" ) - with pytest.raises(ACPRequestError): - await _reapply_session_model_on_resume( - conn, "codex-acp", "sess-1", "gpt-5.4/low", 600.0 - ) - - @pytest.mark.asyncio - async def test_hang_times_out_on_resume(self): - # A server that hangs after reconnect must not block session startup - # forever; the bounded call raises TimeoutError instead. - async def _hang(**kwargs): - await asyncio.sleep(10) - - conn = AsyncMock() - conn.set_session_model = AsyncMock(side_effect=_hang) - with pytest.raises(TimeoutError): - await _reapply_session_model_on_resume( - conn, "codex-acp", "sess-1", "gpt-5.4/low", 0.01 - ) + await _reapply_session_model_on_resume( + conn, "codex-acp", "sess-1", "gpt-5.4/low" + ) + conn.set_session_model.assert_awaited_once() class TestSetACPModel: