Skip to content

feat: configurable stream expiration with activity-based timeout#25

Open
KyleKincer wants to merge 1 commit intoget-convex:mainfrom
KyleKincer:configurable-stream-expiration
Open

feat: configurable stream expiration with activity-based timeout#25
KyleKincer wants to merge 1 commit intoget-convex:mainfrom
KyleKincer:configurable-stream-expiration

Conversation

@KyleKincer
Copy link
Copy Markdown

@KyleKincer KyleKincer commented Apr 4, 2026

Summary

The cleanup cron that marks idle streams as timed-out has two issues that make the component unusable for long-running streams (e.g. AI agent sessions that can run for 30+ minutes):

  1. Bug: expiration based on creation time, not activitycleanupExpiredStreams checks stream._creationTime, so a stream that has been receiving chunks every second is still killed 20 minutes after it was created. The comment says "last chunk added more than 20 minutes ago", but the code doesn't match.

  2. No way to configure or disable expiration — The 20-minute EXPIRATION_TIME constant is hardcoded with no public API to change it.

Changes

  • Track lastActivityTime on each stream document, updated on every addChunk call
  • Fix cleanup logic to use lastActivityTime (falling back to _creationTime for backward compat with streams created before this field existed)
  • New configure mutation that accepts expirationMs: number | null — pass a custom timeout in ms, or null to disable expiration entirely
  • Typed PersistentTextStreamingOptions on the client class — replaces the unused options?: object parameter with { expirationMs?: number | null }
  • Auto-configure on first createStream — if expirationMs is set in options, the client lazily calls configure once
  • New streamConfig table — stores the singleton config document
  • Fully backward-compatible — unconfigured deployments retain the existing 20-minute default behavior

Usage

// Disable expiration (for long-running streams)
const streaming = new PersistentTextStreaming(
  components.persistentTextStreaming,
  { expirationMs: null },
);

// Custom timeout (1 hour)
const streaming = new PersistentTextStreaming(
  components.persistentTextStreaming,
  { expirationMs: 60 * 60 * 1000 },
);

// Or configure explicitly
await streaming.configure(ctx);

Motivation

We run AI agent sessions that stream output for 25+ minutes. The hardcoded 20-minute expiration based on _creationTime causes the stream to be marked as timeout mid-session, and subsequent addChunk calls throw "Stream is not streaming; did it timeout?", crashing the session.

Test plan

  • Verify backward compat: unconfigured component retains 20-minute default
  • Verify { expirationMs: null } disables cleanup entirely
  • Verify { expirationMs: N } uses custom timeout
  • Verify lastActivityTime is updated on each addChunk
  • Verify cleanup uses lastActivityTime over _creationTime
  • Verify streams without lastActivityTime (pre-migration) fall back to _creationTime

Made with Cursor

Summary by CodeRabbit

  • New Features
    • Streams now support configurable idle-expiration with custom timeout settings to control when inactive streams automatically expire
    • New ability to disable stream expiration entirely by setting expiration to null for indefinite stream operation
    • Enhanced activity tracking on streams provides improved monitoring and visibility into stream state and inactivity

Two bugs/limitations fixed:

1. The cleanup cron checked `_creationTime` instead of last activity,
   so actively-streaming sessions were killed after 20 minutes from
   creation regardless of ongoing chunk activity.

2. The 20-minute expiration was hardcoded with no way to configure or
   disable it, making the component unsuitable for long-running
   streams (e.g. AI agent sessions that can run for hours).

Changes:
- Track `lastActivityTime` on each stream, updated on every `addChunk`
- Cleanup now uses `lastActivityTime` (falling back to `_creationTime`
  for streams created before this field existed)
- New `configure` mutation accepts `expirationMs: number | null` to
  override the default 20-minute timeout or disable expiration entirely
- `PersistentTextStreaming` client class accepts typed
  `PersistentTextStreamingOptions` with `expirationMs` and
  auto-configures on first `createStream` call
- Fully backward-compatible: unconfigured deployments retain the
  existing 20-minute default

Made-with: Cursor
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 4, 2026

📝 Walkthrough

Walkthrough

The changes implement configurable stream expiration for PersistentTextStreaming. A new configure mutation persists expiration settings in a singleton table; the client's createStream method lazily applies configuration on first invocation. Server-side expiration logic now uses persisted settings and tracks lastActivityTime instead of fixed constants.

Changes

Cohort / File(s) Summary
Client Configuration
src/client/index.ts
Added PersistentTextStreamingOptions interface with optional expirationMs field. Constructor now accepts this typed option. Added internal _configured flag and configure(ctx) method that persists settings via mutation. Modified createStream to lazily call configure before streaming when expiration was provided.
Server Mutation & Expiration Logic
src/component/lib.ts
Added exported configure mutation to persist expirationMs in streamConfig singleton. Renamed internal EXPIRATION_TIME to DEFAULT_EXPIRATION_TIME. Updated addChunk to track lastActivityTime on every insertion and consolidate all stream updates into a single patch. Refactored expiration cleanup to compute effective expiration from persisted config and use lastActivityTime for inactivity detection.
Database Schema
src/component/schema.ts
Extended streams table with optional lastActivityTime field. Introduced new streamConfig table with optional expirationMs field accepting number or null.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Mutation as Mutation System
    participant Database as Database (ctx.db)
    
    Client->>Client: new PersistentTextStreaming(component, { expirationMs: 5000 })
    Client->>Client: _configured = false
    
    Client->>Client: createStream(ctx) called
    
    alt _configured is false
        Client->>Mutation: ctx.runMutation(configure, { expirationMs: 5000 })
        Mutation->>Database: Persist expirationMs to streamConfig
        Database-->>Mutation: ✓ Configured
        Mutation-->>Client: Configuration applied
        Client->>Client: _configured = true
    end
    
    Client->>Mutation: ctx.runMutation(createStream, ...)
    Mutation->>Database: Create stream with inherited expiration config
    Database-->>Mutation: Stream created
    Mutation-->>Client: Stream reference
    
    Note over Database: On addChunk: updates lastActivityTime<br/>On cleanup: uses lastActivityTime + expirationMs<br/>to determine inactivity
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 A stream that remembers when last it did flow,
With expiration times we can now configure,
Lazy setup blooms on first creating go,
No more fixed hardcoded values to endure!
Config persists—our database's treasure.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: configurable stream expiration with activity-based timeout' accurately summarizes the main change: adding configurable stream expiration with activity-based timeout logic, which is the core feature across all three modified files.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/client/index.ts (1)

