diff --git a/packages/junior/src/chat/runtime/slack-runtime.ts b/packages/junior/src/chat/runtime/slack-runtime.ts index 39ba9bee5..d5f661b81 100644 --- a/packages/junior/src/chat/runtime/slack-runtime.ts +++ b/packages/junior/src/chat/runtime/slack-runtime.ts @@ -51,11 +51,21 @@ export interface AssistantLifecycleEvent { userId?: string; } +type SteeringMode = "defer" | "interrupt"; + +export interface SteeringCandidateMessage { + activeRequest: boolean; + inboundMessageId: string; + message: Message; +} + export interface ReplyHooks { beforeFirstResponsePost?: () => Promise; drainSteeringMessages?: ( - inject: (messages: Message[]) => Promise, - ) => Promise; + inject: ( + messages: SteeringCandidateMessage[], + ) => Promise, + ) => Promise; messageContext?: MessageContext; onInputCommitted?: () => Promise; onToolInvocation?: (invocation: TurnToolInvocation) => void; @@ -247,16 +257,22 @@ function getQueuedMessagesFromSlackMessages( interface SteeringMessageDecision { context: TurnContext; decision: SubscribedReplyDecision; + inboundMessageId: string; + mode: SteeringMode; message: Message; text: TurnMessageText; } interface SteeringMessageSelection { - accepted: Message[]; + accepted: Array<{ + inboundMessageId: string; + message: Message; + mode: SteeringMode; + }>; skipped: SteeringMessageDecision[]; } -/** Drain mailbox steering messages only after selecting work Junior will process. */ +/** Drain mailbox steering messages after classifying interrupt, defer, and skip. */ function createAcceptedSteeringDrain( hooks: ReplyHooks, options: { @@ -264,7 +280,7 @@ function createAcceptedSteeringDrain( onAcceptedForProcessing?: (messages: Message[]) => Promise; onSkipped?: (messages: SteeringMessageDecision[]) => Promise; selectMessages: ( - messages: Message[], + messages: SteeringCandidateMessage[], context?: SteeringDrainContext, ) => Promise; stripLeadingBotMention: SlackTurnRuntimeDependencies["stripLeadingBotMention"]; @@ -280,16 +296,29 @@ function createAcceptedSteeringDrain( } return async (inject, context) => { - let acceptedMessages: Message[] | undefined; + let interruptedMessages: Message[] | undefined; await hooks.drainSteeringMessages!(async (messages) => { const selection = await options.selectMessages(messages, context); - const accepted = selection.accepted; await options.onSkipped?.(selection.skipped); - await inject(getQueuedMessagesFromSlackMessages(accepted, options)); - acceptedMessages = accepted; - await options.onAcceptedForProcessing?.(accepted); + // Deferred accepted messages stay pending so a later worker slice handles + // them after the active answer is delivered. + const interrupted = selection.accepted + .filter((accepted) => accepted.mode === "interrupt") + .map((accepted) => accepted.message); + await inject(getQueuedMessagesFromSlackMessages(interrupted, options)); + interruptedMessages = interrupted; + await options.onAcceptedForProcessing?.(interrupted); + return [ + ...selection.accepted + .filter((accepted) => accepted.mode === "interrupt") + .map((accepted) => accepted.inboundMessageId), + ...selection.skipped.map((skipped) => skipped.inboundMessageId), + ]; }); - return getQueuedMessagesFromSlackMessages(acceptedMessages ?? [], options); + return getQueuedMessagesFromSlackMessages( + interruptedMessages ?? [], + options, + ); }; } @@ -448,13 +477,15 @@ export function createSlackTurnRuntime< const decideSteeringMessage = async ( thread: Thread, - message: Message, + candidate: SteeringCandidateMessage, conversationContext: string | undefined, ): Promise<{ context: TurnContext; decision: SubscribedReplyDecision; + mode: SteeringMode; text: TurnMessageText; }> => { + const { message } = candidate; const context: TurnContext = { threadId: deps.getThreadId(thread, message), requesterId: message.author.userId, @@ -469,7 +500,8 @@ export function createSlackTurnRuntime< rawText: appendSlackLegacyAttachmentText(message.text, message.raw), userText: appendSlackLegacyAttachmentText(strippedUserText, message.raw), }; - const isExplicitMention = Boolean(message.isMention); + const isActiveRequest = + candidate.activeRequest || Boolean(message.isMention); const decision = await deps.decideSubscribedReply({ rawText: text.rawText, @@ -477,10 +509,15 @@ export function createSlackTurnRuntime< conversationContext, hasAttachments: message.attachments.length > 0 || legacyAttachmentText !== "", - isExplicitMention, + isExplicitMention: isActiveRequest, context, }); - return { context, decision, text }; + return { + context, + decision, + mode: isActiveRequest ? "interrupt" : "defer", + text, + }; }; const logSkippedSubscribedDecision = (args: { @@ -541,20 +578,22 @@ export function createSlackTurnRuntime< const selectAcceptedSteeringMessages = async (args: { conversationContext: string | undefined; - messages: Message[]; + messages: SteeringCandidateMessage[]; thread: Thread; }): Promise => { const selected: SteeringMessageDecision[] = []; - for (const message of args.messages) { + for (const candidate of args.messages) { const decision = await decideSteeringMessage( args.thread, - message, + candidate, args.conversationContext, ); selected.push({ context: decision.context, decision: decision.decision, - message, + inboundMessageId: candidate.inboundMessageId, + mode: decision.mode, + message: candidate.message, text: decision.text, }); } @@ -577,7 +616,11 @@ export function createSlackTurnRuntime< return { accepted: selected .filter((message) => message.decision.shouldReply) - .map((message) => message.message), + .map((message) => ({ + inboundMessageId: message.inboundMessageId, + message: message.message, + mode: message.mode, + })), skipped: selected.filter((message) => !message.decision.shouldReply), }; }; diff --git a/packages/junior/src/chat/task-execution/slack-work.ts b/packages/junior/src/chat/task-execution/slack-work.ts index e2830f02a..f420c1d62 100644 --- a/packages/junior/src/chat/task-execution/slack-work.ts +++ b/packages/junior/src/chat/task-execution/slack-work.ts @@ -7,7 +7,10 @@ import { type SerializedThread, type StateAdapter, } from "chat"; -import type { SlackTurnRuntime } from "@/chat/runtime/slack-runtime"; +import type { + SlackTurnRuntime, + SteeringCandidateMessage, +} from "@/chat/runtime/slack-runtime"; import { isCooperativeTurnYieldError, isTurnInputCommitLostError, @@ -101,11 +104,29 @@ function compareInboundMessages( } function routeForRecords(records: InboundMessage[]): SlackConversationRoute { - return records.some((record) => record.input.metadata?.route === "mention") + return records.some((record) => { + const metadata = record.input.metadata; + if (!isSlackMetadata(metadata)) { + throw new Error("Conversation mailbox record is not Slack metadata"); + } + return metadata.route === "mention"; + }) ? "mention" : "subscribed"; } +function isSlackAssistantThreadUserMessage(message: Message): boolean { + const raw = + message.raw && typeof message.raw === "object" + ? (message.raw as Record) + : undefined; + return ( + raw?.channel_type === "im" && + typeof raw.thread_ts === "string" && + raw.thread_ts.trim().length > 0 + ); +} + /** Rehydrate the Slack message payload before handing it back to runtime code. */ function restoreMessage(args: { adapter: SlackAdapter; @@ -296,21 +317,32 @@ export function createSlackConversationWorker( ); } }; + // Restore stored mailbox entries as Slack steering candidates; the + // runtime returns only the inbound ids it handled durably. const drainSteeringMessages = async ( - inject: (messages: Message[]) => Promise, - ): Promise => { - let restoredMessages: Message[] | undefined; - const drained = await context.drainMailbox(async (pendingRecords) => { - const messages = pendingRecords.map((record) => - restoreMessage({ adapter, record }), - ); - restoredMessages = messages; - await inject(messages); + inject: ( + messages: SteeringCandidateMessage[], + ) => Promise, + ): Promise => { + await context.drainMailbox(async (pendingRecords) => { + const messages = pendingRecords.map((record) => { + const metadata = record.input.metadata; + if (!isSlackMetadata(metadata)) { + throw new Error( + "Conversation mailbox record is not Slack metadata", + ); + } + const message = restoreMessage({ adapter, record }); + return { + activeRequest: + metadata.route === "mention" || + isSlackAssistantThreadUserMessage(message), + inboundMessageId: record.inboundMessageId, + message, + }; + }); + return await inject(messages); }); - return ( - restoredMessages ?? - drained.map((record) => restoreMessage({ adapter, record })) - ); }; try { diff --git a/packages/junior/src/chat/task-execution/state.ts b/packages/junior/src/chat/task-execution/state.ts index 8a471b17b..52ea1e548 100644 --- a/packages/junior/src/chat/task-execution/state.ts +++ b/packages/junior/src/chat/task-execution/state.ts @@ -1293,10 +1293,15 @@ export async function checkInConversationWork(args: { }); } -/** Drain pending mailbox entries after the caller has durably injected them. */ +/** + * Drain pending mailbox entries after the caller acknowledges durable handling. + * + * Returning ids acknowledges only that subset; returning nothing acknowledges + * every offered pending entry. + */ export async function drainConversationMailbox(args: { conversationId: string; - inject: (messages: InboundMessage[]) => Promise; + inject: (messages: InboundMessage[]) => Promise; leaseToken: string; nowMs?: number; state?: StateAdapter; @@ -1315,7 +1320,20 @@ export async function drainConversationMailbox(args: { return []; } - await args.inject(pending); + const acknowledgedIds = await args.inject(pending); + const offeredIds = new Set( + pending.map((message) => message.inboundMessageId), + ); + for (const inboundMessageId of acknowledgedIds ?? []) { + if (!offeredIds.has(inboundMessageId)) { + throw new Error( + `Conversation mailbox acknowledgement is not pending for ${args.conversationId}`, + ); + } + } + const drainedIds = new Set( + acknowledgedIds ?? pending.map((message) => message.inboundMessageId), + ); await withConversationMutation(args, async (state) => { const current = await readConversation(state, args.conversationId); @@ -1324,9 +1342,6 @@ export async function drainConversationMailbox(args: { `Conversation lease is not held for ${args.conversationId}`, ); } - const drainedIds = new Set( - pending.map((message) => message.inboundMessageId), - ); const pendingMessages = current.execution.pendingMessages.filter( (message) => !drainedIds.has(message.inboundMessageId), ); @@ -1347,7 +1362,7 @@ export async function drainConversationMailbox(args: { ), ); }); - return pending; + return pending.filter((message) => drainedIds.has(message.inboundMessageId)); } /** Mark selected leased mailbox entries after their session-log injection succeeds. */ diff --git a/packages/junior/src/chat/task-execution/worker.ts b/packages/junior/src/chat/task-execution/worker.ts index e32c44988..306a5d2ba 100644 --- a/packages/junior/src/chat/task-execution/worker.ts +++ b/packages/junior/src/chat/task-execution/worker.ts @@ -31,7 +31,7 @@ export interface ConversationWorkerContext { conversationId: string; destination: Destination; drainMailbox( - inject: (messages: InboundMessage[]) => Promise, + inject: (messages: InboundMessage[]) => Promise, ): Promise; leaseToken: string; shouldYield(): boolean; diff --git a/packages/junior/tests/component/task-execution/conversation-work.test.ts b/packages/junior/tests/component/task-execution/conversation-work.test.ts index e03ba2363..d2f8cc096 100644 --- a/packages/junior/tests/component/task-execution/conversation-work.test.ts +++ b/packages/junior/tests/component/task-execution/conversation-work.test.ts @@ -667,6 +667,67 @@ describe("conversation work execution", () => { expect(state ? countPendingConversationMessages(state) : 0).toBe(0); }); + it("keeps unacknowledged drained messages pending for a later slice", async () => { + const queue = createConversationWorkQueueTestAdapter(); + await appendInboundMessage({ message: inboundMessage("m1"), nowMs: 1_000 }); + await appendInboundMessage({ + message: inboundMessage("m2", { + createdAtMs: 2_000, + receivedAtMs: 2_100, + }), + nowMs: 2_100, + }); + const injected: string[][] = []; + + await expect( + processConversationWork(conversationQueueMessage(), { + queue, + run: async (context) => { + await context.drainMailbox(async (messages) => { + injected.push(messages.map((message) => message.inboundMessageId)); + return messages + .filter((message) => message.inboundMessageId === "m1") + .map((message) => message.inboundMessageId); + }); + return { status: "completed" }; + }, + }), + ).resolves.toEqual({ status: "pending_requeued" }); + + expect(injected).toEqual([["m1", "m2"]]); + const state = await getConversationWorkState({ + conversationId: CONVERSATION_ID, + }); + expect(state?.needsRun).toBe(true); + expect(state?.messages.map((message) => message.inboundMessageId)).toEqual([ + "m2", + ]); + }); + + it("rejects mailbox acknowledgements outside the offered pending set", async () => { + const queue = createConversationWorkQueueTestAdapter(); + await appendInboundMessage({ message: inboundMessage("m1"), nowMs: 1_000 }); + + await expect( + processConversationWork(conversationQueueMessage(), { + queue, + run: async (context) => { + await context.drainMailbox(async () => ["different-message"]); + return { status: "completed" }; + }, + }), + ).rejects.toThrow( + "Conversation mailbox acknowledgement is not pending for", + ); + + const state = await getConversationWorkState({ + conversationId: CONVERSATION_ID, + }); + expect(state?.messages.map((message) => message.inboundMessageId)).toEqual([ + "m1", + ]); + }); + it("does not block new mailbox appends while injection is in progress", async () => { const queue = createConversationWorkQueueTestAdapter(); const observed = observeConversationMutationLock({ diff --git a/packages/junior/tests/component/task-execution/slack-conversation-work.test.ts b/packages/junior/tests/component/task-execution/slack-conversation-work.test.ts index f9f38422c..188ea23fc 100644 --- a/packages/junior/tests/component/task-execution/slack-conversation-work.test.ts +++ b/packages/junior/tests/component/task-execution/slack-conversation-work.test.ts @@ -1,8 +1,9 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { Message, StateAdapter, Thread } from "chat"; +import { Message, ThreadImpl, type StateAdapter, type Thread } from "chat"; import { CooperativeTurnYieldError } from "@/chat/runtime/turn"; import { recoverConversationWork } from "@/chat/task-execution/heartbeat"; import { + appendInboundMessage, CONVERSATION_WORK_LEASE_TTL_MS, countPendingConversationMessages, getConversationWorkState, @@ -12,7 +13,10 @@ import { } from "@/chat/task-execution/store"; import { processConversationWork } from "@/chat/task-execution/worker"; import { processConversationQueueMessage } from "@/chat/task-execution/vercel-callback"; -import { createSlackConversationWorker } from "@/chat/task-execution/slack-work"; +import { + buildSlackInboundMessage, + createSlackConversationWorker, +} from "@/chat/task-execution/slack-work"; import { getMessageActorIdentity } from "@/chat/services/message-actor-identity"; import { disconnectStateAdapter, getStateAdapter } from "@/chat/state/adapter"; import { @@ -603,7 +607,6 @@ describe("Slack conversation work execution", () => { }); const injected: string[][] = []; - const drained: string[][] = []; const runtime: SlackWorkerOptions["runtime"] = { handleNewMention: async (_thread, _message, hooks) => { await hooks.onInputCommitted?.(); @@ -617,11 +620,10 @@ describe("Slack conversation work execution", () => { ), services: ingressServices, }); - const messages = - (await hooks.drainSteeringMessages?.(async (steering) => { - injected.push(steering.map((message) => message.id)); - })) ?? []; - drained.push(messages.map((message) => message.id)); + await hooks.drainSteeringMessages?.(async (steering) => { + injected.push(steering.map((candidate) => candidate.message.id)); + return steering.map((candidate) => candidate.inboundMessageId); + }); }, handleSubscribedMessage: async () => { throw new Error("unexpected subscribed route"); @@ -637,7 +639,6 @@ describe("Slack conversation work execution", () => { ).resolves.toEqual({ status: "completed" }); expect(injected).toEqual([["1712345.0002"]]); - expect(drained).toEqual([["1712345.0002"]]); const work = await getConversationWorkState({ conversationId: CONVERSATION_ID, state, @@ -656,6 +657,107 @@ describe("Slack conversation work execution", () => { }); }); + it("treats Slack assistant-thread user messages as active steering", async () => { + const queue = createConversationWorkQueueTestAdapter(); + const state = getStateAdapter(); + await state.connect(); + const slackAdapter = createSlackAdapterFixture(); + const dmChannelId = "D123"; + const threadTs = "1712345.1001"; + const conversationId = `slack:${dmChannelId}:${threadTs}`; + const ingressServices = { + getSlackAdapter: () => slackAdapter, + queue, + runtime: createNoopSlackWebhookRuntime(), + state, + }; + await handleSlackWebhookAndFlush({ + request: slackWebhookRequest( + slackEnvelope({ + channel: dmChannelId, + eventType: "message", + text: "first", + threadTs, + ts: threadTs, + }), + ), + services: ingressServices, + }); + + const observed: Array> = []; + const runtime: SlackWorkerOptions["runtime"] = { + handleNewMention: async (_thread, _message, hooks) => { + await hooks.onInputCommitted?.(); + const followUp = new Message({ + id: "1712345.1002", + threadId: conversationId, + text: "steer this assistant thread", + attachments: [], + metadata: { dateSent: new Date(1_000), edited: false }, + formatted: { type: "root", children: [] }, + raw: { + channel: dmChannelId, + channel_type: "im", + thread_ts: threadTs, + ts: "1712345.1002", + user: "U123", + }, + author: { + userId: "U123", + userName: "", + fullName: "", + isBot: false, + isMe: false, + }, + }); + const thread = new ThreadImpl({ + adapter: slackAdapter, + stateAdapter: state, + id: conversationId, + channelId: dmChannelId, + currentMessage: followUp, + initialMessage: followUp, + isDM: true, + isSubscribedContext: true, + }); + await appendInboundMessage({ + message: buildSlackInboundMessage({ + conversationId, + installation: { teamId: "T123" }, + message: followUp, + receivedAtMs: 1_100, + route: "subscribed", + thread, + }), + state, + }); + await hooks.drainSteeringMessages?.(async (steering) => { + observed.push( + steering.map((candidate) => ({ + activeRequest: candidate.activeRequest, + id: candidate.message.id, + })), + ); + return steering.map((candidate) => candidate.inboundMessageId); + }); + }, + handleSubscribedMessage: async () => { + throw new Error("unexpected subscribed route"); + }, + }; + + await expect( + processNextQueuedSlackWork({ + getSlackAdapter: () => slackAdapter, + queue, + runtime, + state, + }), + ).resolves.toEqual({ status: "completed" }); + + expect(observed).toEqual([[{ activeRequest: true, id: "1712345.1002" }]]); + }); + it("does not replay injected Slack mailbox records after lease recovery", async () => { const queue = createConversationWorkQueueTestAdapter(); const state = getStateAdapter(); diff --git a/packages/junior/tests/integration/slack/conversation-turn-steering-behavior.test.ts b/packages/junior/tests/integration/slack/conversation-turn-steering-behavior.test.ts index ea2a60cdd..f4d95a2b6 100644 --- a/packages/junior/tests/integration/slack/conversation-turn-steering-behavior.test.ts +++ b/packages/junior/tests/integration/slack/conversation-turn-steering-behavior.test.ts @@ -233,9 +233,10 @@ describe("Slack behavior: durable turn steering", () => { expect(work ? countPendingConversationMessages(work) : 0).toBe(1); }); - it("steers rapid Slack webhook follow-ups into one active worker turn", async () => { + it("interrupts explicit follow-ups and defers passive follow-ups during an active turn", async () => { const agentEntered = deferred(); const releaseAgent = deferred(); + let blockingCallReleased = false; const agentCalls: Array<{ prompt: string; steeringTexts: string[]; @@ -244,8 +245,11 @@ describe("Slack behavior: durable turn steering", () => { const generateAssistantReply: ReplyExecutorServices["generateAssistantReply"] = async (prompt, context) => { await context?.onInputCommitted?.(); - agentEntered.resolve(); - await releaseAgent.promise; + if (!blockingCallReleased) { + agentEntered.resolve(); + await releaseAgent.promise; + blockingCallReleased = true; + } const steeringMessages: ReplySteeringMessage[] = []; const drained = await context?.drainSteeringMessages?.( @@ -270,7 +274,7 @@ describe("Slack behavior: durable turn steering", () => { const { conversationId, queue, runNextQueuedWork, services } = createTurnHarness({ completeObject: completeObjectWithDecision((prompt) => - prompt.includes("thanks folks") + prompt.includes("thanks folks") ? { should_reply: false, confidence: 1, @@ -305,13 +309,17 @@ describe("Slack behavior: durable turn steering", () => { for (const followUp of [ { text: "add customer impact", ts: "1712345.000200" }, { text: "thanks folks", ts: "1712345.000250" }, - { text: "include the rollback owner", ts: "1712345.000300" }, + { + eventType: "app_mention" as const, + text: `<@${SLACK_BOT_USER_ID}> include the rollback owner`, + ts: "1712345.000300", + }, { text: "finish with the next action", ts: "1712345.000400" }, ]) { const response = await handleSlackWebhookAndFlush({ request: slackWebhookRequest( makeMessageEvent({ - eventType: "message", + eventType: followUp.eventType ?? "message", text: followUp.text, ts: followUp.ts, }), @@ -322,17 +330,13 @@ describe("Slack behavior: durable turn steering", () => { } releaseAgent.resolve(); - await expect(activeTurn).resolves.toEqual({ status: "completed" }); - expect(queue.sentRecords()).toHaveLength(5); + await expect(activeTurn).resolves.toEqual({ status: "pending_requeued" }); + expect(queue.sentRecords()).toHaveLength(6); expect(agentCalls).toEqual([ { prompt: "start the incident summary", - steeringTexts: [ - "add customer impact", - "include the rollback owner", - "finish with the next action", - ], + steeringTexts: ["include the rollback owner"], }, ]); @@ -342,16 +346,36 @@ describe("Slack behavior: durable turn steering", () => { expect.objectContaining({ channel: CHANNEL_ID, thread_ts: THREAD_TS, - text: expect.stringContaining("Steered: add customer impact"), + text: expect.stringContaining("Steered: include the rollback owner"), }), ); + const queuedResults: string[] = []; while (queue.hasQueuedMessages()) { - await expect(runNextQueuedWork()).resolves.toEqual({ status: "no_work" }); + queuedResults.push((await runNextQueuedWork()).status); } + expect( + queuedResults.filter((status) => status === "completed"), + ).toHaveLength(1); - expect(agentCalls).toHaveLength(1); - expect(slackApiOutbox.messages()).toHaveLength(1); + expect(agentCalls).toEqual([ + { + prompt: "start the incident summary", + steeringTexts: ["include the rollback owner"], + }, + { + prompt: expect.stringMatching( + /add customer impact\s+finish with the next action/, + ), + steeringTexts: [], + }, + ]); + const deliveredMessages = slackApiOutbox.messages(); + expect(deliveredMessages).toHaveLength(2); + expect(deliveredMessages.map((message) => message.params.text)).toEqual([ + "Handled initial: start the incident summary\n\nSteered: include the rollback owner", + `Handled initial: ${agentCalls[1]?.prompt}\n\nSteered:`, + ]); const work = await getConversationWorkState({ conversationId, state, diff --git a/specs/slack-agent-delivery.md b/specs/slack-agent-delivery.md index 2e81663f2..cfa2cf2c7 100644 --- a/specs/slack-agent-delivery.md +++ b/specs/slack-agent-delivery.md @@ -220,7 +220,7 @@ Current rules: 7. Automatic auth resumes must not post a separate public "account connected, continuing..." banner before the real resumed answer. The resumed answer itself is the visible continuation. 8. If auth completes after a newer thread message already replaced the blocked request, Junior stores the credentials but does not post a stale resumed answer. 9. Routine cooperative continuation must not post a visible "continuing in the background" thread acknowledgement. User-visible progress belongs to assistant status and `reportProgress`; final answers still use the finalized reply path. -10. If a user follow-up or duplicate delivery arrives while a turn is active, Junior should fold reply-eligible steering messages into the active conversation at the next safe execution boundary instead of creating a second visible turn. Skipped steering messages, including passive no-reply and opt-out decisions, are consumed and persisted for later context without agent injection or automatic processing reactions. Mailbox and worker mechanics belong to `./task-execution.md`. +10. If an active request arrives while a turn is active, Junior should fold it into the active conversation at the next safe execution boundary instead of creating a second visible turn. Explicit mentions, DMs, and active assistant-thread user messages interrupt the active turn through Pi steering. Passive subscribed-thread messages that the reply policy accepts defer until the active turn completes and delivers its answer, then run as ordinary queued subscribed-message work. Passive no-reply messages and opt-out decisions are consumed through existing skipped-message handling without agent injection or automatic processing reactions. Mailbox and worker mechanics belong to `./task-execution.md`. 11. Any explicit pause acknowledgement that remains for auth or exceptional failure handling is not a final assistant reply. It does not mark the original turn completed, and the final resumed answer must still be delivered through the normal finalized-reply path. ### 12. Testing Contract diff --git a/specs/task-execution.md b/specs/task-execution.md index 5f28d3058..2657d3541 100644 --- a/specs/task-execution.md +++ b/specs/task-execution.md @@ -447,22 +447,28 @@ metadata. A worker that owns the lease advances the conversation: 1. Start the lease check-in timer. -2. Drain pending mailbox messages into the agent session log. +2. Offer pending mailbox messages to the runtime and acknowledge the entries it + durably handled. 3. Restore Pi state from the reduced session log. 4. Call `continue()`. -5. At each safe boundary, drain newly pending mailbox messages into the same - active conversation before another model call starts. +5. At each safe boundary, offer newly pending mailbox messages before another + model call starts. The runtime may acknowledge a subset when only some + messages should be injected into the active run. 6. If cooperative yield is due, enqueue `{ conversationId, destination }`, release the lease, acknowledge the queue delivery, and exit. -7. If the agent is final, drain the mailbox one last time before delivery. If new - messages were pending, continue instead of posting a stale answer. +7. If the agent is final, offer the mailbox one last time before delivery. If + the runtime acknowledged active work for the current run, continue instead of + posting a stale answer. If unacknowledged passive work remains pending, the + current answer may be delivered and the pending work is requeued for a later + slice. 8. Deliver the finalized reply through the destination delivery port. 9. Persist completion state, release the lease, and acknowledge the queue delivery. -Inbound messages that arrive during an active run are part of the active -conversation. They are injected at the next safe boundary, not treated as a -separate concurrent agent run. +Inbound messages that arrive during an active run are conversation mailbox +entries, not separate concurrent agent runs. Runtime-specific routing decides +whether each entry interrupts the active run, is consumed without agent +injection, or remains pending for a later slice. ### Cooperative Yield Contract @@ -731,8 +737,9 @@ Required invariants, using the lowest layer that proves the contract: 16. Component: the all-conversation index is sorted by `lastActivityAtMs`; the active-conversation index contains only non-idle conversations and is sorted by `execution.updatedAtMs`. -17. Evals: realistic multi-message Slack follow-ups during long work are folded - into the active answer without losing user intent. +17. Integration: realistic multi-message Slack follow-ups during long work + preserve user intent by interrupting active requests and deferring passive + reply-eligible messages according to the Slack delivery contract. ## Related Specs