Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/think-prompt-recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@cloudflare/think": minor
---

Add DO chat recovery to `step.prompt()` retry loop.

When a prompt wait times out (e.g. the Think Durable Object died during a deploy), the retry loop now checks whether the DO's built-in chat recovery has picked up the interrupted submission before cancelling and re-submitting. If the submission is still `pending` or `running` (recovery in progress) or already `completed`, the workflow re-waits for the original completion event instead of wasting the in-flight turn.

This leverages Think's existing `_recoverSubmissionsOnStart()` and fiber recovery mechanisms — no new RPC is needed (`inspectSubmission` already exists). The workflow uses a single event type across all retry attempts so the recovered submission's completion event reaches any retry's `waitForEvent`.

Recovery is only attempted for `ThinkPromptTimeoutError` with `retryOnTimeout` enabled. Non-timeout errors (provider errors, validation failures) still go through the existing cancel + full retry path. If the recovery re-wait also times out, the loop falls through to cancel + full retry (no infinite recovery loop).
256 changes: 256 additions & 0 deletions packages/think/src/tests/workflows.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ type DisposableSubmissionResult = SubmitMessagesResult & {
[Symbol.dispose](): void;
};

type FakeInspectResult = {
submissionId: string;
status: string;
};

type FakeThinkAgent = {
submitMessages(): Promise<DisposableSubmissionResult>;
cancelSubmission(submissionId: string, reason: string): Promise<void>;
inspectSubmission?(submissionId: string): Promise<FakeInspectResult | null>;
};

function createWorkflow(agent: FakeThinkAgent): PromptStepRunner {
Expand Down Expand Up @@ -588,4 +594,254 @@ describe("ThinkWorkflow", () => {
}
});
});

