diff --git a/.github/ISSUE_TEMPLATE/page.yml b/.github/ISSUE_TEMPLATE/page.yml new file mode 100644 index 00000000..03d9cefb --- /dev/null +++ b/.github/ISSUE_TEMPLATE/page.yml @@ -0,0 +1,101 @@ +name: Page Fetch Issue +description: Report a problem fetching or narrowing a specific page with curl.md. +labels: ['bug'] +body: + - type: markdown + attributes: + value: | + Thanks for reporting this. + + For page-specific issues, the most helpful details are the exact page URL and the exact request options you used. + + - type: checkboxes + id: existing_issues + attributes: + label: Check existing issues + description: By submitting this issue, you checked there is not already an issue for this problem. + options: + - label: I checked existing issues and did not find a duplicate. + required: true + + - type: input + id: page_url + attributes: + label: Page URL + description: The exact page you tried to fetch. + placeholder: https://developers.cloudflare.com/workers + validations: + required: true + + - type: textarea + id: issue + attributes: + label: What is the issue? + description: What did you expect, and what happened instead? + placeholder: I expected curl.md to return the installation steps, but it omitted that section / returned unrelated content / failed to fetch the page. + validations: + required: true + + - type: input + id: objective + attributes: + label: Objective + description: If you used an objective, paste it exactly. + placeholder: find the install steps + validations: + required: false + + - type: input + id: keywords + attributes: + label: Keywords + description: If you used keywords, paste them exactly. + placeholder: install,worker,binding + validations: + required: false + + - type: dropdown + id: mode + attributes: + label: Mode + description: If you used an objective, which mode did you use? + options: + - I did not use an objective + - smart + - rush + validations: + required: true + + - type: checkboxes + id: request_options + attributes: + label: Request options + options: + - label: I used `fresh` / bypassed cache. + + - type: textarea + id: exact_request + attributes: + label: Exact request + description: Paste the exact CLI command, browser URL, SDK call, or agent tool input if possible. + placeholder: | + curl.md developers.cloudflare.com/workers --objective "find the install steps" --keywords install,worker --fresh + render: shell + validations: + required: false + + - type: textarea + id: response + attributes: + label: Response or error output + description: Paste the returned markdown, error message, or a representative excerpt. + validations: + required: false + + - type: textarea + id: context + attributes: + label: Anything else? + description: Extra context. Public/private page, auth requirements, browser vs CLI, screenshots, etc. + validations: + required: false diff --git a/.github/README.md b/.github/README.md index 6d431879..9883c2df 100644 --- a/.github/README.md +++ b/.github/README.md @@ -31,7 +31,8 @@ curl.md example.com md example.com # Add to your agent -claude plugin install curl-md@claude-plugins-official +claude plugin marketplace add wevm/curl.md +claude plugin install curl-md@curl-md opencode plugin -g @curl.md/opencode pi install npm:@curl.md/pi npx @curl.md/amp install diff --git a/src/api.ts b/src/api.ts index 477cdf7c..d4ad0e3f 100644 --- a/src/api.ts +++ b/src/api.ts @@ -1049,15 +1049,19 @@ export const api = new Hono<{ let paymentIntent: Stripe.PaymentIntent try { - paymentIntent = await stripe.paymentIntents.create({ - amount, - confirm: true, - currency: 'usd', - customer: stripeCustomerId, - metadata: { entity_type: entityType, entity_id: entityId }, - off_session: true, - payment_method: defaultPaymentMethod.id, - }) + const idempotencyKey = c.req.header('idempotency-key') + paymentIntent = await stripe.paymentIntents.create( + { + amount, + confirm: true, + currency: 'usd', + customer: stripeCustomerId, + metadata: { entity_type: entityType, entity_id: entityId }, + off_session: true, + payment_method: defaultPaymentMethod.id, + }, + idempotencyKey ? { idempotencyKey } : undefined, + ) } catch (error) { if ( error instanceof Stripe.errors.StripeCardError && @@ -1733,7 +1737,7 @@ export const api = new Hono<{ switch (event.type) { case 'payment_intent.succeeded': { - const paymentIntent = event.data.object as import('stripe').Stripe.PaymentIntent + const paymentIntent = event.data.object const customer = typeof paymentIntent.customer === 'string' ? paymentIntent.customer : null if (customer && paymentIntent.amount) await c.env.STRIPE_WEBHOOK_QUEUE.send({ @@ -1748,7 +1752,7 @@ export const api = new Hono<{ } case 'charge.dispute.created': { // TODO: send chargeback alert (email/Slack) - const dispute = event.data.object as import('stripe').Stripe.Dispute + const dispute = event.data.object const charge = typeof dispute.charge === 'string' ? await stripe.charges.retrieve(dispute.charge) @@ -1766,7 +1770,7 @@ export const api = new Hono<{ break } case 'refund.created': { - const refund = event.data.object as import('stripe').Stripe.Refund + const refund = event.data.object const charge = typeof refund.charge === 'string' ? await stripe.charges.retrieve(refund.charge) @@ -2045,7 +2049,6 @@ export const api = new Hono<{ vary: 'Accept', }) - // Rate limit: three tiers (anon, authed/free, paid) const identity = c.var.session ? c.var.session.account_id : (c.req.header('cf-connecting-ip') ?? 'unknown') @@ -2059,23 +2062,22 @@ export const api = new Hono<{ if (balanceMills > 0) billable = true } - const limit = (() => { - if (query.objective) + let rateLimitHeaders: Record = {} + if (!billable) { + const limit = (() => { + if (query.objective) + return { + key: `query:${identity}` as const, + max: c.var.session ? 10 : 3, + window: 3600, + } return { - key: `query:${identity}` as const, - max: c.var.session ? 10 : 3, + key: `fetch:${identity}` as const, + max: c.var.session ? 1000 : 100, window: 3600, } - return { - key: `fetch:${identity}` as const, - max: c.var.session ? 1000 : 100, - window: 3600, - } - })() + })() - // Paid users skip rate limiting - let rateLimitHeaders: Record = {} - if (!billable) { const kvKey = `ratelimit:${limit.key}` as const const now = Math.floor(Date.now() / 1000) const record = await c.env.KV.get(kvKey, 'json') @@ -2251,7 +2253,18 @@ export const api = new Hono<{ } const chunks = Md.chunk(filteredContent) - const results = await Promise.all(chunks.map(extractChunk)) + const results: Awaited>[] = [] + let chunkIndex = 0 + // Bound Workers AI fan-out so one large objective request can't saturate inference capacity. + await Promise.all( + Array.from({ length: Math.min(2, chunks.length) }, async () => { + while (chunkIndex < chunks.length) { + const currentIndex = chunkIndex + chunkIndex += 1 + results[currentIndex] = await extractChunk(chunks[currentIndex]!) + } + }), + ) const filtered = results .filter((r) => r.response && r.response.trim() !== Constants.sentinelValue) .map((r) => r.response) diff --git a/src/api.workers.test.ts b/src/api.workers.test.ts index 2a274dc1..c46c3fb4 100644 --- a/src/api.workers.test.ts +++ b/src/api.workers.test.ts @@ -1476,6 +1476,47 @@ describe('POST /api/credits/charge', () => { }) }) + test('forwards stripe idempotency key for off-session charges', async () => { + const account = await factory.account.insert({}) + await db + .updateTable('account') + .set({ stripe_customer_id: `cus_${Nanoid.generate()}` }) + .where('id', '=', account.id) + .execute() + const session = await factory.session.insert({ account_id: account.id }) + + let idempotencyKey: string | null = null + + server.use( + http.get('https://api.stripe.com/v1/payment_methods', () => + HttpResponse.json({ + data: [{ id: 'pm_test', card: { brand: 'visa', last4: '4242' } }], + object: 'list', + }), + ), + http.post('https://api.stripe.com/v1/payment_intents', ({ request }) => { + idempotencyKey = request.headers.get('idempotency-key') + return HttpResponse.json({ + id: 'pi_charge_test', + status: 'succeeded', + object: 'payment_intent', + }) + }), + ) + + const res = await client.api.credits.charge.$post( + { json: { amount: '1000' } }, + { + headers: { + Cookie: await Cookie.generateSigned('curl.session', session.id, env.COOKIE_SECRET), + 'Idempotency-Key': 'charge_attempt_test', + }, + }, + ) + expect(res.status).toBe(200) + expect(idempotencyKey).toBe('charge_attempt_test') + }) + test('returns payment_failed when saved card is declined', async () => { const account = await factory.account.insert({}) await db @@ -2949,6 +2990,61 @@ test('GET /api/:url retries transient AI timeouts and normalizes HTML error page }) }) +test('GET /api/:url bounds objective chunk inference concurrency', async () => { + const url = 'https://ai-chunk-pool.example.com/' + await env.KV.put( + `page:${url}`, + JSON.stringify({ + content: [ + `## Alpha\n\n${'a'.repeat(40_000)}`, + `## Beta\n\n${'b'.repeat(40_000)}`, + `## Gamma\n\n${'c'.repeat(40_000)}`, + ].join('\n\n'), + extras: {}, + meta: { + site: 'ai-chunk-pool.example.com', + url, + }, + }), + ) + + let activeCalls = 0 + let maxActiveCalls = 0 + const aiRun = vi.fn(async () => { + activeCalls += 1 + maxActiveCalls = Math.max(maxActiveCalls, activeCalls) + await new Promise((resolve) => setTimeout(resolve, 10)) + activeCalls -= 1 + return { + response: '## Matched\n\nRelevant content', + usage: { completion_tokens: 1, prompt_tokens: 1 }, + } + }) + + const localClient = testClient( + api, + { + ...env, + AI: { + run: aiRun, + } as unknown as typeof env.AI, + } as unknown as typeof env, + executionCtx, + ) + + const res = await localClient.api[':url{.+}'].$get( + { + param: { url: 'ai-chunk-pool.example.com' }, + query: { objective: 'find the relevant content' }, + }, + { headers: { 'cf-connecting-ip': '10.0.0.41' } }, + ) + + expect(res.status).toBe(200) + expect(aiRun).toHaveBeenCalledTimes(3) + expect(maxActiveCalls).toBeLessThanOrEqual(2) +}) + test('GET /api/:url reports upstream 5xx fetch failures to sentry', async () => { const captureException = vi.spyOn(Sentry, 'captureException').mockImplementation(() => '') server.use( @@ -3040,14 +3136,12 @@ test('GET /api/:url authed 429 includes credits message', async () => { }) }) -test('GET /api/:url paid user skips rate limits', async () => { +test('GET /api/:url paid user skips auth rate limits', async () => { const account = await factory.account.insert({}) const session = await factory.session.insert({ account_id: account.id }) - // Seed balance cache await env.KV.put(`balance:${account.id}`, '1000') - // Seed rate limit to already exceeded await env.KV.put( `ratelimit:fetch:${account.id}`, JSON.stringify({ @@ -3075,10 +3169,9 @@ test('GET /api/:url paid user skips rate limits', async () => { }, }, ) - // Paid user should NOT get 429 even though rate limit is exceeded expect(res.status).toBe(200) - // Should not have rate limit headers - expect(res.headers.get('x-ratelimit-limit')).toBeNull() + expect(res.headers.get('x-credits-remaining')).toBe('999') + await expect(res.text()).resolves.toContain('ok') }) test('GET /api/:url zero balance user gets authed rate limits', async () => { @@ -4968,22 +5061,7 @@ describe('POST /api/stripe/webhook', () => { }, }) - // Compute HMAC signature (async-safe for Workers) - const timestamp = Math.floor(Date.now() / 1000) - const key = await crypto.subtle.importKey( - 'raw', - new TextEncoder().encode(env.STRIPE_WEBHOOK_SECRET), - { name: 'HMAC', hash: 'SHA-256' }, - false, - ['sign'], - ) - const sig = await crypto.subtle.sign( - 'HMAC', - key, - new TextEncoder().encode(`${timestamp}.${payload}`), - ) - const hex = [...new Uint8Array(sig)].map((b) => b.toString(16).padStart(2, '0')).join('') - const header = `t=${timestamp},v1=${hex}` + const header = await createStripeWebhookSignature(payload) const res = await api.request( '/api/stripe/webhook', @@ -5000,6 +5078,112 @@ describe('POST /api/stripe/webhook', () => { expect(res.status).toBe(200) await expect(res.json()).resolves.toEqual({ received: true }) }) + + test('queues charge disputes after expanding the charge customer inline', async () => { + const sendSpy = vi.spyOn(env.STRIPE_WEBHOOK_QUEUE, 'send').mockResolvedValue({ + metadata: { metrics: { backlogBytes: 0, backlogCount: 0 } }, + }) + server.use( + http.get('https://api.stripe.com/v1/charges/ch_test_dispute', () => + HttpResponse.json({ + customer: 'cus_test_dispute', + id: 'ch_test_dispute', + object: 'charge', + }), + ), + ) + const payload = JSON.stringify({ + id: 'evt_test_dispute', + type: 'charge.dispute.created', + data: { + object: { + amount: 500, + charge: 'ch_test_dispute', + id: 'dp_test_dispute', + }, + }, + }) + + try { + const res = await api.request( + '/api/stripe/webhook', + { + method: 'POST', + body: payload, + headers: { + 'content-type': 'application/json', + 'stripe-signature': await createStripeWebhookSignature(payload), + }, + }, + env, + ) + + expect(res.status).toBe(200) + expect(sendSpy).toHaveBeenCalledWith({ + type: 'charge.dispute.created', + data: { + amount_total: 500, + customer: 'cus_test_dispute', + id: 'dp_test_dispute', + }, + }) + } finally { + sendSpy.mockRestore() + } + }) + + test('queues refunds after expanding the charge customer inline', async () => { + const sendSpy = vi.spyOn(env.STRIPE_WEBHOOK_QUEUE, 'send').mockResolvedValue({ + metadata: { metrics: { backlogBytes: 0, backlogCount: 0 } }, + }) + server.use( + http.get('https://api.stripe.com/v1/charges/ch_test_refund', () => + HttpResponse.json({ + customer: 'cus_test_refund', + id: 'ch_test_refund', + object: 'charge', + }), + ), + ) + const payload = JSON.stringify({ + id: 'evt_test_refund', + type: 'refund.created', + data: { + object: { + amount: 700, + charge: 'ch_test_refund', + id: 're_test_refund', + }, + }, + }) + + try { + const res = await api.request( + '/api/stripe/webhook', + { + method: 'POST', + body: payload, + headers: { + 'content-type': 'application/json', + 'stripe-signature': await createStripeWebhookSignature(payload), + }, + }, + env, + ) + + expect(res.status).toBe(200) + expect(sendSpy).toHaveBeenCalledWith({ + type: 'refund.created', + data: { + amount_total: 700, + customer: 'cus_test_refund', + id: 're_test_refund', + }, + }) + } finally { + sendSpy.mockRestore() + } + }) }) describe('POST /api/tunnel', () => { @@ -5070,3 +5254,21 @@ function toSearchParams(formData: FormData) { Array.from(formData.entries()).map(([key, value]) => [key, String(value)]), ) } + +async function createStripeWebhookSignature(payload: string) { + const timestamp = Math.floor(Date.now() / 1000) + const key = await crypto.subtle.importKey( + 'raw', + new TextEncoder().encode(env.STRIPE_WEBHOOK_SECRET), + { name: 'HMAC', hash: 'SHA-256' }, + false, + ['sign'], + ) + const sig = await crypto.subtle.sign( + 'HMAC', + key, + new TextEncoder().encode(`${timestamp}.${payload}`), + ) + const hex = [...new Uint8Array(sig)].map((b) => b.toString(16).padStart(2, '0')).join('') + return `t=${timestamp},v1=${hex}` +} diff --git a/src/entry-server.ts b/src/entry-server.ts index d7e13e61..6fd2a1ad 100644 --- a/src/entry-server.ts +++ b/src/entry-server.ts @@ -7,6 +7,7 @@ import { api } from '#api.ts' import { cleanupExpired } from '#crons/cleanup.ts' import { createClient } from '#db/client.ts' import { appendVaryAccept, negotiateAccept } from '#lib/accept.ts' +import { processRequestEnrichmentMessage } from '#queues/request-enrichment.ts' import { processRequestMessage } from '#queues/request.ts' import { processStripeWebhookMessage } from '#queues/stripe-webhook.ts' @@ -117,15 +118,6 @@ export default Sentry.withSentry( return response }, async queue(batch, env) { - if (batch.queue.endsWith('-dlq')) { - for (const message of batch.messages) { - const { ack: _, retry: __, ...rest } = message - console.error(`DLQ message [${batch.queue}]`, rest) - message.ack() - } - return - } - const queueName = (() => { // Preview queues have a suffix (e.g. `stripe-webhook-pr25`), strip it const previewApex = env.HOST.replace('.curl.md', '') @@ -133,10 +125,15 @@ export default Sentry.withSentry( return batch.queue })() const queue = z.parse( - z.enum([processRequestMessage.queueName, processStripeWebhookMessage.queueName]), + z.enum([ + processRequestEnrichmentMessage.queueName, + processRequestMessage.queueName, + processStripeWebhookMessage.queueName, + ]), queueName, ) const handler = { + [processRequestEnrichmentMessage.queueName]: processRequestEnrichmentMessage, [processRequestMessage.queueName]: processRequestMessage, [processStripeWebhookMessage.queueName]: processStripeWebhookMessage, }[queue] @@ -147,6 +144,20 @@ export default Sentry.withSentry( message.ack() } catch (error) { console.error(`Queue message ${message.id} failed:`, error) + // Emit alert when next retry will move the message into DLQ + if (message.attempts >= 3) + Sentry.captureException(error, { + extra: { + queue: { + attempts: message.attempts, + batch_queue: batch.queue, + body: message.body, + logical_queue: queueName, + message_id: message.id, + }, + }, + tags: { queue: queueName, queue_outcome: 'dead_letter' }, + }) message.retry() } } @@ -163,7 +174,10 @@ export default Sentry.withSentry( }, ) -type QueueHandlerMessage = processRequestMessage.Body | processStripeWebhookMessage.Body +type QueueHandlerMessage = + | processRequestEnrichmentMessage.Body + | processRequestMessage.Body + | processStripeWebhookMessage.Body declare module '@tanstack/react-start' { interface Register { diff --git a/src/env.d.ts b/src/env.d.ts index aee6f9fa..8f2a92d9 100644 --- a/src/env.d.ts +++ b/src/env.d.ts @@ -5,6 +5,9 @@ declare namespace Cloudflare { CLOUDFLARE_ACCOUNT_ID: string CLOUDFLARE_API_TOKEN: string KV: KV + REQUEST_ENRICH_QUEUE: Queue< + import('#queues/request-enrichment.ts').processRequestEnrichmentMessage.Body + > REQUEST_QUEUE: Queue STRIPE_PUBLISHABLE_KEY: string STRIPE_WEBHOOK_QUEUE: Queue< diff --git a/src/lib/github.ts b/src/lib/github.ts index e528f713..98bda8cf 100644 --- a/src/lib/github.ts +++ b/src/lib/github.ts @@ -60,6 +60,7 @@ export async function resolveToken( const res = await fetch(tokenUrl.toString(), { method: 'POST', headers: { Accept: 'application/json' }, + signal: AbortSignal.timeout(10_000), }) const parsed = z.safeParse(tokenSchema, await res.json()) if (!parsed.success) return undefined diff --git a/src/lib/stripe.ts b/src/lib/stripe.ts index 221f311d..06f6691b 100644 --- a/src/lib/stripe.ts +++ b/src/lib/stripe.ts @@ -111,5 +111,6 @@ export function stripeOptions(apiUrl: string) { host: url.hostname, port: Number(url.port) || (url.protocol === 'https:' ? 443 : 80), protocol: url.protocol.replace(':', '') as 'http' | 'https', + timeout: 15_000, } } diff --git a/src/md/fallback.smoke.test.ts b/src/md/fallback.smoke.test.ts new file mode 100644 index 00000000..337fd590 --- /dev/null +++ b/src/md/fallback.smoke.test.ts @@ -0,0 +1,70 @@ +import { expect, test } from 'vitest' +import { create } from './mod.ts' +import * as profiles from './profiles.ts' +import * as rules from './rules.ts' + +// Smoke tests that fetch live no-rule docs/reference pages and verify +// generic HTML extraction still returns non-empty markdown. +// Run via: pnpm test --project md:smoke + +const checks = [ + { + contains: ['Getting Started', 'Hello, world!'], + minLength: 200, + title: 'Getting Started', + url: 'https://doc.rust-lang.org/book/ch01-00-getting-started.html', + }, + { + contains: ['Django at a glance'], + minLength: 500, + title: 'Django at a glance', + url: 'https://docs.djangoproject.com/en/5.2/intro/overview/', + }, + { + contains: ['Crate serde'], + minLength: 500, + title: 'serde - Rust', + url: 'https://docs.rs/serde/latest/serde/', + }, + { + contains: ['Tutorial - User Guide', 'FastAPI'], + minLength: 1_000, + title: 'Tutorial - User Guide - FastAPI', + url: 'https://fastapi.tiangolo.com/tutorial/', + }, + { + contains: ['class String', 'arbitrary sequence of bytes'], + minLength: 1_000, + title: 'class String', + url: 'https://docs.ruby-lang.org/en/master/String.html', + }, + { + contains: ['Tutorial: Get started with Go', 'brief introduction to Go programming'], + minLength: 1_000, + title: 'Tutorial: Get started with Go', + url: 'https://go.dev/doc/tutorial/getting-started', + }, +] as const + +const md = create({ profiles }) + +for (const check of checks) { + test(`fallback: ${check.url}`, async () => { + expect(hasRule(check.url)).toBe(false) + + const result = await md.fetch(check.url) + expect(result.ok).toBe(true) + if (!result.ok) return + + expect(result.content.trim().length).toBeGreaterThan(check.minLength) + expect(result.meta.title).toContain(check.title) + for (const s of check.contains) expect(result.content).toContain(s) + }) +} + +function hasRule(url: string) { + return Object.values(rules).some( + (factory) => + typeof factory === 'function' && factory().patterns.some((pattern) => pattern.test(url)), + ) +} diff --git a/src/md/fromHtml.test.ts b/src/md/fromHtml.test.ts index 67400dc4..8494d2f8 100644 --- a/src/md/fromHtml.test.ts +++ b/src/md/fromHtml.test.ts @@ -267,6 +267,24 @@ describe('strips noise elements', () => { expect(result).toContain('Item') }) + test('preserves wrappers with sidebar classes when they contain main content', async () => { + const { content: result } = await fromHtml(` + + + + + + + `) + expect(result).toContain('# Title') + expect(result).toContain('Paragraph') + }) + test('strips nested noise', async () => { const { content: result } = await fromHtml( html({ diff --git a/src/md/fromHtml.ts b/src/md/fromHtml.ts index 16090bfb..caa286d8 100644 --- a/src/md/fromHtml.ts +++ b/src/md/fromHtml.ts @@ -181,7 +181,8 @@ function strip( if (isHidden(child)) return false if (isDecorativeHashLink(child)) return false if (isSkipLink(child)) return false - if (!knownContentRoot && matchesNoiseClassId(child)) return false + if (!knownContentRoot && !containsContentContainer(child) && matchesNoiseClassId(child)) + return false if (!knownContentRoot && isHighLinkDensity(child)) return false strip(child, inSectioning || sectioningTags.has(child.tagName), profile) @@ -189,6 +190,12 @@ function strip( }) } +function containsContentContainer(node: Element): boolean { + return ( + node.tagName === 'article' || node.tagName === 'main' || hasDescendantContentContainer(node) + ) +} + function isKnownContentRoot(node: Element, profile?: Profile>): boolean { if (!profile) return false return profile.contentRootSelectors.some((selector) => matchesSelector(node, selector)) diff --git a/src/md/mod.test.ts b/src/md/mod.test.ts index af9825fe..ca5da178 100644 --- a/src/md/mod.test.ts +++ b/src/md/mod.test.ts @@ -169,6 +169,47 @@ test('requests markdown directly for mintlify docs after profile detection', asy expect(result.extras.source_tokens_method).toBe('html') }) +test('requests markdown directly for rspress docs after profile detection', async () => { + const body = 'Normalized docs body.' + const requests: Array<{ accept: string | null; url: string }> = [] + const md = create({ + fetch: async (input, init) => { + const url = input instanceof URL ? input.href : input instanceof Request ? input.url : input + const accept = init?.headers ? new Headers(init.headers).get('accept') : null + requests.push({ accept, url }) + + if (url === 'https://rspress.rs/guide/start/introduction' && accept === 'text/markdown') + return new Response(`# Introduction\n\n${body}\n`, { + headers: { 'content-type': 'text/markdown; charset=utf-8' }, + status: 200, + }) + + if (url === 'https://rspress.rs/guide/start/introduction') + return new Response( + '
Copy Markdown

