diff --git a/AGENTS.md b/AGENTS.md index 174a961..660f9b0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -24,6 +24,7 @@ - **`z.output<>` over `z.infer<>`** — use `z.output` for types after transforms/defaults are applied (what `schema.parse()` returns at runtime). Use `z.input` only when representing pre-validation types. - **`const` generics on definitions** — any function that accepts Zod schemas and passes them to callbacks must use `const` generic parameters to preserve literal types (e.g. `>`). +- **Streaming commands use `async *run`** — typed client/typegen stream detection is based on the handler being an async generator function. Do not hide streaming behind `run() { return stream() }` when generated client types should be streaming-aware. - **Flow schemas through generics** — when a factory function accepts Zod schemas, use generics to flow `z.output<>` through to callbacks (`run`, `next`), return types, and constraint types (`alias`). Never fall back to `any` in callback signatures. - **Type tests in `.test-d.ts`** — use vitest's `expectTypeOf` in colocated `.test-d.ts` files to assert generic inference works. Type tests are first-class — write them alongside implementation, not as an afterthought. - **Avoid global declaration merging in type tests** — module augmentation in `.test-d.ts` affects the full `tsc -b` project, so prefer explicit generics/local helper types unless the test is specifically about global registration behavior. diff --git a/README.md b/README.md index 444ca12..18d9e9f 100644 --- a/README.md +++ b/README.md @@ -576,7 +576,7 @@ cli.command('deploy', { ### Streaming -Use `async *run` to stream chunks incrementally. Yield objects for structured data or plain strings for text: +Use `async *run` to stream chunks incrementally. Yield objects for structured data or plain strings for text. Generated client types recognize streaming commands from this `async *run` shape: ```ts cli.command('logs', { diff --git a/SKILL.md b/SKILL.md index cafda0c..3fe7615 100644 --- a/SKILL.md +++ b/SKILL.md @@ -931,7 +931,7 @@ Bun.serve(cli) ## Streaming -Use `async *run` to stream chunks incrementally. Yield objects for structured data or plain strings for text: +Use `async *run` to stream chunks incrementally. Yield objects for structured data or plain strings for text. Generated client types recognize streaming commands from this `async *run` shape: ```ts cli.command('logs', { diff --git a/src/Cli.test.ts b/src/Cli.test.ts index d89a8c1..3dbf1fa 100644 --- a/src/Cli.test.ts +++ b/src/Cli.test.ts @@ -4372,6 +4372,147 @@ describe('fetch', () => { ]) }) + test('POST /_incur/rpc streams async generator output as NDJSON', async () => { + const cli = Cli.create('test') + cli.command('stream', { + args: z.object({ start: z.number() }), + options: z.object({ step: z.number() }), + async *run(c) { + yield { progress: c.args.start } + yield { progress: c.args.start + c.options.step } + return { done: c.args.start + c.options.step * 2 } + }, + }) + const res = await cli.fetch( + new Request('http://localhost/_incur/rpc', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + command: 'stream', + args: { start: 2 }, + options: { step: 3 }, + }), + }), + ) + + expect(res.status).toBe(200) + expect(res.headers.get('content-type')).toBe('application/x-ndjson') + const lines = (await res.text()) + .trim() + .split('\n') + .map((l) => JSON.parse(l)) + expect(lines).toMatchInlineSnapshot(` + [ + { + "data": { + "progress": 2, + }, + "type": "chunk", + }, + { + "data": { + "progress": 5, + }, + "type": "chunk", + }, + { + "meta": { + "command": "stream", + }, + "ok": true, + "type": "done", + }, + ] + `) + }) + + test('POST /_incur/rpc stream preserves returned ok CTA', async () => { + const cli = Cli.create('test') + cli.command('stream', { + async *run(c) { + yield { progress: 1 } + return c.ok(undefined, { cta: { commands: ['status'] } }) + }, + }) + const res = await cli.fetch( + new Request('http://localhost/_incur/rpc', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ command: 'stream', args: {}, options: {} }), + }), + ) + + const lines = (await res.text()) + .trim() + .split('\n') + .map((l) => JSON.parse(l)) + expect(lines).toMatchInlineSnapshot(` + [ + { + "data": { + "progress": 1, + }, + "type": "chunk", + }, + { + "meta": { + "command": "stream", + "cta": { + "commands": [ + { + "command": "test status", + }, + ], + "description": "Suggested command:", + }, + }, + "ok": true, + "type": "done", + }, + ] + `) + }) + + test('POST /_incur/rpc stream preserves returned errors', async () => { + const cli = Cli.create('test') + cli.command('stream', { + async *run(c) { + yield { progress: 1 } + return c.error({ code: 'STREAM_FAIL', message: 'broke' }) + }, + }) + const res = await cli.fetch( + new Request('http://localhost/_incur/rpc', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ command: 'stream', args: {}, options: {} }), + }), + ) + + const lines = (await res.text()) + .trim() + .split('\n') + .map((l) => JSON.parse(l)) + expect(lines).toMatchInlineSnapshot(` + [ + { + "data": { + "progress": 1, + }, + "type": "chunk", + }, + { + "error": { + "code": "STREAM_FAIL", + "message": "broke", + }, + "ok": false, + "type": "error", + }, + ] + `) + }) + test('trailing path segments → positional args', async () => { const cli = Cli.create('test') cli.command('users', { @@ -4496,6 +4637,79 @@ describe('fetch', () => { `) }) + test('async generator RPC stream preserves returned ok CTA', async () => { + const cli = Cli.create('test') + cli.command('stream', { + async *run(c) { + yield { progress: 1 } + return c.ok(undefined, { cta: { commands: ['status'] } }) + }, + }) + const res = await cli.fetch(new Request('http://localhost/stream')) + const lines = (await res.text()) + .trim() + .split('\n') + .map((l) => JSON.parse(l)) + expect(lines).toMatchInlineSnapshot(` + [ + { + "data": { + "progress": 1, + }, + "type": "chunk", + }, + { + "meta": { + "command": "stream", + "cta": { + "commands": [ + { + "command": "test status", + }, + ], + "description": "Suggested command:", + }, + }, + "ok": true, + "type": "done", + }, + ] + `) + }) + + test('async generator RPC stream preserves returned errors', async () => { + const cli = Cli.create('test') + cli.command('stream', { + async *run(c) { + yield { progress: 1 } + return c.error({ code: 'STREAM_FAIL', message: 'broke' }) + }, + }) + const res = await cli.fetch(new Request('http://localhost/stream')) + const lines = (await res.text()) + .trim() + .split('\n') + .map((l) => JSON.parse(l)) + expect(lines).toMatchInlineSnapshot(` + [ + { + "data": { + "progress": 1, + }, + "type": "chunk", + }, + { + "error": { + "code": "STREAM_FAIL", + "message": "broke", + }, + "ok": false, + "type": "error", + }, + ] + `) + }) + test('middleware sets var → command sees it', async () => { const cli = Cli.create('test', { vars: z.object({ user: z.string().default('anonymous') }), diff --git a/src/Cli.ts b/src/Cli.ts index a53e0dd..b658aea 100644 --- a/src/Cli.ts +++ b/src/Cli.ts @@ -220,7 +220,8 @@ export function create( ): Cli | Root { const name = typeof nameOrDefinition === 'string' ? nameOrDefinition : nameOrDefinition.name const def = typeof nameOrDefinition === 'string' ? (definition ?? {}) : nameOrDefinition - const rootDef = 'run' in def ? (def as CommandDefinition) : undefined + const rootDef = + 'run' in def ? annotateStreamingCommand(def as CommandDefinition) : undefined const rootFetch = 'fetch' in def ? (def.fetch as FetchHandler) : undefined const commands = new Map() @@ -258,7 +259,7 @@ export function create( } as InternalFetchGateway) return cli } - commands.set(nameOrCli, def) + commands.set(nameOrCli, annotateStreamingCommand(def)) if (def.aliases) for (const a of def.aliases) commands.set(a, { _alias: true, target: nameOrCli }) return cli @@ -1904,34 +1905,68 @@ async function executeCommand( const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder() + const write = (value: unknown) => { + controller.enqueue(encoder.encode(JSON.stringify(value) + '\n')) + } try { - for await (const value of result.stream) { - controller.enqueue( - encoder.encode(JSON.stringify({ type: 'chunk', data: value }) + '\n'), - ) - } - controller.enqueue( - encoder.encode( - JSON.stringify({ - type: 'done', - ok: true, - meta: { command: path }, - }) + '\n', - ), - ) - } catch (error) { - controller.enqueue( - encoder.encode( - JSON.stringify({ + let returnValue: unknown + while (true) { + const { value, done } = await result.stream.next() + if (done) { + returnValue = value + break + } + if (isSentinel(value) && value[sentinel] === 'error') { + const err = value as ErrorResult + write({ type: 'error', ok: false, error: { - code: 'UNKNOWN', - message: error instanceof Error ? error.message : String(error), + code: err.code, + message: err.message, + ...(err.retryable !== undefined ? { retryable: err.retryable } : undefined), }, - }) + '\n', - ), - ) + }) + controller.close() + return + } + write({ type: 'chunk', data: value }) + } + if (isSentinel(returnValue) && returnValue[sentinel] === 'error') { + const err = returnValue as ErrorResult + write({ + type: 'error', + ok: false, + error: { + code: err.code, + message: err.message, + ...(err.retryable !== undefined ? { retryable: err.retryable } : undefined), + }, + }) + controller.close() + return + } + const cta = + isSentinel(returnValue) && returnValue[sentinel] === 'ok' + ? formatCtaBlock(options.name ?? path, (returnValue as OkResult).cta) + : undefined + write({ + type: 'done', + ok: true, + meta: { command: path, ...(cta ? { cta } : undefined) }, + }) + } catch (error) { + write({ + type: 'error', + ok: false, + error: { + code: error instanceof IncurError ? error.code : 'UNKNOWN', + message: error instanceof Error ? error.message : String(error), + ...(error instanceof IncurError && error.retryable !== undefined + ? { retryable: error.retryable } + : undefined), + }, + }) } controller.close() }, @@ -2498,6 +2533,8 @@ export type CommandsMap = Record< options: Record /** Command output shape. */ output?: unknown | undefined + /** Whether the command streams output chunks. */ + stream?: true | undefined } > @@ -2554,6 +2591,19 @@ function isAlias(entry: CommandEntry): entry is InternalAlias { return '_alias' in entry } +const AsyncGeneratorFunction = async function* () {}.constructor + +function isAsyncGeneratorFunction(value: unknown): boolean { + return typeof value === 'function' && value.constructor === AsyncGeneratorFunction +} + +function annotateStreamingCommand>( + command: command, +): command { + if (isAsyncGeneratorFunction(command.run)) return { ...command, _stream: true } as command + return command +} + /** @internal Follows an alias entry to its canonical target. Returns the entry unchanged if not an alias. */ function resolveAlias( commands: Map, @@ -3160,6 +3210,8 @@ type CommandDefinition< vars extends z.ZodObject | undefined = undefined, cliEnv extends z.ZodObject | undefined = undefined, > = CommandMeta & { + /** @internal Whether this command's handler is an async generator function. */ + _stream?: true | undefined /** Alternative names for this command (e.g. `['extensions', 'ext']` for an `extension` command). */ aliases?: string[] | undefined /** Zod schema for positional arguments. */ @@ -3188,42 +3240,51 @@ type CommandDefinition< /** Alternative usage patterns shown in help output. */ usage?: Usage[] | undefined /** The command handler. Return a value for single-return, or use `async *run` to stream chunks. */ - run(context: { - /** Whether the consumer is an agent (stdout is not a TTY). */ - agent: boolean - /** Positional arguments. */ - args: InferOutput - /** The binary name the user invoked (e.g. an alias). Falls back to `name` when not resolvable. */ - displayName: string - /** Parsed environment variables. */ - env: InferOutput - /** Return an error result with optional CTAs. */ - error: (options: { - code: string - cta?: CtaBlock | undefined - exitCode?: number | undefined - message: string - retryable?: boolean | undefined - }) => never - /** The resolved output format (e.g. `'toon'`, `'json'`, `'jsonl'`). */ - format: Formatter.Format - /** Whether the user explicitly passed `--format` or `--json`. */ - formatExplicit: boolean - /** The CLI name. */ - name: string - /** Return a success result with optional metadata (e.g. CTAs). */ - ok: (data: InferReturn, meta?: { cta?: CtaBlock | undefined }) => never - options: InferOutput - /** Variables set by middleware. */ - var: InferVars - /** The CLI version string. */ - version: string | undefined - }): - | InferReturn - | Promise> - | AsyncGenerator, unknown, unknown> + run: CommandRun } +type CommandRun< + args extends z.ZodObject | undefined = undefined, + env extends z.ZodObject | undefined = undefined, + options extends z.ZodObject | undefined = undefined, + output extends z.ZodType | undefined = undefined, + vars extends z.ZodObject | undefined = undefined, + cliEnv extends z.ZodObject | undefined = undefined, +> = (context: { + /** Whether the consumer is an agent (stdout is not a TTY). */ + agent: boolean + /** Positional arguments. */ + args: InferOutput + /** The binary name the user invoked (e.g. an alias). Falls back to `name` when not resolvable. */ + displayName: string + /** Parsed environment variables. */ + env: InferOutput + /** Return an error result with optional CTAs. */ + error: (options: { + code: string + cta?: CtaBlock | undefined + exitCode?: number | undefined + message: string + retryable?: boolean | undefined + }) => never + /** The resolved output format (e.g. `'toon'`, `'jsonl'`). */ + format: Formatter.Format + /** Whether the user explicitly passed `--format` or `--json`. */ + formatExplicit: boolean + /** The CLI name. */ + name: string + /** Return a success result with optional metadata (e.g. CTAs). */ + ok: (data: InferReturn, meta?: { cta?: CtaBlock | undefined }) => never + options: InferOutput + /** Variables set by middleware. */ + var: InferVars + /** The CLI version string. */ + version: string | undefined +}) => + | InferReturn + | Promise> + | AsyncGenerator, unknown, unknown> + /** @internal A formatted CTA block as it appears in the output envelope. */ type FormattedCtaBlock = { /** Formatted command suggestions. */ diff --git a/src/Client.test-d.ts b/src/Client.test-d.ts index d36f9f9..3af033f 100644 --- a/src/Client.test-d.ts +++ b/src/Client.test-d.ts @@ -19,6 +19,13 @@ export type Commands = { } /** Generated command "auth". */ auth: { args: {}; options: { token: string }; output: void } + /** Generated command "logs". */ + logs: { + args: {} + options: {} + output: { line: string } + stream: true + } /** Generated command "project deploy". */ 'project deploy': { args: { id: string } @@ -210,6 +217,19 @@ test('createClient can be made permissive with an explicit unknown command map', call({ args: { any: 'value' }, options: ['also accepted'] }) }) +test('createClient returns async iterables for streaming commands', () => { + const client = createClient({ baseUrl: 'https://api.example.com' }) + const logs = client('logs') + + expectTypeOf>>().toEqualTypeOf>() + + async function read() { + const stream = await logs() + for await (const chunk of stream) expectTypeOf(chunk).toEqualTypeOf<{ line: string }>() + } + void read +}) + test('ClientError can be imported and RPC payloads can be narrowed', () => { const error = new ClientError('Invalid input', { error: { diff --git a/src/Client.test.ts b/src/Client.test.ts index 298bd7f..a4e5213 100644 --- a/src/Client.test.ts +++ b/src/Client.test.ts @@ -1,4 +1,11 @@ -import { ClientError, createClient, isClientRpcError, isClientRpcErrorEnvelope } from 'incur' +import { + Cli, + ClientError, + createClient, + isClientRpcError, + isClientRpcErrorEnvelope, + z, +} from 'incur' type RuntimeCommands = Record @@ -28,7 +35,7 @@ describe('createClient', () => { expect(calls[0]?.url).toBe('https://api.example.com/_incur/rpc') expect(calls[0]?.init?.method).toBe('POST') expect(calls[0]?.init?.headers).toEqual({ - accept: 'application/json', + accept: 'application/json, application/x-ndjson', 'content-type': 'application/json', }) expect(JSON.parse(String(calls[0]?.init?.body))).toEqual({ @@ -159,6 +166,278 @@ describe('createClient', () => { } }) + test('returns async iterable for streaming RPC responses', async () => { + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response( + [ + JSON.stringify({ type: 'chunk', data: { line: 'one' } }), + JSON.stringify({ type: 'chunk', data: { line: 'two' } }), + JSON.stringify({ type: 'done', ok: true, meta: { command: 'logs' } }), + ].join('\n') + '\n', + { headers: { 'content-type': 'application/x-ndjson' } }, + ), + }) + + const stream = await client('logs')() + const chunks = [] + for await (const chunk of stream) chunks.push(chunk) + + expect(chunks).toEqual([{ line: 'one' }, { line: 'two' }]) + }) + + test('calls a real CLI RPC server and unwraps non-streaming responses', async () => { + const cli = Cli.create('test').command('sum', { + args: z.object({ left: z.number() }), + options: z.object({ right: z.number() }), + run: (c) => ({ value: c.args.left + c.options.right }), + }) + const client = createClient<{ + sum: { args: { left: number }; options: { right: number }; output: { value: number } } + }>({ + baseUrl: 'http://localhost', + fetch: (input, init) => cli.fetch(new Request(input, init)), + }) + + await expect( + client('sum')({ + args: { left: 1 }, + options: { right: 2 }, + }), + ).resolves.toEqual({ value: 3 }) + }) + + test('calls a real CLI RPC server and iterates streaming responses', async () => { + const cli = Cli.create('test').command('logs', { + args: z.object({ prefix: z.string() }), + options: z.object({ count: z.number() }), + output: z.object({ line: z.string() }), + async *run(c) { + yield { line: `${c.args.prefix}-1` } + yield { line: `${c.args.prefix}-${c.options.count}` } + }, + }) + const client = createClient<{ + logs: { + args: { prefix: string } + options: { count: number } + output: { line: string } + stream: true + } + }>({ + baseUrl: 'http://localhost', + fetch: (input, init) => cli.fetch(new Request(input, init)), + }) + + const stream = await client('logs')({ + args: { prefix: 'line' }, + options: { count: 2 }, + }) + const chunks = [] + for await (const chunk of stream) chunks.push(chunk) + + expect(chunks).toEqual([{ line: 'line-1' }, { line: 'line-2' }]) + }) + + test('throws failed streaming RPC records', async () => { + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response( + JSON.stringify({ + type: 'error', + ok: false, + error: { code: 'NOPE', message: 'Nope' }, + }) + '\n', + { headers: { 'content-type': 'application/x-ndjson' }, status: 500 }, + ), + }) + + const stream = await client('logs')() + await expect(async () => { + for await (const chunk of stream) void chunk + }).rejects.toMatchObject({ + name: 'Incur.ClientError', + message: 'Nope', + error: { code: 'NOPE', message: 'Nope' }, + status: 500, + }) + }) + + test('throws invalid JSON streaming RPC records', async () => { + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response('{bad json}\n', { + headers: { 'content-type': 'application/x-ndjson' }, + status: 502, + }), + }) + + const stream = await client('logs')() + await expect(async () => { + for await (const chunk of stream) void chunk + }).rejects.toMatchObject({ + name: 'Incur.ClientError', + message: 'Expected a JSON RPC stream record', + data: '{bad json}', + status: 502, + }) + }) + + test('parses streaming RPC records split across chunks', async () => { + const encoder = new TextEncoder() + const body = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('{"type":"chunk","data":{"line":"')) + controller.enqueue(encoder.encode('one"}}\n{"type":"chunk","data":{"line":"two"}}\n')) + controller.enqueue(encoder.encode('{"type":"done","ok":true}\n')) + controller.close() + }, + }) + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response(body, { + headers: { 'content-type': 'application/x-ndjson' }, + }), + }) + + const stream = await client('logs')() + const chunks = [] + for await (const chunk of stream) chunks.push(chunk) + + expect(chunks).toEqual([{ line: 'one' }, { line: 'two' }]) + }) + + test('ignores blank lines in streaming RPC responses', async () => { + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response( + [ + '', + ' ', + JSON.stringify({ type: 'chunk', data: { line: 'one' } }), + '', + JSON.stringify({ type: 'done', ok: true }), + ].join('\n') + '\n', + { headers: { 'content-type': 'application/x-ndjson' } }, + ), + }) + + const stream = await client('logs')() + const chunks = [] + for await (const chunk of stream) chunks.push(chunk) + + expect(chunks).toEqual([{ line: 'one' }]) + }) + + test('throws when streaming RPC responses have no body', async () => { + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response(null, { + headers: { 'content-type': 'application/x-ndjson' }, + status: 204, + }), + }) + + const stream = await client('logs')() + await expect(async () => { + for await (const chunk of stream) void chunk + }).rejects.toMatchObject({ + name: 'Incur.ClientError', + message: 'Expected an RPC stream body', + status: 204, + }) + }) + + test('throws malformed streaming RPC records', async () => { + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response(JSON.stringify({ type: 'done', ok: false }) + '\n', { + headers: { 'content-type': 'application/x-ndjson' }, + }), + }) + + const stream = await client('logs')() + await expect(async () => { + for await (const chunk of stream) void chunk + }).rejects.toMatchObject({ + name: 'Incur.ClientError', + message: 'Malformed RPC stream record', + data: { type: 'done', ok: false }, + }) + }) + + test('throws when streaming RPC responses end before done', async () => { + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response(JSON.stringify({ type: 'chunk', data: { line: 'one' } }) + '\n', { + headers: { 'content-type': 'application/x-ndjson' }, + }), + }) + + const stream = await client('logs')() + await expect(async () => { + for await (const chunk of stream) void chunk + }).rejects.toMatchObject({ + name: 'Incur.ClientError', + message: 'RPC stream ended before done', + }) + }) + + test('cancels streaming RPC responses when consumers stop early', async () => { + let cancelled = false + const body = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode(JSON.stringify({ type: 'chunk', data: { line: 'one' } }) + '\n'), + ) + }, + cancel() { + cancelled = true + }, + }) + const client = createClient<{ + logs: { args: {}; options: {}; output: { line: string }; stream: true } + }>({ + baseUrl: 'https://api.example.com', + fetch: async () => + new Response(body, { + headers: { 'content-type': 'application/x-ndjson' }, + }), + }) + + const stream = await client('logs')() + for await (const chunk of stream) { + void chunk + break + } + + expect(cancelled).toBe(true) + }) + test('uses a fallback message for failed RPC envelopes without error messages', async () => { const client = createClient({ baseUrl: 'https://api.example.com', diff --git a/src/Client.ts b/src/Client.ts index 6390535..6ccff33 100644 --- a/src/Client.ts +++ b/src/Client.ts @@ -29,11 +29,14 @@ type Field = value extends object : { [field in key]?: value | undefined } type Input = Field<'args', Args> & Field<'options', Options> +type Result = command extends { stream: true } + ? AsyncIterable> + : Output type Caller = RequiredKeys> extends never - ? (input?: Input) => Promise> - : (input: Input) => Promise> + ? (input?: Input) => Promise> + : (input: Input) => Promise> type RuntimeInput = { args?: unknown | undefined @@ -77,7 +80,7 @@ export function createClient(options: ClientOptions): options: input.options ?? {}, }), headers: { - accept: 'application/json', + accept: 'application/json, application/x-ndjson', 'content-type': 'application/json', }, method: 'POST', @@ -86,13 +89,12 @@ export function createClient(options: ClientOptions): throw new ClientError('RPC request failed', { cause: error }) } + if (isStreamingResponse(response)) return parseStreamingResponse(response) + const envelope = await parseResponse(response) if (envelope.ok) return envelope.data - const message = - isRecord(envelope.error) && typeof envelope.error.message === 'string' - ? envelope.error.message - : 'RPC command failed' + const message = errorMessage(envelope.error, 'RPC command failed') throw new ClientError(message, { data: envelope, error: envelope.error, @@ -107,6 +109,10 @@ function endpoint(base: string | URL): URL { return new URL('_incur/rpc', url) } +function isStreamingResponse(response: Response): boolean { + return response.headers.get('content-type')?.includes('application/x-ndjson') ?? false +} + async function parseResponse(response: Response): Promise { const text = await response.text() let value: unknown @@ -127,3 +133,99 @@ async function parseResponse(response: Response): Promise { }) return value as Envelope } + +async function* parseStreamingResponse(response: Response): AsyncGenerator { + if (!response.body) + throw new ClientError('Expected an RPC stream body', { + status: response.status, + }) + + const reader = response.body.getReader() + const decoder = new TextDecoder() + let buffer = '' + let completed = false + let eof = false + + try { + while (true) { + const { value, done } = await reader.read() + if (done) { + eof = true + break + } + buffer += decoder.decode(value, { stream: true }) + + let newline: number + while ((newline = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, newline).trim() + buffer = buffer.slice(newline + 1) + if (line) { + const result = readStreamRecord(line, response.status) + if (result.done) { + completed = true + return + } + yield result.data + } + } + } + + const remaining = buffer.trim() + if (remaining) { + const result = readStreamRecord(remaining, response.status) + if (result.done) { + completed = true + return + } + yield result.data + } + } finally { + if (!completed && !eof) await reader.cancel() + reader.releaseLock() + } + + throw new ClientError('RPC stream ended before done', { + status: response.status, + }) +} + +function readStreamRecord( + line: string, + status: number, +): { data: unknown; done?: false | undefined } | { done: true } { + let value: unknown + try { + value = JSON.parse(line) + } catch (error) { + throw new ClientError('Expected a JSON RPC stream record', { + cause: error, + data: line, + status, + }) + } + + if (!isRecord(value) || typeof value.type !== 'string') + throw new ClientError('Malformed RPC stream record', { + data: value, + status, + }) + + if (value.type === 'chunk') return { data: value.data } + if (value.type === 'done' && value.ok === true) return { done: true } + if (value.type === 'error' && value.ok === false) { + throw new ClientError(errorMessage(value.error, 'RPC stream failed'), { + data: value, + error: value.error, + status, + }) + } + + throw new ClientError('Malformed RPC stream record', { + data: value, + status, + }) +} + +function errorMessage(error: unknown, fallback: string): string { + return isRecord(error) && typeof error.message === 'string' ? error.message : fallback +} diff --git a/src/Typegen.test.ts b/src/Typegen.test.ts index 0d53a62..86dadf3 100644 --- a/src/Typegen.test.ts +++ b/src/Typegen.test.ts @@ -355,6 +355,20 @@ describe('fromCli', () => { ) }) + test('streaming command', () => { + const cli = Cli.create('test').command('logs', { + output: z.object({ line: z.string() }), + async *run() { + yield { line: 'one' } + }, + }) + + const output = Typegen.fromCli(cli) + expect(output).toContain( + '"logs": { args: {}; options: {}; output: { line: string }; stream: true }', + ) + }) + test('skips commands that cannot be called by RPC client', () => { const cli = Cli.create('test') .command('deploy', { @@ -559,6 +573,12 @@ function createClientRoundTripCli() { output: z.void(), run: () => undefined, }) + .command('logs', { + output: z.object({ line: z.string() }), + async *run() { + yield { line: 'one' } + }, + }) .command('api', { fetch: () => new Response(), openapi: spec, diff --git a/src/Typegen.ts b/src/Typegen.ts index 3ac6887..0113e2a 100644 --- a/src/Typegen.ts +++ b/src/Typegen.ts @@ -22,11 +22,12 @@ export function fromCli(cli: Cli.Cli): string { 'export type Commands = {', ] - for (const { name, args, options, output } of entries) { + for (const { name, args, options, output, stream } of entries) { const outputType = output ? `; output: ${schemaToType(output, 'unknown')}` : '' + const streamType = stream ? '; stream: true' : '' lines.push( ` /** Generated command ${commentText(JSON.stringify(name))}. */`, - ` ${JSON.stringify(name)}: { args: ${schemaToType(args)}; options: ${schemaToType(options)}${outputType} }`, + ` ${JSON.stringify(name)}: { args: ${schemaToType(args)}; options: ${schemaToType(options)}${outputType}${streamType} }`, ) } @@ -56,6 +57,7 @@ function collectEntries(commands: Map, prefix: string[]): Entry[] { args: entry.args, options: entry.options, output: entry.output, + stream: entry._stream, }) } return result.sort((a, b) => a.name.localeCompare(b.name)) @@ -66,6 +68,7 @@ type Entry = { name: string options?: z.ZodObject | undefined output?: z.ZodType | undefined + stream?: true | undefined } /** Converts a Zod schema to a TypeScript type string. Returns `fallback` for undefined schemas. */ diff --git a/src/e2e.test.ts b/src/e2e.test.ts index e4be415..ee65415 100644 --- a/src/e2e.test.ts +++ b/src/e2e.test.ts @@ -1637,15 +1637,15 @@ describe('typegen', () => { /** Generated command "slow". */ "slow": { args: {}; options: {} } /** Generated command "stream". */ - "stream": { args: {}; options: {} } + "stream": { args: {}; options: {}; stream: true } /** Generated command "stream-error". */ - "stream-error": { args: {}; options: {} } + "stream-error": { args: {}; options: {}; stream: true } /** Generated command "stream-ok". */ - "stream-ok": { args: {}; options: {} } + "stream-ok": { args: {}; options: {}; stream: true } /** Generated command "stream-text". */ - "stream-text": { args: {}; options: {} } + "stream-text": { args: {}; options: {}; stream: true } /** Generated command "stream-throw". */ - "stream-throw": { args: {}; options: {} } + "stream-throw": { args: {}; options: {}; stream: true } /** Generated command "validate-fail". */ "validate-fail": { args: { email: string; age: number }; options: {} } }