diff --git a/src/__tests__/errors.test.ts b/src/__tests__/errors.test.ts new file mode 100644 index 0000000..4c2666d --- /dev/null +++ b/src/__tests__/errors.test.ts @@ -0,0 +1,78 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; +import { AgentNotFoundError, RateLimitError } from '../core/errors'; +import { toRateLimitError as discordToRateLimit } from '../providers/discord/adapter'; +import { toRateLimitError as slackToRateLimit } from '../providers/slack/adapter'; + +test('RateLimitError carries retryAfterMs', () => { + const err = new RateLimitError(2500, 'test'); + assert.equal(err.name, 'RateLimitError'); + assert.equal(err.retryAfterMs, 2500); + assert.equal(err.message, 'test'); + assert.ok(err instanceof Error); + assert.ok(err instanceof RateLimitError); +}); + +test('RateLimitError has a default message when context is omitted', () => { + const err = new RateLimitError(500); + assert.match(err.message, /Rate limited; retry after 500ms/); +}); + +test('AgentNotFoundError carries the agentId', () => { + const err = new AgentNotFoundError('agent-7'); + assert.equal(err.name, 'AgentNotFoundError'); + assert.equal(err.agentId, 'agent-7'); + assert.match(err.message, /Agent not found: agent-7/); + assert.ok(err instanceof Error); + assert.ok(err instanceof AgentNotFoundError); +}); + +test('RateLimitError is distinguishable from AgentNotFoundError', () => { + const rl = new RateLimitError(1000); + const nf = new AgentNotFoundError('x'); + assert.ok(!(rl instanceof AgentNotFoundError)); + assert.ok(!(nf instanceof RateLimitError)); +}); + +test('discord toRateLimitError maps status=429 with no retryAfter to a 1s backoff', () => { + const err = Object.assign(new Error('rate limited'), { status: 429 }); + const result = discordToRateLimit(err); + assert.ok(result instanceof RateLimitError); + assert.equal(result!.retryAfterMs, 1000); +}); + +test('discord toRateLimitError preserves the platform retryAfter (ms)', () => { + const err = Object.assign(new Error('rate limited'), { + status: 429, + retryAfter: 4321, + }); + const result = discordToRateLimit(err); + assert.ok(result instanceof RateLimitError); + assert.equal(result!.retryAfterMs, 4321); +}); + +test('discord toRateLimitError returns null for unrelated errors', () => { + assert.equal(discordToRateLimit(new Error('boom')), null); + assert.equal(discordToRateLimit(null), null); + assert.equal(discordToRateLimit(undefined), null); + assert.equal(discordToRateLimit('string'), null); +}); + +test('slack toRateLimitError converts seconds to ms', () => { + const err = Object.assign(new Error('rate limited'), { + code: 'slack_webapi_rate_limited_error', + retryAfter: 30, + }); + const result = slackToRateLimit(err); + assert.ok(result instanceof RateLimitError); + assert.equal(result!.retryAfterMs, 30000); +}); + +test('slack toRateLimitError ignores non-rate-limit errors', () => { + const platform = Object.assign(new Error('channel_not_found'), { + code: 'slack_webapi_platform_error', + data: { error: 'channel_not_found' }, + }); + assert.equal(slackToRateLimit(platform), null); + assert.equal(slackToRateLimit(new Error('network down')), null); +}); diff --git a/src/__tests__/server.test.ts b/src/__tests__/server.test.ts index 15dd33a..accc178 100644 --- a/src/__tests__/server.test.ts +++ b/src/__tests__/server.test.ts @@ -3,6 +3,7 @@ import assert from 'node:assert/strict'; import http from 'http'; import type { ApiDeps } from '../core/api'; import type { BridgeProvider } from '../core/types'; +import { AgentNotFoundError, RateLimitError } from '../core/errors'; const mod: { createServerHandler?: typeof import('../core/api').createServerHandler; @@ -62,7 +63,7 @@ function makeDeps(overrides: Partial = {}): ApiDeps { function request( server: http.Server, options: { method: string; path: string; body?: object; contentType?: string }, -): Promise<{ status: number; body: any }> { +): Promise<{ status: number; body: any; headers: http.IncomingHttpHeaders }> { return new Promise((resolve, reject) => { const addr = server.address() as { port: number }; const payload = options.body ? JSON.stringify(options.body) : undefined; @@ -82,11 +83,13 @@ function request( let data = ''; res.on('data', (chunk) => (data += chunk)); res.on('end', () => { + let body: unknown; try { - resolve({ status: res.statusCode!, body: JSON.parse(data) }); + body = JSON.parse(data); } catch { - resolve({ status: res.statusCode!, body: data }); + body = data; } + resolve({ status: res.statusCode!, body, headers: res.headers }); }); }, ); @@ -294,7 +297,7 @@ test('POST /api/send returns 404 for unknown agent', async () => { providers: new Map([ [ 'discord', - makeProvider('discord', { findThrows: new Error('Agent not found: missing') }), + makeProvider('discord', { findThrows: new AgentNotFoundError('missing') }), ], ]), }), @@ -312,6 +315,158 @@ test('POST /api/send returns 404 for unknown agent', async () => { } }); +test('POST /api/send retries and surfaces 429 on persistent rate limit', async () => { + let calls = 0; + const provider: BridgeProvider = { + name: 'discord', + isReady: () => true, + async start() {}, + async stop() {}, + resolveConversation: () => null, + findOrCreateAgentChannel: async (agentId) => ({ + channelId: 'ch-1', + agentId, + agentName: 'Test', + }), + send: async () => { + calls += 1; + throw new RateLimitError(1, 'platform rate limited'); + }, + }; + const server = await startTestServer(makeDeps({ providers: new Map([['discord', provider]]) })); + try { + const res = await request(server, { + method: 'POST', + path: '/api/send', + body: { agentId: 'a-1', message: 'hi' }, + }); + assert.equal(res.status, 429); + assert.match(res.body.error, /Rate limited/); + assert.equal(res.headers['retry-after'], '1', 'should advertise a Retry-After header'); + assert.equal(calls, 3, 'should attempt exactly 3 sends before giving up'); + } finally { + server.close(); + } +}); + +test('POST /api/send Retry-After header rounds sub-second backoffs up to 1s', async () => { + const provider: BridgeProvider = { + name: 'discord', + isReady: () => true, + async start() {}, + async stop() {}, + resolveConversation: () => null, + findOrCreateAgentChannel: async (agentId) => ({ + channelId: 'ch-1', + agentId, + agentName: 'Test', + }), + send: async () => { + throw new RateLimitError(250, 'sub-second backoff'); + }, + }; + const server = await startTestServer(makeDeps({ providers: new Map([['discord', provider]]) })); + try { + const res = await request(server, { + method: 'POST', + path: '/api/send', + body: { agentId: 'a-1', message: 'hi' }, + }); + assert.equal(res.status, 429); + assert.equal(res.headers['retry-after'], '1'); + } finally { + server.close(); + } +}); + +test('POST /api/send Retry-After header rounds fractional seconds up', async () => { + // 1500ms is small enough to keep the test fast (3 × 1.5s = ~4.5s) + // but still exercises the round-up branch (Math.ceil(1500 / 1000) = 2). + const provider: BridgeProvider = { + name: 'discord', + isReady: () => true, + async start() {}, + async stop() {}, + resolveConversation: () => null, + findOrCreateAgentChannel: async (agentId) => ({ + channelId: 'ch-1', + agentId, + agentName: 'Test', + }), + send: async () => { + throw new RateLimitError(1500, 'fractional backoff'); + }, + }; + const server = await startTestServer(makeDeps({ providers: new Map([['discord', provider]]) })); + try { + const res = await request(server, { + method: 'POST', + path: '/api/send', + body: { agentId: 'a-1', message: 'hi' }, + }); + assert.equal(res.status, 429); + assert.equal(res.headers['retry-after'], '2', '1500ms rounds up to 2s'); + } finally { + server.close(); + } +}); + +test('POST /api/send succeeds on retry after a transient rate limit', async () => { + let calls = 0; + const sent: string[] = []; + const provider: BridgeProvider = { + name: 'discord', + isReady: () => true, + async start() {}, + async stop() {}, + resolveConversation: () => null, + findOrCreateAgentChannel: async (agentId) => ({ + channelId: 'ch-1', + agentId, + agentName: 'Test', + }), + send: async (_target, msg) => { + calls += 1; + if (calls === 1) throw new RateLimitError(1, 'transient'); + sent.push(msg.text); + }, + }; + const server = await startTestServer(makeDeps({ providers: new Map([['discord', provider]]) })); + try { + const res = await request(server, { + method: 'POST', + path: '/api/send', + body: { agentId: 'a-1', message: 'hi' }, + }); + assert.equal(res.status, 200); + assert.equal(calls, 2); + assert.deepEqual(sent, ['hi']); + } finally { + server.close(); + } +}); + +test('POST /api/send returns 500 for non-typed errors from findOrCreateAgentChannel', async () => { + const server = await startTestServer( + makeDeps({ + providers: new Map([ + ['discord', makeProvider('discord', { findThrows: new Error('DB is down') })], + ]), + }), + ); + try { + const res = await request(server, { + method: 'POST', + path: '/api/send', + body: { agentId: 'a-1', message: 'hi' }, + }); + assert.equal(res.status, 500); + assert.match(res.body.error, /DB is down/); + } finally { + server.close(); + } +}); + test('POST /api/send returns 415 for wrong content type', async () => { const server = await startTestServer(makeDeps()); try { diff --git a/src/core/api.ts b/src/core/api.ts index 933ceb5..9a21ba8 100644 --- a/src/core/api.ts +++ b/src/core/api.ts @@ -3,6 +3,7 @@ import type { BridgeProvider } from './types'; import { config } from './config'; import { logger } from './logger'; import { splitMessage as defaultSplit } from './splitMessage'; +import { AgentNotFoundError, RateLimitError } from './errors'; export interface SendRequest { agentId: string; @@ -49,8 +50,13 @@ export function parseBody(req: http.IncomingMessage): Promise { }); } -function sendJson(res: http.ServerResponse, status: number, data: object) { - res.writeHead(status, { 'Content-Type': 'application/json' }); +function sendJson( + res: http.ServerResponse, + status: number, + data: object, + headers?: Record, +) { + res.writeHead(status, { 'Content-Type': 'application/json', ...(headers ?? {}) }); res.end(JSON.stringify(data)); } @@ -113,10 +119,10 @@ export function createServerHandler(deps: ApiDeps) { try { info = await provider.findOrCreateAgentChannel(body.agentId); } catch (err) { - const msg = (err as Error).message; - if (msg.startsWith('Agent not found:')) { - sendJson(res, 404, { success: false, error: msg }); + if (err instanceof AgentNotFoundError) { + sendJson(res, 404, { success: false, error: err.message }); } else { + const msg = (err as Error).message; await log.error('api/findOrCreateAgentChannel', msg); sendJson(res, 500, { success: false, error: msg }); } @@ -137,22 +143,30 @@ export function createServerHandler(deps: ApiDeps) { break; } catch (err) { lastError = err as Error; - const discordErr = err as { status?: number; retryAfter?: number }; - const isRateLimited = discordErr.status === 429 || discordErr.retryAfter != null; - if (isRateLimited) { - const delay = discordErr.retryAfter ?? 1000; - await new Promise((r) => setTimeout(r, delay)); + if (err instanceof RateLimitError) { + // Clamp the in-request backoff: never spin with a zero delay, and + // never tie up the HTTP connection for more than a few seconds. + // Larger backoffs are surfaced to the caller via Retry-After below. + const waitMs = Math.min(Math.max(err.retryAfterMs, 100), 5000); + await new Promise((r) => setTimeout(r, waitMs)); } else { break; } } } if (lastError) { - const discordErr = lastError as Error & { status?: number; retryAfter?: number }; - const isRateLimited = discordErr.status === 429 || discordErr.retryAfter != null; - if (isRateLimited) { + if (lastError instanceof RateLimitError) { await log.error('api', 'Rate limited by provider after 3 retries'); - sendJson(res, 429, { success: false, error: 'Rate limited, retry later' }); + // Round up to whole seconds; clamp to a minimum of 1 so we never + // advertise a zero-second backoff that the kernel already waited + // through and still hit the limit. + const retryAfterSec = Math.max(1, Math.ceil(lastError.retryAfterMs / 1000)); + sendJson( + res, + 429, + { success: false, error: 'Rate limited, retry later' }, + { 'Retry-After': retryAfterSec }, + ); } else { await log.error('api', lastError.message); sendJson(res, 500, { success: false, error: lastError.message }); diff --git a/src/core/errors.ts b/src/core/errors.ts new file mode 100644 index 0000000..ffa05d1 --- /dev/null +++ b/src/core/errors.ts @@ -0,0 +1,42 @@ +/** + * Provider-agnostic bridge errors. + * + * Adapters must translate platform-specific error shapes (Discord's + * `DiscordAPIError` / `@discordjs/rest` `RateLimitError`, Slack's + * `WebAPIRateLimitedError`, etc.) into these classes so the kernel + * can react without leaking platform types. + */ + +/** + * The chat platform asked us to slow down. The kernel uses + * `retryAfterMs` to schedule a backoff before retrying; adapters + * convert their platform's unit (Discord returns ms, Slack returns + * seconds) into a single milliseconds value here. + */ +export class RateLimitError extends Error { + readonly retryAfterMs: number; + + constructor(retryAfterMs: number, context?: string) { + super(context ?? `Rate limited; retry after ${retryAfterMs}ms`); + // Restore the prototype chain so `instanceof` works when this class is + // subclassed or compiled to older targets / crosses module boundaries. + Object.setPrototypeOf(this, new.target.prototype); + this.name = 'RateLimitError'; + this.retryAfterMs = retryAfterMs; + } +} + +/** + * The requested agent does not exist in the maestro registry. + * The kernel surfaces this to API callers as 404. + */ +export class AgentNotFoundError extends Error { + readonly agentId: string; + + constructor(agentId: string) { + super(`Agent not found: ${agentId}`); + Object.setPrototypeOf(this, new.target.prototype); + this.name = 'AgentNotFoundError'; + this.agentId = agentId; + } +} diff --git a/src/providers/discord/adapter.ts b/src/providers/discord/adapter.ts index e928abc..8213f48 100644 --- a/src/providers/discord/adapter.ts +++ b/src/providers/discord/adapter.ts @@ -23,6 +23,7 @@ import type { import { maestro } from '../../core/maestro'; import { logger } from '../../core/logger'; import { checkTranscriptionDependencies } from '../../core/transcription'; +import { AgentNotFoundError, RateLimitError } from '../../core/errors'; import { discordConfig } from './config'; import { channelDb } from './channelsDb'; import { threadDb } from './threadsDb'; @@ -178,7 +179,13 @@ export class DiscordProvider implements BridgeProvider { if (msg.mention && discordConfig.mentionUserId) { text = `<@${discordConfig.mentionUserId}> ${text}`; } - await channel.send(text); + try { + await channel.send(text); + } catch (err) { + const rl = toRateLimitError(err); + if (rl) throw rl; + throw err; + } } async react(target: MessageTarget, emoji: string): Promise { @@ -221,7 +228,7 @@ export class DiscordProvider implements BridgeProvider { if (!this.client) throw new Error('Discord client not initialised'); const allAgents = await maestro.listAgents(); const agent = allAgents.find((a) => a.id === agentId); - if (!agent) throw new Error(`Agent not found: ${agentId}`); + if (!agent) throw new AgentNotFoundError(agentId); const guild = await this.client.guilds.fetch(discordConfig.guildId); @@ -274,3 +281,26 @@ export class DiscordProvider implements BridgeProvider { return fetched; } } + +/** + * Translate a discord.js error into the kernel-level `RateLimitError`. + * + * discord.js surfaces rate limits through two shapes: + * - `@discordjs/rest` `RateLimitError` with `status: 429` and `retryAfter` in ms + * - `DiscordAPIError` with `status: 429` and no `retryAfter` (the API will + * respect the next `Retry-After` we send) + * + * Returns `null` when the error is not a rate-limit; the caller rethrows + * the original error in that case. + */ +export function toRateLimitError(err: unknown): RateLimitError | null { + if (!err || typeof err !== 'object') return null; + const e = err as { status?: number; retryAfter?: number; name?: string }; + // `@discordjs/rest`'s RateLimitError carries a numeric `retryAfter` without a + // `status`, so we accept either signal. Requiring `retryAfter` to be a number + // avoids promoting unrelated errors that happen to carry a truthy property. + if (e.status === 429 || typeof e.retryAfter === 'number') { + return new RateLimitError(e.retryAfter ?? 1000, `Discord rate limited`); + } + return null; +} diff --git a/src/providers/slack/adapter.ts b/src/providers/slack/adapter.ts index 7230004..0455d04 100644 --- a/src/providers/slack/adapter.ts +++ b/src/providers/slack/adapter.ts @@ -13,6 +13,7 @@ import type { } from '../../core/types'; import { maestro } from '../../core/maestro'; import { logger } from '../../core/logger'; +import { AgentNotFoundError, RateLimitError } from '../../core/errors'; import { slackConfig } from './config'; import { channelDb } from './channelsDb'; import { conversationDb } from './conversationsDb'; @@ -321,24 +322,30 @@ export class SlackProvider implements BridgeProvider { text = `<@${slackConfig.mentionUserId}> ${text}`; } - if (isThreadTs(target.channelId)) { - // target is a thread_ts — look up parent channel - const convo = conversationDb.get(target.channelId); - if (!convo) { - // The thread is orphaned — its row was likely removed when the - // bound channel was disconnected, or the DB was reset. Log the - // mismatch specifically so operators can distinguish it from - // generic Slack/network errors before surfacing to the kernel. - void logger.error('slack/send:orphan-thread', `thread_ts=${target.channelId}`); - throw new Error(`No conversation found for thread_ts ${target.channelId}`); + try { + if (isThreadTs(target.channelId)) { + // target is a thread_ts — look up parent channel + const convo = conversationDb.get(target.channelId); + if (!convo) { + // The thread is orphaned — its row was likely removed when the + // bound channel was disconnected, or the DB was reset. Log the + // mismatch specifically so operators can distinguish it from + // generic Slack/network errors before surfacing to the kernel. + void logger.error('slack/send:orphan-thread', `thread_ts=${target.channelId}`); + throw new Error(`No conversation found for thread_ts ${target.channelId}`); + } + await this.client.chat.postMessage({ + channel: convo.channel_id, + thread_ts: target.channelId, + text, + }); + } else { + await this.client.chat.postMessage({ channel: target.channelId, text }); } - await this.client.chat.postMessage({ - channel: convo.channel_id, - thread_ts: target.channelId, - text, - }); - } else { - await this.client.chat.postMessage({ channel: target.channelId, text }); + } catch (err) { + const rl = toRateLimitError(err); + if (rl) throw rl; + throw err; } } @@ -396,7 +403,7 @@ export class SlackProvider implements BridgeProvider { const allAgents = await maestro.listAgents(); const agent = allAgents.find((a) => a.id === agentId); - if (!agent) throw new Error(`Agent not found: ${agentId}`); + if (!agent) throw new AgentNotFoundError(agentId); const { channelId } = await findOrCreateSlackChannel(this.client, agent); channelDb.register(channelId, agent.id, agent.name); @@ -411,3 +418,20 @@ export class SlackProvider implements BridgeProvider { } } } + +/** + * Translate a `@slack/web-api` `WebAPIRateLimitedError` into the kernel-level + * `RateLimitError`. Slack's `retryAfter` is in seconds — we convert to ms + * so the kernel deals in a single unit. + * + * Returns `null` when the error is not a rate-limit; the caller rethrows + * the original error in that case. + */ +export function toRateLimitError(err: unknown): RateLimitError | null { + if (!err || typeof err !== 'object') return null; + const e = err as { code?: string; retryAfter?: number }; + if (e.code === 'slack_webapi_rate_limited_error' && typeof e.retryAfter === 'number') { + return new RateLimitError(e.retryAfter * 1000, `Slack rate limited; retry after ${e.retryAfter}s`); + } + return null; +}