Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/design/api-v1-redesign.md
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,13 @@ review diligence — a #206-style omission can't merge.
> root cause is adding `default: ErrorEnvelope` to the send/reply/forward Huma
> handlers (then regen) so every generated client parses it. **#5 attachment
> download-URL, #7 idempotency-key on create tools — still PENDING** (not in
> GA). #3 (one pagination shape) is partial: the SDK AutoPager normalizes
> cursors, but MCP tool schemas still expose `token`/`page_size`.
> GA). **#3 (one pagination shape) ✅ done:** the cursor-paginated list tools
> (`list_messages`, `list_conversations`, `list_events`) now take `cursor` +
> `limit` and return `{ <items>, next_cursor }` (one page; pass `next_cursor`
> back for the next) — the dead `token`/`page_size` are gone. Built on a new
> `AutoPager.page(cursor?)` SDK primitive. (Small fixed lists — `list_agents`/
> `list_domains`/`list_webhooks` — stay non-paginated per decision 7;
> `list_webhook_deliveries` is single-page `limit` since the API has no cursor.)
>
> Scope-gating note: gating is a decision-space/UX optimization, not the
> security boundary — the backend enforces scope per-handler (the OAuth/WS
Expand Down
10 changes: 5 additions & 5 deletions internal/httpapi/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ type ConversationDetailView struct {
Messages []MessageSummaryView `json:"messages" nullable:"false"`
}

// ListConversationsInput — since/until + cursor/limit. Conversation cursor
// *continuation* is not yet supported by the store (it takes only a limit,
// no after-key), so next_cursor is always null — faithful to the legacy
// single-page behavior. True cursoring is a tracked follow-up needing a
// store change.
// ListConversationsInput — since/until + cursor/limit. Cursor continuation IS
// supported: the handler fetches limit+1, and when there are more it keyset-
// encodes the last row's (last_message_at, conversation_id) into next_cursor
// (see handleListConversations: hasMore → EncodeCursor(conversationsCursor{...})).
// next_cursor is null only on the last page.
type ListConversationsInput struct {
Address string `path:"email"`
Since string `query:"since" doc:"RFC3339."`
Expand Down
2 changes: 1 addition & 1 deletion mcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ shows the set your scope allows, with per-tool descriptions.
| --- | --- |
| `send_email` | Send a new email. When the agent has HITL enabled, the message is held and returns `status: pending_approval` instead of `sent`. |
| `reply_to_message` | Reply to an inbound message. Preserves In-Reply-To / References for thread continuity. |
| `list_messages` | List inbound mail. Filter by `status` (unread / read / all), paginate with `page_size` + `token`. |
| `list_messages` | List inbound mail. Filter by `read_status` (unread / read / all); cursor-paginated (`cursor` + `limit` in, `next_cursor` out). |
| `get_message` | Fetch full body, headers, and attachment metadata for one message. |

### Human-in-the-loop approval
Expand Down
28 changes: 16 additions & 12 deletions mcp/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type {
CreateWebhookRequest,
UpdateWebhookRequest,
TestWebhookRequest,
Page,
} from "@e2a/sdk/v1";
import type { McpConfig } from "./config.js";
import type { Scope } from "./tools/tiers.js";
Expand Down Expand Up @@ -211,7 +212,9 @@ export class McpClient {
return this.sdk.messages.get(this.resolveAddress(explicitAddress), messageId);
}

async listMessages(params: {
// Cursor pagination (§6a #3): returns ONE page + next_cursor. `limit` is the
// page size; pass a prior response's next_cursor as `cursor` for the next page.
listMessages(params: {
readStatus?: "unread" | "read" | "all";
sort?: "asc" | "desc";
from?: string;
Expand All @@ -220,22 +223,22 @@ export class McpClient {
since?: string;
until?: string;
labels?: Array<string>;
cursor?: string;
limit?: number;
explicitAddress?: string;
}): Promise<MessageSummaryView[]> {
const { explicitAddress, limit, ...rest } = params;
return this.sdk.messages
.list(this.resolveAddress(explicitAddress), rest)
.toArray({ limit: limit ?? DEFAULT_LIST_LIMIT });
}): Promise<Page<MessageSummaryView>> {
const { explicitAddress, cursor, ...rest } = params;
return this.sdk.messages.list(this.resolveAddress(explicitAddress), rest).page(cursor);
}

// ── Conversations ───────────────────────────────────────────────

listConversations(
params: { since?: string; until?: string; limit?: number },
params: { since?: string; until?: string; cursor?: string; limit?: number },
explicitAddress?: string,
): Promise<ConversationSummaryView[]> {
return this.sdk.conversations.list(this.resolveAddress(explicitAddress), params).toArray({ limit: params.limit ?? 200 });
): Promise<Page<ConversationSummaryView>> {
const { cursor, ...rest } = params;
return this.sdk.conversations.list(this.resolveAddress(explicitAddress), rest).page(cursor);
}

getConversation(
Expand Down Expand Up @@ -409,10 +412,11 @@ export class McpClient {
messageId?: string;
since?: string;
until?: string;
cursor?: string;
limit?: number;
}): Promise<EventJSON[]> {
const { limit, ...rest } = params;
return this.sdk.events.list(rest).toArray({ limit: limit ?? DEFAULT_LIST_LIMIT });
}): Promise<Page<EventJSON>> {
const { cursor, ...rest } = params;
return this.sdk.events.list(rest).page(cursor);
}

getEvent(id: string): Promise<EventJSON> {
Expand Down
23 changes: 10 additions & 13 deletions mcp/src/tools/events.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import type { McpClient } from "../client.js";
import { z } from "zod";
import { runTool, strictInputSchema } from "./util.js";
import { runTool, strictInputSchema, paginationInput } from "./util.js";

// Slice 8: MCP tool surfaces for the customer-facing events API.
// list_events — paginated listing with filters
Expand All @@ -19,7 +19,7 @@ export function registerEventTools(server: McpServer, client: McpClient): void {
title: "List webhook events",
annotations: { readOnlyHint: true },
description:
"List the durable webhook event log in reverse-chronological order. Useful for reconciliation (\"did our webhook receiver see this event?\") and for debugging delivery state. Events past the 30-day retention boundary are not returned. Cursor-paginated via `token` / `next_token` — pass the previous response's `next_token` to walk further back. Returns each event's `data` payload plus a `delivery_status` summary of how many subscribers have received it.",
"List the durable webhook event log in reverse-chronological order. Useful for reconciliation (\"did our webhook receiver see this event?\") and for debugging delivery state. Events past the 30-day retention boundary are not returned. **Cursor-paginated:** returns one page in `events` plus a `next_cursor` when more remain — pass it back as `cursor` to walk further back. Returns each event's `data` payload plus a `delivery_status` summary of how many subscribers have received it.",
inputSchema: strictInputSchema({
type: z
.string()
Expand All @@ -35,17 +35,12 @@ export function registerEventTools(server: McpServer, client: McpClient): void {
.optional()
.describe("RFC3339 timestamp; returns events with `created_at >= since`."),
until: z.string().optional().describe("RFC3339; returns events with `created_at < until`."),
page_size: z.number().int().min(1).max(100).optional(),
token: z.string().optional().describe("Opaque cursor from a previous response's `next_token`."),
...paginationInput,
}),
},
async (args) =>
// The v1 client auto-paginates; we collect up to `page_size` rows
// (the SDK walks cursors internally). `token` is accepted in the
// schema for contract stability but no longer needed — the pager
// handles cursoring transparently.
runTool(async () => ({
events: await client.listEvents({
runTool(async () => {
const page = await client.listEvents({
...(args.type !== undefined ? { type: args.type } : {}),
...(args.agent_id !== undefined ? { agentId: args.agent_id } : {}),
...(args.conversation_id !== undefined
Expand All @@ -54,9 +49,11 @@ export function registerEventTools(server: McpServer, client: McpClient): void {
...(args.message_id !== undefined ? { messageId: args.message_id } : {}),
...(args.since !== undefined ? { since: args.since } : {}),
...(args.until !== undefined ? { until: args.until } : {}),
...(args.page_size !== undefined ? { limit: args.page_size } : {}),
}),
})),
...(args.cursor !== undefined ? { cursor: args.cursor } : {}),
...(args.limit !== undefined ? { limit: args.limit } : {}),
});
return { events: page.items, ...(page.next_cursor ? { next_cursor: page.next_cursor } : {}) };
}),
);

server.registerTool(
Expand Down
40 changes: 20 additions & 20 deletions mcp/src/tools/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { simpleParser } from "mailparser";
import type { McpClient, SendOpts } from "../client.js";
import type { MessageView } from "@e2a/sdk/v1";
import { z } from "zod";
import { runTool, strictInputSchema } from "./util.js";
import { runTool, strictInputSchema, paginationInput } from "./util.js";
import { attachmentsArraySchema, type AttachmentInput } from "./attachments.js";

// Map the snake_case attachment wire shape (filename, content_type, data)
Expand Down Expand Up @@ -264,9 +264,9 @@ export function registerMessageTools(server: McpServer, client: McpClient): void
title: "List conversations for the agent",
annotations: { readOnlyHint: true },
description:
"Lists the agent's conversations — groups of messages sharing a `conversation_id` — one row per conversation, sorted by most recent activity. Each row carries `message_count`, `inbound_count`, `outbound_count`, `has_unread`, and the latest message's subject + sender so you can render an inbox without drilling into each thread. The server caps the response at 100. Use this when the user wants to see threads rather than individual messages — e.g. \"what conversations are unread?\" or \"show recent threads with Alice\". To read a single conversation's messages, call `get_conversation`.",
"Lists the agent's conversations — groups of messages sharing a `conversation_id` — one row per conversation, sorted by most recent activity. Each row carries `message_count`, `inbound_count`, `outbound_count`, `has_unread`, and the latest message's subject + sender so you can render an inbox without drilling into each thread. **Cursor-paginated:** returns one page in `conversations` plus a `next_cursor` when more remain — pass it back as `cursor` for the next page. To read a single conversation's messages, call `get_conversation`.",
inputSchema: strictInputSchema({
page_size: z.number().int().positive().max(100).optional(),
...paginationInput,
since: z
.string()
.optional()
Expand All @@ -283,16 +283,18 @@ export function registerMessageTools(server: McpServer, client: McpClient): void
}),
},
async (args) =>
runTool(async () => ({
conversations: await client.listConversations(
runTool(async () => {
const page = await client.listConversations(
{
...(args.page_size !== undefined ? { limit: args.page_size } : {}),
...(args.cursor !== undefined ? { cursor: args.cursor } : {}),
...(args.limit !== undefined ? { limit: args.limit } : {}),
...(args.since !== undefined ? { since: args.since } : {}),
...(args.until !== undefined ? { until: args.until } : {}),
},
args.email,
),
})),
);
return { conversations: page.items, ...(page.next_cursor ? { next_cursor: page.next_cursor } : {}) };
}),
);

server.registerTool(
Expand All @@ -317,15 +319,15 @@ export function registerMessageTools(server: McpServer, client: McpClient): void
title: "List inbound messages",
annotations: { readOnlyHint: true },
description:
"List messages the agent has received, newest first by default. Filter by `read_status` (unread/read/all; default unread) and cap results with `page_size`. Pass `sort: \"asc\"` for FIFO order (oldest unread first) when the caller wants to drain the inbox in arrival order. **Search filters** (`from`, `subject_contains`, `conversation_id`, `since`, `until`) narrow the result set server-side — use them instead of paginating the full inbox client-side. Returns summaries only — use `get_message` for the full body.",
"List messages the agent has received, newest first by default. Filter by `read_status` (unread/read/all; default unread). **Cursor-paginated:** returns one page in `messages` plus a `next_cursor` when more remain — pass it back as `cursor` for the next page (keep the same filters + sort). Pass `sort: \"asc\"` for FIFO order (oldest unread first) to drain the inbox in arrival order. **Search filters** (`from`, `subject_contains`, `conversation_id`, `since`, `until`) narrow server-side — use them instead of paging the whole inbox. Returns summaries only — use `get_message` for the full body.",
inputSchema: strictInputSchema({
read_status: z.enum(["unread", "read", "all"]).optional(),
page_size: z.number().int().positive().max(100).optional(),
...paginationInput,
sort: z
.enum(["asc", "desc"])
.optional()
.describe(
"Sort order by created_at. Defaults to `desc` (newest first). Pass `asc` for FIFO polling — drain the inbox in arrival order. Switching sort mid-pagination rejects the existing token.",
"Sort order by created_at. Defaults to `desc` (newest first). Pass `asc` for FIFO polling — drain the inbox in arrival order. Switching sort mid-pagination rejects the existing cursor.",
),
from: z
.string()
Expand Down Expand Up @@ -364,18 +366,15 @@ export function registerMessageTools(server: McpServer, client: McpClient): void
.describe(
"AND-match filter on labels. A row is returned only if ALL given labels are present. Use lowercase strings matching `[a-z0-9:_-]+`; `e2a:*` system labels can be filtered even though setting them is server-only.",
),
token: z.string().optional().describe("Pagination token from a previous response."),
email: z.string().optional(),
}),
},
async (args) =>
// The v1 client auto-paginates; `token` is accepted in the schema
// for contract stability but unused — the pager walks cursors
// internally up to `page_size` rows.
runTool(async () => ({
messages: await client.listMessages({
runTool(async () => {
const page = await client.listMessages({
...(args.read_status !== undefined ? { readStatus: args.read_status } : {}),
...(args.page_size !== undefined ? { limit: args.page_size } : {}),
...(args.cursor !== undefined ? { cursor: args.cursor } : {}),
...(args.limit !== undefined ? { limit: args.limit } : {}),
...(args.sort !== undefined ? { sort: args.sort } : {}),
...(args.from !== undefined ? { from: args.from } : {}),
...(args.subject_contains !== undefined
Expand All @@ -388,8 +387,9 @@ export function registerMessageTools(server: McpServer, client: McpClient): void
...(args.until !== undefined ? { until: args.until } : {}),
...(args.labels !== undefined ? { labels: args.labels } : {}),
...(args.email !== undefined ? { explicitAddress: args.email } : {}),
}),
})),
});
return { messages: page.items, ...(page.next_cursor ? { next_cursor: page.next_cursor } : {}) };
}),
);

server.registerTool(
Expand Down
20 changes: 20 additions & 0 deletions mcp/src/tools/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ export function strictInputSchema<S extends ZodRawShape>(shape: S) {
return z.object(shape).strict();
}

// paginationInput is the ONE pagination shape for every cursor-paginated list
// tool (§6a #3): `cursor` + `limit` in, and the tool returns `{ <items>,
// next_cursor }`. Spread it into the tool's input schema. This replaces the old
// mix of `token` / `page_size` / bare `limit`.
export const paginationInput = {
cursor: z
.string()
.optional()
.describe(
"Pagination cursor. Pass the `next_cursor` from a previous response to fetch the next page; omit for the first page. The response includes `next_cursor` ONLY when more pages remain — when it is absent, you have reached the last page; STOP (do not keep calling).",
),
limit: z
.number()
.int()
.positive()
.max(100)
.optional()
.describe("Max items in this page (1–100). Defaults to a server-chosen page size."),
} as const;

export async function runTool<T>(fn: () => Promise<T>): Promise<ToolResult> {
try {
const result = await fn();
Expand Down
23 changes: 13 additions & 10 deletions mcp/tests/events-streamable-http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ function makeStubClient(): McpClient {
getMessage: vi.fn(async () => ({ messageId: "m" })),
getAgent: vi.fn(async () => ({ id: "x@y", email: "x@y" })),
listAgents: vi.fn(async () => [{ email: "bot@example.com" }]),
listEvents: vi.fn(async () => [
{
id: "evt_http",
type: "email.received",
schemaVersion: 1,
createdAt: "2026-06-01T12:00:00Z",
status: "processed",
data: { from: "alice@example.com" },
},
]),
listEvents: vi.fn(async () => ({
items: [
{
id: "evt_http",
type: "email.received",
schemaVersion: 1,
createdAt: "2026-06-01T12:00:00Z",
status: "processed",
data: { from: "alice@example.com" },
},
],
next_cursor: undefined,
})),
getEvent: vi.fn(async (id: string) => ({
id,
type: "email.received",
Expand Down
Loading
Loading