Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0a180f1
fix(sdk): assign condenser LLM usage id
openhands-agent May 23, 2026
43032a6
fix(sdk): reset condenser LLM metrics
openhands-agent May 24, 2026
ca04e8c
fix(acp): accept user messages during async turns
neubig May 24, 2026
f50ed72
fix(agent-server): stream ACP text deltas
neubig May 24, 2026
71f8148
fix(acp): interrupt running turn on new user message
neubig May 24, 2026
b9dc93b
fix(agent-server): handle ACP string token deltas
neubig May 24, 2026
b336d05
merge main into PR #3376
openhands-agent May 24, 2026
6c89026
fix: address ACP async turn races (#3376)
openhands-agent May 24, 2026
49583a2
fix: satisfy ACP prompt future typing (#3376)
openhands-agent May 24, 2026
55b6c38
test: update ACP arun prompt snapshot test (#3376)
openhands-agent May 24, 2026
c9bbb68
fix: close ACP async ordering gaps (#3376)
openhands-agent May 24, 2026
ce68215
fix: address ACP cancellation edge cases (#3376)
openhands-agent May 24, 2026
f581a01
fix: address ACP review edge cases (#3376)
openhands-agent May 24, 2026
f1ca28d
Merge branch 'pr-3368-fix-condenser-usage-id' into codex/acp-live-mes…
neubig May 24, 2026
ffeb881
fix: close ACP rerun race windows (#3376)
openhands-agent May 24, 2026
c1c37ca
Merge branch 'main' into codex/acp-live-message-deltas
neubig May 24, 2026
1c390af
fix(acp): reassign agent state for prompt tracking
neubig May 24, 2026
c3749ff
fix(acp): resume session after cancel drain timeout
neubig May 25, 2026
6246d1e
Address ACP async review races
May 25, 2026
682fad9
Clarify ACP queued-message cleanup fixes
May 25, 2026
741399e
Fix ACP resume cursor after cancellation
May 25, 2026
89a68ff
Treat ACP prompt timeout as idle timeout
May 25, 2026
81b4713
Restore hard ACP prompt timeout
May 25, 2026
70e44e2
Fix ACP interrupt cursor races
May 25, 2026
ff3b45b
Format EventService ACP rerun logic
May 25, 2026
af1ac3f
Use reassignment-safe ACP cursor state updates
May 25, 2026
c988567
Fix remaining ACP async race reviews
May 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 134 additions & 34 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
)
from openhands.agent_server.pub_sub import PubSub, Subscriber
from openhands.sdk import LLM, AgentBase, Event, Message, get_logger
from openhands.sdk.agent import ACPAgent
from openhands.sdk.conversation.base import BaseConversation
from openhands.sdk.conversation.impl.local_conversation import LocalConversation
from openhands.sdk.conversation.impl.local_conversation import (
ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID,
ACP_SUPERSEDE_INFLIGHT_PROMPT,
LocalConversation,
)
from openhands.sdk.conversation.response_utils import get_agent_final_response
from openhands.sdk.conversation.secret_registry import SecretValue
from openhands.sdk.conversation.state import (
Expand Down Expand Up @@ -71,6 +76,10 @@ class EventService:
# Set when a send_message(run=True) is rejected because a run is still
# wrapping up; consumed by _run_and_publish to re-run the stranded message.
_rerun_requested: bool = field(default=False, init=False)
# Set only for the internal ACP interrupt/restart path triggered by a new
# send_message(run=True). Explicit user pause/interrupt clears it so user
# stop intent wins over an earlier automatic restart request.
_acp_internal_rerun_requested: bool = field(default=False, init=False)
_run_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
_callback_wrapper: AsyncCallbackWrapper | None = field(default=None, init=False)
_lease: ConversationLease | None = field(default=None, init=False)
Expand Down Expand Up @@ -422,8 +431,18 @@ async def send_message(self, message: Message, run: bool = False):
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._conversation.send_message, message)
if run:
(
did_mark_acp_prompt_superseded,
active_acp_prompt_has_latest_message,
) = await self._mark_running_acp_prompt_superseded()
interrupted_acp = False
if did_mark_acp_prompt_superseded:
self._acp_internal_rerun_requested = True
interrupted_acp = True
await self.interrupt(internal_acp_rerun=True)
try:
await self.run()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: After an internal ACP supersede, this coroutine awaits interrupt(internal_acp_rerun=True) and then always calls run(). If the user explicitly presses Stop/Pause while that internal interrupt is draining, that explicit call clears the rerun flags, but this suspended send_message() still resumes here and restarts from PAUSED. Please re-check an internal rerun generation/flag after interrupt() returns, or let _run_and_publish own the restart, so explicit user stop intent wins.

self._acp_internal_rerun_requested = False
except ValueError as e:
# run() refused. If a run is still wrapping up (its
# wait_for_pending tail), the message we just appended won't be
Expand All @@ -433,8 +452,53 @@ async def send_message(self, message: Message, run: bool = False):
# is what keeps a deliberate run=False append, or an IDLE reached
# via another path, from triggering an unwanted run.
# "inactive_service" is terminal and must not re-arm.
if str(e) == "conversation_already_running":
if (
str(e) == "conversation_already_running"
and not active_acp_prompt_has_latest_message
):
self._rerun_requested = True
if interrupted_acp:
self._acp_internal_rerun_requested = True

def _mark_running_acp_prompt_superseded_sync(self) -> tuple[bool, bool]:
"""Mark the currently running ACP prompt superseded if needed.

The tuple is ``(did_mark_superseded, active_prompt_has_latest_message)``.
If the running ACP prompt has already advanced to the newly appended
user message, interrupting it would cancel the replacement prompt and
strand that message behind the persisted cursor.
"""
if not self._conversation:
return (False, False)
if self._run_task is None or self._run_task.done():
return (False, False)
if not isinstance(self._conversation.agent, ACPAgent):
return (False, False)
with self._conversation._state as state:
if state.execution_status != ConversationExecutionStatus.RUNNING:
return (False, False)
inflight_prompt_user_message_id = state.agent_state.get(
ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID
)
last_user_message_id = state.last_user_message_id
if inflight_prompt_user_message_id is None or last_user_message_id is None:
return (False, False)
active_prompt_has_latest_message = (
inflight_prompt_user_message_id == last_user_message_id
)
if active_prompt_has_latest_message:
return (False, True)
state.agent_state = {
**state.agent_state,
ACP_SUPERSEDE_INFLIGHT_PROMPT: True,
Comment thread
neubig marked this conversation as resolved.
}
return (True, False)

async def _mark_running_acp_prompt_superseded(self) -> tuple[bool, bool]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None, self._mark_running_acp_prompt_superseded_sync
)

async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID:
subscriber_id = self._pub_sub.subscribe(subscriber)
Expand Down Expand Up @@ -624,41 +688,55 @@ async def start(self):
self._pub_sub, loop=asyncio.get_running_loop()
)

# Only wire token streaming if at least one LLM has stream=True.
# The LLM silently ignores on_token when stream is off, but skipping
# the wiring lets us log the decision so operators can tell from a
# log line whether deltas will flow.
streaming_enabled = any(llm.stream for llm in agent.get_all_llms())
from openhands.sdk.agent import ACPAgent

# Only wire token streaming for agents that can actually emit token
# callbacks. SDK LLM agents need stream=True, while ACP agents emit
# AgentMessageChunk text through their bridge without exposing an LLM.
streaming_enabled = isinstance(agent, ACPAgent) or any(
llm.stream for llm in agent.get_all_llms()
)
logger.debug(
"Token streaming: %s",
"enabled" if streaming_enabled else "disabled (no LLM has stream=True)",
)

def _token_streaming_callback(chunk: LLMStreamChunk) -> None:
def _publish_stream_delta(
content: str | None = None,
reasoning_content: str | None = None,
) -> None:
# Published directly to _pub_sub (not via _callback_wrapper) so
# deltas reach subscribers but are NOT persisted to
# ConversationState.events. See StreamingDeltaEvent docstring.
if not self._main_loop or not self._main_loop.is_running():
return
# Use `is not None` rather than truthiness: some providers
# emit legitimate empty-string chunks at stream boundaries
# (e.g. after a tool call) that we still want to forward.
if content is None and reasoning_content is None:
return
event = StreamingDeltaEvent(
content=content,
reasoning_content=reasoning_content,
)
with suppress(RuntimeError):
asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop)

def _token_streaming_callback(chunk: LLMStreamChunk | str) -> None:
if isinstance(chunk, str):
_publish_stream_delta(content=chunk)
return

for choice in chunk.choices or ():
delta = choice.delta
if delta is None:
continue
content = getattr(delta, "content", None)
reasoning = getattr(delta, "reasoning_content", None)
# Use `is not None` rather than truthiness: some providers
# emit legitimate empty-string chunks at stream boundaries
# (e.g. after a tool call) that we still want to forward.
if content is None and reasoning is None:
continue
event = StreamingDeltaEvent(
_publish_stream_delta(
content=content if isinstance(content, str) else None,
reasoning_content=reasoning if isinstance(reasoning, str) else None,
)
with suppress(RuntimeError):
asyncio.run_coroutine_threadsafe(
self._pub_sub(event), self._main_loop
)

conversation = LocalConversation(
agent=agent,
Expand Down Expand Up @@ -812,21 +890,38 @@ async def _run_and_publish():
# wrapping up. A send_message(run=True) that arrived during
# the wait_for_pending() tail above had its run() rejected as
# "conversation_already_running" and suppressed, setting
# _rerun_requested. Honor it only while the conversation is
# still IDLE — i.e. that message is genuinely pending. If the
# run loop was still alive it already absorbed the message
# (LocalConversation.run() keeps looping on FINISHED) and we
# are FINISHED here, so the IDLE guard avoids a redundant run.
# A deliberate run=False append, or an IDLE reached via
# another path, never sets the flag.
if self._rerun_requested:
self._rerun_requested = False
if (
await self._get_execution_status()
== ConversationExecutionStatus.IDLE
):
with suppress(ValueError):
# _rerun_requested. Honor it while the conversation is IDLE
# (pending input) or internally ACP-interrupted PAUSED (the
# old task finished its interrupt before the replacement run
# could start). Explicit user pause/interrupt clears the
# internal ACP flag, so user stop intent wins over an older
# automatic restart request. If the run loop was still alive
# it already absorbed the message and we are FINISHED here,
# so the guard avoids a redundant run. A deliberate
# run=False append, or an IDLE reached via another path,
# never sets the flag.
rerun_requested = self._rerun_requested
acp_internal_rerun_requested = self._acp_internal_rerun_requested
self._rerun_requested = False
self._acp_internal_rerun_requested = False
if rerun_requested:
status = await self._get_execution_status()
should_restart = status == ConversationExecutionStatus.IDLE or (
acp_internal_rerun_requested
and status == ConversationExecutionStatus.PAUSED
and isinstance(conversation.agent, ACPAgent)
)
if should_restart:
try:
await self.run()
except ValueError as e:
if str(e) == "conversation_already_running":
self._rerun_requested = True
self._acp_internal_rerun_requested = (
acp_internal_rerun_requested
)
else:
raise

# Create task but don't await it - runs in background
self._run_task = asyncio.create_task(_run_and_publish())
Expand Down Expand Up @@ -857,25 +952,30 @@ async def reject_pending_actions(self, reason: str):

async def pause(self):
if self._conversation:
self._rerun_requested = False
self._acp_internal_rerun_requested = False
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._conversation.pause)
# Publish state update after pause to ensure stats are updated
await self._publish_state_update()

async def interrupt(self):
async def interrupt(self, *, internal_acp_rerun: bool = False):
"""Immediately cancel an in-flight async LLM call.

Delegates to :meth:`LocalConversation.interrupt` which cancels the
``arun()`` task. If no async run is in progress the call falls
back to :meth:`pause`.
"""
if self._conversation:
if not internal_acp_rerun:
self._rerun_requested = False
self._acp_internal_rerun_requested = False
self._conversation.interrupt()
# Wait for the run task to finish so we can publish the final
# state update (PAUSED + InterruptEvent) cleanly.
if self._run_task is not None and not self._run_task.done():
with suppress(Exception):
await asyncio.wait_for(self._run_task, timeout=5.0)
await asyncio.wait_for(asyncio.shield(self._run_task), timeout=5.0)
# Only clear _run_task if it actually finished; if
# wait_for timed out the task may still be running and
# clearing prematurely would allow a second run() to
Expand Down
Loading
Loading