From 60cbca646dd4b0608a3a64ffb46cf7a83261b4e3 Mon Sep 17 00:00:00 2001 From: tmm Date: Sun, 3 May 2026 12:50:17 -0400 Subject: [PATCH 1/7] feat: launch tweaks --- .github/ISSUE_TEMPLATE/page.yml | 101 +++++++ .github/README.md | 3 +- src/api.ts | 77 ++--- src/api.workers.test.ts | 69 ++++- src/entry-server.ts | 13 +- src/env.d.ts | 3 + src/queues/request-enrichment.ts | 41 +++ src/queues/request-enrichment.workers.test.ts | 121 ++++++++ src/queues/request.ts | 34 +-- src/queues/request.workers.test.ts | 263 ++++++++++-------- src/worker-configuration.d.ts | 5 +- test/vitest.config.ts | 1 + wrangler.jsonc | 45 +++ 13 files changed, 588 insertions(+), 188 deletions(-) create mode 100644 .github/ISSUE_TEMPLATE/page.yml create mode 100644 src/queues/request-enrichment.ts create mode 100644 src/queues/request-enrichment.workers.test.ts diff --git a/.github/ISSUE_TEMPLATE/page.yml b/.github/ISSUE_TEMPLATE/page.yml new file mode 100644 index 00000000..03d9cefb --- /dev/null +++ b/.github/ISSUE_TEMPLATE/page.yml @@ -0,0 +1,101 @@ +name: Page Fetch Issue +description: Report a problem fetching or narrowing a specific page with curl.md. +labels: ['bug'] +body: + - type: markdown + attributes: + value: | + Thanks for reporting this. + + For page-specific issues, the most helpful details are the exact page URL and the exact request options you used. + + - type: checkboxes + id: existing_issues + attributes: + label: Check existing issues + description: By submitting this issue, you checked there is not already an issue for this problem. + options: + - label: I checked existing issues and did not find a duplicate. + required: true + + - type: input + id: page_url + attributes: + label: Page URL + description: The exact page you tried to fetch. + placeholder: https://developers.cloudflare.com/workers + validations: + required: true + + - type: textarea + id: issue + attributes: + label: What is the issue? + description: What did you expect, and what happened instead? + placeholder: I expected curl.md to return the installation steps, but it omitted that section / returned unrelated content / failed to fetch the page. + validations: + required: true + + - type: input + id: objective + attributes: + label: Objective + description: If you used an objective, paste it exactly. + placeholder: find the install steps + validations: + required: false + + - type: input + id: keywords + attributes: + label: Keywords + description: If you used keywords, paste them exactly. + placeholder: install,worker,binding + validations: + required: false + + - type: dropdown + id: mode + attributes: + label: Mode + description: If you used an objective, which mode did you use? + options: + - I did not use an objective + - smart + - rush + validations: + required: true + + - type: checkboxes + id: request_options + attributes: + label: Request options + options: + - label: I used `fresh` / bypassed cache. + + - type: textarea + id: exact_request + attributes: + label: Exact request + description: Paste the exact CLI command, browser URL, SDK call, or agent tool input if possible. + placeholder: | + curl.md developers.cloudflare.com/workers --objective "find the install steps" --keywords install,worker --fresh + render: shell + validations: + required: false + + - type: textarea + id: response + attributes: + label: Response or error output + description: Paste the returned markdown, error message, or a representative excerpt. + validations: + required: false + + - type: textarea + id: context + attributes: + label: Anything else? + description: Extra context. Public/private page, auth requirements, browser vs CLI, screenshots, etc. + validations: + required: false diff --git a/.github/README.md b/.github/README.md index 6d431879..9883c2df 100644 --- a/.github/README.md +++ b/.github/README.md @@ -31,7 +31,8 @@ curl.md example.com md example.com # Add to your agent -claude plugin install curl-md@claude-plugins-official +claude plugin marketplace add wevm/curl.md +claude plugin install curl-md@curl-md opencode plugin -g @curl.md/opencode pi install npm:@curl.md/pi npx @curl.md/amp install diff --git a/src/api.ts b/src/api.ts index 477cdf7c..cf493fcb 100644 --- a/src/api.ts +++ b/src/api.ts @@ -2045,7 +2045,7 @@ export const api = new Hono<{ vary: 'Accept', }) - // Rate limit: three tiers (anon, authed/free, paid) + // Keep throttling tied to auth state so stale balance cache never opens an unlimited fast path. const identity = c.var.session ? c.var.session.account_id : (c.req.header('cf-connecting-ip') ?? 'unknown') @@ -2073,42 +2073,42 @@ export const api = new Hono<{ } })() - // Paid users skip rate limiting let rateLimitHeaders: Record = {} - if (!billable) { - const kvKey = `ratelimit:${limit.key}` as const - const now = Math.floor(Date.now() / 1000) - const record = await c.env.KV.get(kvKey, 'json') - - const reset = record && record.reset > now ? record.reset : now + limit.window - const count = record && record.reset > now ? record.count + 1 : 1 - - rateLimitHeaders = { - 'x-ratelimit-limit': String(limit.max), - 'x-ratelimit-remaining': String(Math.max(0, limit.max - count)), - 'x-ratelimit-reset': String(reset), - } + const kvKey = `ratelimit:${limit.key}` as const + const now = Math.floor(Date.now() / 1000) + const record = await c.env.KV.get(kvKey, 'json') - if (count > limit.max) - return c.json( - { - code: 'rate_limit_exceeded' as const, - message: c.var.session ? 'Add credits to remove rate limits' : 'Rate limit exceeded', - }, - 429, - { - ...rateLimitHeaders, - 'retry-after': String(reset - now), - }, - ) + const reset = record && record.reset > now ? record.reset : now + limit.window + const count = record && record.reset > now ? record.count + 1 : 1 - c.executionCtx.waitUntil( - c.env.KV.put(kvKey, JSON.stringify({ count, reset }), { - expirationTtl: limit.window, - }), - ) + rateLimitHeaders = { + 'x-ratelimit-limit': String(limit.max), + 'x-ratelimit-remaining': String(Math.max(0, limit.max - count)), + 'x-ratelimit-reset': String(reset), } + if (count > limit.max) + return c.json( + { + code: 'rate_limit_exceeded' as const, + message: + !billable && c.var.session + ? 'Add credits to remove rate limits' + : 'Rate limit exceeded', + }, + 429, + { + ...rateLimitHeaders, + 'retry-after': String(reset - now), + }, + ) + + c.executionCtx.waitUntil( + c.env.KV.put(kvKey, JSON.stringify({ count, reset }), { + expirationTtl: limit.window, + }), + ) + const md = Md.create({ headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${c.env.HOST}/1.0; +https://${c.env.HOST})`, @@ -2251,7 +2251,18 @@ export const api = new Hono<{ } const chunks = Md.chunk(filteredContent) - const results = await Promise.all(chunks.map(extractChunk)) + const results: Awaited>[] = [] + let chunkIndex = 0 + // Bound Workers AI fan-out so one large objective request can't saturate inference capacity. + await Promise.all( + Array.from({ length: Math.min(2, chunks.length) }, async () => { + while (chunkIndex < chunks.length) { + const currentIndex = chunkIndex + chunkIndex += 1 + results[currentIndex] = await extractChunk(chunks[currentIndex]!) + } + }), + ) const filtered = results .filter((r) => r.response && r.response.trim() !== Constants.sentinelValue) .map((r) => r.response) diff --git a/src/api.workers.test.ts b/src/api.workers.test.ts index 2a274dc1..98bc7370 100644 --- a/src/api.workers.test.ts +++ b/src/api.workers.test.ts @@ -2949,6 +2949,61 @@ test('GET /api/:url retries transient AI timeouts and normalizes HTML error page }) }) +test('GET /api/:url bounds objective chunk inference concurrency', async () => { + const url = 'https://ai-chunk-pool.example.com/' + await env.KV.put( + `page:${url}`, + JSON.stringify({ + content: [ + `## Alpha\n\n${'a'.repeat(40_000)}`, + `## Beta\n\n${'b'.repeat(40_000)}`, + `## Gamma\n\n${'c'.repeat(40_000)}`, + ].join('\n\n'), + extras: {}, + meta: { + site: 'ai-chunk-pool.example.com', + url, + }, + }), + ) + + let activeCalls = 0 + let maxActiveCalls = 0 + const aiRun = vi.fn(async () => { + activeCalls += 1 + maxActiveCalls = Math.max(maxActiveCalls, activeCalls) + await new Promise((resolve) => setTimeout(resolve, 10)) + activeCalls -= 1 + return { + response: '## Matched\n\nRelevant content', + usage: { completion_tokens: 1, prompt_tokens: 1 }, + } + }) + + const localClient = testClient( + api, + { + ...env, + AI: { + run: aiRun, + } as unknown as typeof env.AI, + } as unknown as typeof env, + executionCtx, + ) + + const res = await localClient.api[':url{.+}'].$get( + { + param: { url: 'ai-chunk-pool.example.com' }, + query: { objective: 'find the relevant content' }, + }, + { headers: { 'cf-connecting-ip': '10.0.0.41' } }, + ) + + expect(res.status).toBe(200) + expect(aiRun).toHaveBeenCalledTimes(3) + expect(maxActiveCalls).toBeLessThanOrEqual(2) +}) + test('GET /api/:url reports upstream 5xx fetch failures to sentry', async () => { const captureException = vi.spyOn(Sentry, 'captureException').mockImplementation(() => '') server.use( @@ -3040,14 +3095,12 @@ test('GET /api/:url authed 429 includes credits message', async () => { }) }) -test('GET /api/:url paid user skips rate limits', async () => { +test('GET /api/:url paid user still hits auth rate limits', async () => { const account = await factory.account.insert({}) const session = await factory.session.insert({ account_id: account.id }) - // Seed balance cache await env.KV.put(`balance:${account.id}`, '1000') - // Seed rate limit to already exceeded await env.KV.put( `ratelimit:fetch:${account.id}`, JSON.stringify({ @@ -3075,10 +3128,12 @@ test('GET /api/:url paid user skips rate limits', async () => { }, }, ) - // Paid user should NOT get 429 even though rate limit is exceeded - expect(res.status).toBe(200) - // Should not have rate limit headers - expect(res.headers.get('x-ratelimit-limit')).toBeNull() + expect(res.status).toBe(429) + expect(res.headers.get('x-ratelimit-limit')).toBe('1000') + await expect(res.json()).resolves.toEqual({ + code: 'rate_limit_exceeded', + message: 'Rate limit exceeded', + }) }) test('GET /api/:url zero balance user gets authed rate limits', async () => { diff --git a/src/entry-server.ts b/src/entry-server.ts index d7e13e61..6fa0714e 100644 --- a/src/entry-server.ts +++ b/src/entry-server.ts @@ -7,6 +7,7 @@ import { api } from '#api.ts' import { cleanupExpired } from '#crons/cleanup.ts' import { createClient } from '#db/client.ts' import { appendVaryAccept, negotiateAccept } from '#lib/accept.ts' +import { processRequestEnrichmentMessage } from '#queues/request-enrichment.ts' import { processRequestMessage } from '#queues/request.ts' import { processStripeWebhookMessage } from '#queues/stripe-webhook.ts' @@ -133,10 +134,15 @@ export default Sentry.withSentry( return batch.queue })() const queue = z.parse( - z.enum([processRequestMessage.queueName, processStripeWebhookMessage.queueName]), + z.enum([ + processRequestEnrichmentMessage.queueName, + processRequestMessage.queueName, + processStripeWebhookMessage.queueName, + ]), queueName, ) const handler = { + [processRequestEnrichmentMessage.queueName]: processRequestEnrichmentMessage, [processRequestMessage.queueName]: processRequestMessage, [processStripeWebhookMessage.queueName]: processStripeWebhookMessage, }[queue] @@ -163,7 +169,10 @@ export default Sentry.withSentry( }, ) -type QueueHandlerMessage = processRequestMessage.Body | processStripeWebhookMessage.Body +type QueueHandlerMessage = + | processRequestEnrichmentMessage.Body + | processRequestMessage.Body + | processStripeWebhookMessage.Body declare module '@tanstack/react-start' { interface Register { diff --git a/src/env.d.ts b/src/env.d.ts index aee6f9fa..8f2a92d9 100644 --- a/src/env.d.ts +++ b/src/env.d.ts @@ -5,6 +5,9 @@ declare namespace Cloudflare { CLOUDFLARE_ACCOUNT_ID: string CLOUDFLARE_API_TOKEN: string KV: KV + REQUEST_ENRICH_QUEUE: Queue< + import('#queues/request-enrichment.ts').processRequestEnrichmentMessage.Body + > REQUEST_QUEUE: Queue STRIPE_PUBLISHABLE_KEY: string STRIPE_WEBHOOK_QUEUE: Queue< diff --git a/src/queues/request-enrichment.ts b/src/queues/request-enrichment.ts new file mode 100644 index 00000000..bd18d1fd --- /dev/null +++ b/src/queues/request-enrichment.ts @@ -0,0 +1,41 @@ +import { env } from 'cloudflare:workers' +import { estimateTokenCount } from 'tokenx' +import type { Database } from '#db/client.ts' + +export async function processRequestEnrichmentMessage( + message: Message, + db: Database, +) { + const body = message.body + + try { + const response = await fetch(body.url, { + headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${env.HOST}/1.0; +https://${env.HOST})` }, + redirect: 'follow', + }) + if (!response.ok) return + + const contentType = (response.headers.get('content-type') ?? '').toLowerCase() + if (!contentType.includes('text/html') && !contentType.includes('application/xhtml+xml')) return + + const sourceTokens = estimateTokenCount(await response.text()) + await db + .updateTable('request') + .set({ source_tokens: sourceTokens, source_tokens_method: 'html' }) + .where('id', '=', body.request_id) + .where('source_tokens', '<', sourceTokens) + .where('source_tokens_method', '=', 'estimated') + .executeTakeFirst() + } catch { + // Best-effort enrichment only; keep the fallback when the follow-up fetch fails. + } +} + +processRequestEnrichmentMessage.queueName = 'curl-request-enrichment' as const + +export namespace processRequestEnrichmentMessage { + export type Body = { + request_id: string + url: string + } +} diff --git a/src/queues/request-enrichment.workers.test.ts b/src/queues/request-enrichment.workers.test.ts new file mode 100644 index 00000000..2d9218c5 --- /dev/null +++ b/src/queues/request-enrichment.workers.test.ts @@ -0,0 +1,121 @@ +import { createMessageBatch } from 'cloudflare:test' +import { env } from 'cloudflare:workers' +import { HttpResponse, http } from 'msw' +import { estimateTokenCount } from 'tokenx' +import { afterAll, expect, test } from 'vitest' +import { createClient } from '#db/client.ts' +import { processRequestEnrichmentMessage } from '#queues/request-enrichment.ts' +import { server } from '#test/workers.server.ts' + +const db = createClient(env.DB.connectionString, { max: 1 }) + +afterAll(() => db.destroy()) + +test('upgrades estimated rows with html source tokens when fetch succeeds', async () => { + const html = '

