[codex] Accept ACP user messages during async turns#3376
Conversation
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: openhands <openhands@all-hands.dev>
Python API breakage checks — ✅ PASSEDResult: ✅ PASSED |
REST API breakage checks (OpenAPI) — ✅ PASSEDResult: ✅ PASSED |
Coverage Report •
|
||||||||||||||||||||||||||||||||||||||||
all-hands-bot
left a comment
There was a problem hiding this comment.
Clean approach to the live-streaming problem: reusing the existing StreamingDeltaEvent / _pub_sub path for ACP deltas is exactly right — transient chunks don't belong in state.events, and the new helper _publish_streaming_delta_from_thread correctly encapsulates the cross-thread scheduling so neither the LLM token path nor the ACP path has to repeat the run_coroutine_threadsafe / suppress boilerplate. Test coverage is solid (wiring, unwiring, and actual delta delivery are all exercised). Two items worth a look before merge are called out below.
This review was generated by an AI agent (OpenHands) on behalf of the repository owner via OpenHands Automation.
dc3d0a3 to
ca04e8c
Compare
Co-authored-by: openhands <openhands@all-hands.dev>
|
Merged |
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable, but I found two ACP concurrency races that should be addressed before merge: one can replay a just-arrived user prompt, and one can let cancelled prompt updates leak into the next turn. Risk: 🟡 medium, since this changes conversation loop/cancellation behavior.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26368622109
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the two ACP async concurrency review items in 6c89026 and verified locally with ruff plus the full EventService test file (83 passed). Ready for another automated review.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: openhands <openhands@all-hands.dev>
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few ACP concurrency ordering gaps that should be addressed before merge. Risk: 🟡 medium because this changes async conversation loop/cancellation behavior.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26369467316
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest ACP concurrency review in c9bbb68: atomic async finalization, finish-gap queued-message reconciliation, and cancellation drain-before-failure ordering. Verified locally with ruff, pyright, and targeted ACP async tests.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few ACP cancellation/queued-message edge cases plus one slow test that should be addressed before merge. Risk: 🟡 medium because this changes async conversation and cancellation behavior.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26369954489
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest ACP review in ce68215: cancelled-prompt drain timeout now restarts/quarantines the ACP session, timeout ordering drains before terminal failure emission, queued messages at the iteration cap are left IDLE for a follow-up run, and the slow cancellation test now releases its fake prompt promptly. Verified locally with ruff, pyright, ACP arun tests, and the ACP cancellation test.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found several ACP cancellation/session-restart and queued-message edge cases that should be addressed before merge. Risk: 🟡 medium because this changes async conversation and cancellation behavior.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26370828864
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest ACP cancellation/session review in f581a01: shielded interrupt waits, bounded ACP cancel send, suffix reset on forced restarts, deferred user-cancel restarts, FIFO processing for multiple queued messages, and immediate stop after ACP ERROR/STUCK. Verified locally with ruff, pyright, targeted ACP arun tests, and ACP cancellation regression.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few ACP async queue/cancellation races to address before merge. Risk: 🟡 medium because this changes agent/conversation loop behavior; a human maintainer should also decide on eval coverage before approval.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26371395754
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest ACP queue/cancellation review in ffeb881: persisted FIFO cursor across arun invocations, kept explicit rerun requests through ACP PAUSED interrupts, and serialized timeout/error finalization under the state lock. Verified locally with ruff, pyright, ACP arun regression tests, and ACP cancellation regression.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
Co-authored-by: openhands <openhands@all-hands.dev>
|
Added a follow-up fix in c3749ff after investigating conversation Root cause: after two rapid user interruptions, the ACP bridge timed out waiting for the cancelled prompt to drain. Fix: preserve the persisted ACP session id/cwd across restart-after-drain-timeout so the restarted subprocess attempts Verification:
|
all-hands-bot
left a comment
There was a problem hiding this comment.
The iterative hardening across this PR (FIFO message tracking, shielded drain, deferred cancel-path restart, suffix-marker clearing on fallback) is solid work. Earlier rounds of review + fixes addressed the major concurrency races. Three remaining points below — two in production code, one in tests.
This review was generated by an AI agent (OpenHands) on behalf of the user via OpenHands Automation.
| with state: | ||
| self._emit_turn_timeout(time.monotonic() - t0, state, on_event) | ||
| if not drained: | ||
| self._restart_session_after_drain_timeout(state, on_event) |
There was a problem hiding this comment.
🟡 Suggestion: The cancel path defers subprocess restart to the next turn (_restart_session_on_next_turn = True) to avoid holding the state lock during slow I/O. The timeout path calls _restart_session_after_drain_timeout here while with state: is already held, which runs _cleanup() + init_state() (subprocess spawn + ACP connection) under that lock. This blocks send_message() for the duration of the restart.
For consistency you could mirror the cancel path: set _restart_session_on_next_turn = True here too, and let the flag be consumed at the top of the next astep(). The timeout path already sets ERROR via _emit_turn_timeout, so arun() will break; the restart would then be picked up by the subsequent init_state() call at the start of the next run (which re-opens the server anyway). If the inline-restart ordering is intentional for the timeout case, a brief comment explaining why it differs from the cancel path would help future readers.
|
|
||
| acp_user_message_changed = ( | ||
| self._state.last_user_message_id is not None | ||
| and self._state.last_user_message_id != acp_step_user_message_id |
There was a problem hiding this comment.
🟡 Suggestion: acp_step_user_message_id is None when no queued message was found (e.g., last_acp_prompt_user_message_id already points to the newest event). In that case astep() is called with prompt_message=None and falls back to scanning state.events in reverse — but last_acp_prompt_user_message_id is not updated afterward (line 1096 has the if acp_step_user_message_id is not None: guard). Then this check evaluates as last_user_message_id is not None and last_user_message_id != None → True, spuriously signalling that a new message arrived and setting RUNNING for another iteration. On the next iteration the pattern repeats, so the loop burns through max_iteration_per_run re-prompting the last already-processed message.
In practice EventService only calls run() when a new message exists, so this edge case is unlikely to surface there. Standalone callers that loop arun() without a new message could hit it. A guard like if acp_step_user_message is None: break # no queued message before the await self.agent.astep(...) call would close the gap and make the intent explicit.
|
|
||
| completed_promptly = True | ||
| try: | ||
| await asyncio.wait_for(send_done.wait(), timeout=0.2) |
There was a problem hiding this comment.
🟡 Suggestion: The 0.2 s deadline is a design assertion ("send_message must not block on the in-flight prompt"), not a performance benchmark, so the intent is good. However it can produce false negatives on heavily loaded CI runners where thread scheduling latency alone exceeds 200 ms. Consider replacing the wait_for + completed_promptly flag pattern with a threading.Event that send_message sets synchronously once the lock is acquired, then assert that the event was set before release_first_step. That approach is instant rather than time-bounded and cannot flake under load.
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
Verified ACP live message behavior through a real stdio ACP JSON-RPC harness; the PR achieves its stated goal with no QA issues found.
Does this PR achieve its stated goal?
Yes. I exercised the changed SDK and agent-server paths with a minimal ACP server instead of relying on unit tests: on the base commit, ACP send_message() waited behind an in-flight prompt and ACP string chunks did not produce streaming deltas; on the PR, the intervening user message persisted immediately, the queued ACP prompt ran, streaming deltas were published, and EventService sent session/cancel before restarting with the replacement prompt. The condenser settings path also constructs separate default and condenser LLM usage IDs on the PR.
| Phase | Result |
|---|---|
| Environment Setup | ✅ uv sync --dev --frozen completed (Checked 235 packages in 2ms) |
| CI Status | 🟡 Core build/test/pre-commit checks are green; OpenHands pr-review and qa-changes automation checks are still in progress |
| Functional Verification | ✅ ACP queued messages, streaming deltas, interrupt/cancel restart, and condenser usage IDs verified |
Functional Verification
Test 1: Local ACP conversation accepts a user message while a prompt is in flight
Step 1 — Reproduce baseline without the PR:
Checked out origin/main (2aa5256e2147a3252be8d1f96600f627ec27abbb) and ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py local_queue:
QA_RESULT={"cancel_events": [], "completed_within_250ms": false, "mode": "local_queue", "prompts": ["initial request", "intervening request"], "second_prompt_auto": true, "send_elapsed_if_prompt": null, "send_elapsed_total": 2.7864337420000425}This confirms the old behavior: the intervening send_message() did not persist promptly; it waited ~2.8s for the in-flight ACP prompt/usage wait to complete.
Step 2 — Apply the PR's changes:
Checked out c3749ffa83e04f8f64ff102aa5529ef5c65e591c.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py local_queue:
QA_RESULT={"cancel_events": [], "completed_within_250ms": true, "mode": "local_queue", "prompts": ["initial request", "intervening request"], "second_prompt_auto": true, "send_elapsed_if_prompt": 0.0020447240000294187, "send_elapsed_total": 0.0020454750000453714}This shows the PR fixes the core async-turn issue: the new user message persisted in ~2ms while the first ACP prompt was still active, and the second ACP prompt was processed automatically in FIFO order.
Test 2: ACP plain-string token callbacks publish streaming deltas
Step 1 — Reproduce baseline without the PR:
Checked out origin/main and ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py event_streaming:
QA_RESULT={"delta_contents": [], "delta_count": 0, "mode": "event_streaming", "prompts": ["stream this"]}This confirms the previous agent-server streaming path did not surface ACP plain-string chunks as StreamingDeltaEvents.
Step 2 — Apply the PR's changes:
Checked out c3749ffa83e04f8f64ff102aa5529ef5c65e591c.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py event_streaming:
QA_RESULT={"delta_contents": ["delta for stream this"], "delta_count": 1, "mode": "event_streaming", "prompts": ["stream this"]}This shows the ACP string chunk reached subscribers as a transient streaming delta.
Test 3: Agent-server interrupts an in-flight ACP turn and sends session/cancel
Step 1 — Reproduce baseline without the PR:
Checked out origin/main and ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py event_interrupt:
QA_RESULT={"cancel_count": 0, "mode": "event_interrupt", "prompts": ["initial long request", "replacement request"], "replacement_prompt_seen": true, "send_elapsed": 7.073722684000018}This shows the old EventService path did not send session/cancel; the second send_message(run=True) waited for the first long ACP prompt and then for the replacement run.
Step 2 — Apply the PR's changes:
Checked out c3749ffa83e04f8f64ff102aa5529ef5c65e591c.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py event_interrupt:
QA_RESULT={"cancel_count": 1, "mode": "event_interrupt", "prompts": ["initial long request", "replacement request"], "replacement_prompt_seen": true, "send_elapsed": 2.009888352999951}This confirms the PR sends ACP session/cancel during the interrupted turn and restarts against the replacement user prompt instead of waiting for the old long prompt to complete.
Test 4: Condenser LLM usage IDs stay separated
Step 1 — Establish baseline:
On origin/main, ran a real settings construction script using OpenHandsAgentSettings, CondenserSettings(enabled=True), and create_agent_from_settings(...):
Agent
agent_llm_usage_id= default
condenser= LLMSummarizingCondenser
all_llm_usage_ids= ['default', 'condenser']
Step 2 — Apply the PR's changes and re-run:
On c3749ffa83e04f8f64ff102aa5529ef5c65e591c, ran the same script:
Agent
agent_llm_usage_id= default
condenser= LLMSummarizingCondenser
all_llm_usage_ids= ['default', 'condenser']
This verifies the PR branch exposes distinct runtime usage IDs for agent and condenser LLMs; the current origin/main already has the same behavior, so this was a no-regression check for the copied fix.
Issues Found
None.
This review was created by an AI agent (OpenHands) on behalf of the user.
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few ACP async queue/cancellation edge cases that should be addressed before merge. Risk: 🟡 medium because this changes agent/conversation loop behavior; a human maintainer should decide eval coverage before approval.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26379406959
| # the message and we are FINISHED here, so the guard avoids a | ||
| # redundant run. A deliberate run=False append, or an IDLE | ||
| # reached via another path, never sets the flag. | ||
| if self._rerun_requested: |
There was a problem hiding this comment.
🟠 Important: _rerun_requested now stays set when the current run finishes in a non-restartable state (for example, FINISHED after the still-running loop already absorbed the run=True message). That stale flag can be consumed by a later IDLE/ACP-PAUSED cleanup and start an unexpected run, including after a later run=False append. Please clear/consume the flag before checking status, and set it back only when the replacement run() attempt races with another active task.
| ] | ||
| if last_acp_prompt_user_message_id is None: | ||
| acp_step_user_message = ( | ||
| user_messages[-1] if user_messages else None |
There was a problem hiding this comment.
🟠 Important: With no persisted ACP cursor, this starts from the latest user message. If a client queued multiple user messages before the first arun() (e.g. run=False sends followed by run=True), every earlier message is skipped and the latest id is persisted as the cursor. Since this path is now responsible for FIFO ACP processing, the initial cursor case should also process the earliest unprocessed user message, or establish an explicit baseline before selecting the next prompt.
| except TimeoutError: | ||
| self._emit_turn_timeout(time.monotonic() - t0, state, on_event) | ||
| await self._arequest_session_cancel() | ||
| drained = await self._drain_cancelled_prompt(prompt_future) |
There was a problem hiding this comment.
🟠 Important: If the shielded prompt completes during _drain_cancelled_prompt(), drained is True but the completed PromptResponse is discarded and the code still emits a timeout while preserving the same ACP session. That can leave ACP server history with a completed turn that OpenHands records as a timeout. Please have the drain helper return the completed result/exception so it can be finalized, or restart/discard the session whenever a completed turn is intentionally ignored.
| with state: | ||
| self._emit_turn_timeout(time.monotonic() - t0, state, on_event) | ||
| if not drained: | ||
| self._restart_session_after_drain_timeout(state, on_event) |
There was a problem hiding this comment.
🟠 Important: On drain timeout, _restart_session_after_drain_timeout() replaces _client; the finally block then calls _clear_turn_callbacks() on the new bridge, leaving the old bridge (the one still owned by the undrained prompt) with on_event/on_token wired. Clear the old callbacks before cleanup/restart, or have the restart helper clear them before swapping clients, so late old-session updates cannot append after the synthetic timeout/failure events.
| await asyncio.sleep(60) | ||
| # Block until cancellation releases the prompt, exercising | ||
| # prompt quiescing without leaving a long-running portal task. | ||
| await asyncio.to_thread(prompt_released.wait) |
There was a problem hiding this comment.
🟡 Suggestion: If the cancellation path regresses before _fake_cancel runs, this to_thread worker blocks forever and executor.close() can hang after the test times out, masking the actual failure. Release prompt_released in a cleanup finally (or use a bounded wait) before closing the executor so the test fails cleanly.
Summary
StreamingDeltaEvents, including ACP providers that invoke token callbacks with plain string chunksastep()awaits so websocket/API user messages can be persisted immediatelysession/cancelwhen an in-flight turn is interruptedRegression Tests
test_acp_string_token_callback_publishes_deltafails on the prior implementation because ACP plain-string token callbacks raised before publishing anyStreamingDeltaEventtest_acp_arun_accepts_user_message_while_step_is_in_flightfails on main becausesend_message()waits behind the in-flight ACP prompttests/sdk/test_settings.py -k condensercovers the copied condenser LLM usage-id behavior from fix(sdk): assign condenser LLM usage id #3368Validation
uv run pytest -q tests/sdk/test_settings.py -k condenseruv run pytest -q tests/agent_server/test_event_streaming.py tests/agent_server/test_event_service.py::TestEventServiceSendMessage tests/sdk/agent/test_acp_agent.py::TestACPAgentAstep tests/sdk/conversation/local/test_conversation_send_message.pyuv run ruff check openhands-sdk/openhands/sdk/settings/model.py tests/sdk/test_settings.py openhands-agent-server/openhands/agent_server/event_service.py tests/agent_server/test_event_streaming.pyAgent Server images for this PR
• GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server
Variants & Base Images
eclipse-temurin:17-jdknikolaik/python-nodejs:python3.13-nodejs22-slimgolang:1.21-bookwormPull (multi-arch manifest)
# Each variant is a multi-arch manifest supporting both amd64 and arm64 docker pull ghcr.io/openhands/agent-server:c3749ff-pythonRun
All tags pushed for this build
About Multi-Architecture Support
c3749ff-python) is a multi-arch manifest supporting both amd64 and arm64c3749ff-python-amd64) are also available if needed