Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ai-sdk-queue-broadcast-seam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@gemstack/ai-sdk": minor
---

Make `agent.queue()` / `.broadcast()` framework-agnostic. The engine no longer dynamically imports `@rudderjs/queue` or `@rudderjs/broadcast`; instead register a neutral adapter once at startup with the new `configureAiQueue({ dispatch, broadcast })`. New public exports: `configureAiQueue`, and the `QueueDispatch` / `QueueBroadcast` types. Rudder users get this wired automatically by `@rudderjs/ai`'s provider (no app change). This removes the last `@rudderjs/*` reference from the engine source.
6 changes: 5 additions & 1 deletion packages/ai-sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ export type {
AsToolStreamingOption,
ChunkProjector,
} from './agent.js'
export { QueuedPromptBuilder } from './queue-job.js'
export { QueuedPromptBuilder, configureAiQueue } from './queue-job.js'
export type { BroadcastOptions } from './queue-job.js'

// Middleware
export { runOnConfig, runOnChunk, runOnBeforeToolCall, runOnAfterToolCall, runSequential, runOnUsage, runOnAbort, runOnError } from './middleware.js'
Expand Down Expand Up @@ -173,6 +174,9 @@ export type { CacheAdapter } from './cache-adapter.js'
// Neutral storage contract for ImageGenerator/AudioGenerator .store() (bring your own)
export type { StorageAdapter } from './storage-adapter.js'

// Neutral queue/broadcast contract behind agent.queue() / .broadcast() (bring your own)
export type { QueueDispatch, QueueBroadcast } from './queue-adapter.js'

// Sub-agent run store (asTool streaming + suspend)
export {
InMemorySubAgentRunStore,
Expand Down
25 changes: 25 additions & 0 deletions packages/ai-sdk/src/queue-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Neutral job-dispatch contract behind `agent.queue('...').send()`.
*
* `@gemstack/ai-sdk` does not bundle or depend on any queue implementation.
* Register one once at startup via {@link configureAiQueue}; Rudder apps get
* this wired automatically by Rudder's `AiProvider`.
*
* `dispatch` enqueues `fn` to run later on a worker.
*/
export type QueueDispatch = (
fn: () => void | Promise<void>,
options?: { queue?: string; delay?: number },
) => Promise<void>

/**
* Neutral broadcast contract behind `.broadcast(channel)`. Optional — only
* needed when a queued job streams progress to a channel. Pushes one `event`
* (`chunk` | `done` | `error`, optionally prefixed) with its `data` payload to
* a named `channel`.
*/
export type QueueBroadcast = (
channel: string,
event: string,
data: unknown,
) => void | Promise<void>
46 changes: 44 additions & 2 deletions packages/ai-sdk/src/queue-job.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, beforeEach, afterEach } from 'node:test'
import assert from 'node:assert/strict'
import { QueuedPromptBuilder, _setQueueJobLoadersForTests } from './queue-job.js'
import { QueuedPromptBuilder, _setQueueJobLoadersForTests, configureAiQueue } from './queue-job.js'
import type { AgentResponse, AgentStreamResponse, StreamChunk } from './types.js'

// ─── Test seam wiring ─────────────────────────────────────
Expand Down Expand Up @@ -195,7 +195,7 @@ describe('QueuedPromptBuilder — broadcast() path', () => {
const { agent } = makeStreamingAgent([{ type: 'finish', finishReason: 'stop' }])
await assert.rejects(
() => new QueuedPromptBuilder(agent, 'x').broadcast('chan').send(),
/@rudderjs\/broadcast/,
/needs a broadcast adapter/,
)
})

Expand All @@ -218,3 +218,45 @@ describe('QueuedPromptBuilder — broadcast() path', () => {
assert.strictEqual(received, fakeResponse)
})
})

// ─── Public configureAiQueue() registration ───────────────

describe('configureAiQueue', () => {
it('registers a dispatch + broadcast adapter that backs send()', async () => {
// Drop the per-test fakes so configureAiQueue captures the real defaults.
restoreLoaders()
const localDispatched: DispatchedJob[] = []
const localBroadcasts: BroadcastCall[] = []
const restore = configureAiQueue({
dispatch: async (fn, options) => {
localDispatched.push({ fn, queue: options?.queue, delay: options?.delay })
await fn()
},
broadcast: (channel, event, data) => {
localBroadcasts.push({ channel, event, data })
},
})
try {
const { agent, calls } = makePromptOnlyAgent()
await new QueuedPromptBuilder(agent, 'hi').onQueue('ai').send()
assert.strictEqual(localDispatched.length, 1)
assert.strictEqual(localDispatched[0]?.queue, 'ai')
assert.deepStrictEqual(calls, [{ input: 'hi' }], 'the agent prompt ran inside the dispatched job')
assert.strictEqual(localBroadcasts.length, 0)
} finally {
restore()
}
})

it('restore() reverts to the default adapter, which rejects with a clear error', async () => {
// Revert to the engine default, then register + immediately restore.
restoreLoaders()
const restore = configureAiQueue({ dispatch: async (fn) => { await fn() } })
restore()
const { agent } = makePromptOnlyAgent()
await assert.rejects(
() => new QueuedPromptBuilder(agent, 'x').send(),
/needs a queue adapter/,
)
})
})
69 changes: 41 additions & 28 deletions packages/ai-sdk/src/queue-job.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { AgentPromptOptions, AgentResponse, AgentStreamResponse, StreamChunk } from './types.js'
import type { QueueDispatch, QueueBroadcast } from './queue-adapter.js'