Example

Hello world

' + server.use( + http.get( + 'https://example.com/', + () => + new HttpResponse(html, { + headers: { 'content-type': 'text/html; charset=utf-8' }, + status: 200, + }), + ), + ) + + await db + .insertInto('request') + .values({ + cached: false, + hostname: 'example.com', + id: 'req_enrich_1', + markdown_tokens: 25, + path: '/', + source_tokens: 25, + source_tokens_method: 'estimated', + url: 'https://example.com', + }) + .execute() + + const batch = createMessageBatch( + processRequestEnrichmentMessage.queueName, + [ + { + attempts: 1, + body: { + request_id: 'req_enrich_1', + url: 'https://example.com', + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ], + ) + + await processRequestEnrichmentMessage(batch.messages[0]!, db) + + const row = await db + .selectFrom('request') + .where('id', '=', 'req_enrich_1') + .select(['source_tokens', 'source_tokens_method']) + .executeTakeFirstOrThrow() + + expect(row.source_tokens).toBe(estimateTokenCount(html)) + expect(row.source_tokens_method).toBe('html') +}) + +test('keeps estimated rows when html source tokens are smaller', async () => { + const html = '
' + server.use( + http.get( + 'https://spa.example.com/', + () => + new HttpResponse(html, { + headers: { 'content-type': 'text/html; charset=utf-8' }, + status: 200, + }), + ), + ) + + await db + .insertInto('request') + .values({ + cached: false, + hostname: 'spa.example.com', + id: 'req_fallback_1', + markdown_tokens: 120, + path: '/', + source_tokens: 120, + source_tokens_method: 'estimated', + url: 'https://spa.example.com', + }) + .execute() + + const batch = createMessageBatch( + processRequestEnrichmentMessage.queueName, + [ + { + attempts: 1, + body: { + request_id: 'req_fallback_1', + url: 'https://spa.example.com', + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ], + ) + + await processRequestEnrichmentMessage(batch.messages[0]!, db) + + const row = await db + .selectFrom('request') + .where('id', '=', 'req_fallback_1') + .select(['source_tokens', 'source_tokens_method']) + .executeTakeFirstOrThrow() + + expect(estimateTokenCount(html)).toBeLessThan(120) + expect(row.source_tokens).toBe(120) + expect(row.source_tokens_method).toBe('estimated') +}) diff --git a/src/queues/request.ts b/src/queues/request.ts index 0c748f44..43b83830 100644 --- a/src/queues/request.ts +++ b/src/queues/request.ts @@ -1,5 +1,4 @@ import { env } from 'cloudflare:workers' -import { estimateTokenCount } from 'tokenx' import type { Database } from '#db/client.ts' import type { DB } from '#db/types.gen.ts' @@ -83,7 +82,14 @@ export async function processRequestMessage( await env.KV.put(`balance:${billingEntity}`, String(newBalance.balance_mills)) } - if (body.source_tokens_method === 'estimated') await enrichSourceTokensFromHtml(body, db) + if (body.source_tokens_method !== 'estimated') return + + // Best-effort analytics only; billing/logging should not retry just because enrichment handoff fails. + try { + await env.REQUEST_ENRICH_QUEUE.send({ request_id: body.id, url: body.url }) + } catch (error) { + console.error(`Request enrichment enqueue failed for ${body.id}:`, error) + } } processRequestMessage.queueName = 'curl-request' as const @@ -112,27 +118,3 @@ export namespace processRequestMessage { user_agent: string | undefined } } - -async function enrichSourceTokensFromHtml(body: processRequestMessage.Body, db: Database) { - try { - const response = await fetch(body.url, { - headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${env.HOST}/1.0; +https://${env.HOST})` }, - redirect: 'follow', - }) - if (!response.ok) return - - const contentType = (response.headers.get('content-type') ?? '').toLowerCase() - if (!contentType.includes('text/html') && !contentType.includes('application/xhtml+xml')) return - - const sourceTokens = estimateTokenCount(await response.text()) - await db - .updateTable('request') - .set({ source_tokens: sourceTokens, source_tokens_method: 'html' }) - .where('id', '=', body.id) - .where('source_tokens', '<', sourceTokens) - .where('source_tokens_method', '=', 'estimated') - .executeTakeFirst() - } catch { - // Best-effort enrichment only; keep the markdown fallback when HTML fetch fails. - } -} diff --git a/src/queues/request.workers.test.ts b/src/queues/request.workers.test.ts index 82287b02..26767f09 100644 --- a/src/queues/request.workers.test.ts +++ b/src/queues/request.workers.test.ts @@ -1,16 +1,21 @@ import { createMessageBatch } from 'cloudflare:test' import { env } from 'cloudflare:workers' -import { HttpResponse, http } from 'msw' -import { estimateTokenCount } from 'tokenx' import { afterAll, expect, test, vi } from 'vitest' import { createClient } from '#db/client.ts' import * as Nanoid from '#lib/nanoid.ts' import { processRequestMessage } from '#queues/request.ts' import { createFactory } from '#test/factory.ts' -import { server } from '#test/workers.server.ts' const db = createClient(env.DB.connectionString, { max: 1 }) const factory = createFactory(db) +const queueSendResponse = { + metadata: { + metrics: { + backlogBytes: 0, + backlogCount: 0, + }, + }, +} afterAll(() => db.destroy()) @@ -67,18 +72,6 @@ test('leaves KV stats cache untouched when a request is recorded', async () => { await env.KV.put('stats:tokens_saved', '1000') await env.KV.put('stats:tokens_saved:example.com', '500') - const html = '

Example

Hello world

