diff --git a/.changeset/cookbook-child-workflows-hook-resume.md b/.changeset/cookbook-child-workflows-hook-resume.md new file mode 100644 index 0000000000..a845151cc8 --- /dev/null +++ b/.changeset/cookbook-child-workflows-hook-resume.md @@ -0,0 +1,2 @@ +--- +--- diff --git a/docs/content/docs/v4/cookbook/advanced/child-workflows.mdx b/docs/content/docs/v4/cookbook/advanced/child-workflows.mdx index 1dddee5ef5..2fb8c1e202 100644 --- a/docs/content/docs/v4/cookbook/advanced/child-workflows.mdx +++ b/docs/content/docs/v4/cookbook/advanced/child-workflows.mdx @@ -1,8 +1,8 @@ --- title: Child Workflows -description: Spawn child workflows from a parent and poll their progress for batch processing, report generation, and other multi-workflow orchestration scenarios. +description: Spawn child workflows from a parent and wait for completion via hook resume. type: guide -summary: Orchestrate independent child workflows from a parent workflow using start(), sleep(), and getRun() to fan out work with isolated failure boundaries. +summary: Orchestrate independent child workflows from a parent using start(), defineHook(), and startAndWait() — the child resumes the parent's hook when done instead of polling getRun().status. --- Use child workflows when a single workflow needs to orchestrate many independent units of work. Each child runs as its own workflow with a separate event log, retry boundary, and failure scope -- if one child fails, it doesn't take down the parent or siblings. @@ -18,189 +18,213 @@ Child workflows are the right choice when: For simpler cases where steps share a single event log, use [direct await composition](/cookbook/common-patterns/workflow-composition#direct-await-flattening) instead. -## Basic pattern: spawn and poll +## Basic pattern: spawn and wait via hook -The core pattern has three parts: +The recommended pattern has four parts: -1. A **step** that calls `start()` to spawn a child workflow and returns the run ID -2. A **polling loop** in the parent workflow that checks child status with `getRun()` -3. A **step** that retrieves the child's return value once it completes +1. A **completion hook** the parent creates and awaits — zero compute while waiting +2. A **wrapped child export** that runs the real child in try/catch/finally and resumes the parent's hook from a step in `finally` +3. A **spawn step** that calls `start()` with the wrapped child and the hook token +4. A **`startAndWait()` helper** that ties the hook, spawn, and typed result together ```typescript -import { sleep } from "workflow"; -import { getRun, start } from "workflow/api"; +import { defineHook, getWorkflowMetadata } from "workflow"; +import { start } from "workflow/api"; +import { z } from "zod"; -declare function pollUntilComplete(runIds: string[]): Promise; // @setup -declare function collectResults(runIds: string[]): Promise>; // @setup +declare function fetchDocument(documentId: string): Promise; // @setup +declare function analyzeContent(content: string): Promise; // @setup +declare function generateSummary(analysis: string): Promise; // @setup -// Child workflow -- processes a single document -export async function processDocument(documentId: string) { - "use workflow"; +const childCompletionHook = defineHook({ + schema: z.discriminatedUnion("status", [ + z.object({ status: z.literal("completed"), value: z.unknown() }), + z.object({ status: z.literal("failed"), error: z.string() }), + ]), +}); - const content = await fetchDocument(documentId); - const analysis = await analyzeContent(content); - const summary = await generateSummary(analysis); - - return { documentId, summary }; +function completionToken(parentRunId: string, key: string) { + return `child-completion:${parentRunId}:${key}`; } -async function fetchDocument(documentId: string): Promise { +async function resumeParentCompletion( + token: string, + result: + | { status: "completed"; value: unknown } + | { status: "failed"; error: string } +) { "use step"; - const res = await fetch(`https://docs.example.com/api/${documentId}`); - return res.text(); + await childCompletionHook.resume(token, result); } -async function analyzeContent(content: string): Promise { - "use step"; - // Call analysis API - return `analysis of ${content.length} chars`; -} - -async function generateSummary(analysis: string): Promise { - "use step"; - // Generate summary from analysis - return `Summary: ${analysis}`; +async function withChildCompletionHook( + runChild: () => Promise, + completionTokenArg: string +) { + let result: + | { status: "completed"; value: TResult } + | { status: "failed"; error: string } + | undefined; + + try { + const value = await runChild(); + result = { status: "completed", value }; + } catch (error) { + result = { + status: "failed", + error: error instanceof Error ? error.message : String(error), + }; + } finally { + if (result) { + await resumeParentCompletion(completionTokenArg, result); + } + } } -// Parent workflow -- orchestrates document processing -export async function processDocumentBatch(documentIds: string[]) { +// Child workflow -- processes a single document +export async function processDocument(documentId: string) { "use workflow"; - // Spawn a child workflow for each document - const runIds = await spawnChildren(documentIds); + const content = await fetchDocument(documentId); + const analysis = await analyzeContent(content); + const summary = await generateSummary(analysis); - // Poll until all children complete - await pollUntilComplete(runIds); + return { documentId, summary }; +} - // Collect results - const results = await collectResults(runIds); +// Spawnable wrapper -- explicit export so `start()` can register it +export async function processDocumentWithCompletion( + documentId: string, + completionTokenArg: string +) { + "use workflow"; - return { processed: results.length, results }; + await withChildCompletionHook( + () => processDocument(documentId), + completionTokenArg + ); } -async function spawnChildren( - documentIds: string[] -): Promise { +async function spawnProcessDocument( + documentId: string, + completionTokenArg: string +): Promise { "use step"; // [!code highlight] - const runIds: string[] = []; - for (const docId of documentIds) { - const run = await start(processDocument, [docId]); // [!code highlight] - runIds.push(run.runId); - } - return runIds; + const run = await start(processDocumentWithCompletion, [ + documentId, + completionTokenArg, + ]); // [!code highlight] + return run.runId; } -``` - -### Polling loop - -The parent workflow polls child statuses in a loop, sleeping between checks. This is durable -- if the parent replays, the sleep and status checks replay from the event log. - -```typescript -import { sleep } from "workflow"; -import { getRun } from "workflow/api"; -const POLL_INTERVAL = "30s"; -const MAX_POLL_ITERATIONS = 120; // 60 minutes at 30s intervals +async function startAndWait( + key: string, + startChild: (completionTokenArg: string) => Promise +): Promise { + const { workflowRunId } = getWorkflowMetadata(); + const token = completionToken(workflowRunId, key); + const hook = childCompletionHook.create({ token }); // [!code highlight] -async function pollUntilComplete(runIds: string[]): Promise { - let iteration = 0; + await startChild(token); - while (iteration < MAX_POLL_ITERATIONS) { - const status = await checkStatuses(runIds); // [!code highlight] - - if (status.running === 0) { - if (status.failed > 0) { - throw new Error( - `${status.failed} of ${runIds.length} children failed` - ); - } - return; // All completed successfully - } - - iteration += 1; - await sleep(POLL_INTERVAL); // [!code highlight] + const completion = await hook; // [!code highlight] + if (completion.status === "failed") { + throw new Error(completion.error); } - - throw new Error("Timed out waiting for children to complete"); + return completion.value as TResult; } -async function checkStatuses( - runIds: string[] -): Promise<{ running: number; completed: number; failed: number }> { - "use step"; // [!code highlight] +// Parent workflow -- orchestrates document processing +export async function processDocumentBatch(documentIds: string[]) { + "use workflow"; - let running = 0; - let completed = 0; - let failed = 0; + const results = await Promise.all( + documentIds.map((documentId) => + startAndWait<{ documentId: string; summary: string }>(documentId, (token) => + spawnProcessDocument(documentId, token).then(() => undefined) + ) + ) + ); - for (const runId of runIds) { - const run = getRun(runId); // [!code highlight] - const status = await run.status; // [!code highlight] + return { processed: results.length, results }; +} +``` - if (status === "completed") completed += 1; - else if (status === "failed" || status === "cancelled") failed += 1; - else running += 1; // pending, running - } +### Why hooks instead of polling? - return { running, completed, failed }; -} +Polling with `getRun().status` in a `sleep()` loop works, but hook resume is preferable because: -async function collectResults( - runIds: string[] -): Promise> { - "use step"; +- **Zero compute while waiting** — the parent suspends on the hook instead of waking every poll interval +- **Immediate wake-up** — the parent resumes as soon as the child finishes, not on the next poll tick +- **Typed payloads** — the child sends `{ status, value | error }` directly; no separate `returnValue` fetch step +- **No worker-pool pressure** — `Run#returnValue` polling inside steps can hold worker slots while waiting for children (see [Eager Processing](/changelog/eager-processing)) - const results = []; - for (const runId of runIds) { - const run = getRun(runId); - const value = await run.returnValue; - results.push(value as { documentId: string; summary: string }); - } - return results; -} -``` +When a parent calls a child workflow inline with `await` (flattened into the same run), the same wrapper and hook handshake still works — pass the token and `await processDocumentWithCompletion(...)` inside `startAndWait()` instead of calling `start()`. ## Fan-out pattern: chunked spawning -When spawning hundreds of children, batch the `start()` calls to avoid overwhelming the system. Use multiple spawn steps, each launching a chunk of children. +When spawning hundreds of children, batch the `start()` calls to avoid overwhelming the system. Each child still gets its own completion hook keyed by a stable identifier (document ID, report ID, index). ```typescript import { start } from "workflow/api"; -declare function pollUntilComplete(runIds: string[]): Promise; // @setup +declare function startAndWait( + key: string, + startChild: (completionTokenArg: string) => Promise +): Promise; // @setup const CHUNK_SIZE = 10; -export async function largeReportBatch(reportConfigs: Array<{ id: string; query: string }>) { +export async function largeReportBatch( + reportConfigs: Array<{ id: string; query: string }> +) { "use workflow"; - // Spawn children in chunks - const allRunIds: string[] = []; + const results = []; for (let i = 0; i < reportConfigs.length; i += CHUNK_SIZE) { const chunk = reportConfigs.slice(i, i + CHUNK_SIZE); - const runIds = await spawnReportChunk(chunk); // [!code highlight] - allRunIds.push(...runIds); + const chunkResults = await Promise.all( + chunk.map((config) => + startAndWait<{ reportId: string; formatted: string }>(config.id, (token) => + spawnReportWithCompletion(config.id, config.query, token).then( + () => undefined + ) + ) + ) + ); + results.push(...chunkResults); } - // Poll until all complete - await pollUntilComplete(allRunIds); - - const results = await collectReportResults(allRunIds); return { total: results.length, results }; } -async function spawnReportChunk( - configs: Array<{ id: string; query: string }> -): Promise { +async function spawnReportWithCompletion( + reportId: string, + query: string, + completionTokenArg: string +): Promise { "use step"; - const runIds: string[] = []; - for (const config of configs) { - const run = await start(generateReport, [config.id, config.query]); - runIds.push(run.runId); - } - return runIds; + const run = await start(generateReportWithCompletion, [ + reportId, + query, + completionTokenArg, + ]); + return run.runId; +} + +async function generateReportWithCompletion( + reportId: string, + query: string, + completionTokenArg: string +) { + "use workflow"; + + await withChildCompletionHook( + () => generateReport(reportId, query), + completionTokenArg + ); } async function generateReport(reportId: string, query: string) { @@ -213,160 +237,100 @@ async function generateReport(reportId: string, query: string) { declare function queryDatabase(reportId: string, query: string): Promise; // @setup declare function formatReport(reportId: string, data: string): Promise; // @setup - -declare function collectReportResults( - runIds: string[] -): Promise>; // @setup +declare function withChildCompletionHook( + runChild: () => Promise, + completionTokenArg: string +): Promise; // @setup ``` ## Error handling ### Tolerating partial failures -Not every batch requires 100% success. Use `allowFailures` logic to let the parent continue when some children fail, while still surfacing the failures. +Use `Promise.allSettled` with `startAndWait()` so one failing child doesn't abort siblings. The hook payload already carries `{ status: "failed", error }` — no status polling required. ```typescript -import { sleep } from "workflow"; -import { getRun } from "workflow/api"; - -const POLL_INTERVAL = "30s"; -const MAX_POLL_ITERATIONS = 120; - -async function pollWithPartialFailures( - runIds: string[], - maxFailureRate: number -): Promise<{ completed: string[]; failed: string[] }> { - let iteration = 0; - const completedIds: string[] = []; - const failedIds: string[] = []; - - while (iteration < MAX_POLL_ITERATIONS) { - const status = await checkDetailedStatuses(runIds); - - completedIds.length = 0; - failedIds.length = 0; - - for (const entry of status) { - if (entry.status === "completed") completedIds.push(entry.runId); - else if (entry.status === "failed" || entry.status === "cancelled") - failedIds.push(entry.runId); - } - - const active = runIds.length - completedIds.length - failedIds.length; - - // Check if failure rate exceeds threshold - const failureRate = failedIds.length / Math.max(1, runIds.length); // [!code highlight] - if (failureRate > maxFailureRate) { // [!code highlight] - throw new Error( // [!code highlight] - `Failure rate ${(failureRate * 100).toFixed(1)}% exceeds ` + // [!code highlight] - `threshold of ${(maxFailureRate * 100).toFixed(1)}%` // [!code highlight] - ); // [!code highlight] - } // [!code highlight] - - if (active === 0) { - return { completed: completedIds, failed: failedIds }; - } +declare function startAndWait( + key: string, + startChild: (completionTokenArg: string) => Promise +): Promise; // @setup +declare function spawnProcessDocument( + documentId: string, + completionTokenArg: string +): Promise; // @setup + +export async function processDocumentBatchTolerant(documentIds: string[]) { + "use workflow"; - iteration += 1; - await sleep(POLL_INTERVAL); - } + const settled = await Promise.allSettled( + documentIds.map((documentId) => + startAndWait<{ documentId: string; summary: string }>(documentId, (token) => + spawnProcessDocument(documentId, token).then(() => undefined) + ) + ) + ); - throw new Error("Timed out waiting for children"); -} + const results = settled + .filter( + (entry): entry is PromiseFulfilledResult<{ documentId: string; summary: string }> => + entry.status === "fulfilled" + ) + .map((entry) => entry.value); -async function checkDetailedStatuses( - runIds: string[] -): Promise> { - "use step"; + const failed = settled.filter((entry) => entry.status === "rejected").length; - const statuses = []; - for (const runId of runIds) { - const run = getRun(runId); - const status = await run.status; - statuses.push({ runId, status }); - } - return statuses; + return { processed: results.length, failed, results }; } ``` ### Retrying failed children -When a child fails, the parent can spawn a replacement and continue polling. Track restart counts to prevent infinite retry loops. +When a child fails, spawn a replacement with a fresh hook token. Track restart counts to prevent infinite retry loops. ```typescript -import { sleep } from "workflow"; - -declare function checkDetailedStatuses(runIds: string[]): Promise>; // @setup - -const POLL_INTERVAL = "30s"; -const MAX_POLL_ITERATIONS = 120; - -async function pollWithRetries( - initialRunIds: string[], - maxRestartsPerChild: number, - spawnReplacement: (index: number) => Promise -): Promise { - const activeRuns = new Map(); - const restartCounts = new Map(); - - initialRunIds.forEach((runId, index) => activeRuns.set(index, runId)); - - let iteration = 0; - - while (iteration < MAX_POLL_ITERATIONS) { - const statuses = await checkDetailedStatuses( - Array.from(activeRuns.values()) - ); - const statusByRunId = new Map( - statuses.map((s) => [s.runId, s.status]) - ); - - for (const [index, runId] of activeRuns.entries()) { - const status = statusByRunId.get(runId) ?? "running"; - - if (status === "completed") { - activeRuns.delete(index); - continue; - } - - if (status === "failed" || status === "cancelled") { - const restarts = (restartCounts.get(index) ?? 0) + 1; // [!code highlight] - restartCounts.set(index, restarts); // [!code highlight] - - if (restarts > maxRestartsPerChild) { // [!code highlight] - throw new Error( // [!code highlight] - `Child ${index} exceeded restart limit (${maxRestartsPerChild})` // [!code highlight] - ); // [!code highlight] - } // [!code highlight] - - const newRunId = await spawnReplacement(index); // [!code highlight] - activeRuns.set(index, newRunId); // [!code highlight] - } +declare function startAndWait( + key: string, + startChild: (completionTokenArg: string) => Promise +): Promise; // @setup +declare function spawnProcessDocument( + documentId: string, + completionTokenArg: string +): Promise; // @setup + +async function startAndWaitWithRetries( + documentId: string, + maxRestarts: number +): Promise<{ documentId: string; summary: string }> { + for (let attempt = 0; attempt <= maxRestarts; attempt++) { + try { + return await startAndWait<{ documentId: string; summary: string }>( + `${documentId}:${attempt}`, + (token) => spawnProcessDocument(documentId, token).then(() => undefined) + ); + } catch (error) { + if (attempt === maxRestarts) throw error; } - - if (activeRuns.size === 0) return; - - iteration += 1; - await sleep(POLL_INTERVAL); } - throw new Error("Timed out waiting for children"); + throw new Error("unreachable"); } ``` ## Tips -- **`start()` must be called from a step**, not directly from a workflow function. Wrap it in a `"use step"` function. -- **`getRun()` must also be called from a step.** The polling loop lives in the workflow, but the actual status check is a step. -- **Set a max iteration count on polling loops** to prevent runaway workflows. Calculate the count from your expected max duration and poll interval. +- **`start()` must be called from a step** in v4, not directly from a workflow function. Bake the wrapped workflow reference into the step — don't pass workflow functions as step arguments. +- **`defineHook().resume()` must be called from a step.** The wrapped child's `finally` block calls a step that resumes the parent hook. +- **Export wrapped children at module scope.** The SDK registers `"use workflow"` functions statically — a runtime higher-order function returned from `withChildCompletionHook()` cannot be passed to `start()`. +- **Use stable hook keys** — document ID, job ID, or index — so parallel children inside one parent run don't collide on tokens. - **Use chunked spawning for large batches.** Spawning 500 children in a single step can time out. Break it into chunks of 10-50. -- **Each child has its own retry semantics.** Steps inside child workflows retry independently. The parent only sees the child's final status. +- **Each child has its own retry semantics.** Steps inside child workflows retry independently. The parent sees the final `{ status, value | error }` payload from the hook. - **Use `deploymentId: "latest"`** if children should run on the most recent deployment. See [Versioning](/docs/foundations/versioning) for the full model and the [`start()` API reference](/docs/api-reference/workflow-api/start#using-deploymentid-latest) for compatibility considerations. ## Key APIs - [`start()`](/docs/api-reference/workflow-api/start) -- spawn a new workflow run and get its run ID -- [`getRun()`](/docs/api-reference/workflow-api/get-run) -- retrieve a workflow run's status and return value -- [`sleep()`](/docs/api-reference/workflow/sleep) -- durably pause between polling iterations +- [`defineHook()`](/docs/api-reference/workflow/define-hook) -- typed hook for parent/child completion handshakes +- [`resumeHook()`](/docs/api-reference/workflow-api/resume-hook) -- resume a waiting parent from a step (called by the child wrapper) +- [`getWorkflowMetadata()`](/docs/api-reference/workflow/get-workflow-metadata) -- read the parent run ID for deterministic hook tokens - [`"use workflow"`](/docs/foundations/workflows-and-steps) -- marks the orchestrator function - [`"use step"`](/docs/foundations/workflows-and-steps) -- marks functions with full Node.js access diff --git a/docs/content/docs/v4/cookbook/common-patterns/workflow-composition.mdx b/docs/content/docs/v4/cookbook/common-patterns/workflow-composition.mdx index d950155075..9cd7f47d81 100644 --- a/docs/content/docs/v4/cookbook/common-patterns/workflow-composition.mdx +++ b/docs/content/docs/v4/cookbook/common-patterns/workflow-composition.mdx @@ -9,7 +9,7 @@ related: - /docs/api-reference/workflow-api/get-run --- -Workflows can call other workflows. Choose between two composition modes depending on whether the parent needs the child's result inline (direct await) or wants to fire the child off as an independent run (background spawn). For massive fan-out with polling and partial-failure handling, see [Child Workflows](/cookbook/advanced/child-workflows). +Workflows can call other workflows. Choose between two composition modes depending on whether the parent needs the child's result inline (direct await) or wants to fire the child off as an independent run (background spawn). For massive fan-out with hook-based waiting and partial-failure handling, see [Child Workflows](/cookbook/advanced/child-workflows). ## When to use this @@ -106,9 +106,9 @@ If you want the child workflow to run on the latest deployment rather than the c ## Adapting to your use case -- **Spawn many children at once** — call `start()` in a loop inside a step. For more advanced fan-out (chunking, polling, partial-failure handling), graduate to the [Child Workflows](/cookbook/advanced/child-workflows) recipe. -- **Wait for a background child to finish** — combine `start()` with `getRun()` polling. The [Child Workflows](/cookbook/advanced/child-workflows) page covers the full polling loop. -- **Pass results back from background children** — the spawn step returns the `runId`; later, a poll step uses `getRun(runId).returnValue` to fetch the final result. +- **Spawn many children at once** — call `start()` in a loop inside a step. For more advanced fan-out (chunking, hook-based waiting, partial-failure handling), graduate to the [Child Workflows](/cookbook/advanced/child-workflows) recipe. +- **Wait for a background child to finish** — combine `start()` with a completion hook the child resumes when done. The [Child Workflows](/cookbook/advanced/child-workflows) page covers the recommended `startAndWait()` pattern. +- **Pass results back from background children** — the wrapped child resumes the parent's hook in `finally` with `{ status, value | error }`; the parent awaits the hook instead of polling `getRun().status`. ## Key APIs diff --git a/docs/content/docs/v5/cookbook/advanced/child-workflows.mdx b/docs/content/docs/v5/cookbook/advanced/child-workflows.mdx index e4cb6c5815..c311193fa7 100644 --- a/docs/content/docs/v5/cookbook/advanced/child-workflows.mdx +++ b/docs/content/docs/v5/cookbook/advanced/child-workflows.mdx @@ -1,8 +1,8 @@ --- title: Child Workflows -description: Spawn child workflows from a parent and poll their progress for batch processing, report generation, and other multi-workflow orchestration scenarios. +description: Spawn child workflows from a parent and wait for completion via hook resume. type: guide -summary: Orchestrate independent child workflows from a parent workflow using start(), sleep(), and getRun() to fan out work with isolated failure boundaries. +summary: Orchestrate independent child workflows from a parent using start(), defineHook(), and startAndWait() — the child resumes the parent's hook when done instead of polling getRun().status. --- Use child workflows when a single workflow needs to orchestrate many independent units of work. Each child runs as its own workflow with a separate event log, retry boundary, and failure scope -- if one child fails, it doesn't take down the parent or siblings. @@ -18,20 +18,68 @@ Child workflows are the right choice when: For simpler cases where steps share a single event log, use [direct await composition](/cookbook/common-patterns/workflow-composition#direct-await-flattening) instead. -## Basic pattern: spawn and poll +## Basic pattern: spawn and wait via hook -The core pattern has three parts: +The recommended pattern has four parts: -1. A parent workflow that calls `start()` to spawn child workflows and records their run IDs -2. A **polling loop** in the parent workflow that checks child status with `getRun()` -3. A **step** that retrieves the child's return value once it completes +1. A **completion hook** the parent creates and awaits — zero compute while waiting +2. A **wrapped child export** that runs the real child in try/catch/finally and resumes the parent's hook from a step in `finally` +3. A **`start()` call** that spawns the wrapped child with the hook token (directly from the workflow in v5) +4. A **`startAndWait()` helper** that ties the hook, spawn, and typed result together ```typescript -import { sleep } from "workflow"; -import { getRun, start } from "workflow/api"; +import { defineHook, getWorkflowMetadata } from "workflow"; +import { start } from "workflow/api"; +import { z } from "zod"; + +declare function fetchDocument(documentId: string): Promise; // @setup +declare function analyzeContent(content: string): Promise; // @setup +declare function generateSummary(analysis: string): Promise; // @setup + +const childCompletionHook = defineHook({ + schema: z.discriminatedUnion("status", [ + z.object({ status: z.literal("completed"), value: z.unknown() }), + z.object({ status: z.literal("failed"), error: z.string() }), + ]), +}); + +function completionToken(parentRunId: string, key: string) { + return `child-completion:${parentRunId}:${key}`; +} -declare function pollUntilComplete(runIds: string[]): Promise; // @setup -declare function collectResults(runIds: string[]): Promise>; // @setup +async function resumeParentCompletion( + token: string, + result: + | { status: "completed"; value: unknown } + | { status: "failed"; error: string } +) { + "use step"; + await childCompletionHook.resume(token, result); +} + +async function withChildCompletionHook( + runChild: () => Promise, + completionTokenArg: string +) { + let result: + | { status: "completed"; value: TResult } + | { status: "failed"; error: string } + | undefined; + + try { + const value = await runChild(); + result = { status: "completed", value }; + } catch (error) { + result = { + status: "failed", + error: error instanceof Error ? error.message : String(error), + }; + } finally { + if (result) { + await resumeParentCompletion(completionTokenArg, result); + } + } +} // Child workflow -- processes a single document export async function processDocument(documentId: string) { @@ -44,152 +92,113 @@ export async function processDocument(documentId: string) { return { documentId, summary }; } -async function fetchDocument(documentId: string): Promise { - "use step"; - const res = await fetch(`https://docs.example.com/api/${documentId}`); - return res.text(); -} +// Spawnable wrapper -- explicit export so `start()` can register it +export async function processDocumentWithCompletion( + documentId: string, + completionTokenArg: string +) { + "use workflow"; -async function analyzeContent(content: string): Promise { - "use step"; - // Call analysis API - return `analysis of ${content.length} chars`; + await withChildCompletionHook( + () => processDocument(documentId), + completionTokenArg + ); } -async function generateSummary(analysis: string): Promise { - "use step"; - // Generate summary from analysis - return `Summary: ${analysis}`; +async function startAndWait( + key: string, + startChild: (completionTokenArg: string) => Promise +): Promise { + const { workflowRunId } = getWorkflowMetadata(); + const token = completionToken(workflowRunId, key); + const hook = childCompletionHook.create({ token }); // [!code highlight] + + await startChild(token); + + const completion = await hook; // [!code highlight] + if (completion.status === "failed") { + throw new Error(completion.error); + } + return completion.value as TResult; } // Parent workflow -- orchestrates document processing export async function processDocumentBatch(documentIds: string[]) { "use workflow"; - // Spawn a child workflow for each document - const runIds: string[] = []; - for (const docId of documentIds) { - const run = await start(processDocument, [docId]); // [!code highlight] - runIds.push(run.runId); - } - - // Poll until all children complete - await pollUntilComplete(runIds); - - // Collect results - const results = await collectResults(runIds); + const results = await Promise.all( + documentIds.map((documentId) => + startAndWait<{ documentId: string; summary: string }>(documentId, (token) => + start(processDocumentWithCompletion, [documentId, token]).then(() => undefined) // [!code highlight] + ) + ) + ); return { processed: results.length, results }; } ``` -### Polling loop - -The parent workflow polls child statuses in a loop, sleeping between checks. This is durable -- if the parent replays, the sleep and status checks replay from the event log. - -```typescript -import { sleep } from "workflow"; -import { getRun } from "workflow/api"; - -const POLL_INTERVAL = "30s"; -const MAX_POLL_ITERATIONS = 120; // 60 minutes at 30s intervals - -async function pollUntilComplete(runIds: string[]): Promise { - let iteration = 0; - - while (iteration < MAX_POLL_ITERATIONS) { - const status = await checkStatuses(runIds); // [!code highlight] - - if (status.running === 0) { - if (status.failed > 0) { - throw new Error( - `${status.failed} of ${runIds.length} children failed` - ); - } - return; // All completed successfully - } - - iteration += 1; - await sleep(POLL_INTERVAL); // [!code highlight] - } - - throw new Error("Timed out waiting for children to complete"); -} - -async function checkStatuses( - runIds: string[] -): Promise<{ running: number; completed: number; failed: number }> { - "use step"; // [!code highlight] +### Why hooks instead of polling? - let running = 0; - let completed = 0; - let failed = 0; +Polling with `getRun().status` in a `sleep()` loop works, but hook resume is preferable because: - for (const runId of runIds) { - const run = getRun(runId); // [!code highlight] - const status = await run.status; // [!code highlight] +- **Zero compute while waiting** — the parent suspends on the hook instead of waking every poll interval +- **Immediate wake-up** — the parent resumes as soon as the child finishes, not on the next poll tick +- **Typed payloads** — the child sends `{ status, value | error }` directly; no separate `returnValue` fetch step +- **No worker-pool pressure** — `Run#returnValue` polling inside steps can hold worker slots while waiting for children (see [Eager Processing](/changelog/eager-processing)) - if (status === "completed") completed += 1; - else if (status === "failed" || status === "cancelled") failed += 1; - else running += 1; // pending, running - } - - return { running, completed, failed }; -} - -async function collectResults( - runIds: string[] -): Promise> { - "use step"; - - const results = []; - for (const runId of runIds) { - const run = getRun(runId); - const value = await run.returnValue; - results.push(value as { documentId: string; summary: string }); - } - return results; -} -``` +When a parent calls a child workflow inline with `await` (flattened into the same run), the same wrapper and hook handshake still works — pass the token and `await processDocumentWithCompletion(...)` inside `startAndWait()` instead of calling `start()`. ## Fan-out pattern: chunked spawning -When spawning hundreds of children, batch the `start()` calls to avoid overwhelming the system. Start one chunk at a time from the parent workflow. +When spawning hundreds of children, batch the `start()` calls to avoid overwhelming the system. Each child still gets its own completion hook keyed by a stable identifier (document ID, report ID, index). ```typescript import { start } from "workflow/api"; -declare function pollUntilComplete(runIds: string[]): Promise; // @setup +declare function startAndWait( + key: string, + startChild: (completionTokenArg: string) => Promise +): Promise; // @setup const CHUNK_SIZE = 10; -export async function largeReportBatch(reportConfigs: Array<{ id: string; query: string }>) { +export async function largeReportBatch( + reportConfigs: Array<{ id: string; query: string }> +) { "use workflow"; - // Spawn children in chunks - const allRunIds: string[] = []; + const results = []; for (let i = 0; i < reportConfigs.length; i += CHUNK_SIZE) { const chunk = reportConfigs.slice(i, i + CHUNK_SIZE); - const runIds = await startReportChunk(chunk); // [!code highlight] - allRunIds.push(...runIds); + const chunkResults = await Promise.all( + chunk.map((config) => + startAndWait<{ reportId: string; formatted: string }>(config.id, (token) => + start(generateReportWithCompletion, [ + config.id, + config.query, + token, + ]).then(() => undefined) + ) + ) + ); + results.push(...chunkResults); } - // Poll until all complete - await pollUntilComplete(allRunIds); - - const results = await collectReportResults(allRunIds); return { total: results.length, results }; } -async function startReportChunk( - configs: Array<{ id: string; query: string }> -): Promise { - const runIds: string[] = []; - for (const config of configs) { - const run = await start(generateReport, [config.id, config.query]); // [!code highlight] - runIds.push(run.runId); - } - return runIds; +async function generateReportWithCompletion( + reportId: string, + query: string, + completionTokenArg: string +) { + "use workflow"; + + await withChildCompletionHook( + () => generateReport(reportId, query), + completionTokenArg + ); } async function generateReport(reportId: string, query: string) { @@ -202,160 +211,103 @@ async function generateReport(reportId: string, query: string) { declare function queryDatabase(reportId: string, query: string): Promise; // @setup declare function formatReport(reportId: string, data: string): Promise; // @setup - -declare function collectReportResults( - runIds: string[] -): Promise>; // @setup +declare function withChildCompletionHook( + runChild: () => Promise, + completionTokenArg: string +): Promise; // @setup ``` ## Error handling ### Tolerating partial failures -Not every batch requires 100% success. Use `allowFailures` logic to let the parent continue when some children fail, while still surfacing the failures. +Use `Promise.allSettled` with `startAndWait()` so one failing child doesn't abort siblings. The hook payload already carries `{ status: "failed", error }` — no status polling required. ```typescript -import { sleep } from "workflow"; -import { getRun } from "workflow/api"; - -const POLL_INTERVAL = "30s"; -const MAX_POLL_ITERATIONS = 120; - -async function pollWithPartialFailures( - runIds: string[], - maxFailureRate: number -): Promise<{ completed: string[]; failed: string[] }> { - let iteration = 0; - const completedIds: string[] = []; - const failedIds: string[] = []; - - while (iteration < MAX_POLL_ITERATIONS) { - const status = await checkDetailedStatuses(runIds); - - completedIds.length = 0; - failedIds.length = 0; - - for (const entry of status) { - if (entry.status === "completed") completedIds.push(entry.runId); - else if (entry.status === "failed" || entry.status === "cancelled") - failedIds.push(entry.runId); - } - - const active = runIds.length - completedIds.length - failedIds.length; - - // Check if failure rate exceeds threshold - const failureRate = failedIds.length / Math.max(1, runIds.length); // [!code highlight] - if (failureRate > maxFailureRate) { // [!code highlight] - throw new Error( // [!code highlight] - `Failure rate ${(failureRate * 100).toFixed(1)}% exceeds ` + // [!code highlight] - `threshold of ${(maxFailureRate * 100).toFixed(1)}%` // [!code highlight] - ); // [!code highlight] - } // [!code highlight] +import { start } from "workflow/api"; - if (active === 0) { - return { completed: completedIds, failed: failedIds }; - } +declare function startAndWait( + key: string, + startChild: (completionTokenArg: string) => Promise +): Promise; // @setup +declare function processDocumentWithCompletion( + documentId: string, + completionTokenArg: string +): Promise; // @setup - iteration += 1; - await sleep(POLL_INTERVAL); - } - - throw new Error("Timed out waiting for children"); -} - -async function checkDetailedStatuses( - runIds: string[] -): Promise> { - "use step"; +export async function processDocumentBatchTolerant(documentIds: string[]) { + "use workflow"; - const statuses = []; - for (const runId of runIds) { - const run = getRun(runId); - const status = await run.status; - statuses.push({ runId, status }); - } - return statuses; + const settled = await Promise.allSettled( + documentIds.map((documentId) => + startAndWait<{ documentId: string; summary: string }>(documentId, (token) => + start(processDocumentWithCompletion, [documentId, token]).then( + () => undefined + ) + ) + ) + ); + + const results = settled + .filter( + (entry): entry is PromiseFulfilledResult<{ documentId: string; summary: string }> => + entry.status === "fulfilled" + ) + .map((entry) => entry.value); + + const failed = settled.filter((entry) => entry.status === "rejected").length; + + return { processed: results.length, failed, results }; } ``` ### Retrying failed children -When a child fails, the parent can spawn a replacement and continue polling. Track restart counts to prevent infinite retry loops. +When a child fails, spawn a replacement with a fresh hook token. Track restart counts to prevent infinite retry loops. ```typescript -import { sleep } from "workflow"; - -declare function checkDetailedStatuses(runIds: string[]): Promise>; // @setup - -const POLL_INTERVAL = "30s"; -const MAX_POLL_ITERATIONS = 120; - -async function pollWithRetries( - initialRunIds: string[], - maxRestartsPerChild: number, - spawnReplacement: (index: number) => Promise -): Promise { - const activeRuns = new Map(); - const restartCounts = new Map(); - - initialRunIds.forEach((runId, index) => activeRuns.set(index, runId)); - - let iteration = 0; - - while (iteration < MAX_POLL_ITERATIONS) { - const statuses = await checkDetailedStatuses( - Array.from(activeRuns.values()) - ); - const statusByRunId = new Map( - statuses.map((s) => [s.runId, s.status]) - ); - - for (const [index, runId] of activeRuns.entries()) { - const status = statusByRunId.get(runId) ?? "running"; - - if (status === "completed") { - activeRuns.delete(index); - continue; - } - - if (status === "failed" || status === "cancelled") { - const restarts = (restartCounts.get(index) ?? 0) + 1; // [!code highlight] - restartCounts.set(index, restarts); // [!code highlight] - - if (restarts > maxRestartsPerChild) { // [!code highlight] - throw new Error( // [!code highlight] - `Child ${index} exceeded restart limit (${maxRestartsPerChild})` // [!code highlight] - ); // [!code highlight] - } // [!code highlight] - - const newRunId = await spawnReplacement(index); // [!code highlight] - activeRuns.set(index, newRunId); // [!code highlight] - } +declare function startAndWait( + key: string, + startChild: (completionTokenArg: string) => Promise +): Promise; // @setup +declare function spawnProcessDocument( + documentId: string, + completionTokenArg: string +): Promise; // @setup + +async function startAndWaitWithRetries( + documentId: string, + maxRestarts: number +): Promise<{ documentId: string; summary: string }> { + for (let attempt = 0; attempt <= maxRestarts; attempt++) { + try { + return await startAndWait<{ documentId: string; summary: string }>( + `${documentId}:${attempt}`, + (token) => spawnProcessDocument(documentId, token) + ); + } catch (error) { + if (attempt === maxRestarts) throw error; } - - if (activeRuns.size === 0) return; - - iteration += 1; - await sleep(POLL_INTERVAL); } - throw new Error("Timed out waiting for children"); + throw new Error("unreachable"); } ``` ## Tips -- **`start()` can be called directly from a workflow function in v5.** It records a step-backed boundary in the parent event log and returns a serializable `Run`. -- **`getRun()` must also be called from a step.** The polling loop lives in the workflow, but the actual status check is a step. -- **Set a max iteration count on polling loops** to prevent runaway workflows. Calculate the count from your expected max duration and poll interval. +- **`defineHook().resume()` must be called from a step.** The wrapped child's `finally` block calls a step that resumes the parent hook. +- **Export wrapped children at module scope.** The SDK registers `"use workflow"` functions statically — a runtime higher-order function returned from `withChildCompletionHook()` cannot be passed to `start()`. +- **Use stable hook keys** — document ID, job ID, or index — so parallel children inside one parent run don't collide on tokens. - **Use chunked spawning for large batches.** Starting 500 children at once can create a large burst of work. Break it into chunks of 10-50. -- **Each child has its own retry semantics.** Steps inside child workflows retry independently. The parent only sees the child's final status. +- **Each child has its own retry semantics.** Steps inside child workflows retry independently. The parent sees the final `{ status, value | error }` payload from the hook. - **Use `deploymentId: "latest"`** if children should run on the most recent deployment. See [Versioning](/docs/foundations/versioning) for the full model and the [`start()` API reference](/docs/api-reference/workflow-api/start#using-deploymentid-latest) for compatibility considerations. ## Key APIs - [`start()`](/docs/api-reference/workflow-api/start) -- spawn a new workflow run and get its run ID -- [`getRun()`](/docs/api-reference/workflow-api/get-run) -- retrieve a workflow run's status and return value -- [`sleep()`](/docs/api-reference/workflow/sleep) -- durably pause between polling iterations +- [`defineHook()`](/docs/api-reference/workflow/define-hook) -- typed hook for parent/child completion handshakes +- [`resumeHook()`](/docs/api-reference/workflow-api/resume-hook) -- resume a waiting parent from a step (called by the child wrapper) +- [`getWorkflowMetadata()`](/docs/api-reference/workflow/get-workflow-metadata) -- read the parent run ID for deterministic hook tokens - [`"use workflow"`](/docs/foundations/workflows-and-steps) -- marks the orchestrator function - [`"use step"`](/docs/foundations/workflows-and-steps) -- marks functions with full Node.js access diff --git a/docs/content/docs/v5/cookbook/common-patterns/workflow-composition.mdx b/docs/content/docs/v5/cookbook/common-patterns/workflow-composition.mdx index 1e328d8263..2a14745672 100644 --- a/docs/content/docs/v5/cookbook/common-patterns/workflow-composition.mdx +++ b/docs/content/docs/v5/cookbook/common-patterns/workflow-composition.mdx @@ -9,7 +9,7 @@ related: - /docs/api-reference/workflow-api/get-run --- -Workflows can call other workflows. Choose between two composition modes depending on whether the parent needs the child's result inline (direct await) or wants to fire the child off as an independent run (background spawn). For massive fan-out with polling and partial-failure handling, see [Child Workflows](/cookbook/advanced/child-workflows). +Workflows can call other workflows. Choose between two composition modes depending on whether the parent needs the child's result inline (direct await) or wants to fire the child off as an independent run (background spawn). For massive fan-out with hook-based waiting and partial-failure handling, see [Child Workflows](/cookbook/advanced/child-workflows). ## When to use this @@ -99,9 +99,9 @@ If you want the child workflow to run on the latest deployment rather than the c ## Adapting to your use case -- **Spawn many children at once** — call `start()` in a loop from the workflow. For more advanced fan-out (chunking, polling, partial-failure handling), graduate to the [Child Workflows](/cookbook/advanced/child-workflows) recipe. -- **Wait for a background child to finish** — combine `start()` with `getRun()` polling. The [Child Workflows](/cookbook/advanced/child-workflows) page covers the full polling loop. -- **Pass results back from background children** — `start()` returns the `runId`; later, a poll step uses `getRun(runId).returnValue` to fetch the final result. +- **Spawn many children at once** — call `start()` in a loop from the workflow. For more advanced fan-out (chunking, hook-based waiting, partial-failure handling), graduate to the [Child Workflows](/cookbook/advanced/child-workflows) recipe. +- **Wait for a background child to finish** — combine `start()` with a completion hook the child resumes when done. The [Child Workflows](/cookbook/advanced/child-workflows) page covers the recommended `startAndWait()` pattern. +- **Pass results back from background children** — the wrapped child resumes the parent's hook in `finally` with `{ status, value | error }`; the parent awaits the hook instead of polling `getRun().status`. ## Key APIs diff --git a/docs/lib/cookbook-tree.ts b/docs/lib/cookbook-tree.ts index 25e6748bd5..dc878466cd 100644 --- a/docs/lib/cookbook-tree.ts +++ b/docs/lib/cookbook-tree.ts @@ -179,7 +179,7 @@ export const recipes: Record = { slug: 'child-workflows', title: 'Child Workflows', description: - 'Spawn and orchestrate child workflows from a parent, polling for completion and handling partial failures.', + 'Spawn and orchestrate child workflows from a parent, waiting for completion via hook resume and handling partial failures.', category: 'advanced', }, 'distributed-abort-controller': { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 06e6d40da3..a32d75e3a8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2462,6 +2462,9 @@ importers: workflow: specifier: workspace:* version: link:../../packages/workflow + zod: + specifier: 'catalog:' + version: 4.3.6 packages: @@ -8113,6 +8116,7 @@ packages: '@ungap/structured-clone@1.3.0': resolution: {integrity: sha512-WmoN8qaIAo7WTYWbAZuG8PYEhn5fkz7dZrqTBZ7dtt//lL2Gwms1IcnQ5yHqjDfX8Ft5j4YzDM23f87zBfDe9g==} + deprecated: Potential CWE-502 - Update to 1.3.1 or higher '@unhead/vue@2.1.12': resolution: {integrity: sha512-zEWqg0nZM8acpuTZE40wkeUl8AhIe0tU0OkilVi1D4fmVjACrwoh5HP6aNqJ8kUnKsoy6D+R3Vi/O+fmdNGO7g==} @@ -23199,6 +23203,14 @@ snapshots: optionalDependencies: vite: 7.3.2(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3) + '@vitest/mocker@4.0.18(vite@7.3.2(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3))': + dependencies: + '@vitest/spy': 4.0.18 + estree-walker: 3.0.3 + magic-string: 0.30.21 + optionalDependencies: + vite: 7.3.2(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3) + '@vitest/mocker@4.0.18(vite@7.3.2(@types/node@24.6.2)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3))': dependencies: '@vitest/spy': 4.0.18 @@ -32231,7 +32243,7 @@ snapshots: vitest@4.0.18(@opentelemetry/api@1.9.0)(@types/node@22.19.0)(jiti@2.6.1)(jsdom@26.1.0)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3): dependencies: '@vitest/expect': 4.0.18 - '@vitest/mocker': 4.0.18(vite@7.3.2(@types/node@24.6.2)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3)) + '@vitest/mocker': 4.0.18(vite@7.3.2(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3)) '@vitest/pretty-format': 4.0.18 '@vitest/runner': 4.0.18 '@vitest/snapshot': 4.0.18 @@ -32270,7 +32282,7 @@ snapshots: vitest@4.0.18(@opentelemetry/api@1.9.1)(@types/node@22.19.0)(jiti@2.6.1)(jsdom@26.1.0)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3): dependencies: '@vitest/expect': 4.0.18 - '@vitest/mocker': 4.0.18(vite@7.3.2(@types/node@24.6.2)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3)) + '@vitest/mocker': 4.0.18(vite@7.3.2(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.3)) '@vitest/pretty-format': 4.0.18 '@vitest/runner': 4.0.18 '@vitest/snapshot': 4.0.18 diff --git a/workbench/vitest/package.json b/workbench/vitest/package.json index 5045603b0e..deec39d029 100644 --- a/workbench/vitest/package.json +++ b/workbench/vitest/package.json @@ -12,6 +12,7 @@ "@workflow/vitest": "workspace:*", "ms": "2.1.3", "vitest": "catalog:", - "workflow": "workspace:*" + "workflow": "workspace:*", + "zod": "catalog:" } } diff --git a/workbench/vitest/workflows/cookbook/child-workflows.ts b/workbench/vitest/workflows/cookbook/child-workflows.ts index 9dc8f1e919..b11d6f65b8 100644 --- a/workbench/vitest/workflows/cookbook/child-workflows.ts +++ b/workbench/vitest/workflows/cookbook/child-workflows.ts @@ -1,10 +1,29 @@ -import { getRun, start } from 'workflow/api'; +import { defineHook } from 'workflow'; +import { start } from 'workflow/api'; +import { z } from 'zod'; async function processItem(item: string): Promise { 'use step'; return `processed-${item}`; } +const childCompletionHook = defineHook({ + schema: z.discriminatedUnion('status', [ + z.object({ status: z.literal('completed'), value: z.unknown() }), + z.object({ status: z.literal('failed'), error: z.string() }), + ]), +}); + +async function resumeParentCompletion( + token: string, + result: + | { status: 'completed'; value: unknown } + | { status: 'failed'; error: string } +) { + 'use step'; + await childCompletionHook.resume(token, result); +} + // Child workflow export async function childWorkflow(item: string) { 'use workflow'; @@ -13,29 +32,59 @@ export async function childWorkflow(item: string) { return { item, result }; } -async function spawnChild(item: string): Promise { - 'use step'; +export async function childWorkflowWithCompletion( + item: string, + completionToken: string +) { + 'use workflow'; - const run = await start(childWorkflow, [item]); - return run.runId; + let payload: + | { status: 'completed'; value: { item: string; result: string } } + | { status: 'failed'; error: string } + | undefined; + + try { + const value = await childWorkflow(item); + payload = { status: 'completed', value }; + } catch (error) { + payload = { + status: 'failed', + error: error instanceof Error ? error.message : String(error), + }; + } finally { + if (payload) { + await resumeParentCompletion(completionToken, payload); + } + } } -async function collectResult( - runId: string -): Promise<{ item: string; result: string }> { +async function spawnChildWithCompletion( + item: string, + completionToken: string +): Promise { 'use step'; - const run = getRun(runId); - const value = await run.returnValue; - return value as { item: string; result: string }; + const run = await start(childWorkflowWithCompletion, [item, completionToken]); + return run.runId; } -// Parent workflow — spawns one child and collects its result +// Parent workflow — spawns one child and waits via hook resume export async function parentWorkflow(item: string) { 'use workflow'; - const runId = await spawnChild(item); - const result = await collectResult(runId); + const hook = childCompletionHook.create({ + token: `child-completion:${item}`, + }); + + const childRunId = await spawnChildWithCompletion(item, hook.token); + const completion = await hook; + + if (completion.status === 'failed') { + throw new Error(completion.error); + } - return { childRunId: runId, result }; + return { + childRunId, + result: completion.value as { item: string; result: string }, + }; }