diff --git a/src/client/index.ts b/src/client/index.ts index 2140955..fdbc682 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -25,6 +25,15 @@ export type StreamWriter> = ( chunkAppender: ChunkAppender, ) => Promise; +export interface PersistentTextStreamingOptions { + /** + * How long (in ms) a stream may be idle before the cleanup cron marks + * it as timed-out. Set to `null` to disable expiration entirely. + * When omitted the component defaults to 20 minutes. + */ + expirationMs?: number | null; +} + // TODO -- make more flexible. # of bytes, etc? const hasDelimeter = (text: string) => { return text.includes(".") || text.includes("!") || text.includes("?"); @@ -32,11 +41,30 @@ const hasDelimeter = (text: string) => { // TODO -- some sort of wrapper with easy ergonomics for working with LLMs? export class PersistentTextStreaming { + private _configured = false; + constructor( public component: UseApi, - public options?: object, + public options?: PersistentTextStreamingOptions, ) {} + /** + * Persist the expiration configuration into the component's database. + * This only needs to be called once (the setting is durable), but is + * safe to call repeatedly — it upserts a singleton config document. + * + * If `options.expirationMs` was provided to the constructor this is + * called automatically on the first `createStream` invocation. + */ + async configure(ctx: RunMutationCtx): Promise { + const expirationMs = this.options?.expirationMs; + if (expirationMs === undefined) return; + await ctx.runMutation(this.component.lib.configure, { + expirationMs, + }); + this._configured = true; + } + /** * Create a new stream. This will return a stream ID that can be used * in an HTTP action to stream data back out to the client while also @@ -56,6 +84,9 @@ export class PersistentTextStreaming { */ async createStream(ctx: RunMutationCtx): Promise { + if (!this._configured && this.options?.expirationMs !== undefined) { + await this.configure(ctx); + } const id = await ctx.runMutation(this.component.lib.createStream); return id as StreamId; } diff --git a/src/component/lib.ts b/src/component/lib.ts index cc01906..6530dc4 100644 --- a/src/component/lib.ts +++ b/src/component/lib.ts @@ -27,22 +27,22 @@ export const addChunk = mutation({ if (!stream) { throw new Error("Stream not found"); } + const patch: Record = { + lastActivityTime: Date.now(), + }; if (stream.status === "pending") { - await ctx.db.patch(args.streamId, { - status: "streaming", - }); + patch.status = "streaming"; } else if (stream.status !== "streaming") { throw new Error("Stream is not streaming; did it timeout?"); } + if (args.final) { + patch.status = "done"; + } + await ctx.db.patch(args.streamId, patch); await ctx.db.insert("chunks", { streamId: args.streamId, text: args.text, }); - if (args.final) { - await ctx.db.patch(args.streamId, { - status: "done", - }); - } }, }); @@ -119,14 +119,45 @@ export const getStreamText = query({ }, }); -const EXPIRATION_TIME = 20 * 60 * 1000; // 20 minutes in milliseconds +// Configure stream expiration behavior. +// expirationMs: number of milliseconds of inactivity before a stream is +// timed out. Set to null to disable expiration entirely. +export const configure = mutation({ + args: { + expirationMs: v.union(v.number(), v.null()), + }, + handler: async (ctx, args) => { + const existing = await ctx.db.query("streamConfig").first(); + if (existing) { + await ctx.db.patch(existing._id, { + expirationMs: args.expirationMs, + }); + } else { + await ctx.db.insert("streamConfig", { + expirationMs: args.expirationMs, + }); + } + }, +}); + +const DEFAULT_EXPIRATION_TIME = 20 * 60 * 1000; // 20 minutes in milliseconds const BATCH_SIZE = 100; -// If the last chunk of a stream was added more than 20 minutes ago, -// set the stream to timeout. The action feeding it has to be dead. +// Clean up streams that have been inactive longer than the configured +// expiration. Uses lastActivityTime (falling back to _creationTime for +// streams created before this field existed). Skipped entirely when +// expiration is configured as null. export const cleanupExpiredStreams = internalMutation({ args: {}, handler: async (ctx) => { + const config = await ctx.db.query("streamConfig").first(); + const expirationMs = config?.expirationMs; + + if (expirationMs === null) return; + + const effectiveExpiration = + typeof expirationMs === "number" ? expirationMs : DEFAULT_EXPIRATION_TIME; + const now = Date.now(); const pendingStreams = await ctx.db .query("streams") @@ -138,7 +169,9 @@ export const cleanupExpiredStreams = internalMutation({ .take(BATCH_SIZE); for (const stream of [...pendingStreams, ...streamingStreams]) { - if (now - stream._creationTime > EXPIRATION_TIME) { + const lastActive = + (stream as any).lastActivityTime ?? stream._creationTime; + if (now - lastActive > effectiveExpiration) { console.log("Cleaning up expired stream", stream._id); await ctx.db.patch(stream._id, { status: "timeout", diff --git a/src/component/schema.ts b/src/component/schema.ts index d7435fc..2d2cc8f 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -13,9 +13,13 @@ export type StreamStatus = Infer; export default defineSchema({ streams: defineTable({ status: streamStatusValidator, + lastActivityTime: v.optional(v.number()), }).index("byStatus", ["status"]), chunks: defineTable({ streamId: v.id("streams"), text: v.string(), }).index("byStream", ["streamId"]), + streamConfig: defineTable({ + expirationMs: v.optional(v.union(v.number(), v.null())), + }), });