' - server.use( - http.get( - 'https://example.com/', - () => - new HttpResponse(html, { - headers: { 'content-type': 'text/html; charset=utf-8' }, - status: 200, - }), - ), - ) - const batch = createMessageBatch(processRequestMessage.queueName, [ { attempts: 1, @@ -116,10 +109,11 @@ test('leaves KV stats cache untouched when a request is recorded', async () => { expect(hostCached).toBe('500') }) -test('does not fail when KV delete is rate limited', async () => { - const deleteSpy = vi - .spyOn(env.KV, 'delete') - .mockRejectedValue(new Error('KV DELETE failed: 429 Too Many Requests')) +test('does not fail when enrichment queue send fails', async () => { + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + const sendSpy = vi + .spyOn(env.REQUEST_ENRICH_QUEUE, 'send') + .mockRejectedValue(new Error('queue send failed')) try { const batch = createMessageBatch(processRequestMessage.queueName, [ @@ -142,7 +136,7 @@ test('does not fail when KV delete is rate limited', async () => { organization_id: null, path: '/', source_tokens: 60, - source_tokens_method: 'markdown', + source_tokens_method: 'estimated', url: 'https://ratelimit.example.com', user_agent: 'test-agent', }, @@ -152,16 +146,20 @@ test('does not fail when KV delete is rate limited', async () => { ]) await expect(processRequestMessage(batch.messages[0]!, db)).resolves.toBeUndefined() - expect(deleteSpy).not.toHaveBeenCalled() + expect(sendSpy).toHaveBeenCalledWith({ + request_id: 'req_no_kv_delete', + url: 'https://ratelimit.example.com', + }) const row = await db .selectFrom('request') .where('id', '=', 'req_no_kv_delete') .select(['id', 'source_tokens_method']) .executeTakeFirstOrThrow() - expect(row).toEqual({ id: 'req_no_kv_delete', source_tokens_method: 'markdown' }) + expect(row).toEqual({ id: 'req_no_kv_delete', source_tokens_method: 'estimated' }) } finally { - deleteSpy.mockRestore() + consoleErrorSpy.mockRestore() + sendSpy.mockRestore() } }) @@ -207,113 +205,142 @@ test('stores total savings when stage counts are present', async () => { expect(row.source_tokens_method).toBe('markdown') }) -test('upgrades estimated rows with html source tokens when fetch succeeds', async () => { - const html = '

Example

Hello world

