fix: forward task_started / task_notification between turns via a background drain#713
Open
julianmesa-gitkraken wants to merge 2 commits into
Conversation
52eac3f to
7ff658b
Compare
…nFollowupCollector Building blocks for the single-consumer session reader (issue agentclientprotocol#336), in a standalone module so each unit is testable in isolation: - TurnQueue: single-producer / single-consumer async queue. The reader is the sole producer; prompt() is the sole consumer per turn. close() and error() propagate to the consumer; clear() drops leftovers for an abandoned turn; take() guards against concurrent consumers. - classifyOffTurn: pure classification of an off-turn message into a task-lifecycle event (emit immediately) or a followup candidate. - OffTurnFollowupCollector: state machine that buffers off-turn followup candidates until the closing result/idle, then forwards them (for origin.kind === "task-notification") or discards them as aftermath. Bounded by a 256-entry cap.
9ab68ec to
6eab829
Compare
…der (agentclientprotocol#336) Background tasks launched with run_in_background=true emit task_started / task_notification system messages between turns. Previously they sat in the SDK's buffer until the next user prompt, where prompt()'s switch discarded them as a no-op — the agent appeared unresponsive to background-task completion and then acted on a stale prompt. A per-session reader (#runSessionReader) is now the sole consumer of session.query.next(). It forwards lifecycle events to the client immediately (in-turn or between turns), hands in-turn messages to the active prompt() via a TurnQueue, and forwards autonomous task-notification followups out-of-turn. prompt() consumes the queue instead of touching the iterator, so there is only ever one consumer of the AsyncGenerator. Key pieces: - #runSessionReader: single linear loop; lifecycle/raw/followup emits are scheduled on a detached, never-awaited readerSideEffects chain so a slow ACP client can't stall SDK consumption or block the next prompt. - #emitTaskLifecycleUpdate: renders task_started (SDK description) and task_notification (real SDK status, no fallback) as an agent_message_chunk routed to the reader's bound sessionId. - #emitFollowup + computeFollowupUsageUpdate: forward a followup's content via the same notification helpers prompt() uses, plus a usage_update derived from the followup's own snapshots, without touching accumulatedUsage / stopReason / pending prompts. - prompt() consumes turnQueue.take(); raw-SDK emit moved entirely to the reader (no double-emit); the error/cancel finally clears the queue so an abandoned turn's buffered messages don't leak into the next one. - teardownSession and process-died recovery abort, await readerDone, then drain readerSideEffects (bounded by a 2s timeout) before deleting the session, so queued emits finish first and a wedged client can't hang teardown. Tests: session-reader describe block covers lifecycle (description vs summary, real status, skip_transcript), raw-SDK emit (off-turn, no in-turn double-emit, FIFO order), followup forwarding (out-of-turn, no usage contamination, no authRequired from a stale login result, lifecycle mid-followup, real #emitFollowup end-to-end), aftermath discard, process-died propagation, teardown drain, and the single-consumer invariant. computeFollowupUsageUpdate has unit coverage for buffered snapshot, result.usage fallback, subagent-only fallback, and inferred context window. acp-agent-settings mocks gain next/close stubs so the reader parks cleanly.
6eab829 to
43b7e08
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #336.
Summary
When Claude Code launches background tasks via the
Tasktool withrun_in_background=true,task_notification(andtask_started) system messages arriving between turns sit in the SDK's internal buffer until the user sends the next message — at which point the wrapper'sprompt()switch hits the// Todo: process via status apino-op and silently discards them. The user observes this as "the agent doesn't react to my background task completing — I have to type something to wake it, and then it acts on a stale prompt".The wrapper now runs a single-consumer session reader per session that is the sole consumer of
session.query. It forwardstask_started/task_notificationto the ACP client the moment they arrive (in-turn or between turns), hands in-turn messages to the activeprompt()via a queue, and forwards autonomoustask-notificationfollowups out-of-turn.prompt()no longer touches the SDK iterator directly.Architecture
A new module
src/session-reader.tsholds three unit-testable building blocks:TurnQueue— single-producer / single-consumer async queue. The reader is the only producer;prompt()is the only consumer per turn.close()/error()propagate to the consumer;clear()drops leftovers for an abandoned turn.classifyOffTurn— pure classifier: a message is eitherlifecycle(emit immediately) or afollowup-candidate(feed to the collector).OffTurnFollowupCollector— small state machine for messages seen while no turn is active. It buffers followup candidates until the closingresult/idle, then forwards them (whenresult.origin.kind === "task-notification") or discards them as aftermath. Bounded by a 256-entry cap.In
src/acp-agent.ts:#runSessionReader— the only caller ofsession.query.next(). Per message: optionally schedule a raw-SDK emit; if lifecycle, emit it; else if a turn is running, push toturnQueue; else feedoffTurn. Exits cleanly on abort / iterator close / iterator throw, closing the queue so any consumer is unblocked.#enqueueReaderSideEffect— serializes the reader's client-facing emits (raw SDK messages, lifecycle, followup) onto a per-session promise chain the reader never awaits, so a slow ACP client can't stall SDK consumption or block the next prompt.#emitTaskLifecycleUpdate— renderstask_started(using the SDK'sdescription) andtask_notification(using the SDK's realstatus) as anagent_message_chunk, routed to the reader's boundsessionId.#emitFollowup/computeFollowupUsageUpdate— forward an autonomous followup's content using the same notification helpersprompt()uses, plus ausage_updatederived from the followup's own snapshots, without ever touchingaccumulatedUsage,stopReason, or the user's pending prompts.prompt()consumessession.turnQueue.take()instead ofsession.query.next(). Raw-SDK emit moved entirely to the reader (no double-emit). Error/cancel paths clear the queue.Behaviour
task_started/task_notificationproduce a short bracketedagent_message_chunk—[task <id>] started: <description>or[task <id>] <status>: <summary>plusoutput: <path>when present — whether the event arrives during a turn or between turns.skip_transcript: truelifecycle messages produce no client-visible update.task-notificationfollowups (an SDK-driven mini-turn that runs between user turns) are forwarded to the client out-of-turn: their assistant/tool content via the normal notification helpers, plus ausage_updatecarrying the followup's cost and_claude/originmeta. They never contaminate the next user turn's usage or stop reason.Lifecycle & robustness
session.query.next(), eliminating the concurrent-next()-on-an-AsyncGenerator hazard.teardownSessionand the process-died recoveryabort(), thenawait readerDone, then drain the detachedreaderSideEffectschain (bounded by a 2s timeout) before deleting the session — so queued emits finish before the session disappears and a wedged client can't hang teardown.prompt()clears theturnQueueon both error and cancel, so messages the reader buffered for an abandoned turn don't leak into the next one._claude/sdkMessageemits are FIFO among themselves but are explicitly not ordered relative to the derivedsession/updatestream (the reader doesn't block on the client). Documented at the emit site.Tests
npm run test:run: 355 passed | 13 skipped.src/tests/session-reader.test.ts— unit tests forTurnQueue(push/take/close/error/clear, FIFO, concurrent-take guard),classifyOffTurn(every subtype),OffTurnFollowupCollector(all transitions, cap, emit-error swallowing).src/tests/acp-agent.test.ts→describe("session reader (issue #336)")— lifecycle (descriptionnotsummary; realstatus;skip_transcript), raw-SDK emit (off-turn, no in-turn double-emit, FIFO order), followup forwarding (out-of-turn, no usage contamination, noauthRequiredfrom a stalePlease run /login, lifecycle-mid-followup, real#emitFollowupend-to-end render), aftermath (orphan+idle, non-followup result, cancellation aftermath discarded), process-died propagation, teardown awaits reader + drains side-effects, single-consumer invariant.computeFollowupUsageUpdateunit cases: buffered snapshot, fallback toresult.usage, subagent-only fallback, model with no matchingmodelUsagekey → inferred context window.Verification
Out of scope (deliberately not in this PR)
Two product enhancements were considered and intentionally left out:
Live streaming of autonomous followups. Today a followup's content is buffered and forwarded as a block when its closing
resultarrives. Streaming it token-by-token is blocked by the SDK contract, not by effort: only theresultmessage carriesorigin; theassistant/stream_eventcontent messages do not. To stream we'd have to emit content before knowing whether the group is a real followup or the aftermath of a cancelled/errored turn — and emitting aftermath optimistically reintroduces exactly the contamination this PR fixes, with no way to retract it over ACP. A safe implementation needs either (a) the SDK tagging followup content messages withorigin(not just theresult), or (b) a retractable preview channel in the ACP protocol. Neither exists today.A dedicated session-update variant /
_metamarker for background-task output. Followup and lifecycle updates currently piggyback onagent_message_chunk(the followup'susage_updatealready carries_claude/origin). A richer scheme — tagging every followup chunk with_metaor a dedicatedtask_*session-update so clients can render background output distinctly — is additive and backward-compatible, but only produces visible effect once an ACP client consumes it. Left for a follow-up coordinated with client-side rendering rather than shipped speculatively here.