Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 63 additions & 20 deletions packages/junior/src/chat/runtime/slack-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,21 @@ export interface AssistantLifecycleEvent {
userId?: string;
}

type SteeringMode = "defer" | "interrupt";

export interface SteeringCandidateMessage {
activeRequest: boolean;
inboundMessageId: string;
message: Message;
}

export interface ReplyHooks {
beforeFirstResponsePost?: () => Promise<void>;
drainSteeringMessages?: (
inject: (messages: Message[]) => Promise<void>,
) => Promise<Message[]>;
inject: (
messages: SteeringCandidateMessage[],
) => Promise<readonly string[] | void>,
) => Promise<void>;
messageContext?: MessageContext;
onInputCommitted?: () => Promise<void>;
onToolInvocation?: (invocation: TurnToolInvocation) => void;
Expand Down Expand Up @@ -247,24 +257,30 @@ function getQueuedMessagesFromSlackMessages(
interface SteeringMessageDecision {
context: TurnContext;
decision: SubscribedReplyDecision;
inboundMessageId: string;
mode: SteeringMode;
message: Message;
text: TurnMessageText;
}

interface SteeringMessageSelection {
accepted: Message[];
accepted: Array<{
inboundMessageId: string;
message: Message;
mode: SteeringMode;
}>;
skipped: SteeringMessageDecision[];
}

/** Drain mailbox steering messages only after selecting work Junior will process. */
/** Drain mailbox steering messages after classifying interrupt, defer, and skip. */
function createAcceptedSteeringDrain(
hooks: ReplyHooks,
options: {
explicitMention: boolean;
onAcceptedForProcessing?: (messages: Message[]) => Promise<void>;
onSkipped?: (messages: SteeringMessageDecision[]) => Promise<void>;
selectMessages: (
messages: Message[],
messages: SteeringCandidateMessage[],
context?: SteeringDrainContext,
) => Promise<SteeringMessageSelection>;
stripLeadingBotMention: SlackTurnRuntimeDependencies<unknown>["stripLeadingBotMention"];
Expand All @@ -280,16 +296,29 @@ function createAcceptedSteeringDrain(
}

return async (inject, context) => {
let acceptedMessages: Message[] | undefined;
let interruptedMessages: Message[] | undefined;
await hooks.drainSteeringMessages!(async (messages) => {
const selection = await options.selectMessages(messages, context);
const accepted = selection.accepted;
await options.onSkipped?.(selection.skipped);
await inject(getQueuedMessagesFromSlackMessages(accepted, options));
acceptedMessages = accepted;
await options.onAcceptedForProcessing?.(accepted);
// Deferred accepted messages stay pending so a later worker slice handles
// them after the active answer is delivered.
const interrupted = selection.accepted
.filter((accepted) => accepted.mode === "interrupt")
.map((accepted) => accepted.message);
await inject(getQueuedMessagesFromSlackMessages(interrupted, options));
interruptedMessages = interrupted;
await options.onAcceptedForProcessing?.(interrupted);
return [
...selection.accepted
.filter((accepted) => accepted.mode === "interrupt")
.map((accepted) => accepted.inboundMessageId),
...selection.skipped.map((skipped) => skipped.inboundMessageId),
];
});
return getQueuedMessagesFromSlackMessages(acceptedMessages ?? [], options);
return getQueuedMessagesFromSlackMessages(
interruptedMessages ?? [],
options,
);
};
}

Expand Down Expand Up @@ -448,13 +477,15 @@ export function createSlackTurnRuntime<

const decideSteeringMessage = async (
thread: Thread,
message: Message,
candidate: SteeringCandidateMessage,
conversationContext: string | undefined,
): Promise<{
context: TurnContext;
decision: SubscribedReplyDecision;
mode: SteeringMode;
text: TurnMessageText;
}> => {
const { message } = candidate;
const context: TurnContext = {
threadId: deps.getThreadId(thread, message),
requesterId: message.author.userId,
Expand All @@ -469,18 +500,24 @@ export function createSlackTurnRuntime<
rawText: appendSlackLegacyAttachmentText(message.text, message.raw),
userText: appendSlackLegacyAttachmentText(strippedUserText, message.raw),
};
const isExplicitMention = Boolean(message.isMention);
const isActiveRequest =
candidate.activeRequest || Boolean(message.isMention);

const decision = await deps.decideSubscribedReply({
rawText: text.rawText,
text: text.userText,
conversationContext,
hasAttachments:
message.attachments.length > 0 || legacyAttachmentText !== "",
isExplicitMention,
isExplicitMention: isActiveRequest,
context,
});
return { context, decision, text };
return {
context,
decision,
mode: isActiveRequest ? "interrupt" : "defer",
text,
};
};

const logSkippedSubscribedDecision = (args: {
Expand Down Expand Up @@ -541,20 +578,22 @@ export function createSlackTurnRuntime<

const selectAcceptedSteeringMessages = async (args: {
conversationContext: string | undefined;
messages: Message[];
messages: SteeringCandidateMessage[];
thread: Thread;
}): Promise<SteeringMessageSelection> => {
const selected: SteeringMessageDecision[] = [];
for (const message of args.messages) {
for (const candidate of args.messages) {
const decision = await decideSteeringMessage(
args.thread,
message,
candidate,
args.conversationContext,
);
selected.push({
context: decision.context,
decision: decision.decision,
message,
inboundMessageId: candidate.inboundMessageId,
mode: decision.mode,
message: candidate.message,
text: decision.text,
});
}
Expand All @@ -577,7 +616,11 @@ export function createSlackTurnRuntime<
return {
accepted: selected
.filter((message) => message.decision.shouldReply)
.map((message) => message.message),
.map((message) => ({
inboundMessageId: message.inboundMessageId,
message: message.message,
mode: message.mode,
})),
skipped: selected.filter((message) => !message.decision.shouldReply),
};
};
Expand Down
62 changes: 47 additions & 15 deletions packages/junior/src/chat/task-execution/slack-work.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import {
type SerializedThread,
type StateAdapter,
} from "chat";
import type { SlackTurnRuntime } from "@/chat/runtime/slack-runtime";
import type {
SlackTurnRuntime,
SteeringCandidateMessage,
} from "@/chat/runtime/slack-runtime";
import {
isCooperativeTurnYieldError,
isTurnInputCommitLostError,
Expand Down Expand Up @@ -101,11 +104,29 @@ function compareInboundMessages(
}

function routeForRecords(records: InboundMessage[]): SlackConversationRoute {
return records.some((record) => record.input.metadata?.route === "mention")
return records.some((record) => {
const metadata = record.input.metadata;
if (!isSlackMetadata(metadata)) {
throw new Error("Conversation mailbox record is not Slack metadata");
}
return metadata.route === "mention";
})
? "mention"
: "subscribed";
}

function isSlackAssistantThreadUserMessage(message: Message): boolean {
const raw =
message.raw && typeof message.raw === "object"
? (message.raw as Record<string, unknown>)
: undefined;
return (
raw?.channel_type === "im" &&
typeof raw.thread_ts === "string" &&
raw.thread_ts.trim().length > 0
);
}

/** Rehydrate the Slack message payload before handing it back to runtime code. */
function restoreMessage(args: {
adapter: SlackAdapter;
Expand Down Expand Up @@ -296,21 +317,32 @@ export function createSlackConversationWorker(
);
}
};
// Restore stored mailbox entries as Slack steering candidates; the
// runtime returns only the inbound ids it handled durably.
const drainSteeringMessages = async (
inject: (messages: Message[]) => Promise<void>,
): Promise<Message[]> => {
let restoredMessages: Message[] | undefined;
const drained = await context.drainMailbox(async (pendingRecords) => {
const messages = pendingRecords.map((record) =>
restoreMessage({ adapter, record }),
);
restoredMessages = messages;
await inject(messages);
inject: (
messages: SteeringCandidateMessage[],
) => Promise<readonly string[] | void>,
): Promise<void> => {
await context.drainMailbox(async (pendingRecords) => {
const messages = pendingRecords.map((record) => {
const metadata = record.input.metadata;
if (!isSlackMetadata(metadata)) {
throw new Error(
"Conversation mailbox record is not Slack metadata",
);
}
const message = restoreMessage({ adapter, record });
return {
activeRequest:
metadata.route === "mention" ||
isSlackAssistantThreadUserMessage(message),
inboundMessageId: record.inboundMessageId,
message,
};
});
return await inject(messages);
});
return (
restoredMessages ??
drained.map((record) => restoreMessage({ adapter, record }))
);
};

try {
Expand Down
29 changes: 22 additions & 7 deletions packages/junior/src/chat/task-execution/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1293,10 +1293,15 @@ export async function checkInConversationWork(args: {
});
}

/** Drain pending mailbox entries after the caller has durably injected them. */
/**
* Drain pending mailbox entries after the caller acknowledges durable handling.
*
* Returning ids acknowledges only that subset; returning nothing acknowledges
* every offered pending entry.
*/
export async function drainConversationMailbox(args: {
conversationId: string;
inject: (messages: InboundMessage[]) => Promise<void>;
inject: (messages: InboundMessage[]) => Promise<readonly string[] | void>;
leaseToken: string;
nowMs?: number;
state?: StateAdapter;
Expand All @@ -1315,7 +1320,20 @@ export async function drainConversationMailbox(args: {
return [];
}

await args.inject(pending);
const acknowledgedIds = await args.inject(pending);
const offeredIds = new Set(
pending.map((message) => message.inboundMessageId),
);
for (const inboundMessageId of acknowledgedIds ?? []) {
if (!offeredIds.has(inboundMessageId)) {
throw new Error(
`Conversation mailbox acknowledgement is not pending for ${args.conversationId}`,
);
}
}
const drainedIds = new Set(
acknowledgedIds ?? pending.map((message) => message.inboundMessageId),
);

await withConversationMutation(args, async (state) => {
const current = await readConversation(state, args.conversationId);
Expand All @@ -1324,9 +1342,6 @@ export async function drainConversationMailbox(args: {
`Conversation lease is not held for ${args.conversationId}`,
);
}
const drainedIds = new Set(
pending.map((message) => message.inboundMessageId),
);
const pendingMessages = current.execution.pendingMessages.filter(
(message) => !drainedIds.has(message.inboundMessageId),
);
Expand All @@ -1347,7 +1362,7 @@ export async function drainConversationMailbox(args: {
),
);
});
return pending;
return pending.filter((message) => drainedIds.has(message.inboundMessageId));
}

/** Mark selected leased mailbox entries after their session-log injection succeeds. */
Expand Down
2 changes: 1 addition & 1 deletion packages/junior/src/chat/task-execution/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface ConversationWorkerContext {
conversationId: string;
destination: Destination;
drainMailbox(
inject: (messages: InboundMessage[]) => Promise<void>,
inject: (messages: InboundMessage[]) => Promise<readonly string[] | void>,
): Promise<InboundMessage[]>;
leaseToken: string;
shouldYield(): boolean;
Expand Down
Loading
Loading