Introduction

HTML fallback body.

', + { headers: { 'content-type': 'text/html; charset=utf-8' }, status: 200 }, + ) + + return new Response(null, { status: 404 }) + }, + profiles, + }) + + const result = await md.fetch('https://rspress.rs/guide/start/introduction') + expect(result.ok).toBe(true) + if (!result.ok) return + + expect(requests).toEqual([ + { accept: null, url: 'https://rspress.rs/guide/start/introduction' }, + { accept: 'text/markdown', url: 'https://rspress.rs/guide/start/introduction' }, + ]) + expect(result.content).toContain(body) + expect(result.content).not.toContain('HTML fallback body.') + expect(result.meta.generator).toBe('Rspress v2.0.10') + expect(result.extras.source_tokens).toBeGreaterThan(0) + expect(result.extras.source_tokens_method).toBe('html') +}) + test('fetches markdown from a text/markdown alternate link before converting html', async () => { const requests: string[] = [] const md = create({ diff --git a/src/md/mod.ts b/src/md/mod.ts index f168d040..de01c3c1 100644 --- a/src/md/mod.ts +++ b/src/md/mod.ts @@ -31,6 +31,7 @@ export function create(options: create.Options = {}): create.ReturnType { })() const rewrittenUrl = matched?.rule?.rewrite?.(inputURL, matched.match) ?? inputURL + const signal = AbortSignal.timeout(15_000) // 15 seconds const requestInit = { ...init, headers: { @@ -38,6 +39,7 @@ export function create(options: create.Options = {}): create.ReturnType { ...init?.headers, }, redirect: init?.redirect ?? 'follow', + signal: init?.signal ? AbortSignal.any([init?.signal, signal]) : signal, } satisfies RequestInit const context = { fetch: options.fetch ?? globalThis.fetch.bind(globalThis), diff --git a/src/md/profile.test.ts b/src/md/profile.test.ts index 2d40c8aa..35bade29 100644 --- a/src/md/profile.test.ts +++ b/src/md/profile.test.ts @@ -121,6 +121,30 @@ test('detects read the docs profile before generic sphinx markers', () => { }) }) +test('detects rspress profile from generator and dom markers', () => { + const result = detectPageProfile( + '
Copy Markdown