44-45: Coalesce concurrent configure calls to avoid duplicate mutations.

Parallel createStream calls can race before _configured flips to true. Consider an in-flight promise guard.

Proposed refactor
 export class PersistentTextStreaming {
   private _configured = false;
+  private _configurePromise: Promise<void> | null = null;

   async configure(ctx: RunMutationCtx): Promise<void> {
     const expirationMs = this.options?.expirationMs;
     if (expirationMs === undefined) return;
-    await ctx.runMutation(this.component.lib.configure, {
-      expirationMs,
-    });
-    this._configured = true;
+    if (this._configured) return;
+    if (!this._configurePromise) {
+      this._configurePromise = ctx
+        .runMutation(this.component.lib.configure, { expirationMs })
+        .then(() => {
+          this._configured = true;
+        })
+        .finally(() => {
+          this._configurePromise = null;
+        });
+    }
+    await this._configurePromise;
   }

   async createStream(ctx: RunMutationCtx): Promise<StreamId> {
-    if (!this._configured && this.options?.expirationMs !== undefined) {
+    if (this.options?.expirationMs !== undefined) {
       await this.configure(ctx);
     }

Also applies to: 59-66, 87-89

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 44 - 45, Concurrent calls to
configure/createStream can race because _configured is a simple boolean; add an
in-flight promise guard (e.g., a private _configurePromise: Promise<void> |
null) used inside configure() and any callers like createStream() so the first
caller sets _configurePromise to the ongoing configure work, other callers await
that same promise instead of proceeding, and upon completion set _configured =
true and clear _configurePromise; update references to _configured and
configure() (and call sites createStream()) to await _configurePromise when
present to coalesce concurrent invocations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/component/lib.ts`:
- Around line 126-129: The args schema for expirationMs currently allows any
number (including negatives); update the args validation so expirationMs rejects
negative values (e.g., replace v.union(v.number(), v.null()) with a schema that
only permits null or numbers >= 0) and add a defensive check at the start of the
handler (async (ctx, args) => ...) to throw or normalize when args.expirationMs
is negative to prevent immediate timeouts during cleanup.

---

Nitpick comments:
In `@src/client/index.ts`:
- Around line 44-45: Concurrent calls to configure/createStream can race because
_configured is a simple boolean; add an in-flight promise guard (e.g., a private
_configurePromise: Promise<void> | null) used inside configure() and any callers
like createStream() so the first caller sets _configurePromise to the ongoing
configure work, other callers await that same promise instead of proceeding, and
upon completion set _configured = true and clear _configurePromise; update
references to _configured and configure() (and call sites createStream()) to
await _configurePromise when present to coalesce concurrent invocations.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 369631b0-d440-4ead-8ca1-0a78d050b344

📥 Commits

Reviewing files that changed from the base of the PR and between fe93e40 and a517a25.

📒 Files selected for processing (3)
  • src/client/index.ts
  • src/component/lib.ts
  • src/component/schema.ts

Comment thread src/component/lib.ts
Comment on lines +126 to +129
args: {
expirationMs: v.union(v.number(), v.null()),
},
handler: async (ctx, args) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject negative expiration values in configuration.

Line 127 currently accepts any number, including negatives. A negative expirationMs will cause immediate timeouts in cleanup.

Proposed fix
 export const configure = mutation({
   args: {
     expirationMs: v.union(v.number(), v.null()),
   },
   handler: async (ctx, args) => {
+    if (args.expirationMs !== null && args.expirationMs < 0) {
+      throw new Error("expirationMs must be >= 0 or null");
+    }
     const existing = await ctx.db.query("streamConfig").first();
     if (existing) {
       await ctx.db.patch(existing._id, {
         expirationMs: args.expirationMs,
       });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
args: {
expirationMs: v.union(v.number(), v.null()),
},
handler: async (ctx, args) => {
args: {
expirationMs: v.union(v.number(), v.null()),
},
handler: async (ctx, args) => {
if (args.expirationMs !== null && args.expirationMs < 0) {
throw new Error("expirationMs must be >= 0 or null");
}
const existing = await ctx.db.query("streamConfig").first();
if (existing) {
await ctx.db.patch(existing._id, {
expirationMs: args.expirationMs,
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/component/lib.ts` around lines 126 - 129, The args schema for
expirationMs currently allows any number (including negatives); update the args
validation so expirationMs rejects negative values (e.g., replace
v.union(v.number(), v.null()) with a schema that only permits null or numbers >=
0) and add a defensive check at the start of the handler (async (ctx, args) =>
...) to throw or normalize when args.expirationMs is negative to prevent
immediate timeouts during cleanup.

@ianmacartney
Copy link
Copy Markdown
Member

ianmacartney commented Apr 8, 2026 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants