Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,46 @@ export type StreamWriter<A extends GenericActionCtx<GenericDataModel>> = (
chunkAppender: ChunkAppender,
) => Promise<void>;

export interface PersistentTextStreamingOptions {
/**
* How long (in ms) a stream may be idle before the cleanup cron marks
* it as timed-out. Set to `null` to disable expiration entirely.
* When omitted the component defaults to 20 minutes.
*/
expirationMs?: number | null;
}

// TODO -- make more flexible. # of bytes, etc?
const hasDelimeter = (text: string) => {
return text.includes(".") || text.includes("!") || text.includes("?");
};

// TODO -- some sort of wrapper with easy ergonomics for working with LLMs?
export class PersistentTextStreaming {
private _configured = false;

constructor(
public component: UseApi<typeof api>,
public options?: object,
public options?: PersistentTextStreamingOptions,
) {}

/**
* Persist the expiration configuration into the component's database.
* This only needs to be called once (the setting is durable), but is
* safe to call repeatedly — it upserts a singleton config document.
*
* If `options.expirationMs` was provided to the constructor this is
* called automatically on the first `createStream` invocation.
*/
async configure(ctx: RunMutationCtx): Promise<void> {
const expirationMs = this.options?.expirationMs;
if (expirationMs === undefined) return;
await ctx.runMutation(this.component.lib.configure, {
expirationMs,
});
this._configured = true;
}

/**
* Create a new stream. This will return a stream ID that can be used
* in an HTTP action to stream data back out to the client while also
Expand All @@ -56,6 +84,9 @@ export class PersistentTextStreaming {
*/

async createStream(ctx: RunMutationCtx): Promise<StreamId> {
if (!this._configured && this.options?.expirationMs !== undefined) {
await this.configure(ctx);
}
const id = await ctx.runMutation(this.component.lib.createStream);
return id as StreamId;
}
Expand Down
57 changes: 45 additions & 12 deletions src/component/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ export const addChunk = mutation({
if (!stream) {
throw new Error("Stream not found");
}
const patch: Record<string, unknown> = {
lastActivityTime: Date.now(),
};
if (stream.status === "pending") {
await ctx.db.patch(args.streamId, {
status: "streaming",
});
patch.status = "streaming";
} else if (stream.status !== "streaming") {
throw new Error("Stream is not streaming; did it timeout?");
}
if (args.final) {
patch.status = "done";
}
await ctx.db.patch(args.streamId, patch);
await ctx.db.insert("chunks", {
streamId: args.streamId,
text: args.text,
});
if (args.final) {
await ctx.db.patch(args.streamId, {
status: "done",
});
}
},
});

Expand Down Expand Up @@ -119,14 +119,45 @@ export const getStreamText = query({
},
});

const EXPIRATION_TIME = 20 * 60 * 1000; // 20 minutes in milliseconds
// Configure stream expiration behavior.
// expirationMs: number of milliseconds of inactivity before a stream is
// timed out. Set to null to disable expiration entirely.
export const configure = mutation({
args: {
expirationMs: v.union(v.number(), v.null()),
},
handler: async (ctx, args) => {
Comment on lines +126 to +129
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject negative expiration values in configuration.

Line 127 currently accepts any number, including negatives. A negative expirationMs will cause immediate timeouts in cleanup.

Proposed fix
 export const configure = mutation({
   args: {
     expirationMs: v.union(v.number(), v.null()),
   },
   handler: async (ctx, args) => {
+    if (args.expirationMs !== null && args.expirationMs < 0) {
+      throw new Error("expirationMs must be >= 0 or null");
+    }
     const existing = await ctx.db.query("streamConfig").first();
     if (existing) {
       await ctx.db.patch(existing._id, {
         expirationMs: args.expirationMs,
       });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
args: {
expirationMs: v.union(v.number(), v.null()),
},
handler: async (ctx, args) => {
args: {
expirationMs: v.union(v.number(), v.null()),
},
handler: async (ctx, args) => {
if (args.expirationMs !== null && args.expirationMs < 0) {
throw new Error("expirationMs must be >= 0 or null");
}
const existing = await ctx.db.query("streamConfig").first();
if (existing) {
await ctx.db.patch(existing._id, {
expirationMs: args.expirationMs,
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/lib.ts` around lines 126 - 129, The args schema for
expirationMs currently allows any number (including negatives); update the args
validation so expirationMs rejects negative values (e.g., replace
v.union(v.number(), v.null()) with a schema that only permits null or numbers >=
0) and add a defensive check at the start of the handler (async (ctx, args) =>
...) to throw or normalize when args.expirationMs is negative to prevent
immediate timeouts during cleanup.

const existing = await ctx.db.query("streamConfig").first();
if (existing) {
await ctx.db.patch(existing._id, {
expirationMs: args.expirationMs,
});
} else {
await ctx.db.insert("streamConfig", {
expirationMs: args.expirationMs,
});
}
},
});

const DEFAULT_EXPIRATION_TIME = 20 * 60 * 1000; // 20 minutes in milliseconds
const BATCH_SIZE = 100;

// If the last chunk of a stream was added more than 20 minutes ago,
// set the stream to timeout. The action feeding it has to be dead.
// Clean up streams that have been inactive longer than the configured
// expiration. Uses lastActivityTime (falling back to _creationTime for
// streams created before this field existed). Skipped entirely when
// expiration is configured as null.
export const cleanupExpiredStreams = internalMutation({
args: {},
handler: async (ctx) => {
const config = await ctx.db.query("streamConfig").first();
const expirationMs = config?.expirationMs;

if (expirationMs === null) return;

const effectiveExpiration =
typeof expirationMs === "number" ? expirationMs : DEFAULT_EXPIRATION_TIME;

const now = Date.now();
const pendingStreams = await ctx.db
.query("streams")
Expand All @@ -138,7 +169,9 @@ export const cleanupExpiredStreams = internalMutation({
.take(BATCH_SIZE);

for (const stream of [...pendingStreams, ...streamingStreams]) {
if (now - stream._creationTime > EXPIRATION_TIME) {
const lastActive =
(stream as any).lastActivityTime ?? stream._creationTime;
if (now - lastActive > effectiveExpiration) {
console.log("Cleaning up expired stream", stream._id);
await ctx.db.patch(stream._id, {
status: "timeout",
Expand Down
4 changes: 4 additions & 0 deletions src/component/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ export type StreamStatus = Infer<typeof streamStatusValidator>;
export default defineSchema({
streams: defineTable({
status: streamStatusValidator,
lastActivityTime: v.optional(v.number()),
}).index("byStatus", ["status"]),
chunks: defineTable({
streamId: v.id("streams"),
text: v.string(),
}).index("byStream", ["streamId"]),
streamConfig: defineTable({
expirationMs: v.optional(v.union(v.number(), v.null())),
}),
});