' - server.use( - http.get( - 'https://example.com/', - () => - new HttpResponse(html, { - headers: { 'content-type': 'text/html; charset=utf-8' }, - status: 200, - }), - ), - ) +test('enqueues html enrichment for estimated rows', async () => { + const sendSpy = vi.spyOn(env.REQUEST_ENRICH_QUEUE, 'send').mockResolvedValue(queueSendResponse) - const batch = createMessageBatch(processRequestMessage.queueName, [ - { - attempts: 1, - body: { - account_id: null, - api_key_id: null, - billable: false, - cached: false, - cost_mills: 0, - extracted_tokens: null, - filtered_tokens: null, - hostname: 'example.com', - id: 'req_enrich_html', - keywords: null, - markdown_tokens: 25, - mode: null, - objective: null, - organization_id: null, - path: '/', - source_tokens: 25, - source_tokens_method: 'estimated', - url: 'https://example.com', - user_agent: 'test-agent', + try { + const batch = createMessageBatch(processRequestMessage.queueName, [ + { + attempts: 1, + body: { + account_id: null, + api_key_id: null, + billable: false, + cached: false, + cost_mills: 0, + extracted_tokens: null, + filtered_tokens: null, + hostname: 'example.com', + id: 'req_enrich_html', + keywords: null, + markdown_tokens: 25, + mode: null, + objective: null, + organization_id: null, + path: '/', + source_tokens: 25, + source_tokens_method: 'estimated', + url: 'https://example.com', + user_agent: 'test-agent', + }, + id: crypto.randomUUID(), + timestamp: new Date(), }, - id: crypto.randomUUID(), - timestamp: new Date(), - }, - ]) + ]) - await processRequestMessage(batch.messages[0]!, db) + await processRequestMessage(batch.messages[0]!, db) - const row = await db - .selectFrom('request') - .where('id', '=', 'req_enrich_html') - .select(['source_tokens', 'source_tokens_method']) - .executeTakeFirstOrThrow() + expect(sendSpy).toHaveBeenCalledWith({ + request_id: 'req_enrich_html', + url: 'https://example.com', + }) - expect(row.source_tokens).toBe(estimateTokenCount(html)) - expect(row.source_tokens_method).toBe('html') + const row = await db + .selectFrom('request') + .where('id', '=', 'req_enrich_html') + .select(['source_tokens', 'source_tokens_method']) + .executeTakeFirstOrThrow() + + expect(row.source_tokens).toBe(25) + expect(row.source_tokens_method).toBe('estimated') + } finally { + sendSpy.mockRestore() + } }) -test('keeps estimated rows when html source tokens are smaller', async () => { - const html = '
' - server.use( - http.get( - 'https://spa.example.com/', - () => - new HttpResponse(html, { - headers: { 'content-type': 'text/html; charset=utf-8' }, - status: 200, - }), - ), - ) +test('does not enqueue html enrichment for non-estimated rows', async () => { + const sendSpy = vi.spyOn(env.REQUEST_ENRICH_QUEUE, 'send').mockResolvedValue(queueSendResponse) - const batch = createMessageBatch(processRequestMessage.queueName, [ - { - attempts: 1, - body: { - account_id: null, - api_key_id: null, - billable: false, - cached: false, - cost_mills: 0, - extracted_tokens: null, - filtered_tokens: null, - hostname: 'spa.example.com', - id: 'req_keep_fallback', - keywords: null, - markdown_tokens: 120, - mode: null, - objective: null, - organization_id: null, - path: '/', - source_tokens: 120, - source_tokens_method: 'estimated', - url: 'https://spa.example.com', - user_agent: 'test-agent', + try { + const batch = createMessageBatch(processRequestMessage.queueName, [ + { + attempts: 1, + body: { + account_id: null, + api_key_id: null, + billable: false, + cached: false, + cost_mills: 0, + extracted_tokens: null, + filtered_tokens: null, + hostname: 'example.com', + id: 'req_keep_fallback', + keywords: null, + markdown_tokens: 120, + mode: null, + objective: null, + organization_id: null, + path: '/', + source_tokens: 120, + source_tokens_method: 'markdown', + url: 'https://example.com', + user_agent: 'test-agent', + }, + id: crypto.randomUUID(), + timestamp: new Date(), }, - id: crypto.randomUUID(), - timestamp: new Date(), - }, - ]) + ]) - await processRequestMessage(batch.messages[0]!, db) + await processRequestMessage(batch.messages[0]!, db) - const row = await db - .selectFrom('request') - .where('id', '=', 'req_keep_fallback') - .select(['source_tokens', 'source_tokens_method']) - .executeTakeFirstOrThrow() + expect(sendSpy).not.toHaveBeenCalled() + } finally { + sendSpy.mockRestore() + } +}) - expect(estimateTokenCount(html)).toBeLessThan(120) - expect(row.source_tokens).toBe(120) - expect(row.source_tokens_method).toBe('estimated') +test('enqueues html enrichment for cached estimated rows', async () => { + const sendSpy = vi.spyOn(env.REQUEST_ENRICH_QUEUE, 'send').mockResolvedValue(queueSendResponse) + + try { + const batch = createMessageBatch(processRequestMessage.queueName, [ + { + attempts: 1, + body: { + account_id: null, + api_key_id: null, + billable: false, + cached: true, + cost_mills: 0, + extracted_tokens: null, + filtered_tokens: null, + hostname: 'example.com', + id: 'req_cached_est', + keywords: null, + markdown_tokens: 120, + mode: null, + objective: null, + organization_id: null, + path: '/', + source_tokens: 120, + source_tokens_method: 'estimated', + url: 'https://example.com', + user_agent: 'test-agent', + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ]) + + await processRequestMessage(batch.messages[0]!, db) + + expect(sendSpy).toHaveBeenCalledWith({ + request_id: 'req_cached_est', + url: 'https://example.com', + }) + } finally { + sendSpy.mockRestore() + } }) test('deducts credits when billable', async () => { diff --git a/src/worker-configuration.d.ts b/src/worker-configuration.d.ts index 65aa3681..fb320db5 100644 --- a/src/worker-configuration.d.ts +++ b/src/worker-configuration.d.ts @@ -1,9 +1,10 @@ /* eslint-disable */ -// Generated by Wrangler by running `wrangler types src/worker-configuration.d.ts` (hash: 2b7371d5a52af64e010418c63936e3d0) +// Generated by Wrangler by running `wrangler types src/worker-configuration.d.ts` (hash: ba17e42f37ef51979f866f7ad008fec9) // Runtime types generated with workerd@1.20260426.1 2025-10-30 no_handle_cross_request_promise_resolution,nodejs_compat declare namespace Cloudflare { interface ProductionEnv { DB: Hyperdrive; + REQUEST_ENRICH_QUEUE: Queue; REQUEST_QUEUE: Queue; STRIPE_WEBHOOK_QUEUE: Queue; AI: Ai; @@ -23,6 +24,7 @@ declare namespace Cloudflare { } interface PreviewEnv { DB: Hyperdrive; + REQUEST_ENRICH_QUEUE: Queue; REQUEST_QUEUE: Queue; STRIPE_WEBHOOK_QUEUE: Queue; AI: Ai; @@ -42,6 +44,7 @@ declare namespace Cloudflare { } interface Env { DB: Hyperdrive; + REQUEST_ENRICH_QUEUE: Queue; REQUEST_QUEUE: Queue; STRIPE_WEBHOOK_QUEUE: Queue; AI: Ai; diff --git a/test/vitest.config.ts b/test/vitest.config.ts index 9f194fd5..e60bae19 100644 --- a/test/vitest.config.ts +++ b/test/vitest.config.ts @@ -66,6 +66,7 @@ export default defineConfig({ hyperdrives: { DB: env.DB_URL }, kvNamespaces: ['KV'], queueProducers: { + REQUEST_ENRICH_QUEUE: 'test-request-enrichment-queue', REQUEST_QUEUE: 'test-request-queue', STRIPE_WEBHOOK_QUEUE: 'test-stripe-webhook-queue', }, diff --git a/wrangler.jsonc b/wrangler.jsonc index 6816083a..4410f14e 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -26,10 +26,18 @@ "kv_namespaces": [{ "binding": "KV", "id": "e3a921b8d13741b99e7aa1cbb6144c9f" }], "queues": { "producers": [ + { "binding": "REQUEST_ENRICH_QUEUE", "queue": "curl-request-enrichment" }, { "binding": "REQUEST_QUEUE", "queue": "curl-request" }, { "binding": "STRIPE_WEBHOOK_QUEUE", "queue": "curl-stripe-webhook" }, ], "consumers": [ + { + "queue": "curl-request-enrichment", + "max_batch_size": 10, + "max_batch_timeout": 30, + "max_retries": 3, + "dead_letter_queue": "curl-request-enrichment-dlq", + }, { "queue": "curl-request", "max_batch_size": 10, @@ -44,6 +52,12 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-dlq", }, + { + "queue": "curl-request-enrichment-dlq", + "max_batch_size": 1, + "max_batch_timeout": 5, + "max_retries": 0, + }, { "queue": "curl-request-dlq", "max_batch_size": 1, @@ -107,10 +121,18 @@ "kv_namespaces": [{ "binding": "KV", "id": "e3a921b8d13741b99e7aa1cbb6144c9f" }], "queues": { "producers": [ + { "binding": "REQUEST_ENRICH_QUEUE", "queue": "curl-request-enrichment" }, { "binding": "REQUEST_QUEUE", "queue": "curl-request" }, { "binding": "STRIPE_WEBHOOK_QUEUE", "queue": "curl-stripe-webhook" }, ], "consumers": [ + { + "queue": "curl-request-enrichment", + "max_batch_size": 10, + "max_batch_timeout": 30, + "max_retries": 3, + "dead_letter_queue": "curl-request-enrichment-dlq", + }, { "queue": "curl-request", "max_batch_size": 10, @@ -125,6 +147,12 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-dlq", }, + { + "queue": "curl-request-enrichment-dlq", + "max_batch_size": 1, + "max_batch_timeout": 5, + "max_retries": 0, + }, { "queue": "curl-request-dlq", "max_batch_size": 1, @@ -191,6 +219,10 @@ "kv_namespaces": [{ "binding": "KV", "id": "__PREVIEW_KV_ID__" }], "queues": { "producers": [ + { + "binding": "REQUEST_ENRICH_QUEUE", + "queue": "curl-request-enrichment-__PREVIEW_APEX__", + }, { "binding": "REQUEST_QUEUE", "queue": "curl-request-__PREVIEW_APEX__", @@ -201,6 +233,13 @@ }, ], "consumers": [ + { + "queue": "curl-request-enrichment-__PREVIEW_APEX__", + "max_batch_size": 10, + "max_batch_timeout": 30, + "max_retries": 3, + "dead_letter_queue": "curl-request-enrichment-__PREVIEW_APEX__-dlq", + }, { "queue": "curl-request-__PREVIEW_APEX__", "max_batch_size": 10, @@ -215,6 +254,12 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-__PREVIEW_APEX__-dlq", }, + { + "queue": "curl-request-enrichment-__PREVIEW_APEX__-dlq", + "max_batch_size": 1, + "max_batch_timeout": 5, + "max_retries": 0, + }, { "queue": "curl-request-__PREVIEW_APEX__-dlq", "max_batch_size": 1, From d9398ef3206ff3b3cd92ed08e275042306dae9e7 Mon Sep 17 00:00:00 2001 From: tmm Date: Sun, 3 May 2026 13:11:41 -0400 Subject: [PATCH 2/7] chore: up stripe --- src/api.ts | 50 ++++--- src/api.workers.test.ts | 164 +++++++++++++++++++--- src/queues/stripe-webhook.ts | 44 ++++-- src/queues/stripe-webhook.workers.test.ts | 57 ++++++++ src/routes/_dash.$login/billing.tsx | 15 +- 5 files changed, 272 insertions(+), 58 deletions(-) diff --git a/src/api.ts b/src/api.ts index cf493fcb..17103717 100644 --- a/src/api.ts +++ b/src/api.ts @@ -1049,15 +1049,19 @@ export const api = new Hono<{ let paymentIntent: Stripe.PaymentIntent try { - paymentIntent = await stripe.paymentIntents.create({ - amount, - confirm: true, - currency: 'usd', - customer: stripeCustomerId, - metadata: { entity_type: entityType, entity_id: entityId }, - off_session: true, - payment_method: defaultPaymentMethod.id, - }) + const idempotencyKey = c.req.header('idempotency-key') + paymentIntent = await stripe.paymentIntents.create( + { + amount, + confirm: true, + currency: 'usd', + customer: stripeCustomerId, + metadata: { entity_type: entityType, entity_id: entityId }, + off_session: true, + payment_method: defaultPaymentMethod.id, + }, + idempotencyKey ? { idempotencyKey } : undefined, + ) } catch (error) { if ( error instanceof Stripe.errors.StripeCardError && @@ -1733,7 +1737,7 @@ export const api = new Hono<{ switch (event.type) { case 'payment_intent.succeeded': { - const paymentIntent = event.data.object as import('stripe').Stripe.PaymentIntent + const paymentIntent = event.data.object const customer = typeof paymentIntent.customer === 'string' ? paymentIntent.customer : null if (customer && paymentIntent.amount) await c.env.STRIPE_WEBHOOK_QUEUE.send({ @@ -1748,36 +1752,30 @@ export const api = new Hono<{ } case 'charge.dispute.created': { // TODO: send chargeback alert (email/Slack) - const dispute = event.data.object as import('stripe').Stripe.Dispute - const charge = - typeof dispute.charge === 'string' - ? await stripe.charges.retrieve(dispute.charge) - : dispute.charge - const customer = typeof charge.customer === 'string' ? charge.customer : null - if (customer) + const dispute = event.data.object + const chargeId = + typeof dispute.charge === 'string' ? dispute.charge : (dispute.charge?.id ?? null) + if (chargeId) await c.env.STRIPE_WEBHOOK_QUEUE.send({ type: event.type, data: { amount_total: dispute.amount, - customer, + charge_id: chargeId, id: dispute.id, }, }) break } case 'refund.created': { - const refund = event.data.object as import('stripe').Stripe.Refund - const charge = - typeof refund.charge === 'string' - ? await stripe.charges.retrieve(refund.charge) - : refund.charge - const customer = charge && typeof charge.customer === 'string' ? charge.customer : null - if (customer) + const refund = event.data.object + const chargeId = + typeof refund.charge === 'string' ? refund.charge : (refund.charge?.id ?? null) + if (chargeId) await c.env.STRIPE_WEBHOOK_QUEUE.send({ type: 'refund.created', data: { amount_total: refund.amount, - customer, + charge_id: chargeId, id: refund.id, }, }) diff --git a/src/api.workers.test.ts b/src/api.workers.test.ts index 98bc7370..8de40342 100644 --- a/src/api.workers.test.ts +++ b/src/api.workers.test.ts @@ -1476,6 +1476,47 @@ describe('POST /api/credits/charge', () => { }) }) + test('forwards stripe idempotency key for off-session charges', async () => { + const account = await factory.account.insert({}) + await db + .updateTable('account') + .set({ stripe_customer_id: `cus_${Nanoid.generate()}` }) + .where('id', '=', account.id) + .execute() + const session = await factory.session.insert({ account_id: account.id }) + + let idempotencyKey: string | null = null + + server.use( + http.get('https://api.stripe.com/v1/payment_methods', () => + HttpResponse.json({ + data: [{ id: 'pm_test', card: { brand: 'visa', last4: '4242' } }], + object: 'list', + }), + ), + http.post('https://api.stripe.com/v1/payment_intents', ({ request }) => { + idempotencyKey = request.headers.get('idempotency-key') + return HttpResponse.json({ + id: 'pi_charge_test', + status: 'succeeded', + object: 'payment_intent', + }) + }), + ) + + const res = await client.api.credits.charge.$post( + { json: { amount: '1000' } }, + { + headers: { + Cookie: await Cookie.generateSigned('curl.session', session.id, env.COOKIE_SECRET), + 'Idempotency-Key': 'charge_attempt_test', + }, + }, + ) + expect(res.status).toBe(200) + expect(idempotencyKey).toBe('charge_attempt_test') + }) + test('returns payment_failed when saved card is declined', async () => { const account = await factory.account.insert({}) await db @@ -5023,22 +5064,7 @@ describe('POST /api/stripe/webhook', () => { }, }) - // Compute HMAC signature (async-safe for Workers) - const timestamp = Math.floor(Date.now() / 1000) - const key = await crypto.subtle.importKey( - 'raw', - new TextEncoder().encode(env.STRIPE_WEBHOOK_SECRET), - { name: 'HMAC', hash: 'SHA-256' }, - false, - ['sign'], - ) - const sig = await crypto.subtle.sign( - 'HMAC', - key, - new TextEncoder().encode(`${timestamp}.${payload}`), - ) - const hex = [...new Uint8Array(sig)].map((b) => b.toString(16).padStart(2, '0')).join('') - const header = `t=${timestamp},v1=${hex}` + const header = await createStripeWebhookSignature(payload) const res = await api.request( '/api/stripe/webhook', @@ -5055,6 +5081,94 @@ describe('POST /api/stripe/webhook', () => { expect(res.status).toBe(200) await expect(res.json()).resolves.toEqual({ received: true }) }) + + test('queues charge disputes without expanding charge details inline', async () => { + const sendSpy = vi.spyOn(env.STRIPE_WEBHOOK_QUEUE, 'send').mockResolvedValue({ + metadata: { metrics: { backlogBytes: 0, backlogCount: 0 } }, + }) + const payload = JSON.stringify({ + id: 'evt_test_dispute', + type: 'charge.dispute.created', + data: { + object: { + amount: 500, + charge: 'ch_test_dispute', + id: 'dp_test_dispute', + }, + }, + }) + + try { + const res = await api.request( + '/api/stripe/webhook', + { + method: 'POST', + body: payload, + headers: { + 'content-type': 'application/json', + 'stripe-signature': await createStripeWebhookSignature(payload), + }, + }, + env, + ) + + expect(res.status).toBe(200) + expect(sendSpy).toHaveBeenCalledWith({ + type: 'charge.dispute.created', + data: { + amount_total: 500, + charge_id: 'ch_test_dispute', + id: 'dp_test_dispute', + }, + }) + } finally { + sendSpy.mockRestore() + } + }) + + test('queues refunds without expanding charge details inline', async () => { + const sendSpy = vi.spyOn(env.STRIPE_WEBHOOK_QUEUE, 'send').mockResolvedValue({ + metadata: { metrics: { backlogBytes: 0, backlogCount: 0 } }, + }) + const payload = JSON.stringify({ + id: 'evt_test_refund', + type: 'refund.created', + data: { + object: { + amount: 700, + charge: 'ch_test_refund', + id: 're_test_refund', + }, + }, + }) + + try { + const res = await api.request( + '/api/stripe/webhook', + { + method: 'POST', + body: payload, + headers: { + 'content-type': 'application/json', + 'stripe-signature': await createStripeWebhookSignature(payload), + }, + }, + env, + ) + + expect(res.status).toBe(200) + expect(sendSpy).toHaveBeenCalledWith({ + type: 'refund.created', + data: { + amount_total: 700, + charge_id: 'ch_test_refund', + id: 're_test_refund', + }, + }) + } finally { + sendSpy.mockRestore() + } + }) }) describe('POST /api/tunnel', () => { @@ -5125,3 +5239,21 @@ function toSearchParams(formData: FormData) { Array.from(formData.entries()).map(([key, value]) => [key, String(value)]), ) } + +async function createStripeWebhookSignature(payload: string) { + const timestamp = Math.floor(Date.now() / 1000) + const key = await crypto.subtle.importKey( + 'raw', + new TextEncoder().encode(env.STRIPE_WEBHOOK_SECRET), + { name: 'HMAC', hash: 'SHA-256' }, + false, + ['sign'], + ) + const sig = await crypto.subtle.sign( + 'HMAC', + key, + new TextEncoder().encode(`${timestamp}.${payload}`), + ) + const hex = [...new Uint8Array(sig)].map((b) => b.toString(16).padStart(2, '0')).join('') + return `t=${timestamp},v1=${hex}` +} diff --git a/src/queues/stripe-webhook.ts b/src/queues/stripe-webhook.ts index 3b7dd101..6b2d5db8 100644 --- a/src/queues/stripe-webhook.ts +++ b/src/queues/stripe-webhook.ts @@ -1,5 +1,7 @@ import { env } from 'cloudflare:workers' +import Stripe from 'stripe' import type { Database } from '#db/client.ts' +import * as StripeUtils from '#lib/stripe.ts' export async function processStripeWebhookMessage( message: Message, @@ -82,7 +84,15 @@ async function processReversal( .executeTakeFirst() if (existing) return - const entity = await findEntity(data.customer, db) + const customerId = await (async () => { + if ('customer' in data) return data.customer + const stripe = new Stripe(env.STRIPE_SECRET_KEY, StripeUtils.stripeOptions(env.STRIPE_API_URL)) + const charge = await stripe.charges.retrieve(data.charge_id) + return typeof charge.customer === 'string' ? charge.customer : null + })() + if (!customerId) return + + const entity = await findEntity(customerId, db) if (!entity) return await db.transaction().execute(async (tx) => { @@ -164,18 +174,30 @@ export namespace processStripeWebhookMessage { } | { type: 'charge.dispute.created' - data: { - amount_total: number - customer: string - id: string - } + data: + | { + amount_total: number + customer: string + id: string + } + | { + amount_total: number + charge_id: string + id: string + } } | { type: 'refund.created' - data: { - amount_total: number - customer: string - id: string - } + data: + | { + amount_total: number + customer: string + id: string + } + | { + amount_total: number + charge_id: string + id: string + } } } diff --git a/src/queues/stripe-webhook.workers.test.ts b/src/queues/stripe-webhook.workers.test.ts index 2fad3e13..71582598 100644 --- a/src/queues/stripe-webhook.workers.test.ts +++ b/src/queues/stripe-webhook.workers.test.ts @@ -1,9 +1,11 @@ import { createMessageBatch } from 'cloudflare:test' import { env } from 'cloudflare:workers' +import { HttpResponse, http } from 'msw' import { afterAll, expect, test } from 'vitest' import { createClient } from '#db/client.ts' import { processStripeWebhookMessage } from '#queues/stripe-webhook.ts' import { createFactory } from '#test/factory.ts' +import { server } from '#test/workers.server.ts' const db = createClient(env.DB.connectionString, { max: 1 }) const factory = createFactory(db) @@ -204,6 +206,61 @@ test('processes charge.dispute.created for account', async () => { expect(tx.account_id).toBe(account.id) }) +test('processes charge.dispute.created by expanding charge customer in the queue worker', async () => { + const customerId = `cus_${crypto.randomUUID()}` + const chargeId = `ch_${crypto.randomUUID()}` + const disputeId = `dp_${crypto.randomUUID()}` + const account = await factory.account.insert({}) + await db + .updateTable('account') + .set({ stripe_customer_id: customerId, balance_mills: 20000 }) + .where('id', '=', account.id) + .execute() + + server.use( + http.get(`https://api.stripe.com/v1/charges/${chargeId}`, () => + HttpResponse.json({ + customer: customerId, + id: chargeId, + object: 'charge', + }), + ), + ) + + const batch = createMessageBatch( + processStripeWebhookMessage.queueName, + [ + { + attempts: 1, + body: { + type: 'charge.dispute.created', + data: { amount_total: 2000, charge_id: chargeId, id: disputeId }, + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ], + ) + await processStripeWebhookMessage(batch.messages[0]!, db) + + const updated = await db + .selectFrom('account') + .where('id', '=', account.id) + .select('balance_mills') + .executeTakeFirstOrThrow() + expect(updated.balance_mills).toBe(0) + + const tx = await db + .selectFrom('credit_transaction') + .where('reference_id', '=', disputeId) + .selectAll() + .executeTakeFirstOrThrow() + expect(tx.type).toBe('chargeback') + expect(tx.amount_mills).toBe(-20000) + expect(tx.balance_after_mills).toBe(0) + expect(tx.account_id).toBe(account.id) +}) + test('processes charge.dispute.created for organization', async () => { const customerId = `cus_${crypto.randomUUID()}` const chargeId = `ch_${crypto.randomUUID()}` diff --git a/src/routes/_dash.$login/billing.tsx b/src/routes/_dash.$login/billing.tsx index 5e6d1c45..13c942dc 100644 --- a/src/routes/_dash.$login/billing.tsx +++ b/src/routes/_dash.$login/billing.tsx @@ -119,12 +119,17 @@ function Component() { const addCredits = useMutation({ async mutationFn(amount: (typeof creditAmounts)[number]) { if (data.payment_methods.length > 0) { - const res = await rpc.api.credits.charge.$post({ - json: { - amount, - ...(entity.type === 'organization' ? { organization_id: entity.id } : {}), + const res = await rpc.api.credits.charge.$post( + { + json: { + amount, + ...(entity.type === 'organization' ? { organization_id: entity.id } : {}), + }, }, - }) + { + headers: { 'Idempotency-Key': crypto.randomUUID() }, + }, + ) if (res.status === 200) return { kind: 'charge', result: await res.json() } as const if (res.status === 400) { From 5ab3cf5d3a0a3c262734f1c71a23688ea56cb43d Mon Sep 17 00:00:00 2001 From: tmm Date: Sun, 3 May 2026 13:28:09 -0400 Subject: [PATCH 3/7] chore: up --- src/api.ts | 82 ++++++++++++++++---------------- src/api.workers.test.ts | 11 ++--- src/lib/github.ts | 1 + src/lib/stripe.ts | 1 + src/md/mod.ts | 2 + src/md/transports.ts | 4 +- src/queues/request-enrichment.ts | 1 + 7 files changed, 52 insertions(+), 50 deletions(-) diff --git a/src/api.ts b/src/api.ts index 17103717..de80fb9d 100644 --- a/src/api.ts +++ b/src/api.ts @@ -2043,7 +2043,6 @@ export const api = new Hono<{ vary: 'Accept', }) - // Keep throttling tied to auth state so stale balance cache never opens an unlimited fast path. const identity = c.var.session ? c.var.session.account_id : (c.req.header('cf-connecting-ip') ?? 'unknown') @@ -2057,55 +2056,54 @@ export const api = new Hono<{ if (balanceMills > 0) billable = true } - const limit = (() => { - if (query.objective) + let rateLimitHeaders: Record = {} + if (!billable) { + const limit = (() => { + if (query.objective) + return { + key: `query:${identity}` as const, + max: c.var.session ? 10 : 3, + window: 3600, + } return { - key: `query:${identity}` as const, - max: c.var.session ? 10 : 3, + key: `fetch:${identity}` as const, + max: c.var.session ? 1000 : 100, window: 3600, } - return { - key: `fetch:${identity}` as const, - max: c.var.session ? 1000 : 100, - window: 3600, - } - })() + })() - let rateLimitHeaders: Record = {} - const kvKey = `ratelimit:${limit.key}` as const - const now = Math.floor(Date.now() / 1000) - const record = await c.env.KV.get(kvKey, 'json') + const kvKey = `ratelimit:${limit.key}` as const + const now = Math.floor(Date.now() / 1000) + const record = await c.env.KV.get(kvKey, 'json') - const reset = record && record.reset > now ? record.reset : now + limit.window - const count = record && record.reset > now ? record.count + 1 : 1 + const reset = record && record.reset > now ? record.reset : now + limit.window + const count = record && record.reset > now ? record.count + 1 : 1 - rateLimitHeaders = { - 'x-ratelimit-limit': String(limit.max), - 'x-ratelimit-remaining': String(Math.max(0, limit.max - count)), - 'x-ratelimit-reset': String(reset), - } + rateLimitHeaders = { + 'x-ratelimit-limit': String(limit.max), + 'x-ratelimit-remaining': String(Math.max(0, limit.max - count)), + 'x-ratelimit-reset': String(reset), + } - if (count > limit.max) - return c.json( - { - code: 'rate_limit_exceeded' as const, - message: - !billable && c.var.session - ? 'Add credits to remove rate limits' - : 'Rate limit exceeded', - }, - 429, - { - ...rateLimitHeaders, - 'retry-after': String(reset - now), - }, - ) + if (count > limit.max) + return c.json( + { + code: 'rate_limit_exceeded' as const, + message: c.var.session ? 'Add credits to remove rate limits' : 'Rate limit exceeded', + }, + 429, + { + ...rateLimitHeaders, + 'retry-after': String(reset - now), + }, + ) - c.executionCtx.waitUntil( - c.env.KV.put(kvKey, JSON.stringify({ count, reset }), { - expirationTtl: limit.window, - }), - ) + c.executionCtx.waitUntil( + c.env.KV.put(kvKey, JSON.stringify({ count, reset }), { + expirationTtl: limit.window, + }), + ) + } const md = Md.create({ headers: { diff --git a/src/api.workers.test.ts b/src/api.workers.test.ts index 8de40342..f923b908 100644 --- a/src/api.workers.test.ts +++ b/src/api.workers.test.ts @@ -3136,7 +3136,7 @@ test('GET /api/:url authed 429 includes credits message', async () => { }) }) -test('GET /api/:url paid user still hits auth rate limits', async () => { +test('GET /api/:url paid user skips auth rate limits', async () => { const account = await factory.account.insert({}) const session = await factory.session.insert({ account_id: account.id }) @@ -3169,12 +3169,9 @@ test('GET /api/:url paid user still hits auth rate limits', async () => { }, }, ) - expect(res.status).toBe(429) - expect(res.headers.get('x-ratelimit-limit')).toBe('1000') - await expect(res.json()).resolves.toEqual({ - code: 'rate_limit_exceeded', - message: 'Rate limit exceeded', - }) + expect(res.status).toBe(200) + expect(res.headers.get('x-credits-remaining')).toBe('999') + await expect(res.text()).resolves.toContain('ok') }) test('GET /api/:url zero balance user gets authed rate limits', async () => { diff --git a/src/lib/github.ts b/src/lib/github.ts index e528f713..98bda8cf 100644 --- a/src/lib/github.ts +++ b/src/lib/github.ts @@ -60,6 +60,7 @@ export async function resolveToken( const res = await fetch(tokenUrl.toString(), { method: 'POST', headers: { Accept: 'application/json' }, + signal: AbortSignal.timeout(10_000), }) const parsed = z.safeParse(tokenSchema, await res.json()) if (!parsed.success) return undefined diff --git a/src/lib/stripe.ts b/src/lib/stripe.ts index 221f311d..06f6691b 100644 --- a/src/lib/stripe.ts +++ b/src/lib/stripe.ts @@ -111,5 +111,6 @@ export function stripeOptions(apiUrl: string) { host: url.hostname, port: Number(url.port) || (url.protocol === 'https:' ? 443 : 80), protocol: url.protocol.replace(':', '') as 'http' | 'https', + timeout: 15_000, } } diff --git a/src/md/mod.ts b/src/md/mod.ts index f168d040..de01c3c1 100644 --- a/src/md/mod.ts +++ b/src/md/mod.ts @@ -31,6 +31,7 @@ export function create(options: create.Options = {}): create.ReturnType { })() const rewrittenUrl = matched?.rule?.rewrite?.(inputURL, matched.match) ?? inputURL + const signal = AbortSignal.timeout(15_000) // 15 seconds const requestInit = { ...init, headers: { @@ -38,6 +39,7 @@ export function create(options: create.Options = {}): create.ReturnType { ...init?.headers, }, redirect: init?.redirect ?? 'follow', + signal: init?.signal ? AbortSignal.any([init?.signal, signal]) : signal, } satisfies RequestInit const context = { fetch: options.fetch ?? globalThis.fetch.bind(globalThis), diff --git a/src/md/transports.ts b/src/md/transports.ts index c4560d5a..e44d7f43 100644 --- a/src/md/transports.ts +++ b/src/md/transports.ts @@ -4,8 +4,9 @@ import { defineTransport } from './mod.ts' export const cfBrowserRendering = defineTransport<{ accountId: string apiToken: string -}>(async (url, _init, context) => { +}>(async (url, init, context) => { if (context.previous?.status !== 403 || !context.options) return null + const signal = AbortSignal.timeout(20_00) const res = await context.fetch( `https://api.cloudflare.com/client/v4/accounts/${context.options.accountId}/browser-rendering/content`, { @@ -19,6 +20,7 @@ export const cfBrowserRendering = defineTransport<{ rejectResourceTypes: ['font', 'image', 'media'], goToOptions: { waitUntil: 'networkidle2' }, }), + signal: init?.signal ? AbortSignal.any([init?.signal, signal]) : signal, }, ) if (!res.ok) return null diff --git a/src/queues/request-enrichment.ts b/src/queues/request-enrichment.ts index bd18d1fd..64cac3f1 100644 --- a/src/queues/request-enrichment.ts +++ b/src/queues/request-enrichment.ts @@ -12,6 +12,7 @@ export async function processRequestEnrichmentMessage( const response = await fetch(body.url, { headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${env.HOST}/1.0; +https://${env.HOST})` }, redirect: 'follow', + signal: AbortSignal.timeout(10_000), }) if (!response.ok) return From a79e7b173f8cc64dfc491fead94f79efae749c8a Mon Sep 17 00:00:00 2001 From: tmm Date: Sun, 3 May 2026 13:32:13 -0400 Subject: [PATCH 4/7] fix: dlq --- src/entry-server.ts | 23 +++++++++++------- wrangler.jsonc | 57 +++------------------------------------------ 2 files changed, 17 insertions(+), 63 deletions(-) diff --git a/src/entry-server.ts b/src/entry-server.ts index 6fa0714e..6fd2a1ad 100644 --- a/src/entry-server.ts +++ b/src/entry-server.ts @@ -118,15 +118,6 @@ export default Sentry.withSentry( return response }, async queue(batch, env) { - if (batch.queue.endsWith('-dlq')) { - for (const message of batch.messages) { - const { ack: _, retry: __, ...rest } = message - console.error(`DLQ message [${batch.queue}]`, rest) - message.ack() - } - return - } - const queueName = (() => { // Preview queues have a suffix (e.g. `stripe-webhook-pr25`), strip it const previewApex = env.HOST.replace('.curl.md', '') @@ -153,6 +144,20 @@ export default Sentry.withSentry( message.ack() } catch (error) { console.error(`Queue message ${message.id} failed:`, error) + // Emit alert when next retry will move the message into DLQ + if (message.attempts >= 3) + Sentry.captureException(error, { + extra: { + queue: { + attempts: message.attempts, + batch_queue: batch.queue, + body: message.body, + logical_queue: queueName, + message_id: message.id, + }, + }, + tags: { queue: queueName, queue_outcome: 'dead_letter' }, + }) message.retry() } } diff --git a/wrangler.jsonc b/wrangler.jsonc index 4410f14e..969fca0a 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -52,24 +52,7 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-dlq", }, - { - "queue": "curl-request-enrichment-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-request-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-stripe-webhook-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, + // No DLQ consumers. Dead-lettered messages must remain available for inspection and replay. ], }, "triggers": { @@ -147,24 +130,7 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-dlq", }, - { - "queue": "curl-request-enrichment-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-request-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-stripe-webhook-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, + // No DLQ consumers. Dead-lettered messages must remain available for inspection and replay. ], }, "version_metadata": { @@ -254,24 +220,7 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-__PREVIEW_APEX__-dlq", }, - { - "queue": "curl-request-enrichment-__PREVIEW_APEX__-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-request-__PREVIEW_APEX__-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-stripe-webhook-__PREVIEW_APEX__-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, + // No DLQ consumers. Dead-lettered messages must remain available for inspection and replay. ], }, "version_metadata": { From 8bcac00f6cc4937fa9cedcaabf37b0a65a313dc1 Mon Sep 17 00:00:00 2001 From: tmm Date: Sun, 3 May 2026 13:35:57 -0400 Subject: [PATCH 5/7] chore: up --- src/queues/request-enrichment.ts | 6 ++---- wrangler.jsonc | 3 --- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/queues/request-enrichment.ts b/src/queues/request-enrichment.ts index 64cac3f1..bf47c96f 100644 --- a/src/queues/request-enrichment.ts +++ b/src/queues/request-enrichment.ts @@ -6,10 +6,8 @@ export async function processRequestEnrichmentMessage( message: Message, db: Database, ) { - const body = message.body - try { - const response = await fetch(body.url, { + const response = await fetch(message.body.url, { headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${env.HOST}/1.0; +https://${env.HOST})` }, redirect: 'follow', signal: AbortSignal.timeout(10_000), @@ -23,7 +21,7 @@ export async function processRequestEnrichmentMessage( await db .updateTable('request') .set({ source_tokens: sourceTokens, source_tokens_method: 'html' }) - .where('id', '=', body.request_id) + .where('id', '=', message.body.request_id) .where('source_tokens', '<', sourceTokens) .where('source_tokens_method', '=', 'estimated') .executeTakeFirst() diff --git a/wrangler.jsonc b/wrangler.jsonc index 969fca0a..5edbf2b0 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -52,7 +52,6 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-dlq", }, - // No DLQ consumers. Dead-lettered messages must remain available for inspection and replay. ], }, "triggers": { @@ -130,7 +129,6 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-dlq", }, - // No DLQ consumers. Dead-lettered messages must remain available for inspection and replay. ], }, "version_metadata": { @@ -220,7 +218,6 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-__PREVIEW_APEX__-dlq", }, - // No DLQ consumers. Dead-lettered messages must remain available for inspection and replay. ], }, "version_metadata": { From 46e3262b822081458054d54c512e8c7261d2fe20 Mon Sep 17 00:00:00 2001 From: tmm Date: Sun, 3 May 2026 13:52:15 -0400 Subject: [PATCH 6/7] chore: up --- src/md/fallback.smoke.test.ts | 70 +++++++++++++++++++++++++++++++++++ src/md/fromHtml.test.ts | 18 +++++++++ src/md/fromHtml.ts | 9 ++++- src/md/mod.test.ts | 41 ++++++++++++++++++++ src/md/profile.test.ts | 24 ++++++++++++ src/md/profiles.ts | 28 ++++++++++++++ 6 files changed, 189 insertions(+), 1 deletion(-) create mode 100644 src/md/fallback.smoke.test.ts diff --git a/src/md/fallback.smoke.test.ts b/src/md/fallback.smoke.test.ts new file mode 100644 index 00000000..337fd590 --- /dev/null +++ b/src/md/fallback.smoke.test.ts @@ -0,0 +1,70 @@ +import { expect, test } from 'vitest' +import { create } from './mod.ts' +import * as profiles from './profiles.ts' +import * as rules from './rules.ts' + +// Smoke tests that fetch live no-rule docs/reference pages and verify +// generic HTML extraction still returns non-empty markdown. +// Run via: pnpm test --project md:smoke + +const checks = [ + { + contains: ['Getting Started', 'Hello, world!'], + minLength: 200, + title: 'Getting Started', + url: 'https://doc.rust-lang.org/book/ch01-00-getting-started.html', + }, + { + contains: ['Django at a glance'], + minLength: 500, + title: 'Django at a glance', + url: 'https://docs.djangoproject.com/en/5.2/intro/overview/', + }, + { + contains: ['Crate serde'], + minLength: 500, + title: 'serde - Rust', + url: 'https://docs.rs/serde/latest/serde/', + }, + { + contains: ['Tutorial - User Guide', 'FastAPI'], + minLength: 1_000, + title: 'Tutorial - User Guide - FastAPI', + url: 'https://fastapi.tiangolo.com/tutorial/', + }, + { + contains: ['class String', 'arbitrary sequence of bytes'], + minLength: 1_000, + title: 'class String', + url: 'https://docs.ruby-lang.org/en/master/String.html', + }, + { + contains: ['Tutorial: Get started with Go', 'brief introduction to Go programming'], + minLength: 1_000, + title: 'Tutorial: Get started with Go', + url: 'https://go.dev/doc/tutorial/getting-started', + }, +] as const + +const md = create({ profiles }) + +for (const check of checks) { + test(`fallback: ${check.url}`, async () => { + expect(hasRule(check.url)).toBe(false) + + const result = await md.fetch(check.url) + expect(result.ok).toBe(true) + if (!result.ok) return + + expect(result.content.trim().length).toBeGreaterThan(check.minLength) + expect(result.meta.title).toContain(check.title) + for (const s of check.contains) expect(result.content).toContain(s) + }) +} + +function hasRule(url: string) { + return Object.values(rules).some( + (factory) => + typeof factory === 'function' && factory().patterns.some((pattern) => pattern.test(url)), + ) +} diff --git a/src/md/fromHtml.test.ts b/src/md/fromHtml.test.ts index 67400dc4..8494d2f8 100644 --- a/src/md/fromHtml.test.ts +++ b/src/md/fromHtml.test.ts @@ -267,6 +267,24 @@ describe('strips noise elements', () => { expect(result).toContain('Item') }) + test('preserves wrappers with sidebar classes when they contain main content', async () => { + const { content: result } = await fromHtml(` + + + + + + + `) + expect(result).toContain('# Title') + expect(result).toContain('Paragraph') + }) + test('strips nested noise', async () => { const { content: result } = await fromHtml( html({ diff --git a/src/md/fromHtml.ts b/src/md/fromHtml.ts index 16090bfb..caa286d8 100644 --- a/src/md/fromHtml.ts +++ b/src/md/fromHtml.ts @@ -181,7 +181,8 @@ function strip( if (isHidden(child)) return false if (isDecorativeHashLink(child)) return false if (isSkipLink(child)) return false - if (!knownContentRoot && matchesNoiseClassId(child)) return false + if (!knownContentRoot && !containsContentContainer(child) && matchesNoiseClassId(child)) + return false if (!knownContentRoot && isHighLinkDensity(child)) return false strip(child, inSectioning || sectioningTags.has(child.tagName), profile) @@ -189,6 +190,12 @@ function strip( }) } +function containsContentContainer(node: Element): boolean { + return ( + node.tagName === 'article' || node.tagName === 'main' || hasDescendantContentContainer(node) + ) +} + function isKnownContentRoot(node: Element, profile?: Profile>): boolean { if (!profile) return false return profile.contentRootSelectors.some((selector) => matchesSelector(node, selector)) diff --git a/src/md/mod.test.ts b/src/md/mod.test.ts index af9825fe..ca5da178 100644 --- a/src/md/mod.test.ts +++ b/src/md/mod.test.ts @@ -169,6 +169,47 @@ test('requests markdown directly for mintlify docs after profile detection', asy expect(result.extras.source_tokens_method).toBe('html') }) +test('requests markdown directly for rspress docs after profile detection', async () => { + const body = 'Normalized docs body.' + const requests: Array<{ accept: string | null; url: string }> = [] + const md = create({ + fetch: async (input, init) => { + const url = input instanceof URL ? input.href : input instanceof Request ? input.url : input + const accept = init?.headers ? new Headers(init.headers).get('accept') : null + requests.push({ accept, url }) + + if (url === 'https://rspress.rs/guide/start/introduction' && accept === 'text/markdown') + return new Response(`# Introduction\n\n${body}\n`, { + headers: { 'content-type': 'text/markdown; charset=utf-8' }, + status: 200, + }) + + if (url === 'https://rspress.rs/guide/start/introduction') + return new Response( + '
Copy Markdown