describe("prompt step recovery", () => {
it("recovers via DO chat recovery instead of re-submitting on timeout", async () => {
let submitCount = 0;
let waitCount = 0;
const doNames: string[] = [];
const waitNames: string[] = [];
const cancelCalls: Array<{ submissionId: string; reason: string }> = [];

const agent: FakeThinkAgent = {
async submitMessages() {
submitCount++;
return createSubmissionResult(`submission-${submitCount}`, () => {});
},
async cancelSubmission(submissionId: string, reason: string) {
cancelCalls.push({ submissionId, reason });
},
async inspectSubmission(submissionId: string) {
return { submissionId, status: "running" };
}
};

const step = {
do: async (name: string, callback: () => Promise<unknown>) => {
doNames.push(name);
return callback();
},
waitForEvent: async (name: string) => {
waitNames.push(name);
waitCount++;
if (waitCount === 1) {
throw new Error("timed out");
}
return {
payload: {
submissionId: "submission-1",
status: "completed",
output: { answer: "recovered" }
},
[Symbol.dispose]: () => {}
};
},
sleep: async () => {}
} as unknown as AgentWorkflowStep;

const workflow = createWorkflow(agent);
const output = await workflow._promptStep(
"structure",
{
prompt: "Return structured output",
output: z.object({ answer: z.string() }),
timeout: "1 minute",
retries: { maxAttempts: 3, baseDelayMs: 100, maxDelayMs: 1000 }
},
step,
createEvent()
);

expect(output).toEqual({ answer: "recovered" });
// No fresh submission — the original was recovered
expect(submitCount).toBe(1);
// No cancel — recovery succeeded
expect(cancelCalls).toEqual([]);
expect(waitCount).toBe(2);
expect(doNames).toContain("structure:recovery-check-0");
expect(waitNames).toContain("structure:recovery-wait-0");
});

it("falls back to full retry when submission is dead (error status)", async () => {
let submitCount = 0;
let waitCount = 0;
const doNames: string[] = [];
const cancelCalls: Array<{ submissionId: string; reason: string }> = [];

const agent: FakeThinkAgent = {
async submitMessages() {
submitCount++;
return createSubmissionResult(`submission-${submitCount}`, () => {});
},
async cancelSubmission(submissionId: string, reason: string) {
cancelCalls.push({ submissionId, reason });
},
async inspectSubmission(submissionId: string) {
return { submissionId, status: "error" };
}
};

const step = {
do: async (name: string, callback: () => Promise<unknown>) => {
doNames.push(name);
return callback();
},
waitForEvent: async () => {
waitCount++;
if (waitCount === 1) {
throw new Error("timed out");
}
return {
payload: {
submissionId: `submission-${submitCount}`,
status: "completed",
output: { answer: "retry" }
},
[Symbol.dispose]: () => {}
};
},
sleep: async () => {}
} as unknown as AgentWorkflowStep;

const workflow = createWorkflow(agent);
const output = await workflow._promptStep(
"structure",
{
prompt: "Return structured output",
output: z.object({ answer: z.string() }),
timeout: "1 minute",
retries: { maxAttempts: 3, baseDelayMs: 100, maxDelayMs: 1000 }
},
step,
createEvent()
);

expect(output).toEqual({ answer: "retry" });
// Recovery failed → fresh submission on retry
expect(submitCount).toBe(2);
expect(cancelCalls).toEqual([
{
submissionId: "submission-1",
reason: "Think workflow retrying prompt step"
}
]);
expect(doNames).toContain("structure:recovery-check-0");
});

it("falls back to full retry when inspectSubmission throws (DO still down)", async () => {
let submitCount = 0;
let waitCount = 0;
const cancelCalls: Array<{ submissionId: string; reason: string }> = [];

const agent: FakeThinkAgent = {
async submitMessages() {
submitCount++;
return createSubmissionResult(`submission-${submitCount}`, () => {});
},
async cancelSubmission(submissionId: string, reason: string) {
cancelCalls.push({ submissionId, reason });
},
async inspectSubmission() {
throw new Error("DO is down");
}
};

const step = {
do: async (_name: string, callback: () => Promise<unknown>) =>
callback(),
waitForEvent: async () => {
waitCount++;
if (waitCount === 1) {
throw new Error("timed out");
}
return {
payload: {
submissionId: `submission-${submitCount}`,
status: "completed",
output: { answer: "retry" }
},
[Symbol.dispose]: () => {}
};
},
sleep: async () => {}
} as unknown as AgentWorkflowStep;

const workflow = createWorkflow(agent);
const output = await workflow._promptStep(
"structure",
{
prompt: "Return structured output",
output: z.object({ answer: z.string() }),
timeout: "1 minute",
retries: { maxAttempts: 3, baseDelayMs: 100, maxDelayMs: 1000 }
},
step,
createEvent()
);

expect(output).toEqual({ answer: "retry" });
expect(submitCount).toBe(2);
expect(cancelCalls).toEqual([
{
submissionId: "submission-1",
reason: "Think workflow retrying prompt step"
}
]);
});

it("does not attempt recovery for non-timeout errors", async () => {
let submitCount = 0;
const doNames: string[] = [];
const inspectCalls: string[] = [];

const agent: FakeThinkAgent = {
async submitMessages() {
submitCount++;
return createSubmissionResult(`submission-${submitCount}`, () => {});
},
async cancelSubmission() {},
async inspectSubmission(submissionId: string) {
inspectCalls.push(submissionId);
return null;
}
};

const step = {
do: async (name: string, callback: () => Promise<unknown>) => {
doNames.push(name);
return callback();
},
waitForEvent: async () => {
return {
payload: {
submissionId: `submission-${submitCount}`,
status: submitCount < 2 ? "error" : "completed",
error: submitCount < 2 ? "provider error" : undefined,
output: submitCount < 2 ? undefined : { answer: "ok" }
},
[Symbol.dispose]: () => {}
};
},
sleep: async () => {}
} as unknown as AgentWorkflowStep;

const workflow = createWorkflow(agent);
const output = await workflow._promptStep(
"structure",
{
prompt: "Return structured output",
output: z.object({ answer: z.string() }),
retries: { maxAttempts: 3, baseDelayMs: 100, maxDelayMs: 1000 }
},
step,
createEvent()
);

expect(output).toEqual({ answer: "ok" });
expect(submitCount).toBe(2);
// inspectSubmission must NOT be called for non-timeout errors
expect(inspectCalls).toEqual([]);
expect(doNames).not.toContain("structure:recovery-check-0");
});
});
});
Loading
Loading