Docs

', + new URL('https://rspress.rs/guide/start/introduction'), + profiles, + ) + + expect(result).toEqual({ + contentRootSelectors: [ + '.rp-doc-layout__doc-container', + '.rp-home-feature', + '.rp-home-hero', + '.rspress-doc', + ], + generator: 'Rspress v2.0.10', + key: 'rspress', + markdownRequest: { + headers: { Accept: 'text/markdown' }, + url: 'https://rspress.rs/guide/start/introduction', + }, + markers: ['meta:generator=Rspress v2.0.10', 'dom:__rspress_root'], + }) +}) + test('detects sphinx profile from classic theme markers', () => { const result = detectPageProfile( 'Docs

Docs

', diff --git a/src/md/profiles.ts b/src/md/profiles.ts index 5c8d5d08..7e7032d8 100644 --- a/src/md/profiles.ts +++ b/src/md/profiles.ts @@ -125,6 +125,34 @@ export const readTheDocs = defineProfile({ key: 'readTheDocs', }) +export const rspress = defineProfile<{ + markdownRequest: { headers: Record; url: string } +}>({ + checks: [{ url: 'https://rspress.rs/' }, { url: 'https://rspack.rs/guide/start/introduction' }], + contentRootSelectors: [ + '.rp-doc-layout__doc-container', + '.rp-home-feature', + '.rp-home-hero', + '.rspress-doc', + ], + detect: { + generator: /^rspress\b/i, + includesAny: { + marker: 'dom:__rspress_root', + needles: [ + 'id="__rspress_root"', + 'id="__rspress_modal_container"', + 'class="rp-doc rspress-doc"', + 'class="rp-llms-button', + ], + }, + }, + key: 'rspress', + resolve: (url) => ({ + markdownRequest: { headers: { Accept: 'text/markdown' }, url: url.href }, + }), +}) + export const sphinx = defineProfile({ checks: [{ url: 'https://docs.python.org/3/library/functions.html' }], contentRootSelectors: ['.body', '.bodywrapper', '.documentwrapper'], diff --git a/src/md/transports.ts b/src/md/transports.ts index c4560d5a..28948d97 100644 --- a/src/md/transports.ts +++ b/src/md/transports.ts @@ -4,8 +4,9 @@ import { defineTransport } from './mod.ts' export const cfBrowserRendering = defineTransport<{ accountId: string apiToken: string -}>(async (url, _init, context) => { +}>(async (url, init, context) => { if (context.previous?.status !== 403 || !context.options) return null + const signal = AbortSignal.timeout(20_000) const res = await context.fetch( `https://api.cloudflare.com/client/v4/accounts/${context.options.accountId}/browser-rendering/content`, { @@ -19,6 +20,7 @@ export const cfBrowserRendering = defineTransport<{ rejectResourceTypes: ['font', 'image', 'media'], goToOptions: { waitUntil: 'networkidle2' }, }), + signal: init?.signal ? AbortSignal.any([init?.signal, signal]) : signal, }, ) if (!res.ok) return null diff --git a/src/queues/request-enrichment.ts b/src/queues/request-enrichment.ts new file mode 100644 index 00000000..92fac944 --- /dev/null +++ b/src/queues/request-enrichment.ts @@ -0,0 +1,40 @@ +import { env } from 'cloudflare:workers' +import { estimateTokenCount } from 'tokenx' +import type { Database } from '#db/client.ts' + +export async function processRequestEnrichmentMessage( + message: Message, + db: Database, +) { + const response = await fetch(message.body.url, { + headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${env.HOST}/1.0; +https://${env.HOST})` }, + redirect: 'follow', + signal: AbortSignal.timeout(10_000), + }) + if (!response.ok) { + if (response.status >= 500 || response.status === 408 || response.status === 429) + throw new Error(`Request enrichment fetch failed with ${response.status}`) + return + } + + const contentType = (response.headers.get('content-type') ?? '').toLowerCase() + if (!contentType.includes('text/html') && !contentType.includes('application/xhtml+xml')) return + + const sourceTokens = estimateTokenCount(await response.text()) + await db + .updateTable('request') + .set({ source_tokens: sourceTokens, source_tokens_method: 'html' }) + .where('id', '=', message.body.request_id) + .where('source_tokens', '<', sourceTokens) + .where('source_tokens_method', '=', 'estimated') + .executeTakeFirst() +} + +processRequestEnrichmentMessage.queueName = 'curl-request-enrichment' as const + +export namespace processRequestEnrichmentMessage { + export type Body = { + request_id: string + url: string + } +} diff --git a/src/queues/request-enrichment.workers.test.ts b/src/queues/request-enrichment.workers.test.ts new file mode 100644 index 00000000..9af993fe --- /dev/null +++ b/src/queues/request-enrichment.workers.test.ts @@ -0,0 +1,144 @@ +import { createMessageBatch } from 'cloudflare:test' +import { env } from 'cloudflare:workers' +import { HttpResponse, http } from 'msw' +import { estimateTokenCount } from 'tokenx' +import { afterAll, expect, test } from 'vitest' +import { createClient } from '#db/client.ts' +import { processRequestEnrichmentMessage } from '#queues/request-enrichment.ts' +import { server } from '#test/workers.server.ts' + +const db = createClient(env.DB.connectionString, { max: 1 }) + +afterAll(() => db.destroy()) + +test('upgrades estimated rows with html source tokens when fetch succeeds', async () => { + const html = '

