diff --git a/CHANGELOG.md b/CHANGELOG.md index c1a866b..0731e01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,8 @@ - Added `--include-dirty` to review, CI, and revalidation file filters for auditing uncommitted worktree changes, thanks @AsishKumarDalal. - Fixed Bun package-manager detection to recognize the text `bun.lock` lockfile, thanks @austinm911. - Fixed review-output schema to tolerate optional `reproduction` and `minimumFixScope` fields and zero-valued evidence line numbers (normalized to `null`), recovering 4 of 28 zod issue patterns observed in run `20260517T190759-3c9e9e` (78 errors over 1000 features) that previously dropped whole-feature output instead of the affected finding. +- Changed `clawpatch review --jobs` and `clawpatch ci --jobs` defaults from a fixed `10` to `floor(cpuCores / 2)` clamped to `[1, 10]`, thanks @coletebou. +- Added `clawpatch review --rate-limit-per-minute ` (also `CLAWPATCH_RPM`) to cap how many provider calls may start within any rolling 60s window across jobs, thanks @coletebou. ## 0.3.0 - 2026-05-18 diff --git a/README.md b/README.md index 7d88c1e..930994a 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,8 @@ Useful flags: - `--json` - `--plain` - `--limit ` -- `--jobs ` +- `--jobs ` (default: half of CPU cores, max 10) +- `--rate-limit-per-minute ` - `--source ` - `--feature ` - `--project ` diff --git a/docs/code-review.md b/docs/code-review.md index e1ce7f0..3ee65a4 100644 --- a/docs/code-review.md +++ b/docs/code-review.md @@ -20,7 +20,8 @@ Current behavior: - selects pending features unless `--feature` is set - claims each feature with an atomic lock file plus the feature run lock -- reviews with a bounded worker pool; default `--jobs` is `10` +- reviews with a bounded worker pool; default `--jobs` is half of CPU cores, max 10 +- caps provider call starts with `--rate-limit-per-minute ` or `CLAWPATCH_RPM` - emits progress to stderr unless `--quiet` is set - builds bounded prompt context from owned files, context files, and tests - includes a prompt context manifest with included files, omitted files, byte diff --git a/src/app.test.ts b/src/app.test.ts index 7829cf7..e57e44b 100644 --- a/src/app.test.ts +++ b/src/app.test.ts @@ -177,6 +177,31 @@ describe("runProviderReviewWithRetry", () => { expect(review).toHaveBeenCalledTimes(2); }); + it("acquires the RPM limiter before each retry attempt", async () => { + delete process.env["CLAWPATCH_REVIEW_RETRIES"]; + const review = vi + .fn() + .mockRejectedValueOnce(new ClawpatchError("garbled", 8, "malformed-output")) + .mockResolvedValueOnce(emptyReview()); + const acquire = vi.fn().mockResolvedValue(undefined); + const provider = fakeProvider(review); + await runProviderReviewWithRetry({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + provider: provider as any, + root: "/tmp", + prompt: "hi", + // eslint-disable-next-line @typescript-eslint/no-explicit-any + options: {} as any, + context: QUIET_CONTEXT, + featureId: "feat_x", + index: 0, + total: 1, + limiter: { acquire }, + }); + expect(review).toHaveBeenCalledTimes(2); + expect(acquire).toHaveBeenCalledTimes(2); + }); + it("does NOT retry on provider-auth", async () => { delete process.env["CLAWPATCH_REVIEW_RETRIES"]; const err = new ClawpatchError("auth", 4, "provider-auth"); diff --git a/src/app.ts b/src/app.ts index a8df9a6..2ac258c 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,6 +1,6 @@ import { appendFile, lstat, readFile, realpath, writeFile } from "node:fs/promises"; import { join, relative, resolve } from "node:path"; -import { hostname } from "node:os"; +import { cpus, hostname } from "node:os"; import { changedPathsBetweenSnapshots, hasSourceDirtyWorktree, @@ -74,6 +74,7 @@ import { reasoningEfforts, } from "./types.js"; import { validationCommandsForFeature } from "./validation.js"; +import { createRpmLimiter, defaultJobs, rpmFromFlag, type RpmLimiter } from "./rpm-limiter.js"; export type AppContext = { root: string; @@ -315,6 +316,9 @@ export async function reviewCommand( error: unknown; }> = []; const jobs = Math.min(reviewJobs(flags), Math.max(features.length, 1)); + const limiter = createRpmLimiter( + rpmFromFlag(stringFlag(flags, "rateLimitPerMinute"), process.env["CLAWPATCH_RPM"]), + ); let cursor = 0; emitProgress(context, "review", "start", { run: currentRunId, @@ -342,6 +346,7 @@ export async function reviewCommand( total: features.length, mode, customPrompt, + limiter, allowNonPendingFeatureReview: stringFlag(flags, "feature") !== undefined, }); findingIds.push(...reviewed.findingIds); @@ -651,6 +656,7 @@ type ReviewFeatureOptions = { total: number; mode: ReviewMode; customPrompt: string | null; + limiter: RpmLimiter; allowNonPendingFeatureReview: boolean; }; @@ -668,6 +674,7 @@ async function reviewFeature( total, mode, customPrompt, + limiter, allowNonPendingFeatureReview, } = options; const started = Date.now(); @@ -705,6 +712,7 @@ async function reviewFeature( featureId: feature.featureId, index, total, + limiter, }); // Layer 1 drops: per-finding schema violations from parseReviewOutput. const droppedFindings: DroppedFinding[] = [...providerOutput.droppedFindings]; @@ -817,12 +825,14 @@ async function runProviderReviewWithRetry(args: { featureId: string; index: number; total: number; + limiter?: RpmLimiter; }): Promise { - const { provider, root, prompt, options, context, featureId, index, total } = args; + const { provider, root, prompt, options, context, featureId, index, total, limiter } = args; const maxAttempts = 1 + reviewRetries(); let lastError: unknown; for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { try { + await limiter?.acquire(); return await provider.review(root, prompt, options); } catch (error: unknown) { lastError = error; @@ -1403,7 +1413,7 @@ function reviewFlagSubset( flags: Record, ): Record { const subset = providerFlagSubset(flags); - for (const flag of ["since", "limit", "jobs"] as const) { + for (const flag of ["since", "limit", "jobs", "rateLimitPerMinute"] as const) { const value = stringFlag(flags, flag); if (value !== undefined) { subset[flag] = value; @@ -2011,6 +2021,21 @@ async function filterFindingsByOwnedFilesSince( return filterFindingsByChangedOwnedFiles(findings, features, changed); } +export function reviewJobs( + flags: Record, + coreCount: number = cpus().length, +): number { + const explicit = stringFlag(flags, "jobs"); + if (explicit !== undefined) { + const parsed = Number(explicit); + if (!Number.isFinite(parsed) || parsed < 1) { + return 1; + } + return Math.min(Math.floor(parsed), 32); + } + return defaultJobs(coreCount); +} + async function changedFiles( root: string, flags: Record, @@ -2034,14 +2059,6 @@ function hasFileFilter(flags: Record): boolean { return stringFlag(flags, "since") !== undefined || flags["includeDirty"] === true; } -function reviewJobs(flags: Record): number { - const parsed = Number(stringFlag(flags, "jobs") ?? "10"); - if (!Number.isFinite(parsed) || parsed < 1) { - return 1; - } - return Math.min(Math.floor(parsed), 32); -} - function reviewMode(flags: Record): ReviewMode { const mode = stringFlag(flags, "mode") ?? "default"; if (mode === "default" || mode === "deslopify") { diff --git a/src/cli.ts b/src/cli.ts index 1a20176..0ea6eee 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -161,6 +161,7 @@ const commandFlags = { "since", "jobs", "mode", + "rateLimitPerMinute", "provider", "model", "reasoningEffort", @@ -174,6 +175,7 @@ const commandFlags = { "limit", "since", "jobs", + "rateLimitPerMinute", "provider", "model", "reasoningEffort", @@ -224,6 +226,7 @@ const valueFlagNames = new Set([ "since", "jobs", "mode", + "rate-limit-per-minute", "source", "provider", "model", @@ -415,8 +418,9 @@ Flags: --limit --since --include-dirty - --jobs default: 10 + --jobs default: ~half of CPU cores, max 10 --mode + --rate-limit-per-minute cap provider calls per 60s window (env: CLAWPATCH_RPM) --provider --model --reasoning-effort @@ -462,7 +466,8 @@ Flags: --since --include-dirty --limit - --jobs default: 10 + --jobs default: ~half of CPU cores, max 10 + --rate-limit-per-minute cap provider calls per 60s window (env: CLAWPATCH_RPM) --provider --model --reasoning-effort diff --git a/src/review-jobs.test.ts b/src/review-jobs.test.ts new file mode 100644 index 0000000..88e8f8a --- /dev/null +++ b/src/review-jobs.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from "vitest"; +import { reviewJobs } from "./app.js"; + +describe("reviewJobs", () => { + it("defaults to floor(cores / 2) capped at 10 when --jobs is not given", () => { + expect(reviewJobs({}, 4)).toBe(2); + expect(reviewJobs({}, 8)).toBe(4); + expect(reviewJobs({}, 32)).toBe(10); + expect(reviewJobs({}, 1)).toBe(1); + }); + + it("honors explicit --jobs value", () => { + expect(reviewJobs({ jobs: "7" }, 32)).toBe(7); + expect(reviewJobs({ jobs: "1" }, 32)).toBe(1); + }); + + it("caps explicit --jobs at 32", () => { + expect(reviewJobs({ jobs: "100" }, 4)).toBe(32); + }); + + it("treats invalid explicit --jobs as 1", () => { + expect(reviewJobs({ jobs: "abc" }, 8)).toBe(1); + expect(reviewJobs({ jobs: "0" }, 8)).toBe(1); + }); +}); diff --git a/src/rpm-limiter.test.ts b/src/rpm-limiter.test.ts new file mode 100644 index 0000000..2e02945 --- /dev/null +++ b/src/rpm-limiter.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, it } from "vitest"; +import { createRpmLimiter, defaultJobs, rpmFromFlag } from "./rpm-limiter.js"; + +type ScheduledTimeout = { handler: () => void; runAt: number }; + +function makeFakeClock() { + let nowMs = 0; + const pending: ScheduledTimeout[] = []; + return { + clock: { + now: () => nowMs, + setTimeout: (handler: () => void, ms: number) => { + pending.push({ handler, runAt: nowMs + ms }); + }, + }, + advance(ms: number): void { + nowMs += ms; + // Fire any timeouts due at or before the current time, in scheduled order. + while (true) { + const due = pending.findIndex((entry) => entry.runAt <= nowMs); + if (due === -1) { + return; + } + const [entry] = pending.splice(due, 1); + entry?.handler(); + } + }, + setNow(ms: number): void { + nowMs = ms; + }, + pending: () => pending.length, + }; +} + +describe("defaultJobs", () => { + it("returns floor(cores / 2) capped at 10", () => { + expect(defaultJobs(4)).toBe(2); + expect(defaultJobs(8)).toBe(4); + expect(defaultJobs(32)).toBe(10); + }); + + it("clamps to a minimum of 1", () => { + expect(defaultJobs(1)).toBe(1); + expect(defaultJobs(0)).toBe(1); + expect(defaultJobs(Number.NaN)).toBe(1); + }); +}); + +describe("rpmFromFlag", () => { + it("prefers explicit flag over env", () => { + expect(rpmFromFlag("30", "60")).toBe(30); + }); + + it("falls back to env when flag is missing", () => { + expect(rpmFromFlag(undefined, "45")).toBe(45); + }); + + it("returns undefined when neither is set", () => { + expect(rpmFromFlag(undefined, undefined)).toBeUndefined(); + expect(rpmFromFlag("", "")).toBeUndefined(); + }); + + it("returns undefined for invalid values", () => { + expect(rpmFromFlag("abc", undefined)).toBeUndefined(); + expect(rpmFromFlag("0", undefined)).toBeUndefined(); + expect(rpmFromFlag("-5", undefined)).toBeUndefined(); + }); +}); + +describe("createRpmLimiter", () => { + it("is a no-op when limit is undefined", async () => { + const limiter = createRpmLimiter(undefined); + for (let i = 0; i < 100; i += 1) { + await limiter.acquire(); + } + }); + + it("is a no-op when limit is invalid", async () => { + const limiter = createRpmLimiter(0); + await limiter.acquire(); + const limiter2 = createRpmLimiter(Number.NaN); + await limiter2.acquire(); + }); + + it("allows up to N starts in a 60s window without delay", async () => { + const fake = makeFakeClock(); + const limiter = createRpmLimiter(3, fake.clock); + await limiter.acquire(); + await limiter.acquire(); + await limiter.acquire(); + expect(fake.pending()).toBe(0); + }); + + it("delays the (N+1)th call until the oldest slot expires", async () => { + const fake = makeFakeClock(); + const limiter = createRpmLimiter(2, fake.clock); + await limiter.acquire(); + fake.advance(10_000); + await limiter.acquire(); + + let resolved = false; + const pending = limiter.acquire().then(() => { + resolved = true; + }); + + // Allow the chain to evaluate `step` and schedule its setTimeout. + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + // First slot was at t=0, window is 60s, so the next slot opens at t=60_000. + fake.advance(50_000); + await pending; + expect(resolved).toBe(true); + }); +}); diff --git a/src/rpm-limiter.ts b/src/rpm-limiter.ts new file mode 100644 index 0000000..f9cc6d9 --- /dev/null +++ b/src/rpm-limiter.ts @@ -0,0 +1,90 @@ +export type RpmLimiter = { + acquire(): Promise; +}; + +type Clock = { + now(): number; + setTimeout(handler: () => void, ms: number): void; +}; + +const defaultClock: Clock = { + now: () => Date.now(), + setTimeout: (handler, ms) => { + setTimeout(handler, ms); + }, +}; + +const noopLimiter: RpmLimiter = { + acquire: async () => {}, +}; + +export function createRpmLimiter( + limit: number | undefined, + clock: Clock = defaultClock, +): RpmLimiter { + if (limit === undefined || !Number.isFinite(limit) || limit < 1) { + return noopLimiter; + } + const max = Math.floor(limit); + const window = 60_000; + const starts: number[] = []; + let chain: Promise = Promise.resolve(); + + function pruneOlder(reference: number): void { + while (starts.length > 0) { + const head = starts[0]; + if (head === undefined || reference - head < window) { + return; + } + starts.shift(); + } + } + + async function step(): Promise { + const now = clock.now(); + pruneOlder(now); + if (starts.length < max) { + starts.push(now); + return; + } + const oldest = starts[0] ?? now; + const wait = window - (now - oldest); + await new Promise((resolveWait) => { + clock.setTimeout(resolveWait, Math.max(wait, 0)); + }); + const after = clock.now(); + pruneOlder(after); + starts.push(after); + } + + return { + acquire(): Promise { + const next = chain.then(step); + // Ensure rejections do not poison the chain for subsequent acquirers. + chain = next.catch(() => undefined); + return next; + }, + }; +} + +export function rpmFromFlag( + explicit: string | undefined, + envValue: string | undefined, +): number | undefined { + const raw = explicit ?? envValue; + if (raw === undefined || raw === "") { + return undefined; + } + const parsed = Number(raw); + if (!Number.isFinite(parsed) || parsed < 1) { + return undefined; + } + return Math.floor(parsed); +} + +export function defaultJobs(coreCount: number): number { + if (!Number.isFinite(coreCount) || coreCount < 1) { + return 1; + } + return Math.min(Math.max(Math.floor(coreCount / 2), 1), 10); +} diff --git a/src/workflow.test.ts b/src/workflow.test.ts index fc5c219..6f8cc02 100644 --- a/src/workflow.test.ts +++ b/src/workflow.test.ts @@ -333,6 +333,8 @@ describe("workflow", () => { "2", "--jobs", "1", + "--rate-limit-per-minute", + "30", "--output", "report.md", ]).flags, @@ -340,6 +342,7 @@ describe("workflow", () => { since: "origin/main", limit: "2", jobs: "1", + rateLimitPerMinute: "30", output: "report.md", }); expect(parseArgs(["ci", "--include-dirty"]).flags).toMatchObject({