From 041e81c59b8e22fd241ec78177a4bd6ef9e943f3 Mon Sep 17 00:00:00 2001 From: Suleiman Shahbari Date: Sat, 27 Jun 2026 14:08:22 +0300 Subject: [PATCH] feat(ai-sdk): neutral queue/broadcast adapter seam (configureAiQueue) - drop @rudderjs/queue + @rudderjs/broadcast --- .changeset/ai-sdk-queue-broadcast-seam.md | 5 ++ packages/ai-sdk/src/index.ts | 6 +- packages/ai-sdk/src/queue-adapter.ts | 25 ++++++++ packages/ai-sdk/src/queue-job.test.ts | 46 ++++++++++++++- packages/ai-sdk/src/queue-job.ts | 69 ++++++++++++++--------- 5 files changed, 120 insertions(+), 31 deletions(-) create mode 100644 .changeset/ai-sdk-queue-broadcast-seam.md create mode 100644 packages/ai-sdk/src/queue-adapter.ts diff --git a/.changeset/ai-sdk-queue-broadcast-seam.md b/.changeset/ai-sdk-queue-broadcast-seam.md new file mode 100644 index 0000000..430bae1 --- /dev/null +++ b/.changeset/ai-sdk-queue-broadcast-seam.md @@ -0,0 +1,5 @@ +--- +"@gemstack/ai-sdk": minor +--- + +Make `agent.queue()` / `.broadcast()` framework-agnostic. The engine no longer dynamically imports `@rudderjs/queue` or `@rudderjs/broadcast`; instead register a neutral adapter once at startup with the new `configureAiQueue({ dispatch, broadcast })`. New public exports: `configureAiQueue`, and the `QueueDispatch` / `QueueBroadcast` types. Rudder users get this wired automatically by `@rudderjs/ai`'s provider (no app change). This removes the last `@rudderjs/*` reference from the engine source. diff --git a/packages/ai-sdk/src/index.ts b/packages/ai-sdk/src/index.ts index 67314cf..e5abe22 100644 --- a/packages/ai-sdk/src/index.ts +++ b/packages/ai-sdk/src/index.ts @@ -135,7 +135,8 @@ export type { AsToolStreamingOption, ChunkProjector, } from './agent.js' -export { QueuedPromptBuilder } from './queue-job.js' +export { QueuedPromptBuilder, configureAiQueue } from './queue-job.js' +export type { BroadcastOptions } from './queue-job.js' // Middleware export { runOnConfig, runOnChunk, runOnBeforeToolCall, runOnAfterToolCall, runSequential, runOnUsage, runOnAbort, runOnError } from './middleware.js' @@ -173,6 +174,9 @@ export type { CacheAdapter } from './cache-adapter.js' // Neutral storage contract for ImageGenerator/AudioGenerator .store() (bring your own) export type { StorageAdapter } from './storage-adapter.js' +// Neutral queue/broadcast contract behind agent.queue() / .broadcast() (bring your own) +export type { QueueDispatch, QueueBroadcast } from './queue-adapter.js' + // Sub-agent run store (asTool streaming + suspend) export { InMemorySubAgentRunStore, diff --git a/packages/ai-sdk/src/queue-adapter.ts b/packages/ai-sdk/src/queue-adapter.ts new file mode 100644 index 0000000..94061bc --- /dev/null +++ b/packages/ai-sdk/src/queue-adapter.ts @@ -0,0 +1,25 @@ +/** + * Neutral job-dispatch contract behind `agent.queue('...').send()`. + * + * `@gemstack/ai-sdk` does not bundle or depend on any queue implementation. + * Register one once at startup via {@link configureAiQueue}; Rudder apps get + * this wired automatically by Rudder's `AiProvider`. + * + * `dispatch` enqueues `fn` to run later on a worker. + */ +export type QueueDispatch = ( + fn: () => void | Promise, + options?: { queue?: string; delay?: number }, +) => Promise + +/** + * Neutral broadcast contract behind `.broadcast(channel)`. Optional — only + * needed when a queued job streams progress to a channel. Pushes one `event` + * (`chunk` | `done` | `error`, optionally prefixed) with its `data` payload to + * a named `channel`. + */ +export type QueueBroadcast = ( + channel: string, + event: string, + data: unknown, +) => void | Promise diff --git a/packages/ai-sdk/src/queue-job.test.ts b/packages/ai-sdk/src/queue-job.test.ts index 4ae37df..f10eef8 100644 --- a/packages/ai-sdk/src/queue-job.test.ts +++ b/packages/ai-sdk/src/queue-job.test.ts @@ -1,6 +1,6 @@ import { describe, it, beforeEach, afterEach } from 'node:test' import assert from 'node:assert/strict' -import { QueuedPromptBuilder, _setQueueJobLoadersForTests } from './queue-job.js' +import { QueuedPromptBuilder, _setQueueJobLoadersForTests, configureAiQueue } from './queue-job.js' import type { AgentResponse, AgentStreamResponse, StreamChunk } from './types.js' // ─── Test seam wiring ───────────────────────────────────── @@ -195,7 +195,7 @@ describe('QueuedPromptBuilder — broadcast() path', () => { const { agent } = makeStreamingAgent([{ type: 'finish', finishReason: 'stop' }]) await assert.rejects( () => new QueuedPromptBuilder(agent, 'x').broadcast('chan').send(), - /@rudderjs\/broadcast/, + /needs a broadcast adapter/, ) }) @@ -218,3 +218,45 @@ describe('QueuedPromptBuilder — broadcast() path', () => { assert.strictEqual(received, fakeResponse) }) }) + +// ─── Public configureAiQueue() registration ─────────────── + +describe('configureAiQueue', () => { + it('registers a dispatch + broadcast adapter that backs send()', async () => { + // Drop the per-test fakes so configureAiQueue captures the real defaults. + restoreLoaders() + const localDispatched: DispatchedJob[] = [] + const localBroadcasts: BroadcastCall[] = [] + const restore = configureAiQueue({ + dispatch: async (fn, options) => { + localDispatched.push({ fn, queue: options?.queue, delay: options?.delay }) + await fn() + }, + broadcast: (channel, event, data) => { + localBroadcasts.push({ channel, event, data }) + }, + }) + try { + const { agent, calls } = makePromptOnlyAgent() + await new QueuedPromptBuilder(agent, 'hi').onQueue('ai').send() + assert.strictEqual(localDispatched.length, 1) + assert.strictEqual(localDispatched[0]?.queue, 'ai') + assert.deepStrictEqual(calls, [{ input: 'hi' }], 'the agent prompt ran inside the dispatched job') + assert.strictEqual(localBroadcasts.length, 0) + } finally { + restore() + } + }) + + it('restore() reverts to the default adapter, which rejects with a clear error', async () => { + // Revert to the engine default, then register + immediately restore. + restoreLoaders() + const restore = configureAiQueue({ dispatch: async (fn) => { await fn() } }) + restore() + const { agent } = makePromptOnlyAgent() + await assert.rejects( + () => new QueuedPromptBuilder(agent, 'x').send(), + /needs a queue adapter/, + ) + }) +}) diff --git a/packages/ai-sdk/src/queue-job.ts b/packages/ai-sdk/src/queue-job.ts index 5fb8697..ecfbe43 100644 --- a/packages/ai-sdk/src/queue-job.ts +++ b/packages/ai-sdk/src/queue-job.ts @@ -1,4 +1,5 @@ import type { AgentPromptOptions, AgentResponse, AgentStreamResponse, StreamChunk } from './types.js' +import type { QueueDispatch, QueueBroadcast } from './queue-adapter.js' /** * Optional shape on the agent reference — when set, the queued job uses @@ -85,17 +86,18 @@ export class QueuedPromptBuilder { * Stream the agent's progress to a broadcast channel as the job runs. * * When set, the queued job uses `agent.stream()` instead of `prompt()` and - * pushes each chunk to `channel` via `@rudderjs/broadcast`. Events: + * pushes each chunk to `channel` via the registered broadcast adapter + * (`configureAiQueue({ broadcast })`). Events: * * - `chunk` (per `StreamChunk` from the agent) * - `done` (the final `AgentResponse`) * - `error` (`{ message }` if the run fails) * - * Requires `@rudderjs/broadcast` installed and its WS server running in the - * worker process. In the typical Rudder dev setup (single process running - * both web + queue:work) this works out of the box. If your queue worker is - * a separate process from the broadcast WS server, you'll need a pub/sub - * bridge (Redis, Reverb, etc.) — outside the scope of v1. + * Requires a broadcast adapter registered via `configureAiQueue({ broadcast })` + * and reachable from the worker process. In the typical single-process dev + * setup (one process running both web + queue worker) this works out of the + * box. If your queue worker is a separate process from the broadcast server, + * you'll need a pub/sub bridge (Redis, Reverb, etc.) — outside the scope of v1. */ broadcast(channel: string, opts: BroadcastOptions = {}): this { this._broadcastChannel = channel @@ -142,11 +144,8 @@ export class QueuedPromptBuilder { // ─── Internals ──────────────────────────────────────────── -type DispatchFn = ( - fn: () => void | Promise, - options?: { queue?: string; delay?: number }, -) => Promise -type BroadcastFn = (channel: string, event: string, data: unknown) => void | Promise +type DispatchFn = QueueDispatch +type BroadcastFn = QueueBroadcast let _dispatchLoader: () => Promise = defaultLoadDispatch let _broadcastLoader: () => Promise = defaultLoadBroadcast @@ -155,25 +154,39 @@ async function loadDispatch(): Promise { return _dispatchL async function loadBroadcast(): Promise { return _broadcastLoader() } async function defaultLoadDispatch(): Promise { - try { - const specifier = '@rudderjs/queue' - const mod: Record = await import(/* @vite-ignore */ specifier) - return mod['dispatch'] as DispatchFn - } catch { - throw new Error( - '[ai-sdk] @rudderjs/queue is required for agent.queue(). Install it: pnpm add @rudderjs/queue', - ) - } + throw new Error( + '[ai-sdk] agent.queue() needs a queue adapter. Register one at startup with configureAiQueue({ dispatch }). Rudder apps get this automatically via their AI provider.', + ) } async function defaultLoadBroadcast(): Promise { - try { - const specifier = '@rudderjs/broadcast' - const mod: Record = await import(/* @vite-ignore */ specifier) - const fn = mod['broadcast'] - return typeof fn === 'function' ? fn as BroadcastFn : null - } catch { - return null + return null +} + +/** + * Register the queue (and optional broadcast) adapter that backs + * `agent.queue('...').send()` and `.broadcast(channel)`. + * + * Call once at app startup. Returns a restore function that reinstates the + * previously-registered adapters (handy in tests). + * + * @example + * configureAiQueue({ + * dispatch: (fn, opts) => myQueue.push(fn, opts), + * broadcast: (channel, event, data) => myBus.emit(channel, event, data), + * }) + */ +export function configureAiQueue(config: { + dispatch: QueueDispatch + broadcast?: QueueBroadcast | null +}): () => void { + const prevDispatch = _dispatchLoader + const prevBroadcast = _broadcastLoader + _dispatchLoader = async () => config.dispatch + _broadcastLoader = async () => config.broadcast ?? null + return () => { + _dispatchLoader = prevDispatch + _broadcastLoader = prevBroadcast } } @@ -209,7 +222,7 @@ async function runStreamingAndBroadcast( const broadcastFn = await loadBroadcast() if (broadcastFn === null) { throw new Error( - '[ai-sdk] @rudderjs/broadcast is required for .broadcast(). Install it: pnpm add @rudderjs/broadcast', + '[ai-sdk] .broadcast() needs a broadcast adapter. Register one with configureAiQueue({ broadcast }).', ) } if (typeof agentRef.stream !== 'function') {