Example

Hello world

' + server.use( + http.get( + 'https://example.com/', + () => + new HttpResponse(html, { + headers: { 'content-type': 'text/html; charset=utf-8' }, + status: 200, + }), + ), + ) + + await db + .insertInto('request') + .values({ + cached: false, + hostname: 'example.com', + id: 'req_enrich_1', + markdown_tokens: 25, + path: '/', + source_tokens: 25, + source_tokens_method: 'estimated', + url: 'https://example.com', + }) + .execute() + + const batch = createMessageBatch( + processRequestEnrichmentMessage.queueName, + [ + { + attempts: 1, + body: { + request_id: 'req_enrich_1', + url: 'https://example.com', + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ], + ) + + await processRequestEnrichmentMessage(batch.messages[0]!, db) + + const row = await db + .selectFrom('request') + .where('id', '=', 'req_enrich_1') + .select(['source_tokens', 'source_tokens_method']) + .executeTakeFirstOrThrow() + + expect(row.source_tokens).toBe(estimateTokenCount(html)) + expect(row.source_tokens_method).toBe('html') +}) + +test('keeps estimated rows when html source tokens are smaller', async () => { + const html = '
' + server.use( + http.get( + 'https://spa.example.com/', + () => + new HttpResponse(html, { + headers: { 'content-type': 'text/html; charset=utf-8' }, + status: 200, + }), + ), + ) + + await db + .insertInto('request') + .values({ + cached: false, + hostname: 'spa.example.com', + id: 'req_fallback_1', + markdown_tokens: 120, + path: '/', + source_tokens: 120, + source_tokens_method: 'estimated', + url: 'https://spa.example.com', + }) + .execute() + + const batch = createMessageBatch( + processRequestEnrichmentMessage.queueName, + [ + { + attempts: 1, + body: { + request_id: 'req_fallback_1', + url: 'https://spa.example.com', + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ], + ) + + await processRequestEnrichmentMessage(batch.messages[0]!, db) + + const row = await db + .selectFrom('request') + .where('id', '=', 'req_fallback_1') + .select(['source_tokens', 'source_tokens_method']) + .executeTakeFirstOrThrow() + + expect(estimateTokenCount(html)).toBeLessThan(120) + expect(row.source_tokens).toBe(120) + expect(row.source_tokens_method).toBe('estimated') +}) + +test('throws on transient enrichment fetch failures so the queue can retry', async () => { + server.use(http.get('https://retry.example.com/', () => new HttpResponse(null, { status: 503 }))) + + const batch = createMessageBatch( + processRequestEnrichmentMessage.queueName, + [ + { + attempts: 1, + body: { + request_id: 'req_retry_1', + url: 'https://retry.example.com', + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ], + ) + + await expect(processRequestEnrichmentMessage(batch.messages[0]!, db)).rejects.toThrow( + 'Request enrichment fetch failed with 503', + ) +}) diff --git a/src/queues/request.ts b/src/queues/request.ts index 0c748f44..43b83830 100644 --- a/src/queues/request.ts +++ b/src/queues/request.ts @@ -1,5 +1,4 @@ import { env } from 'cloudflare:workers' -import { estimateTokenCount } from 'tokenx' import type { Database } from '#db/client.ts' import type { DB } from '#db/types.gen.ts' @@ -83,7 +82,14 @@ export async function processRequestMessage( await env.KV.put(`balance:${billingEntity}`, String(newBalance.balance_mills)) } - if (body.source_tokens_method === 'estimated') await enrichSourceTokensFromHtml(body, db) + if (body.source_tokens_method !== 'estimated') return + + // Best-effort analytics only; billing/logging should not retry just because enrichment handoff fails. + try { + await env.REQUEST_ENRICH_QUEUE.send({ request_id: body.id, url: body.url }) + } catch (error) { + console.error(`Request enrichment enqueue failed for ${body.id}:`, error) + } } processRequestMessage.queueName = 'curl-request' as const @@ -112,27 +118,3 @@ export namespace processRequestMessage { user_agent: string | undefined } } - -async function enrichSourceTokensFromHtml(body: processRequestMessage.Body, db: Database) { - try { - const response = await fetch(body.url, { - headers: { 'User-Agent': `Mozilla/5.0 (compatible; ${env.HOST}/1.0; +https://${env.HOST})` }, - redirect: 'follow', - }) - if (!response.ok) return - - const contentType = (response.headers.get('content-type') ?? '').toLowerCase() - if (!contentType.includes('text/html') && !contentType.includes('application/xhtml+xml')) return - - const sourceTokens = estimateTokenCount(await response.text()) - await db - .updateTable('request') - .set({ source_tokens: sourceTokens, source_tokens_method: 'html' }) - .where('id', '=', body.id) - .where('source_tokens', '<', sourceTokens) - .where('source_tokens_method', '=', 'estimated') - .executeTakeFirst() - } catch { - // Best-effort enrichment only; keep the markdown fallback when HTML fetch fails. - } -} diff --git a/src/queues/request.workers.test.ts b/src/queues/request.workers.test.ts index 82287b02..26767f09 100644 --- a/src/queues/request.workers.test.ts +++ b/src/queues/request.workers.test.ts @@ -1,16 +1,21 @@ import { createMessageBatch } from 'cloudflare:test' import { env } from 'cloudflare:workers' -import { HttpResponse, http } from 'msw' -import { estimateTokenCount } from 'tokenx' import { afterAll, expect, test, vi } from 'vitest' import { createClient } from '#db/client.ts' import * as Nanoid from '#lib/nanoid.ts' import { processRequestMessage } from '#queues/request.ts' import { createFactory } from '#test/factory.ts' -import { server } from '#test/workers.server.ts' const db = createClient(env.DB.connectionString, { max: 1 }) const factory = createFactory(db) +const queueSendResponse = { + metadata: { + metrics: { + backlogBytes: 0, + backlogCount: 0, + }, + }, +} afterAll(() => db.destroy()) @@ -67,18 +72,6 @@ test('leaves KV stats cache untouched when a request is recorded', async () => { await env.KV.put('stats:tokens_saved', '1000') await env.KV.put('stats:tokens_saved:example.com', '500') - const html = '