/**
* Optional shape on the agent reference — when set, the queued job uses
Expand Down Expand Up @@ -85,17 +86,18 @@ export class QueuedPromptBuilder {
* Stream the agent's progress to a broadcast channel as the job runs.
*
* When set, the queued job uses `agent.stream()` instead of `prompt()` and
* pushes each chunk to `channel` via `@rudderjs/broadcast`. Events:
* pushes each chunk to `channel` via the registered broadcast adapter
* (`configureAiQueue({ broadcast })`). Events:
*
* - `chunk` (per `StreamChunk` from the agent)
* - `done` (the final `AgentResponse`)
* - `error` (`{ message }` if the run fails)
*
* Requires `@rudderjs/broadcast` installed and its WS server running in the
* worker process. In the typical Rudder dev setup (single process running
* both web + queue:work) this works out of the box. If your queue worker is
* a separate process from the broadcast WS server, you'll need a pub/sub
* bridge (Redis, Reverb, etc.) — outside the scope of v1.
* Requires a broadcast adapter registered via `configureAiQueue({ broadcast })`
* and reachable from the worker process. In the typical single-process dev
* setup (one process running both web + queue worker) this works out of the
* box. If your queue worker is a separate process from the broadcast server,
* you'll need a pub/sub bridge (Redis, Reverb, etc.) — outside the scope of v1.
*/
broadcast(channel: string, opts: BroadcastOptions = {}): this {
this._broadcastChannel = channel
Expand Down Expand Up @@ -142,11 +144,8 @@ export class QueuedPromptBuilder {

// ─── Internals ────────────────────────────────────────────

type DispatchFn = (
fn: () => void | Promise<void>,
options?: { queue?: string; delay?: number },
) => Promise<void>
type BroadcastFn = (channel: string, event: string, data: unknown) => void | Promise<void>
type DispatchFn = QueueDispatch
type BroadcastFn = QueueBroadcast

let _dispatchLoader: () => Promise<DispatchFn> = defaultLoadDispatch
let _broadcastLoader: () => Promise<BroadcastFn | null> = defaultLoadBroadcast
Expand All @@ -155,25 +154,39 @@ async function loadDispatch(): Promise<DispatchFn> { return _dispatchL
async function loadBroadcast(): Promise<BroadcastFn | null> { return _broadcastLoader() }

async function defaultLoadDispatch(): Promise<DispatchFn> {
try {
const specifier = '@rudderjs/queue'
const mod: Record<string, unknown> = await import(/* @vite-ignore */ specifier)
return mod['dispatch'] as DispatchFn
} catch {
throw new Error(
'[ai-sdk] @rudderjs/queue is required for agent.queue(). Install it: pnpm add @rudderjs/queue',
)
}
throw new Error(
'[ai-sdk] agent.queue() needs a queue adapter. Register one at startup with configureAiQueue({ dispatch }). Rudder apps get this automatically via their AI provider.',
)
}

async function defaultLoadBroadcast(): Promise<BroadcastFn | null> {
try {
const specifier = '@rudderjs/broadcast'
const mod: Record<string, unknown> = await import(/* @vite-ignore */ specifier)
const fn = mod['broadcast']
return typeof fn === 'function' ? fn as BroadcastFn : null
} catch {
return null
return null
}

/**
* Register the queue (and optional broadcast) adapter that backs
* `agent.queue('...').send()` and `.broadcast(channel)`.
*
* Call once at app startup. Returns a restore function that reinstates the
* previously-registered adapters (handy in tests).
*
* @example
* configureAiQueue({
* dispatch: (fn, opts) => myQueue.push(fn, opts),
* broadcast: (channel, event, data) => myBus.emit(channel, event, data),
* })
*/
export function configureAiQueue(config: {
dispatch: QueueDispatch
broadcast?: QueueBroadcast | null
}): () => void {
const prevDispatch = _dispatchLoader
const prevBroadcast = _broadcastLoader
_dispatchLoader = async () => config.dispatch
_broadcastLoader = async () => config.broadcast ?? null
return () => {
_dispatchLoader = prevDispatch
_broadcastLoader = prevBroadcast
}
}

Expand Down Expand Up @@ -209,7 +222,7 @@ async function runStreamingAndBroadcast(
const broadcastFn = await loadBroadcast()
if (broadcastFn === null) {
throw new Error(
'[ai-sdk] @rudderjs/broadcast is required for .broadcast(). Install it: pnpm add @rudderjs/broadcast',
'[ai-sdk] .broadcast() needs a broadcast adapter. Register one with configureAiQueue({ broadcast }).',
)
}
if (typeof agentRef.stream !== 'function') {
Expand Down
Loading