Introduction

HTML fallback body.

', + { headers: { 'content-type': 'text/html; charset=utf-8' }, status: 200 }, + ) + + return new Response(null, { status: 404 }) + }, + profiles, + }) + + const result = await md.fetch('https://rspress.rs/guide/start/introduction') + expect(result.ok).toBe(true) + if (!result.ok) return + + expect(requests).toEqual([ + { accept: null, url: 'https://rspress.rs/guide/start/introduction' }, + { accept: 'text/markdown', url: 'https://rspress.rs/guide/start/introduction' }, + ]) + expect(result.content).toContain(body) + expect(result.content).not.toContain('HTML fallback body.') + expect(result.meta.generator).toBe('Rspress v2.0.10') + expect(result.extras.source_tokens).toBeGreaterThan(0) + expect(result.extras.source_tokens_method).toBe('html') +}) + test('fetches markdown from a text/markdown alternate link before converting html', async () => { const requests: string[] = [] const md = create({ diff --git a/src/md/profile.test.ts b/src/md/profile.test.ts index 2d40c8aa..35bade29 100644 --- a/src/md/profile.test.ts +++ b/src/md/profile.test.ts @@ -121,6 +121,30 @@ test('detects read the docs profile before generic sphinx markers', () => { }) }) +test('detects rspress profile from generator and dom markers', () => { + const result = detectPageProfile( + '
Copy Markdown