Example

Hello world

' - server.use( - http.get( - 'https://example.com/', - () => - new HttpResponse(html, { - headers: { 'content-type': 'text/html; charset=utf-8' }, - status: 200, - }), - ), - ) - const batch = createMessageBatch(processRequestMessage.queueName, [ { attempts: 1, @@ -116,10 +109,11 @@ test('leaves KV stats cache untouched when a request is recorded', async () => { expect(hostCached).toBe('500') }) -test('does not fail when KV delete is rate limited', async () => { - const deleteSpy = vi - .spyOn(env.KV, 'delete') - .mockRejectedValue(new Error('KV DELETE failed: 429 Too Many Requests')) +test('does not fail when enrichment queue send fails', async () => { + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + const sendSpy = vi + .spyOn(env.REQUEST_ENRICH_QUEUE, 'send') + .mockRejectedValue(new Error('queue send failed')) try { const batch = createMessageBatch(processRequestMessage.queueName, [ @@ -142,7 +136,7 @@ test('does not fail when KV delete is rate limited', async () => { organization_id: null, path: '/', source_tokens: 60, - source_tokens_method: 'markdown', + source_tokens_method: 'estimated', url: 'https://ratelimit.example.com', user_agent: 'test-agent', }, @@ -152,16 +146,20 @@ test('does not fail when KV delete is rate limited', async () => { ]) await expect(processRequestMessage(batch.messages[0]!, db)).resolves.toBeUndefined() - expect(deleteSpy).not.toHaveBeenCalled() + expect(sendSpy).toHaveBeenCalledWith({ + request_id: 'req_no_kv_delete', + url: 'https://ratelimit.example.com', + }) const row = await db .selectFrom('request') .where('id', '=', 'req_no_kv_delete') .select(['id', 'source_tokens_method']) .executeTakeFirstOrThrow() - expect(row).toEqual({ id: 'req_no_kv_delete', source_tokens_method: 'markdown' }) + expect(row).toEqual({ id: 'req_no_kv_delete', source_tokens_method: 'estimated' }) } finally { - deleteSpy.mockRestore() + consoleErrorSpy.mockRestore() + sendSpy.mockRestore() } }) @@ -207,113 +205,142 @@ test('stores total savings when stage counts are present', async () => { expect(row.source_tokens_method).toBe('markdown') }) -test('upgrades estimated rows with html source tokens when fetch succeeds', async () => { - const html = '

