feat: configurable stream expiration with activity-based timeout#25
feat: configurable stream expiration with activity-based timeout#25KyleKincer wants to merge 1 commit intoget-convex:mainfrom
Conversation
Two bugs/limitations fixed: 1. The cleanup cron checked `_creationTime` instead of last activity, so actively-streaming sessions were killed after 20 minutes from creation regardless of ongoing chunk activity. 2. The 20-minute expiration was hardcoded with no way to configure or disable it, making the component unsuitable for long-running streams (e.g. AI agent sessions that can run for hours). Changes: - Track `lastActivityTime` on each stream, updated on every `addChunk` - Cleanup now uses `lastActivityTime` (falling back to `_creationTime` for streams created before this field existed) - New `configure` mutation accepts `expirationMs: number | null` to override the default 20-minute timeout or disable expiration entirely - `PersistentTextStreaming` client class accepts typed `PersistentTextStreamingOptions` with `expirationMs` and auto-configures on first `createStream` call - Fully backward-compatible: unconfigured deployments retain the existing 20-minute default Made-with: Cursor
📝 WalkthroughWalkthroughThe changes implement configurable stream expiration for Changes
Sequence DiagramsequenceDiagram
participant Client
participant Mutation as Mutation System
participant Database as Database (ctx.db)
Client->>Client: new PersistentTextStreaming(component, { expirationMs: 5000 })
Client->>Client: _configured = false
Client->>Client: createStream(ctx) called
alt _configured is false
Client->>Mutation: ctx.runMutation(configure, { expirationMs: 5000 })
Mutation->>Database: Persist expirationMs to streamConfig
Database-->>Mutation: ✓ Configured
Mutation-->>Client: Configuration applied
Client->>Client: _configured = true
end
Client->>Mutation: ctx.runMutation(createStream, ...)
Mutation->>Database: Create stream with inherited expiration config
Database-->>Mutation: Stream created
Mutation-->>Client: Stream reference
Note over Database: On addChunk: updates lastActivityTime<br/>On cleanup: uses lastActivityTime + expirationMs<br/>to determine inactivity
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/client/index.ts (1)
44-45: Coalesce concurrentconfigurecalls to avoid duplicate mutations.Parallel
createStreamcalls can race before_configuredflips totrue. Consider an in-flight promise guard.Proposed refactor
export class PersistentTextStreaming { private _configured = false; + private _configurePromise: Promise<void> | null = null; async configure(ctx: RunMutationCtx): Promise<void> { const expirationMs = this.options?.expirationMs; if (expirationMs === undefined) return; - await ctx.runMutation(this.component.lib.configure, { - expirationMs, - }); - this._configured = true; + if (this._configured) return; + if (!this._configurePromise) { + this._configurePromise = ctx + .runMutation(this.component.lib.configure, { expirationMs }) + .then(() => { + this._configured = true; + }) + .finally(() => { + this._configurePromise = null; + }); + } + await this._configurePromise; } async createStream(ctx: RunMutationCtx): Promise<StreamId> { - if (!this._configured && this.options?.expirationMs !== undefined) { + if (this.options?.expirationMs !== undefined) { await this.configure(ctx); }Also applies to: 59-66, 87-89
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/index.ts` around lines 44 - 45, Concurrent calls to configure/createStream can race because _configured is a simple boolean; add an in-flight promise guard (e.g., a private _configurePromise: Promise<void> | null) used inside configure() and any callers like createStream() so the first caller sets _configurePromise to the ongoing configure work, other callers await that same promise instead of proceeding, and upon completion set _configured = true and clear _configurePromise; update references to _configured and configure() (and call sites createStream()) to await _configurePromise when present to coalesce concurrent invocations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/component/lib.ts`:
- Around line 126-129: The args schema for expirationMs currently allows any
number (including negatives); update the args validation so expirationMs rejects
negative values (e.g., replace v.union(v.number(), v.null()) with a schema that
only permits null or numbers >= 0) and add a defensive check at the start of the
handler (async (ctx, args) => ...) to throw or normalize when args.expirationMs
is negative to prevent immediate timeouts during cleanup.
---
Nitpick comments:
In `@src/client/index.ts`:
- Around line 44-45: Concurrent calls to configure/createStream can race because
_configured is a simple boolean; add an in-flight promise guard (e.g., a private
_configurePromise: Promise<void> | null) used inside configure() and any callers
like createStream() so the first caller sets _configurePromise to the ongoing
configure work, other callers await that same promise instead of proceeding, and
upon completion set _configured = true and clear _configurePromise; update
references to _configured and configure() (and call sites createStream()) to
await _configurePromise when present to coalesce concurrent invocations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 369631b0-d440-4ead-8ca1-0a78d050b344
📒 Files selected for processing (3)
src/client/index.tssrc/component/lib.tssrc/component/schema.ts
| args: { | ||
| expirationMs: v.union(v.number(), v.null()), | ||
| }, | ||
| handler: async (ctx, args) => { |
There was a problem hiding this comment.
Reject negative expiration values in configuration.
Line 127 currently accepts any number, including negatives. A negative expirationMs will cause immediate timeouts in cleanup.
Proposed fix
export const configure = mutation({
args: {
expirationMs: v.union(v.number(), v.null()),
},
handler: async (ctx, args) => {
+ if (args.expirationMs !== null && args.expirationMs < 0) {
+ throw new Error("expirationMs must be >= 0 or null");
+ }
const existing = await ctx.db.query("streamConfig").first();
if (existing) {
await ctx.db.patch(existing._id, {
expirationMs: args.expirationMs,
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| args: { | |
| expirationMs: v.union(v.number(), v.null()), | |
| }, | |
| handler: async (ctx, args) => { | |
| args: { | |
| expirationMs: v.union(v.number(), v.null()), | |
| }, | |
| handler: async (ctx, args) => { | |
| if (args.expirationMs !== null && args.expirationMs < 0) { | |
| throw new Error("expirationMs must be >= 0 or null"); | |
| } | |
| const existing = await ctx.db.query("streamConfig").first(); | |
| if (existing) { | |
| await ctx.db.patch(existing._id, { | |
| expirationMs: args.expirationMs, | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/component/lib.ts` around lines 126 - 129, The args schema for
expirationMs currently allows any number (including negatives); update the args
validation so expirationMs rejects negative values (e.g., replace
v.union(v.number(), v.null()) with a schema that only permits null or numbers >=
0) and add a defensive check at the start of the handler (async (ctx, args) =>
...) to throw or normalize when args.expirationMs is negative to prevent
immediate timeouts during cleanup.
|
Out of curiosity, how are you running something for 25+ minutes and using
the component, since actions currently have a 10 minute timeout?
…On Sat, Apr 4, 2026 at 3:31 AM coderabbitai[bot] ***@***.***> wrote:
***@***.***[bot]* commented on this pull request.
*Actionable comments posted: 1*
🧹 Nitpick comments (1)
src/client/index.ts (1)
44-45: *Coalesce concurrent configure calls to avoid duplicate mutations.*
Parallel createStream calls can race before _configured flips to true.
Consider an in-flight promise guard.
Proposed refactor
export class PersistentTextStreaming {
private _configured = false;+ private _configurePromise: Promise<void> | null = null;
async configure(ctx: RunMutationCtx): Promise<void> {
const expirationMs = this.options?.expirationMs;
if (expirationMs === undefined) return;- await ctx.runMutation(this.component.lib.configure, {- expirationMs,- });- this._configured = true;+ if (this._configured) return;+ if (!this._configurePromise) {+ this._configurePromise = ctx+ .runMutation(this.component.lib.configure, { expirationMs })+ .then(() => {+ this._configured = true;+ })+ .finally(() => {+ this._configurePromise = null;+ });+ }+ await this._configurePromise;
}
async createStream(ctx: RunMutationCtx): Promise<StreamId> {- if (!this._configured && this.options?.expirationMs !== undefined) {+ if (this.options?.expirationMs !== undefined) {
await this.configure(ctx);
}
Also applies to: 59-66, 87-89
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In ***@***.***/client/index.ts` around lines 44 - 45, Concurrent calls to
configure/createStream can race because _configured is a simple boolean; add an
in-flight promise guard (e.g., a private _configurePromise: Promise<void> |
null) used inside configure() and any callers like createStream() so the first
caller sets _configurePromise to the ongoing configure work, other callers await
that same promise instead of proceeding, and upon completion set _configured =
true and clear _configurePromise; update references to _configured and
configure() (and call sites createStream()) to await _configurePromise when
present to coalesce concurrent invocations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In ***@***.***/component/lib.ts`:
- Around line 126-129: The args schema for expirationMs currently allows any
number (including negatives); update the args validation so expirationMs rejects
negative values (e.g., replace v.union(v.number(), v.null()) with a schema that
only permits null or numbers >= 0) and add a defensive check at the start of the
handler (async (ctx, args) => ...) to throw or normalize when args.expirationMs
is negative to prevent immediate timeouts during cleanup.
---
Nitpick comments:
In ***@***.***/client/index.ts`:
- Around line 44-45: Concurrent calls to configure/createStream can race because
_configured is a simple boolean; add an in-flight promise guard (e.g., a private
_configurePromise: Promise<void> | null) used inside configure() and any callers
like createStream() so the first caller sets _configurePromise to the ongoing
configure work, other callers await that same promise instead of proceeding, and
upon completion set _configured = true and clear _configurePromise; update
references to _configured and configure() (and call sites createStream()) to
await _configurePromise when present to coalesce concurrent invocations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
------------------------------
ℹ️ Review info ⚙️ Run configuration
*Configuration used*: defaults
*Review profile*: CHILL
*Plan*: Pro
*Run ID*: 369631b0-d440-4ead-8ca1-0a78d050b344
📥 Commits
Reviewing files that changed from the base of the PR and between fe93e40
<fe93e40>
and a517a25
<a517a25>
.
📒 Files selected for processing (3)
- src/client/index.ts
- src/component/lib.ts
- src/component/schema.ts
------------------------------
In src/component/lib.ts
<#25 (comment)>
:
> + args: {
+ expirationMs: v.union(v.number(), v.null()),
+ },
+ handler: async (ctx, args) => {
*
|
Summary
The cleanup cron that marks idle streams as timed-out has two issues that make the component unusable for long-running streams (e.g. AI agent sessions that can run for 30+ minutes):
Bug: expiration based on creation time, not activity —
cleanupExpiredStreamschecksstream._creationTime, so a stream that has been receiving chunks every second is still killed 20 minutes after it was created. The comment says "last chunk added more than 20 minutes ago", but the code doesn't match.No way to configure or disable expiration — The 20-minute
EXPIRATION_TIMEconstant is hardcoded with no public API to change it.Changes
lastActivityTimeon each stream document, updated on everyaddChunkcalllastActivityTime(falling back to_creationTimefor backward compat with streams created before this field existed)configuremutation that acceptsexpirationMs: number | null— pass a custom timeout in ms, ornullto disable expiration entirelyPersistentTextStreamingOptionson the client class — replaces the unusedoptions?: objectparameter with{ expirationMs?: number | null }createStream— ifexpirationMsis set in options, the client lazily callsconfigureoncestreamConfigtable — stores the singleton config documentUsage
Motivation
We run AI agent sessions that stream output for 25+ minutes. The hardcoded 20-minute expiration based on
_creationTimecauses the stream to be marked astimeoutmid-session, and subsequentaddChunkcalls throw"Stream is not streaming; did it timeout?", crashing the session.Test plan
{ expirationMs: null }disables cleanup entirely{ expirationMs: N }uses custom timeoutlastActivityTimeis updated on eachaddChunklastActivityTimeover_creationTimelastActivityTime(pre-migration) fall back to_creationTimeMade with Cursor
Summary by CodeRabbit