Docs

', + new URL('https://rspress.rs/guide/start/introduction'), + profiles, + ) + + expect(result).toEqual({ + contentRootSelectors: [ + '.rp-doc-layout__doc-container', + '.rp-home-feature', + '.rp-home-hero', + '.rspress-doc', + ], + generator: 'Rspress v2.0.10', + key: 'rspress', + markdownRequest: { + headers: { Accept: 'text/markdown' }, + url: 'https://rspress.rs/guide/start/introduction', + }, + markers: ['meta:generator=Rspress v2.0.10', 'dom:__rspress_root'], + }) +}) + test('detects sphinx profile from classic theme markers', () => { const result = detectPageProfile( 'Docs

Docs

', diff --git a/src/md/profiles.ts b/src/md/profiles.ts index 5c8d5d08..7e7032d8 100644 --- a/src/md/profiles.ts +++ b/src/md/profiles.ts @@ -125,6 +125,34 @@ export const readTheDocs = defineProfile({ key: 'readTheDocs', }) +export const rspress = defineProfile<{ + markdownRequest: { headers: Record; url: string } +}>({ + checks: [{ url: 'https://rspress.rs/' }, { url: 'https://rspack.rs/guide/start/introduction' }], + contentRootSelectors: [ + '.rp-doc-layout__doc-container', + '.rp-home-feature', + '.rp-home-hero', + '.rspress-doc', + ], + detect: { + generator: /^rspress\b/i, + includesAny: { + marker: 'dom:__rspress_root', + needles: [ + 'id="__rspress_root"', + 'id="__rspress_modal_container"', + 'class="rp-doc rspress-doc"', + 'class="rp-llms-button', + ], + }, + }, + key: 'rspress', + resolve: (url) => ({ + markdownRequest: { headers: { Accept: 'text/markdown' }, url: url.href }, + }), +}) + export const sphinx = defineProfile({ checks: [{ url: 'https://docs.python.org/3/library/functions.html' }], contentRootSelectors: ['.body', '.bodywrapper', '.documentwrapper'], From 7eec81f51b3b8c4b175de2f3edf3f6a0c6de496e Mon Sep 17 00:00:00 2001 From: tmm Date: Sun, 3 May 2026 14:07:56 -0400 Subject: [PATCH 7/7] chore: up --- src/api.ts | 22 ++++--- src/api.workers.test.ts | 26 +++++++-- src/md/transports.ts | 2 +- src/queues/request-enrichment.ts | 40 ++++++------- src/queues/request-enrichment.workers.test.ts | 23 ++++++++ src/queues/stripe-webhook.ts | 44 ++++---------- src/queues/stripe-webhook.workers.test.ts | 57 ------------------- 7 files changed, 91 insertions(+), 123 deletions(-) diff --git a/src/api.ts b/src/api.ts index de80fb9d..d4ad0e3f 100644 --- a/src/api.ts +++ b/src/api.ts @@ -1753,14 +1753,17 @@ export const api = new Hono<{ case 'charge.dispute.created': { // TODO: send chargeback alert (email/Slack) const dispute = event.data.object - const chargeId = - typeof dispute.charge === 'string' ? dispute.charge : (dispute.charge?.id ?? null) - if (chargeId) + const charge = + typeof dispute.charge === 'string' + ? await stripe.charges.retrieve(dispute.charge) + : dispute.charge + const customer = typeof charge.customer === 'string' ? charge.customer : null + if (customer) await c.env.STRIPE_WEBHOOK_QUEUE.send({ type: event.type, data: { amount_total: dispute.amount, - charge_id: chargeId, + customer, id: dispute.id, }, }) @@ -1768,14 +1771,17 @@ export const api = new Hono<{ } case 'refund.created': { const refund = event.data.object - const chargeId = - typeof refund.charge === 'string' ? refund.charge : (refund.charge?.id ?? null) - if (chargeId) + const charge = + typeof refund.charge === 'string' + ? await stripe.charges.retrieve(refund.charge) + : refund.charge + const customer = charge && typeof charge.customer === 'string' ? charge.customer : null + if (customer) await c.env.STRIPE_WEBHOOK_QUEUE.send({ type: 'refund.created', data: { amount_total: refund.amount, - charge_id: chargeId, + customer, id: refund.id, }, }) diff --git a/src/api.workers.test.ts b/src/api.workers.test.ts index f923b908..c46c3fb4 100644 --- a/src/api.workers.test.ts +++ b/src/api.workers.test.ts @@ -5079,10 +5079,19 @@ describe('POST /api/stripe/webhook', () => { await expect(res.json()).resolves.toEqual({ received: true }) }) - test('queues charge disputes without expanding charge details inline', async () => { + test('queues charge disputes after expanding the charge customer inline', async () => { const sendSpy = vi.spyOn(env.STRIPE_WEBHOOK_QUEUE, 'send').mockResolvedValue({ metadata: { metrics: { backlogBytes: 0, backlogCount: 0 } }, }) + server.use( + http.get('https://api.stripe.com/v1/charges/ch_test_dispute', () => + HttpResponse.json({ + customer: 'cus_test_dispute', + id: 'ch_test_dispute', + object: 'charge', + }), + ), + ) const payload = JSON.stringify({ id: 'evt_test_dispute', type: 'charge.dispute.created', @@ -5114,7 +5123,7 @@ describe('POST /api/stripe/webhook', () => { type: 'charge.dispute.created', data: { amount_total: 500, - charge_id: 'ch_test_dispute', + customer: 'cus_test_dispute', id: 'dp_test_dispute', }, }) @@ -5123,10 +5132,19 @@ describe('POST /api/stripe/webhook', () => { } }) - test('queues refunds without expanding charge details inline', async () => { + test('queues refunds after expanding the charge customer inline', async () => { const sendSpy = vi.spyOn(env.STRIPE_WEBHOOK_QUEUE, 'send').mockResolvedValue({ metadata: { metrics: { backlogBytes: 0, backlogCount: 0 } }, }) + server.use( + http.get('https://api.stripe.com/v1/charges/ch_test_refund', () => + HttpResponse.json({ + customer: 'cus_test_refund', + id: 'ch_test_refund', + object: 'charge', + }), + ), + ) const payload = JSON.stringify({ id: 'evt_test_refund', type: 'refund.created', @@ -5158,7 +5176,7 @@ describe('POST /api/stripe/webhook', () => { type: 'refund.created', data: { amount_total: 700, - charge_id: 'ch_test_refund', + customer: 'cus_test_refund', id: 're_test_refund', }, }) diff --git a/src/md/transports.ts b/src/md/transports.ts index e44d7f43..28948d97 100644 --- a/src/md/transports.ts +++ b/src/md/transports.ts @@ -6,7 +6,7 @@ export const cfBrowserRendering = defineTransport<{ apiToken: string }>(async (url, init, context) => { if (context.previous?.status !== 403 || !context.options) return null - const signal = AbortSignal.timeout(20_00) + const signal = AbortSignal.timeout(20_000) const res = await context.fetch( `https://api.cloudflare.com/client/v4/accounts/${context.options.accountId}/browser-rendering/content`, { diff --git a/src/queues/request-enrichment.ts b/src/queues/request-enrichment.ts index bf47c96f..92fac944 100644 --- a/src/queues/request-enrichment.ts +++ b/src/queues/request-enrichment.ts @@ -6,28 +6,28 @@ export async function processRequestEnrichmentMessage( message: Message, db: Database, ) { - try { - const response = await fetch(message.body.url, { - headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${env.HOST}/1.0; +https://${env.HOST})` }, - redirect: 'follow', - signal: AbortSignal.timeout(10_000), - }) - if (!response.ok) return + const response = await fetch(message.body.url, { + headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${env.HOST}/1.0; +https://${env.HOST})` }, + redirect: 'follow', + signal: AbortSignal.timeout(10_000), + }) + if (!response.ok) { + if (response.status >= 500 || response.status === 408 || response.status === 429) + throw new Error(`Request enrichment fetch failed with ${response.status}`) + return + } - const contentType = (response.headers.get('content-type') ?? '').toLowerCase() - if (!contentType.includes('text/html') && !contentType.includes('application/xhtml+xml')) return + const contentType = (response.headers.get('content-type') ?? '').toLowerCase() + if (!contentType.includes('text/html') && !contentType.includes('application/xhtml+xml')) return - const sourceTokens = estimateTokenCount(await response.text()) - await db - .updateTable('request') - .set({ source_tokens: sourceTokens, source_tokens_method: 'html' }) - .where('id', '=', message.body.request_id) - .where('source_tokens', '<', sourceTokens) - .where('source_tokens_method', '=', 'estimated') - .executeTakeFirst() - } catch { - // Best-effort enrichment only; keep the fallback when the follow-up fetch fails. - } + const sourceTokens = estimateTokenCount(await response.text()) + await db + .updateTable('request') + .set({ source_tokens: sourceTokens, source_tokens_method: 'html' }) + .where('id', '=', message.body.request_id) + .where('source_tokens', '<', sourceTokens) + .where('source_tokens_method', '=', 'estimated') + .executeTakeFirst() } processRequestEnrichmentMessage.queueName = 'curl-request-enrichment' as const diff --git a/src/queues/request-enrichment.workers.test.ts b/src/queues/request-enrichment.workers.test.ts index 2d9218c5..9af993fe 100644 --- a/src/queues/request-enrichment.workers.test.ts +++ b/src/queues/request-enrichment.workers.test.ts @@ -119,3 +119,26 @@ test('keeps estimated rows when html source tokens are smaller', async () => { expect(row.source_tokens).toBe(120) expect(row.source_tokens_method).toBe('estimated') }) + +test('throws on transient enrichment fetch failures so the queue can retry', async () => { + server.use(http.get('https://retry.example.com/', () => new HttpResponse(null, { status: 503 }))) + + const batch = createMessageBatch( + processRequestEnrichmentMessage.queueName, + [ + { + attempts: 1, + body: { + request_id: 'req_retry_1', + url: 'https://retry.example.com', + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ], + ) + + await expect(processRequestEnrichmentMessage(batch.messages[0]!, db)).rejects.toThrow( + 'Request enrichment fetch failed with 503', + ) +}) diff --git a/src/queues/stripe-webhook.ts b/src/queues/stripe-webhook.ts index 6b2d5db8..3b7dd101 100644 --- a/src/queues/stripe-webhook.ts +++ b/src/queues/stripe-webhook.ts @@ -1,7 +1,5 @@ import { env } from 'cloudflare:workers' -import Stripe from 'stripe' import type { Database } from '#db/client.ts' -import * as StripeUtils from '#lib/stripe.ts' export async function processStripeWebhookMessage( message: Message, @@ -84,15 +82,7 @@ async function processReversal( .executeTakeFirst() if (existing) return - const customerId = await (async () => { - if ('customer' in data) return data.customer - const stripe = new Stripe(env.STRIPE_SECRET_KEY, StripeUtils.stripeOptions(env.STRIPE_API_URL)) - const charge = await stripe.charges.retrieve(data.charge_id) - return typeof charge.customer === 'string' ? charge.customer : null - })() - if (!customerId) return - - const entity = await findEntity(customerId, db) + const entity = await findEntity(data.customer, db) if (!entity) return await db.transaction().execute(async (tx) => { @@ -174,30 +164,18 @@ export namespace processStripeWebhookMessage { } | { type: 'charge.dispute.created' - data: - | { - amount_total: number - customer: string - id: string - } - | { - amount_total: number - charge_id: string - id: string - } + data: { + amount_total: number + customer: string + id: string + } } | { type: 'refund.created' - data: - | { - amount_total: number - customer: string - id: string - } - | { - amount_total: number - charge_id: string - id: string - } + data: { + amount_total: number + customer: string + id: string + } } } diff --git a/src/queues/stripe-webhook.workers.test.ts b/src/queues/stripe-webhook.workers.test.ts index 71582598..2fad3e13 100644 --- a/src/queues/stripe-webhook.workers.test.ts +++ b/src/queues/stripe-webhook.workers.test.ts @@ -1,11 +1,9 @@ import { createMessageBatch } from 'cloudflare:test' import { env } from 'cloudflare:workers' -import { HttpResponse, http } from 'msw' import { afterAll, expect, test } from 'vitest' import { createClient } from '#db/client.ts' import { processStripeWebhookMessage } from '#queues/stripe-webhook.ts' import { createFactory } from '#test/factory.ts' -import { server } from '#test/workers.server.ts' const db = createClient(env.DB.connectionString, { max: 1 }) const factory = createFactory(db) @@ -206,61 +204,6 @@ test('processes charge.dispute.created for account', async () => { expect(tx.account_id).toBe(account.id) }) -test('processes charge.dispute.created by expanding charge customer in the queue worker', async () => { - const customerId = `cus_${crypto.randomUUID()}` - const chargeId = `ch_${crypto.randomUUID()}` - const disputeId = `dp_${crypto.randomUUID()}` - const account = await factory.account.insert({}) - await db - .updateTable('account') - .set({ stripe_customer_id: customerId, balance_mills: 20000 }) - .where('id', '=', account.id) - .execute() - - server.use( - http.get(`https://api.stripe.com/v1/charges/${chargeId}`, () => - HttpResponse.json({ - customer: customerId, - id: chargeId, - object: 'charge', - }), - ), - ) - - const batch = createMessageBatch( - processStripeWebhookMessage.queueName, - [ - { - attempts: 1, - body: { - type: 'charge.dispute.created', - data: { amount_total: 2000, charge_id: chargeId, id: disputeId }, - }, - id: crypto.randomUUID(), - timestamp: new Date(), - }, - ], - ) - await processStripeWebhookMessage(batch.messages[0]!, db) - - const updated = await db - .selectFrom('account') - .where('id', '=', account.id) - .select('balance_mills') - .executeTakeFirstOrThrow() - expect(updated.balance_mills).toBe(0) - - const tx = await db - .selectFrom('credit_transaction') - .where('reference_id', '=', disputeId) - .selectAll() - .executeTakeFirstOrThrow() - expect(tx.type).toBe('chargeback') - expect(tx.amount_mills).toBe(-20000) - expect(tx.balance_after_mills).toBe(0) - expect(tx.account_id).toBe(account.id) -}) - test('processes charge.dispute.created for organization', async () => { const customerId = `cus_${crypto.randomUUID()}` const chargeId = `ch_${crypto.randomUUID()}`