Example

Hello world

' - server.use( - http.get( - 'https://example.com/', - () => - new HttpResponse(html, { - headers: { 'content-type': 'text/html; charset=utf-8' }, - status: 200, - }), - ), - ) +test('enqueues html enrichment for estimated rows', async () => { + const sendSpy = vi.spyOn(env.REQUEST_ENRICH_QUEUE, 'send').mockResolvedValue(queueSendResponse) - const batch = createMessageBatch(processRequestMessage.queueName, [ - { - attempts: 1, - body: { - account_id: null, - api_key_id: null, - billable: false, - cached: false, - cost_mills: 0, - extracted_tokens: null, - filtered_tokens: null, - hostname: 'example.com', - id: 'req_enrich_html', - keywords: null, - markdown_tokens: 25, - mode: null, - objective: null, - organization_id: null, - path: '/', - source_tokens: 25, - source_tokens_method: 'estimated', - url: 'https://example.com', - user_agent: 'test-agent', + try { + const batch = createMessageBatch(processRequestMessage.queueName, [ + { + attempts: 1, + body: { + account_id: null, + api_key_id: null, + billable: false, + cached: false, + cost_mills: 0, + extracted_tokens: null, + filtered_tokens: null, + hostname: 'example.com', + id: 'req_enrich_html', + keywords: null, + markdown_tokens: 25, + mode: null, + objective: null, + organization_id: null, + path: '/', + source_tokens: 25, + source_tokens_method: 'estimated', + url: 'https://example.com', + user_agent: 'test-agent', + }, + id: crypto.randomUUID(), + timestamp: new Date(), }, - id: crypto.randomUUID(), - timestamp: new Date(), - }, - ]) + ]) - await processRequestMessage(batch.messages[0]!, db) + await processRequestMessage(batch.messages[0]!, db) - const row = await db - .selectFrom('request') - .where('id', '=', 'req_enrich_html') - .select(['source_tokens', 'source_tokens_method']) - .executeTakeFirstOrThrow() + expect(sendSpy).toHaveBeenCalledWith({ + request_id: 'req_enrich_html', + url: 'https://example.com', + }) - expect(row.source_tokens).toBe(estimateTokenCount(html)) - expect(row.source_tokens_method).toBe('html') + const row = await db + .selectFrom('request') + .where('id', '=', 'req_enrich_html') + .select(['source_tokens', 'source_tokens_method']) + .executeTakeFirstOrThrow() + + expect(row.source_tokens).toBe(25) + expect(row.source_tokens_method).toBe('estimated') + } finally { + sendSpy.mockRestore() + } }) -test('keeps estimated rows when html source tokens are smaller', async () => { - const html = '
' - server.use( - http.get( - 'https://spa.example.com/', - () => - new HttpResponse(html, { - headers: { 'content-type': 'text/html; charset=utf-8' }, - status: 200, - }), - ), - ) +test('does not enqueue html enrichment for non-estimated rows', async () => { + const sendSpy = vi.spyOn(env.REQUEST_ENRICH_QUEUE, 'send').mockResolvedValue(queueSendResponse) - const batch = createMessageBatch(processRequestMessage.queueName, [ - { - attempts: 1, - body: { - account_id: null, - api_key_id: null, - billable: false, - cached: false, - cost_mills: 0, - extracted_tokens: null, - filtered_tokens: null, - hostname: 'spa.example.com', - id: 'req_keep_fallback', - keywords: null, - markdown_tokens: 120, - mode: null, - objective: null, - organization_id: null, - path: '/', - source_tokens: 120, - source_tokens_method: 'estimated', - url: 'https://spa.example.com', - user_agent: 'test-agent', + try { + const batch = createMessageBatch(processRequestMessage.queueName, [ + { + attempts: 1, + body: { + account_id: null, + api_key_id: null, + billable: false, + cached: false, + cost_mills: 0, + extracted_tokens: null, + filtered_tokens: null, + hostname: 'example.com', + id: 'req_keep_fallback', + keywords: null, + markdown_tokens: 120, + mode: null, + objective: null, + organization_id: null, + path: '/', + source_tokens: 120, + source_tokens_method: 'markdown', + url: 'https://example.com', + user_agent: 'test-agent', + }, + id: crypto.randomUUID(), + timestamp: new Date(), }, - id: crypto.randomUUID(), - timestamp: new Date(), - }, - ]) + ]) - await processRequestMessage(batch.messages[0]!, db) + await processRequestMessage(batch.messages[0]!, db) - const row = await db - .selectFrom('request') - .where('id', '=', 'req_keep_fallback') - .select(['source_tokens', 'source_tokens_method']) - .executeTakeFirstOrThrow() + expect(sendSpy).not.toHaveBeenCalled() + } finally { + sendSpy.mockRestore() + } +}) - expect(estimateTokenCount(html)).toBeLessThan(120) - expect(row.source_tokens).toBe(120) - expect(row.source_tokens_method).toBe('estimated') +test('enqueues html enrichment for cached estimated rows', async () => { + const sendSpy = vi.spyOn(env.REQUEST_ENRICH_QUEUE, 'send').mockResolvedValue(queueSendResponse) + + try { + const batch = createMessageBatch(processRequestMessage.queueName, [ + { + attempts: 1, + body: { + account_id: null, + api_key_id: null, + billable: false, + cached: true, + cost_mills: 0, + extracted_tokens: null, + filtered_tokens: null, + hostname: 'example.com', + id: 'req_cached_est', + keywords: null, + markdown_tokens: 120, + mode: null, + objective: null, + organization_id: null, + path: '/', + source_tokens: 120, + source_tokens_method: 'estimated', + url: 'https://example.com', + user_agent: 'test-agent', + }, + id: crypto.randomUUID(), + timestamp: new Date(), + }, + ]) + + await processRequestMessage(batch.messages[0]!, db) + + expect(sendSpy).toHaveBeenCalledWith({ + request_id: 'req_cached_est', + url: 'https://example.com', + }) + } finally { + sendSpy.mockRestore() + } }) test('deducts credits when billable', async () => { diff --git a/src/routes/_dash.$login/billing.tsx b/src/routes/_dash.$login/billing.tsx index 5e6d1c45..13c942dc 100644 --- a/src/routes/_dash.$login/billing.tsx +++ b/src/routes/_dash.$login/billing.tsx @@ -119,12 +119,17 @@ function Component() { const addCredits = useMutation({ async mutationFn(amount: (typeof creditAmounts)[number]) { if (data.payment_methods.length > 0) { - const res = await rpc.api.credits.charge.$post({ - json: { - amount, - ...(entity.type === 'organization' ? { organization_id: entity.id } : {}), + const res = await rpc.api.credits.charge.$post( + { + json: { + amount, + ...(entity.type === 'organization' ? { organization_id: entity.id } : {}), + }, }, - }) + { + headers: { 'Idempotency-Key': crypto.randomUUID() }, + }, + ) if (res.status === 200) return { kind: 'charge', result: await res.json() } as const if (res.status === 400) { diff --git a/src/worker-configuration.d.ts b/src/worker-configuration.d.ts index 65aa3681..fb320db5 100644 --- a/src/worker-configuration.d.ts +++ b/src/worker-configuration.d.ts @@ -1,9 +1,10 @@ /* eslint-disable */ -// Generated by Wrangler by running `wrangler types src/worker-configuration.d.ts` (hash: 2b7371d5a52af64e010418c63936e3d0) +// Generated by Wrangler by running `wrangler types src/worker-configuration.d.ts` (hash: ba17e42f37ef51979f866f7ad008fec9) // Runtime types generated with workerd@1.20260426.1 2025-10-30 no_handle_cross_request_promise_resolution,nodejs_compat declare namespace Cloudflare { interface ProductionEnv { DB: Hyperdrive; + REQUEST_ENRICH_QUEUE: Queue; REQUEST_QUEUE: Queue; STRIPE_WEBHOOK_QUEUE: Queue; AI: Ai; @@ -23,6 +24,7 @@ declare namespace Cloudflare { } interface PreviewEnv { DB: Hyperdrive; + REQUEST_ENRICH_QUEUE: Queue; REQUEST_QUEUE: Queue; STRIPE_WEBHOOK_QUEUE: Queue; AI: Ai; @@ -42,6 +44,7 @@ declare namespace Cloudflare { } interface Env { DB: Hyperdrive; + REQUEST_ENRICH_QUEUE: Queue; REQUEST_QUEUE: Queue; STRIPE_WEBHOOK_QUEUE: Queue; AI: Ai; diff --git a/test/vitest.config.ts b/test/vitest.config.ts index 9f194fd5..e60bae19 100644 --- a/test/vitest.config.ts +++ b/test/vitest.config.ts @@ -66,6 +66,7 @@ export default defineConfig({ hyperdrives: { DB: env.DB_URL }, kvNamespaces: ['KV'], queueProducers: { + REQUEST_ENRICH_QUEUE: 'test-request-enrichment-queue', REQUEST_QUEUE: 'test-request-queue', STRIPE_WEBHOOK_QUEUE: 'test-stripe-webhook-queue', }, diff --git a/wrangler.jsonc b/wrangler.jsonc index 6816083a..5edbf2b0 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -26,10 +26,18 @@ "kv_namespaces": [{ "binding": "KV", "id": "e3a921b8d13741b99e7aa1cbb6144c9f" }], "queues": { "producers": [ + { "binding": "REQUEST_ENRICH_QUEUE", "queue": "curl-request-enrichment" }, { "binding": "REQUEST_QUEUE", "queue": "curl-request" }, { "binding": "STRIPE_WEBHOOK_QUEUE", "queue": "curl-stripe-webhook" }, ], "consumers": [ + { + "queue": "curl-request-enrichment", + "max_batch_size": 10, + "max_batch_timeout": 30, + "max_retries": 3, + "dead_letter_queue": "curl-request-enrichment-dlq", + }, { "queue": "curl-request", "max_batch_size": 10, @@ -44,18 +52,6 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-dlq", }, - { - "queue": "curl-request-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-stripe-webhook-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, ], }, "triggers": { @@ -107,10 +103,18 @@ "kv_namespaces": [{ "binding": "KV", "id": "e3a921b8d13741b99e7aa1cbb6144c9f" }], "queues": { "producers": [ + { "binding": "REQUEST_ENRICH_QUEUE", "queue": "curl-request-enrichment" }, { "binding": "REQUEST_QUEUE", "queue": "curl-request" }, { "binding": "STRIPE_WEBHOOK_QUEUE", "queue": "curl-stripe-webhook" }, ], "consumers": [ + { + "queue": "curl-request-enrichment", + "max_batch_size": 10, + "max_batch_timeout": 30, + "max_retries": 3, + "dead_letter_queue": "curl-request-enrichment-dlq", + }, { "queue": "curl-request", "max_batch_size": 10, @@ -125,18 +129,6 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-dlq", }, - { - "queue": "curl-request-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-stripe-webhook-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, ], }, "version_metadata": { @@ -191,6 +183,10 @@ "kv_namespaces": [{ "binding": "KV", "id": "__PREVIEW_KV_ID__" }], "queues": { "producers": [ + { + "binding": "REQUEST_ENRICH_QUEUE", + "queue": "curl-request-enrichment-__PREVIEW_APEX__", + }, { "binding": "REQUEST_QUEUE", "queue": "curl-request-__PREVIEW_APEX__", @@ -201,6 +197,13 @@ }, ], "consumers": [ + { + "queue": "curl-request-enrichment-__PREVIEW_APEX__", + "max_batch_size": 10, + "max_batch_timeout": 30, + "max_retries": 3, + "dead_letter_queue": "curl-request-enrichment-__PREVIEW_APEX__-dlq", + }, { "queue": "curl-request-__PREVIEW_APEX__", "max_batch_size": 10, @@ -215,18 +218,6 @@ "max_retries": 3, "dead_letter_queue": "curl-stripe-webhook-__PREVIEW_APEX__-dlq", }, - { - "queue": "curl-request-__PREVIEW_APEX__-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, - { - "queue": "curl-stripe-webhook-__PREVIEW_APEX__-dlq", - "max_batch_size": 1, - "max_batch_timeout": 5, - "max_retries": 0, - }, ], }, "version_metadata": {