Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions .cursor/rules/convex_rules.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export const listWithExtraArg = query({
handler: async (ctx, args) => {
return await ctx.db
.query("messages")
.filter((q) => q.eq(q.field("author"), args.author))
.withIndex("by_author", (q) => q.eq("author", args.author))
.order("desc")
.paginate(args.paginationOpts);
},
Expand Down Expand Up @@ -198,7 +198,7 @@ export const exampleQuery = query({
handler: async (ctx, args) => {
const idToUsername: Record<Id<"users">, string> = {};
for (const userId of args.userIds) {
const user = await ctx.db.get(userId);
const user = await ctx.db.get("users", userId);
if (user) {
idToUsername[user._id] = user.username;
}
Expand Down Expand Up @@ -236,8 +236,8 @@ const messages = await ctx.db


## Mutation guidelines
- Use `ctx.db.replace` to fully replace an existing document. This method will throw an error if the document does not exist.
- Use `ctx.db.patch` to shallow merge updates into an existing document. This method will throw an error if the document does not exist.
- Use `ctx.db.replace` to fully replace an existing document. This method will throw an error if the document does not exist. Syntax: `await ctx.db.replace('tasks', taskId, { name: 'Buy milk', completed: false })`
- Use `ctx.db.patch` to shallow merge updates into an existing document. This method will throw an error if the document does not exist. Syntax: `await ctx.db.patch('tasks', taskId, { completed: true })`

## Action guidelines
- Always add `"use node";` to the top of files containing actions that use Node.js built-in modules.
Expand Down Expand Up @@ -307,7 +307,7 @@ export const exampleQuery = query({
args: { fileId: v.id("_storage") },
returns: v.null(),
handler: async (ctx, args) => {
const metadata: FileMetadata | null = await ctx.db.system.get(args.fileId);
const metadata: FileMetadata | null = await ctx.db.system.get("_storage", args.fileId);
console.log(metadata);
return null;
},
Expand Down Expand Up @@ -434,7 +434,7 @@ Internal Functions:
"description": "This example shows how to build a chat app without authentication.",
"version": "1.0.0",
"dependencies": {
"convex": "^1.17.4",
"convex": "^1.31.2",
"openai": "^4.79.0"
},
"devDependencies": {
Expand Down Expand Up @@ -667,6 +667,35 @@ export default defineSchema({
});
```

#### convex/tsconfig.json
```typescript
{
/* This TypeScript project config describes the environment that
* Convex functions run in and is used to typecheck them.
* You can modify it, but some settings required to use Convex.
*/
"compilerOptions": {
/* These settings are not required by Convex and can be modified. */
"allowJs": true,
"strict": true,
"moduleResolution": "Bundler",
"jsx": "react-jsx",
"skipLibCheck": true,
"allowSyntheticDefaultImports": true,

/* These compiler options are required by Convex */
"target": "ESNext",
"lib": ["ES2021", "dom"],
"forceConsistentCasingInFileNames": true,
"module": "ESNext",
"isolatedModules": true,
"noEmit": true
},
"include": ["./**/*"],
"exclude": ["./_generated"]
}
```

#### src/App.tsx
```typescript
export default function App() {
Expand Down
6 changes: 6 additions & 0 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ import type {
Options,
RawRequestResponseHandler,
MutationCtx,
SaveMessagesCallbackArgs,
SaveMessagesHandler,
StorageOptions,
StreamingTextArgs,
StreamObjectArgs,
Expand Down Expand Up @@ -123,6 +125,7 @@ export {
vMessageDoc,
vPaginationResult,
vProviderMetadata,
vSaveMessagesArgs,
vSource,
vStorageOptions,
vStreamArgs,
Expand Down Expand Up @@ -186,6 +189,8 @@ export type {
ContextOptions,
ProviderMetadata,
RawRequestResponseHandler,
SaveMessagesCallbackArgs,
SaveMessagesHandler,
StorageOptions,
StreamArgs,
SyncStreamsReturnValue,
Expand Down Expand Up @@ -793,6 +798,7 @@ export class Agent<
...rest,
agentName: this.options.name,
embeddings,
onSaveMessages: this.options.onSaveMessages,
});
}

Expand Down
18 changes: 17 additions & 1 deletion src/client/messages.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { ModelMessage } from "ai";
import type { PaginationOptions, PaginationResult } from "convex/server";
import {
createFunctionHandle,
type PaginationOptions,
type PaginationResult,
} from "convex/server";
import type { MessageDoc } from "../validators.js";
import { validateVectorDimension } from "../component/vector/tables.js";
import {
Expand All @@ -17,6 +21,7 @@ import type {
MutationCtx,
QueryCtx,
ActionCtx,
SaveMessagesHandler,
} from "./types.js";
import { parse } from "convex-helpers/validators";

Expand Down Expand Up @@ -104,6 +109,11 @@ export type SaveMessagesArgs = {
* A pending message ID to replace when adding messages.
*/
pendingMessageId?: string;
/**
* Optional callback mutation to invoke after messages are saved.
* Called within the same transaction as the message save.
*/
onSaveMessages?: SaveMessagesHandler;
Comment on lines +112 to +116
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate onSaveMessages through saveMessage() too.

The hook is only surfaced on saveMessages(). saveMessage() still uses SaveMessageArgs, and the delegation at Line 238 never forwards onSaveMessages, so this public single-message path bypasses the new callback entirely.

💡 Proposed fix
 export type SaveMessageArgs = {
   threadId: string;
   userId?: string | null;
@@
   /**
    * A pending message ID to replace with this message.
    */
   pendingMessageId?: string;
+  /**
+   * Optional callback mutation to invoke after the message is saved.
+   * Called within the same transaction as the message save.
+   */
+  onSaveMessages?: SaveMessagesHandler;
 } & (
@@
   const { messages } = await saveMessages(ctx, component, {
@@
     metadata: args.metadata ? [args.metadata] : undefined,
     embeddings,
+    onSaveMessages: args.onSaveMessages,
   });

Also applies to: 171-172

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/messages.ts` around lines 112 - 116, The single-message API path
bypasses the onSaveMessages hook because SaveMessageArgs and saveMessage do not
accept/forward onSaveMessages to saveMessages; update the SaveMessageArgs type
(and any call sites) to include onSaveMessages?: SaveMessagesHandler, change the
saveMessage function to accept that field and pass it through when delegating to
saveMessages (the function that persists multiple messages), and ensure the
onSaveMessages is invoked within the same transaction in the saveMessages
implementation so single-message saves trigger the callback too.

};

/**
Expand Down Expand Up @@ -131,6 +141,11 @@ export async function saveMessages(
};
}
}
// Convert function reference to handle string for passing to component
const onSaveMessagesHandle = args.onSaveMessages
? await createFunctionHandle(args.onSaveMessages)
: undefined;

const result = await ctx.runMutation(component.messages.addMessages, {
threadId: args.threadId,
userId: args.userId ?? undefined,
Expand All @@ -153,6 +168,7 @@ export async function saveMessages(
}),
),
failPendingSteps: args.failPendingSteps ?? false,
onSaveMessages: onSaveMessagesHandle,
});
return { messages: result.messages };
}
Expand Down
21 changes: 19 additions & 2 deletions src/client/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ import {
serializeObjectResult,
} from "../mapping.js";
import { embedMessages, fetchContextWithPrompt } from "./search.js";
import type { ActionCtx, AgentComponent, Config, Options } from "./types.js";
import type {
ActionCtx,
AgentComponent,
Config,
Options,
SaveMessagesHandler,
} from "./types.js";
import type { Message, MessageDoc } from "../validators.js";
import {
getModelName,
Expand All @@ -25,7 +31,11 @@ import { wrapTools, type ToolCtx } from "./createTool.js";
import type { Agent } from "./index.js";
import { assert, omit } from "convex-helpers";
import { saveInputMessages } from "./saveInputMessages.js";
import type { GenericActionCtx, GenericDataModel } from "convex/server";
import {
createFunctionHandle,
type GenericActionCtx,
type GenericDataModel,
} from "convex/server";

export async function startGeneration<
T,
Expand Down Expand Up @@ -93,6 +103,7 @@ export async function startGeneration<
languageModel?: LanguageModel;
agentName: string;
agentForToolCtx?: Agent;
onSaveMessages?: SaveMessagesHandler;
},
): Promise<{
args: T & {
Expand Down Expand Up @@ -124,6 +135,11 @@ export async function startGeneration<
?.userId) ??
undefined;

// Convert function reference to handle string for passing to component
const onSaveMessagesHandle = opts.onSaveMessages
? await createFunctionHandle(opts.onSaveMessages)
: undefined;

const context = await fetchContextWithPrompt(ctx, component, {
...opts,
userId,
Expand Down Expand Up @@ -282,6 +298,7 @@ export async function startGeneration<
embeddings,
failPendingSteps: false,
finishStreamId,
onSaveMessages: onSaveMessagesHandle,
});
const lastMessage = saved.messages.at(-1)!;
if (createPendingMessage) {
Expand Down
57 changes: 57 additions & 0 deletions src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface Output<_T = any, _P = any, _E = any> {
createElementStreamTransform: any;
}
import type {
FunctionReference,
GenericActionCtx,
GenericDataModel,
GenericMutationCtx,
Expand All @@ -39,6 +40,7 @@ import type {
import type {
MessageDoc,
ProviderMetadata,
SaveMessagesCallbackArgs,
StreamDelta,
StreamMessage,
ThreadDoc,
Expand Down Expand Up @@ -151,6 +153,12 @@ export type Config = {
* log the raw request body or response headers to a table, or logs.
*/
rawRequestResponseHandler?: RawRequestResponseHandler;
/**
* Called whenever messages are saved to the thread. This includes messages
* saved via generateText, streamText, generateObject, streamObject,
* saveMessage, and saveMessages.
*/
onSaveMessages?: SaveMessagesHandler;
/**
* @deprecated Reach out if you use this. Otherwise will be removed soon.
* Default provider options to pass for the LLM calls.
Expand Down Expand Up @@ -348,6 +356,50 @@ export type RawRequestResponseHandler = (
},
) => void | Promise<void>;

export type { SaveMessagesCallbackArgs } from "../validators.js";

/**
* A reference to a mutation function that will be called whenever messages are
* saved to a thread. This callback is invoked **within the same transaction**
* as the message save, making it transactional.
*
* This includes messages saved via generateText, streamText, generateObject,
* streamObject, saveMessage, and saveMessages.
*
* Use this to trigger side effects when messages are saved, such as updating
* counters, creating notifications, or syncing with external systems.
*
* @example
* ```ts
* // In your convex/myModule.ts:
* import { vSaveMessagesArgs } from "@convex-dev/agent";
*
* export const onNewMessages = internalMutation({
* args: vSaveMessagesArgs,
* handler: async (ctx, args) => {
* // This runs in the same transaction as the message save
* await ctx.db.insert("messageEvents", {
* threadId: args.threadId,
* messageCount: args.messages.length,
* timestamp: Date.now(),
* });
* },
* });
*
* // In your agent configuration:
* const agent = new Agent(components.agent, {
* name: "myAgent",
* languageModel: openai.chat("gpt-4o-mini"),
* onSaveMessages: internal.myModule.onNewMessages,
* });
* ```
*/
export type SaveMessagesHandler = FunctionReference<
"mutation",
"internal" | "public",
SaveMessagesCallbackArgs
>;

export type AgentComponent = ComponentApi;

export type TextArgs<
Expand Down Expand Up @@ -621,6 +673,11 @@ export type Options = {
* ordering will not apply. This excludes the system message / instructions.
*/
contextHandler?: ContextHandler;
/**
* Called whenever messages are saved to the thread.
* Overrides the onSaveMessages handler set in the agent constructor.
*/
onSaveMessages?: SaveMessagesHandler;
};

export type SyncStreamsReturnValue =
Expand Down
1 change: 1 addition & 0 deletions src/component/_generated/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
| { message: string; type: "other" }
>;
}>;
onSaveMessages?: string;
pendingMessageId?: string;
promptMessageId?: string;
threadId: string;
Expand Down
25 changes: 24 additions & 1 deletion src/component/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { assert, omit, pick } from "convex-helpers";
import { mergedStream, stream } from "convex-helpers/server/stream";
import {
paginationOptsValidator,
type FunctionReference,
type WithoutSystemFields,
} from "convex/server";
import type { ObjectType } from "convex/values";
Expand All @@ -19,6 +20,7 @@ import {
vMessageWithMetadataInternal,
vPaginationResult,
type MessageDoc,
type SaveMessagesCallbackArgs,
} from "../validators.js";
import { api, internal } from "./_generated/api.js";
import type { Doc, Id } from "./_generated/dataModel.js";
Expand Down Expand Up @@ -144,6 +146,9 @@ const addMessagesArgs = {
// If provided, finish this stream atomically with the message save.
// This prevents UI flickering from separate mutations (issue #181).
finishStreamId: v.optional(v.id("streamingMessages")),
// Optional callback mutation to invoke after messages are saved.
// Called within the same transaction as the message save.
onSaveMessages: v.optional(v.string()),
};
export const addMessages = mutation({
args: addMessagesArgs,
Expand All @@ -166,6 +171,7 @@ async function addMessagesHandler(
failPendingSteps,
// Destructured separately to exclude from `...rest` (used in addMessages args, not message fields)
finishStreamId,
onSaveMessages,
messages,
promptMessageId,
pendingMessageId,
Expand Down Expand Up @@ -313,7 +319,24 @@ async function addMessagesHandler(
if (finishStreamId) {
await finishHandler(ctx, { streamId: finishStreamId });
}
return { messages: toReturn.map(publicMessage) };
const savedMessages = toReturn.map(publicMessage);
// Call the onSaveMessages callback if provided, within the same transaction
if (onSaveMessages && savedMessages.length > 0) {
const callbackArgs: SaveMessagesCallbackArgs = {
userId,
threadId,
messages: savedMessages,
};
await ctx.runMutation(
onSaveMessages as unknown as FunctionReference<
"mutation",
"public" | "internal",
SaveMessagesCallbackArgs
>,
callbackArgs,
);
}
return { messages: savedMessages };
}

// exported for tests
Expand Down
7 changes: 7 additions & 0 deletions src/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,13 @@ export const vMessageDoc = v.object({
});
export type MessageDoc = Infer<typeof vMessageDoc>; // Public

export const vSaveMessagesArgs = v.object({
userId: v.optional(v.string()),
threadId: v.string(),
messages: v.array(vMessageDoc),
});
export type SaveMessagesCallbackArgs = Infer<typeof vSaveMessagesArgs>;

export const vThreadDoc = v.object({
_id: v.string(),
_creationTime: v.number(),
Expand Down
Loading