From 9d40b5f028d868f6609a0b8ecbe509b61b63750d Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Thu, 18 Jun 2026 10:03:37 -0500 Subject: [PATCH 1/4] performance: Make sure all alarms are being set with allowUnconfirmed so we do not block on storage, and all states don't reach to storage on hot-paths Reaching storage would make container startups slower; let's give in on a more greedy approach where if storage were to fail we get an error later after a container has been potentially started. --- .changeset/quiet-starfishes-start.md | 5 + src/lib/container.ts | 152 ++++++++++++++++++++------- src/tests/container-race.test.ts | 25 ++--- src/tests/container.test.ts | 80 ++++++++++++-- src/tests/fixtures.ts | 12 ++- 5 files changed, 205 insertions(+), 69 deletions(-) create mode 100644 .changeset/quiet-starfishes-start.md diff --git a/.changeset/quiet-starfishes-start.md b/.changeset/quiet-starfishes-start.md new file mode 100644 index 0000000..f215f87 --- /dev/null +++ b/.changeset/quiet-starfishes-start.md @@ -0,0 +1,5 @@ +--- +"@cloudflare/containers": patch +--- + +Defer start-path storage and alarm setup until after the container start call. diff --git a/src/lib/container.ts b/src/lib/container.ts index b10439d..caf4eec 100644 --- a/src/lib/container.ts +++ b/src/lib/container.ts @@ -279,7 +279,7 @@ class ContainerState { async getState(): Promise { if (!this.status) { - const state = await this.storage.get(CONTAINER_STATE_KEY); + const state = await this.getStoredState(); if (!state) { this.status = { status: 'stopped', @@ -294,6 +294,17 @@ class ContainerState { return this.status!; } + async getStoredState(): Promise { + if (!this.status) { + const state = await this.storage.get(CONTAINER_STATE_KEY); + if (state) { + this.status = state; + } + } + + return this.status; + } + private async setStatusAndupdate(status: State['status']) { this.status = { status: status, lastChange: Date.now() }; await this.update(); @@ -301,10 +312,15 @@ class ContainerState { private async update() { if (!this.status) throw new Error('status should be init'); - await this.storage.put(CONTAINER_STATE_KEY, this.status); + await this.storage.put(CONTAINER_STATE_KEY, this.status, { allowUnconfirmed: true }); } } +type PendingStoppedEvent = { + params: StopParams; + state: State; +}; + type ContainerProxyOptions = { enableInternet?: boolean; containerId: string; @@ -539,10 +555,9 @@ export class Container extends DurableObject { const persistedOutboundConfiguration = this.restoreOutboundConfiguration(); this.ctx.blockConcurrencyWhile(async () => { - // First thing, schedule the next alarms. Also yields a microtask - // so subclass class-field initializers (e.g. `sleepAfter = "2h"`) + // Yield so subclass class-field initializers (e.g. `sleepAfter = "2h"`) // run before renewActivityTimeout reads `this.sleepAfter`. - await this.scheduleNextAlarm(); + await Promise.resolve(); this.renewActivityTimeout(); const ctor = this.constructor as typeof Container; @@ -559,6 +574,7 @@ export class Container extends DurableObject { if (this.container.running) { this.applyOutboundInterceptionPromise = this.applyOutboundInterception(); + await this.scheduleNextAlarm(); } }); @@ -571,20 +587,6 @@ export class Container extends DurableObject { if (options.entrypoint !== undefined) this.entrypoint = options.entrypoint; if (options.enableInternet !== undefined) this.enableInternet = options.enableInternet; } - - // Create schedules table if it doesn't exist - this.sql` - CREATE TABLE IF NOT EXISTS container_schedules ( - id TEXT PRIMARY KEY NOT NULL DEFAULT (randomblob(9)), - callback TEXT NOT NULL, - payload TEXT, - type TEXT NOT NULL CHECK(type IN ('scheduled', 'delayed')), - time INTEGER NOT NULL, - delayInSeconds INTEGER, - created_at INTEGER DEFAULT (unixepoch()) - ) - `; - if (this.container.running) { this.monitor = this.container.monitor(); this.setupMonitorCallbacks(); @@ -852,9 +854,6 @@ export class Container extends DurableObject { // Determine which ports to check const portsToCheck = await this.getPortsToCheck(ports); - // trigger all onStop that we didn't do yet - await this.syncPendingStoppedEvents(); - // Prepare to start the container resolvedCancellationOptions ??= {}; const containerGetTimeout = @@ -1084,6 +1083,7 @@ export class Container extends DurableObject { callback: string, payload?: T ): Promise> { + this.ensureSchedulesTable(); const id = generateId(9); // Ensure the callback is a string (method name) @@ -1173,8 +1173,13 @@ export class Container extends DurableObject { portParam ); - const state = await this.state.getState(); - if (!this.container.running || state.status !== 'healthy') { + let isReady = false; + if (this.container.running) { + const state = await this.state.getState(); + isReady = state.status === 'healthy'; + } + + if (!isReady) { try { await this.startAndWaitForPorts(port, { abort: request.signal }); } catch (e) { @@ -1355,6 +1360,7 @@ export class Container extends DurableObject { private onStopCalled = false; private state: ContainerState; private monitor: Promise | undefined; + private schedulesTableInitialized = false; // Coalesces concurrent calls to startContainerIfNotRunning so we never // call `this.container.start()` twice. Without this guard, two requests @@ -1364,6 +1370,7 @@ export class Container extends DurableObject { // throw "start() cannot be called on a container that is already running." // See https://github.com/cloudflare/containers/issues/173. private startInFlight: Promise | undefined; + private pendingStoppedEvent: PendingStoppedEvent | undefined; private monitoredPromise: Promise | undefined; @@ -1528,12 +1535,14 @@ export class Container extends DurableObject { return [...hosts]; } - private async refreshOutboundInterception(): Promise { + private async refreshOutboundInterception( + options: { persistOutboundConfiguration?: boolean } = {} + ): Promise { if (!this.usingInterception) { return; } - this.applyOutboundInterceptionPromise = this.applyOutboundInterception(); + this.applyOutboundInterceptionPromise = this.applyOutboundInterception(options); await this.applyOutboundInterceptionPromise; } @@ -1550,7 +1559,9 @@ export class Container extends DurableObject { * - Intercept-all mode: `interceptOutboundHttps('*', ...)` for all HTTPS traffic * - Per-host mode: `interceptOutboundHttps(host, ...)` for each known host */ - private async applyOutboundInterception(): Promise { + private async applyOutboundInterception( + options: { persistOutboundConfiguration?: boolean } = {} + ): Promise { const ctx = this.ctx as unknown as { exports?: { ContainerProxy?: (params: { props: Record }) => Fetcher }; }; @@ -1573,7 +1584,9 @@ export class Container extends DurableObject { } const outboundConfiguration = this.getOutboundConfiguration(); - this.persistOutboundConfiguration(outboundConfiguration); + if (options.persistOutboundConfiguration !== false) { + this.persistOutboundConfiguration(outboundConfiguration); + } const hosts = this.getHostsToIntercept(); @@ -1625,6 +1638,25 @@ export class Container extends DurableObject { /** * Execute SQL queries against the Container's database */ + private ensureSchedulesTable(): void { + if (this.schedulesTableInitialized) { + return; + } + + this.sql` + CREATE TABLE IF NOT EXISTS container_schedules ( + id TEXT PRIMARY KEY NOT NULL DEFAULT (randomblob(9)), + callback TEXT NOT NULL, + payload TEXT, + type TEXT NOT NULL CHECK(type IN ('scheduled', 'delayed')), + time INTEGER NOT NULL, + delayInSeconds INTEGER, + created_at INTEGER DEFAULT (unixepoch()) + ) + `; + this.schedulesTableInitialized = true; + } + private sql>( strings: TemplateStringsArray, ...values: (string | number | boolean | null)[] @@ -1736,6 +1768,7 @@ export class Container extends DurableObject { if (this.startInFlight === startPromise) { this.startInFlight = undefined; } + await this.drainPendingStoppedEvent(); } } @@ -1807,13 +1840,16 @@ export class Container extends DurableObject { await handleError(); } - await this.scheduleNextAlarm(); - - if (!this.container.running) { - await this.refreshOutboundInterception(); + const containerWasRunning = this.container.running; + if (!containerWasRunning) { + if (this.usingInterception) { + await this.refreshOutboundInterception({ persistOutboundConfiguration: false }); + } this.container.start(startConfig); this.monitor = this.container.monitor(); + await this.capturePendingStoppedEvent(containerWasRunning); await this.state.setRunning(); + await this.scheduleNextAlarm(); } else { await this.scheduleNextAlarm(); } @@ -1944,6 +1980,7 @@ export class Container extends DurableObject { } deleteSchedules(name: string): void { + this.ensureSchedulesTable(); this.sql`DELETE FROM container_schedules WHERE callback = ${name}`; } @@ -1957,6 +1994,8 @@ export class Container extends DurableObject { */ override async alarm(alarmProps?: AlarmInvocationInfo): Promise { + this.ensureSchedulesTable(); + if ( alarmProps !== undefined && alarmProps.isRetry && @@ -1976,8 +2015,7 @@ export class Container extends DurableObject { // 1. The container is not running anymore. // 2. Activity expired and it exits. const prevAlarm = Date.now(); - await this.ctx.storage.setAlarm(prevAlarm); - await this.ctx.storage.sync(); + await this.ctx.storage.setAlarm(prevAlarm, { allowUnconfirmed: true }); // Get all schedules that should be executed now const result = this.sql<{ @@ -2038,9 +2076,9 @@ export class Container extends DurableObject { await this.syncPendingStoppedEvents(); if (resultForMinTime.length == 0) { - await this.ctx.storage.deleteAlarm(); + await this.ctx.storage.deleteAlarm({ allowUnconfirmed: true }); } else { - await this.ctx.storage.setAlarm(minTimeFromSchedules); + await this.ctx.storage.setAlarm(minTimeFromSchedules, { allowUnconfirmed: true }); } return; @@ -2071,7 +2109,7 @@ export class Container extends DurableObject { }, timeout); }); - await this.ctx.storage.setAlarm(Date.now()); + await this.ctx.storage.setAlarm(Date.now(), { allowUnconfirmed: true }); // we exit and we have another alarm, // the next alarm is the one that decides if it should stop the loop. @@ -2094,6 +2132,39 @@ export class Container extends DurableObject { } } + private async capturePendingStoppedEvent(containerWasRunning: boolean): Promise { + if (containerWasRunning) { + return; + } + + const state = await this.state.getStoredState(); + if (!state) { + return; + } + + if (state.status === 'healthy' || state.status === 'running') { + this.pendingStoppedEvent = { params: { exitCode: 0, reason: 'exit' }, state }; + return; + } + + if (state.status === 'stopped_with_code') { + this.pendingStoppedEvent = { + params: { exitCode: state.exitCode ?? 0, reason: 'exit' }, + state, + }; + } + } + + private async drainPendingStoppedEvent(): Promise { + if (!this.pendingStoppedEvent) { + return; + } + + const event = this.pendingStoppedEvent; + this.pendingStoppedEvent = undefined; + await this.callOnStop(event.params, event.state); + } + private async callOnStop(onStopParams: StopParams, stateBeforeOnStop: State) { if (this.onStopCalled) { return; @@ -2124,11 +2195,12 @@ export class Container extends DurableObject { clearTimeout(this.timeout); } - await this.ctx.storage.setAlarm(nextTime); - await this.ctx.storage.sync(); + await this.ctx.storage.setAlarm(nextTime, { allowUnconfirmed: true }); } async listSchedules(name: string): Promise[]> { + this.ensureSchedulesTable(); + const result = this.sql` SELECT * FROM container_schedules WHERE callback = ${name} LIMIT 1 `; @@ -2176,6 +2248,8 @@ export class Container extends DurableObject { * @returns The Schedule object or undefined if not found */ async getSchedule(id: string): Promise | undefined> { + this.ensureSchedulesTable(); + const result = this.sql` SELECT * FROM container_schedules WHERE id = ${id} LIMIT 1 `; diff --git a/src/tests/container-race.test.ts b/src/tests/container-race.test.ts index c410587..ed823e9 100644 --- a/src/tests/container-race.test.ts +++ b/src/tests/container-race.test.ts @@ -5,12 +5,12 @@ // via JSG_REQUIRE(!running, ...). It guards against the JS-visible `running` // flag being true. // -// The race in @cloudflare/containers happens because the readiness path in -// containerFetch -> startAndWaitForPorts -> startContainerIfNotRunning has -// multiple `await` points BEFORE the synchronous `this.container.start(...)` -// call. Each `await` yields the DO input gate, allowing two concurrent -// fetches to both pass the `if (this.container.running) return 0;` early -// exit. Whichever calls start() second hits the workerd JSG_REQUIRE. +// The original race in @cloudflare/containers happened because the readiness +// path had multiple `await` points before the synchronous +// `this.container.start(...)` call. Each `await` yielded the DO input gate, +// allowing two concurrent fetches to both pass the `if (this.container.running) +// return 0;` early exit. Whichever called start() second hit the workerd +// JSG_REQUIRE. // // This test reproduces the race deterministically: it relies on the // natural microtask scheduling that `Promise.all([fetchA, fetchB])` @@ -38,16 +38,9 @@ describe('Container concurrent-start race (issue #173)', () => { const reqA = new Request('https://example.com/a'); const reqB = new Request('https://example.com/b'); - // Fire both concurrently. Both will: - // 1. await this.state.getState() -> microtask yield - // 2. observe container.running === false - // 3. enter startContainerIfNotRunning - // 4. await this.state.setRunning() -> microtask yield - // 5. await this.scheduleNextAlarm() -> microtask yield - // 6. call this.container.start(...) synchronously - // - // The second caller to reach step 6 sees running === true (set by the - // first caller) and the workerd guard throws. + // Fire both concurrently. Both callers enter the readiness path, but the + // in-flight start guard makes the second caller join the first start + // attempt instead of calling `this.container.start(...)` itself. const [resA, resB] = await Promise.all([ container.containerFetch(reqA), container.containerFetch(reqB), diff --git a/src/tests/container.test.ts b/src/tests/container.test.ts index c2e603c..54723b2 100644 --- a/src/tests/container.test.ts +++ b/src/tests/container.test.ts @@ -100,7 +100,8 @@ describe('Container', () => { expect(onErrorSpy).toHaveBeenCalled(); expect(mockCtx.storage.put).toHaveBeenCalledWith( '__CF_CONTAINER_STATE', - expect.objectContaining({ status: 'stopped' }) + expect.objectContaining({ status: 'stopped' }), + { allowUnconfirmed: true } ); }); @@ -129,7 +130,8 @@ describe('Container', () => { expect(mockCtx.abort).toHaveBeenCalled(); expect(mockCtx.storage.put).toHaveBeenCalledWith( '__CF_CONTAINER_STATE', - expect.objectContaining({ status: 'stopped' }) + expect.objectContaining({ status: 'stopped' }), + { allowUnconfirmed: true } ); }); @@ -152,7 +154,8 @@ describe('Container', () => { await vi.waitFor(() => { expect(mockCtx.storage.put).toHaveBeenCalledWith( '__CF_CONTAINER_STATE', - expect.objectContaining({ status: 'stopped' }) + expect.objectContaining({ status: 'stopped' }), + { allowUnconfirmed: true } ); }); }); @@ -175,7 +178,8 @@ describe('Container', () => { await vi.waitFor(() => { expect(mockCtx.storage.put).toHaveBeenCalledWith( '__CF_CONTAINER_STATE', - expect.objectContaining({ status: 'stopped' }) + expect.objectContaining({ status: 'stopped' }), + { allowUnconfirmed: true } ); expect(onErrorSpy).toHaveBeenCalledWith( expect.objectContaining({ message: 'container supervisor failed' }) @@ -205,7 +209,8 @@ describe('Container', () => { expect(mockCtx.storage.put).not.toHaveBeenCalledWith( '__CF_CONTAINER_STATE', - expect.objectContaining({ status: 'stopped_with_code' }) + expect.objectContaining({ status: 'stopped_with_code' }), + { allowUnconfirmed: true } ); }); @@ -221,6 +226,52 @@ describe('Container', () => { expect(mockCtx.container.getTcpPort).toHaveBeenCalledWith(33); }); + test('start should call container.start before storage and alarm writes', async ({ + mockCtx, + container, + }) => { + vi.mocked(mockCtx.storage.get).mockClear(); + vi.mocked(mockCtx.storage.put).mockClear(); + vi.mocked(mockCtx.storage.setAlarm).mockClear(); + vi.mocked(mockCtx.storage.sync).mockClear(); + vi.mocked(mockCtx.storage.sql.exec).mockClear(); + + await container.start(undefined, { portToCheck: 8080, retries: 1, waitInterval: 1 }); + + const startOrder = mockCtx.container.start.mock.invocationCallOrder[0]; + expect(startOrder).toBeDefined(); + expect(mockCtx.storage.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.put.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.setAlarm.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.sql.exec).not.toHaveBeenCalled(); + expect(mockCtx.storage.sync).not.toHaveBeenCalled(); + expect(mockCtx.storage.put).toHaveBeenCalledWith( + '__CF_CONTAINER_STATE', + expect.objectContaining({ status: 'running' }), + { allowUnconfirmed: true } + ); + expect(mockCtx.storage.setAlarm).toHaveBeenCalledWith(expect.any(Number), { + allowUnconfirmed: true, + }); + }); + + test('containerFetch should start before reading state on cold start', async ({ + mockCtx, + container, + }) => { + vi.mocked(mockCtx.storage.get).mockClear(); + vi.mocked(mockCtx.storage.put).mockClear(); + vi.mocked(mockCtx.storage.setAlarm).mockClear(); + + await container.containerFetch(new Request('https://example.com/test')); + + const startOrder = mockCtx.container.start.mock.invocationCallOrder[0]; + expect(startOrder).toBeDefined(); + expect(mockCtx.storage.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.put.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.setAlarm.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + }); + test('alarm should not stop a container while its start loop is in flight', async ({ mockCtx, container, @@ -229,9 +280,17 @@ describe('Container', () => { const startBlockedBeforePhysicalStart = new Promise(resolve => { resumeStart = resolve; }); - vi.spyOn(container, 'scheduleNextAlarm').mockImplementationOnce( - () => startBlockedBeforePhysicalStart - ); + container.usingInterception = true; + const refreshSpy = vi + .spyOn( + container as unknown as { + refreshOutboundInterception(options?: { + persistOutboundConfiguration?: boolean; + }): Promise; + }, + 'refreshOutboundInterception' + ) + .mockImplementationOnce(() => startBlockedBeforePhysicalStart); using onStopSpy = vi.spyOn(container, 'onStop'); const startPromise = container.start(undefined, { @@ -240,7 +299,7 @@ describe('Container', () => { waitInterval: 1, }); await vi.waitFor(() => { - expect(container.scheduleNextAlarm).toHaveBeenCalled(); + expect(refreshSpy).toHaveBeenCalled(); }); expect(mockCtx.container.start).not.toHaveBeenCalled(); @@ -268,7 +327,8 @@ describe('Container', () => { expect(onStopSpy).toHaveBeenCalledWith({ exitCode: 0, reason: 'exit' }); expect(mockCtx.storage.put).toHaveBeenCalledWith( '__CF_CONTAINER_STATE', - expect.objectContaining({ status: 'stopped' }) + expect.objectContaining({ status: 'stopped' }), + { allowUnconfirmed: true } ); }); diff --git a/src/tests/fixtures.ts b/src/tests/fixtures.ts index c84d6d8..13220db 100644 --- a/src/tests/fixtures.ts +++ b/src/tests/fixtures.ts @@ -33,10 +33,14 @@ export function makeMockCtx() { const ctx = { storage: { get: vi.fn<(key: string) => Promise>(), - put: vi.fn<(key: string, value: unknown) => Promise>().mockResolvedValue(undefined), - delete: vi.fn<(key: string) => Promise>().mockResolvedValue(true), - setAlarm: vi.fn<(scheduledTime: number) => Promise>().mockResolvedValue(undefined), - deleteAlarm: vi.fn<() => Promise>().mockResolvedValue(undefined), + put: vi + .fn<(key: string, value: unknown, options?: unknown) => Promise>() + .mockResolvedValue(undefined), + delete: vi.fn<(key: string, options?: unknown) => Promise>().mockResolvedValue(true), + setAlarm: vi + .fn<(scheduledTime: number, options?: unknown) => Promise>() + .mockResolvedValue(undefined), + deleteAlarm: vi.fn<(options?: unknown) => Promise>().mockResolvedValue(undefined), sync: vi.fn<() => Promise>().mockResolvedValue(undefined), kv: { get: vi.fn<(key: string) => Promise>(), From b529c3707cae138396709bdefdba87135aa06323 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Thu, 18 Jun 2026 10:09:32 -0500 Subject: [PATCH 2/4] ci: fix bonk --- .github/workflows/bonk.yml | 2 ++ .github/workflows/new-pr-review.yml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/.github/workflows/bonk.yml b/.github/workflows/bonk.yml index 4346a78..87786e4 100644 --- a/.github/workflows/bonk.yml +++ b/.github/workflows/bonk.yml @@ -36,6 +36,8 @@ jobs: CLOUDFLARE_API_TOKEN: ${{ secrets.CF_AI_GATEWAY_TOKEN }} with: model: 'cloudflare-ai-gateway/anthropic/claude-opus-4-7' + # Creds are broken without pinning opencode. + opencode_version: "1.17.7" mentions: '/bonk,@ask-bonk' permissions: write # token_permissions defaults to WRITE (i.e. Bonk can push commits). diff --git a/.github/workflows/new-pr-review.yml b/.github/workflows/new-pr-review.yml index 78f5274..78c6dd3 100644 --- a/.github/workflows/new-pr-review.yml +++ b/.github/workflows/new-pr-review.yml @@ -51,6 +51,8 @@ jobs: CLOUDFLARE_API_TOKEN: ${{ secrets.CF_AI_GATEWAY_TOKEN }} with: model: 'cloudflare-ai-gateway/anthropic/claude-opus-4-7' + # Creds are broken without pinning opencode. + opencode_version: "1.17.7" forks: 'false' permissions: write # The auto-reviewer must never push to PR branches. The prompt From c39e533e665c9d783f3cf35a5632c0c105d54f3f Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Thu, 18 Jun 2026 10:40:58 -0500 Subject: [PATCH 3/4] perf: get rid of any storage operation before calling start() --- src/lib/container.ts | 78 ++++++++++++++++++++----------------- src/tests/container.test.ts | 61 ++++++++++++++++++++++++++++- src/tests/fixtures.ts | 6 +-- 3 files changed, 105 insertions(+), 40 deletions(-) diff --git a/src/lib/container.ts b/src/lib/container.ts index caf4eec..9297cd3 100644 --- a/src/lib/container.ts +++ b/src/lib/container.ts @@ -551,34 +551,9 @@ export class Container extends DurableObject { ); } + this.container = ctx.container; this.state = new ContainerState(this.ctx.storage); - const persistedOutboundConfiguration = this.restoreOutboundConfiguration(); - this.ctx.blockConcurrencyWhile(async () => { - // Yield so subclass class-field initializers (e.g. `sleepAfter = "2h"`) - // run before renewActivityTimeout reads `this.sleepAfter`. - await Promise.resolve(); - this.renewActivityTimeout(); - - const ctor = this.constructor as typeof Container; - if ( - persistedOutboundConfiguration !== undefined || - ctor.outboundByHost !== undefined || - ctor.outbound !== undefined || - ctor.outboundHandlers !== undefined || - this.effectiveAllowedHosts !== undefined || - this.effectiveDeniedHosts !== undefined - ) { - this.usingInterception = true; - } - - if (this.container.running) { - this.applyOutboundInterceptionPromise = this.applyOutboundInterception(); - await this.scheduleNextAlarm(); - } - }); - - this.container = ctx.container; // Apply options if provided if (options) { if (options.defaultPort !== undefined) this.defaultPort = options.defaultPort; @@ -587,6 +562,10 @@ export class Container extends DurableObject { if (options.entrypoint !== undefined) this.entrypoint = options.entrypoint; if (options.enableInternet !== undefined) this.enableInternet = options.enableInternet; } + + this.renewActivityTimeout(); + this.updateUsingInterception(); + if (this.container.running) { this.monitor = this.container.monitor(); this.setupMonitorCallbacks(); @@ -803,9 +782,7 @@ export class Container extends DurableObject { this.setupMonitorCallbacks(); // TODO: We should consider an onHealthy callback - await this.ctx.blockConcurrencyWhile(async () => { - await this.onStart(); - }); + await this.onStart(); } /** @@ -852,7 +829,7 @@ export class Container extends DurableObject { } // Determine which ports to check - const portsToCheck = await this.getPortsToCheck(ports); + const portsToCheck = this.getPortsToCheck(ports); // Prepare to start the container resolvedCancellationOptions ??= {}; @@ -889,11 +866,9 @@ export class Container extends DurableObject { this.setupMonitorCallbacks(); - await this.ctx.blockConcurrencyWhile(async () => { - // All ports are ready - await this.state.setHealthy(); - await this.onStart(); - }); + // All ports are ready + await this.state.setHealthy(); + await this.onStart(); } /** @@ -1388,6 +1363,7 @@ export class Container extends DurableObject { // The runtime does not expose a way to remove outbound interceptions yet, so // once we promote an instance to intercept-all we must keep using it. private hasInterceptAllRegistration = false; + private outboundConfigurationRestored = false; // ========================== // GENERAL HELPERS @@ -1427,6 +1403,22 @@ export class Container extends DurableObject { }; } + private updateUsingInterception( + persistedOutboundConfiguration?: PersistedOutboundConfiguration + ): void { + const ctor = this.constructor as typeof Container; + if ( + persistedOutboundConfiguration !== undefined || + ctor.outboundByHost !== undefined || + ctor.outbound !== undefined || + ctor.outboundHandlers !== undefined || + this.effectiveAllowedHosts !== undefined || + this.effectiveDeniedHosts !== undefined + ) { + this.usingInterception = true; + } + } + private persistOutboundConfiguration(configuration: PersistedOutboundConfiguration): void { this.ctx.storage.kv.put(OUTBOUND_CONFIGURATION_KEY, { ...configuration, @@ -1435,6 +1427,15 @@ export class Container extends DurableObject { }); } + private restoreOutboundConfigurationOnce(): PersistedOutboundConfiguration | undefined { + if (this.outboundConfigurationRestored) { + return undefined; + } + + this.outboundConfigurationRestored = true; + return this.restoreOutboundConfiguration(); + } + private restoreOutboundConfiguration(): PersistedOutboundConfiguration | undefined { const configuration = this.ctx.storage.kv.get( OUTBOUND_CONFIGURATION_KEY @@ -1714,7 +1715,7 @@ export class Container extends DurableObject { * 3. `defaultPort` (if neither of the above is specified) * 4. Falls back to port 33 if none of the above are set */ - private async getPortsToCheck(overridePorts?: number | number[]) { + private getPortsToCheck(overridePorts?: number | number[]) { if (overridePorts !== undefined) { // Use explicitly provided ports (single port or array) return Array.isArray(overridePorts) ? overridePorts : [overridePorts]; @@ -1847,6 +1848,11 @@ export class Container extends DurableObject { } this.container.start(startConfig); this.monitor = this.container.monitor(); + const persistedOutboundConfiguration = this.restoreOutboundConfigurationOnce(); + this.updateUsingInterception(persistedOutboundConfiguration); + if (this.usingInterception) { + await this.refreshOutboundInterception(); + } await this.capturePendingStoppedEvent(containerWasRunning); await this.state.setRunning(); await this.scheduleNextAlarm(); diff --git a/src/tests/container.test.ts b/src/tests/container.test.ts index 54723b2..5816cd6 100644 --- a/src/tests/container.test.ts +++ b/src/tests/container.test.ts @@ -1,7 +1,7 @@ import { describe, expect, test as baseTest, vi } from 'vitest'; import { Container } from '../lib/container'; import { getRandom } from '../lib/utils'; -import { MockWebSocket, test, webSocketPairSpy } from './fixtures'; +import { makeMockCtx, MockWebSocket, test, webSocketPairSpy } from './fixtures'; describe('Container', () => { test('should initialize with default values', ({ container }) => { @@ -234,6 +234,8 @@ describe('Container', () => { vi.mocked(mockCtx.storage.put).mockClear(); vi.mocked(mockCtx.storage.setAlarm).mockClear(); vi.mocked(mockCtx.storage.sync).mockClear(); + vi.mocked(mockCtx.storage.kv.get).mockClear(); + vi.mocked(mockCtx.storage.kv.put).mockClear(); vi.mocked(mockCtx.storage.sql.exec).mockClear(); await container.start(undefined, { portToCheck: 8080, retries: 1, waitInterval: 1 }); @@ -243,6 +245,8 @@ describe('Container', () => { expect(mockCtx.storage.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); expect(mockCtx.storage.put.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); expect(mockCtx.storage.setAlarm.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.kv.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.kv.put).not.toHaveBeenCalled(); expect(mockCtx.storage.sql.exec).not.toHaveBeenCalled(); expect(mockCtx.storage.sync).not.toHaveBeenCalled(); expect(mockCtx.storage.put).toHaveBeenCalledWith( @@ -262,6 +266,7 @@ describe('Container', () => { vi.mocked(mockCtx.storage.get).mockClear(); vi.mocked(mockCtx.storage.put).mockClear(); vi.mocked(mockCtx.storage.setAlarm).mockClear(); + vi.mocked(mockCtx.storage.kv.get).mockClear(); await container.containerFetch(new Request('https://example.com/test')); @@ -270,6 +275,60 @@ describe('Container', () => { expect(mockCtx.storage.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); expect(mockCtx.storage.put.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); expect(mockCtx.storage.setAlarm.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.kv.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + }); + + test('cold start should not touch storage before container.start', async () => { + const mockCtx = makeMockCtx(); + let startCalled = false; + const assertStarted = () => { + if (!startCalled) { + throw new Error('storage called before container.start'); + } + }; + + mockCtx.container.start.mockImplementation(() => { + startCalled = true; + mockCtx.container.running = true; + }); + mockCtx.storage.get.mockImplementation(async () => { + assertStarted(); + return undefined; + }); + mockCtx.storage.put.mockImplementation(async () => { + assertStarted(); + }); + mockCtx.storage.setAlarm.mockImplementation(async () => { + assertStarted(); + }); + mockCtx.storage.deleteAlarm.mockImplementation(async () => { + assertStarted(); + }); + mockCtx.storage.sync.mockImplementation(async () => { + assertStarted(); + }); + mockCtx.storage.kv.get.mockImplementation(() => { + assertStarted(); + return undefined; + }); + mockCtx.storage.kv.put.mockImplementation(() => { + assertStarted(); + }); + mockCtx.storage.sql.exec.mockImplementation(() => { + assertStarted(); + return []; + }); + mockCtx.blockConcurrencyWhile.mockImplementation(async fn => { + assertStarted(); + return fn(); + }); + + const container = new Container(mockCtx as never, {}); + container.defaultPort = 8080; + + await container.start(undefined, { portToCheck: 8080, retries: 1, waitInterval: 1 }); + + expect(mockCtx.container.start).toHaveBeenCalledOnce(); }); test('alarm should not stop a container while its start loop is in flight', async ({ diff --git a/src/tests/fixtures.ts b/src/tests/fixtures.ts index 13220db..e737469 100644 --- a/src/tests/fixtures.ts +++ b/src/tests/fixtures.ts @@ -43,9 +43,9 @@ export function makeMockCtx() { deleteAlarm: vi.fn<(options?: unknown) => Promise>().mockResolvedValue(undefined), sync: vi.fn<() => Promise>().mockResolvedValue(undefined), kv: { - get: vi.fn<(key: string) => Promise>(), - put: vi.fn<(key: string, value: unknown) => Promise>().mockResolvedValue(undefined), - delete: vi.fn<(key: string) => Promise>().mockResolvedValue(true), + get: vi.fn<(key: string) => unknown>(), + put: vi.fn<(key: string, value: unknown) => void>(), + delete: vi.fn<(key: string) => boolean>().mockReturnValue(true), }, sql: { exec: vi.fn<(query: string) => unknown[]>().mockReturnValue([]), From 0c4b4ed496c9d43308cfa4dbb92f00d1f13f88a8 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Fri, 19 Jun 2026 09:55:06 -0500 Subject: [PATCH 4/4] simplify outbound setups, do not use storage for outboud hooks, document new trade-offs this means we need to bump to 0.4.0 as there is some breaking changes in behaviour. --- .changeset/quiet-starfishes-start.md | 6 +- docs/egress.md | 8 +- src/lib/container.ts | 133 ++------------------------- src/tests/container.test.ts | 29 ++++-- 4 files changed, 42 insertions(+), 134 deletions(-) diff --git a/.changeset/quiet-starfishes-start.md b/.changeset/quiet-starfishes-start.md index f215f87..cd780d4 100644 --- a/.changeset/quiet-starfishes-start.md +++ b/.changeset/quiet-starfishes-start.md @@ -1,5 +1,9 @@ --- -"@cloudflare/containers": patch +"@cloudflare/containers": minor --- Defer start-path storage and alarm setup until after the container start call. + +`onStart()` no longer runs under `blockConcurrencyWhile()`. Previously, `start()` and `startAndWaitForPorts()` queued incoming requests until `onStart()` completed. Now, awaits inside user-supplied `onStart()` handlers can yield the input gate, so other requests may be served before `onStart()` completes. + +Runtime outbound configuration set with methods like `setOutboundByHost()`, `setOutboundHandler()`, `setAllowedHosts()`, and `setDeniedHosts()` is no longer persisted to Durable Object storage or restored after Durable Object hydration/container restart. New container starts use the class's static outbound configuration defaults until runtime configuration is set again. diff --git a/docs/egress.md b/docs/egress.md index 60b93d4..843cf46 100644 --- a/docs/egress.md +++ b/docs/egress.md @@ -188,7 +188,13 @@ MyContainer.outboundHandlers = { ## Runtime methods These methods modify the outbound configuration of a running container -instance. Changes are persisted across Durable Object restarts. +instance. Per-instance runtime egress configuration is not persisted by the +`Container` class. Once configured, it remains active for the current running +container and will not be lost until that container exits. + +If you need runtime egress configuration to survive a container exit, persist +your desired configuration in your own storage and reapply it with the +`onStart()` hook or in the constructor. | Method | Description | | ---------------------------------------------- | ------------------------------------------------------------ | diff --git a/src/lib/container.ts b/src/lib/container.ts index 9297cd3..e128466 100644 --- a/src/lib/container.ts +++ b/src/lib/container.ts @@ -26,7 +26,6 @@ const RUNTIME_SIGNALLED_ERROR = 'runtime signalled the container to exit:'; const UNEXPECTED_EXIT_ERROR = 'container exited with unexpected exit code:'; const NOT_LISTENING_ERROR = 'the container is not listening'; const CONTAINER_STATE_KEY = '__CF_CONTAINER_STATE'; -const OUTBOUND_CONFIGURATION_KEY = 'OUTBOUND_CONFIGURATION'; // maxRetries before scheduling next alarm is purposely set to 3, // as according to DO docs at https://developers.cloudflare.com/durable-objects/api/alarms/ @@ -316,11 +315,6 @@ class ContainerState { } } -type PendingStoppedEvent = { - params: StopParams; - state: State; -}; - type ContainerProxyOptions = { enableInternet?: boolean; containerId: string; @@ -334,7 +328,7 @@ type ContainerProxyOptions = { interceptAll?: boolean; }; -type PersistedOutboundConfiguration = Pick< +type OutboundConfiguration = Pick< ContainerProxyOptions, 'outboundByHostOverrides' | 'outboundHandlerOverride' | 'allowedHosts' | 'deniedHosts' > & { @@ -1345,7 +1339,6 @@ export class Container extends DurableObject { // throw "start() cannot be called on a container that is already running." // See https://github.com/cloudflare/containers/issues/173. private startInFlight: Promise | undefined; - private pendingStoppedEvent: PendingStoppedEvent | undefined; private monitoredPromise: Promise | undefined; @@ -1363,8 +1356,6 @@ export class Container extends DurableObject { // The runtime does not expose a way to remove outbound interceptions yet, so // once we promote an instance to intercept-all we must keep using it. private hasInterceptAllRegistration = false; - private outboundConfigurationRestored = false; - // ========================== // GENERAL HELPERS // ========================== @@ -1390,7 +1381,7 @@ export class Container extends DurableObject { return this.deniedHostsOverride ?? this.deniedHosts; } - private getOutboundConfiguration(): PersistedOutboundConfiguration { + private getOutboundConfiguration(): OutboundConfiguration { return { outboundByHostOverrides: Object.keys(this.outboundByHostOverrides).length > 0 @@ -1404,11 +1395,11 @@ export class Container extends DurableObject { } private updateUsingInterception( - persistedOutboundConfiguration?: PersistedOutboundConfiguration + outboundConfiguration?: OutboundConfiguration ): void { const ctor = this.constructor as typeof Container; if ( - persistedOutboundConfiguration !== undefined || + outboundConfiguration !== undefined || ctor.outboundByHost !== undefined || ctor.outbound !== undefined || ctor.outboundHandlers !== undefined || @@ -1419,67 +1410,6 @@ export class Container extends DurableObject { } } - private persistOutboundConfiguration(configuration: PersistedOutboundConfiguration): void { - this.ctx.storage.kv.put(OUTBOUND_CONFIGURATION_KEY, { - ...configuration, - allowedHosts: this.allowedHostsOverride, - deniedHosts: this.deniedHostsOverride, - }); - } - - private restoreOutboundConfigurationOnce(): PersistedOutboundConfiguration | undefined { - if (this.outboundConfigurationRestored) { - return undefined; - } - - this.outboundConfigurationRestored = true; - return this.restoreOutboundConfiguration(); - } - - private restoreOutboundConfiguration(): PersistedOutboundConfiguration | undefined { - const configuration = this.ctx.storage.kv.get( - OUTBOUND_CONFIGURATION_KEY - ); - - if (!configuration) { - return undefined; - } - - this.outboundHandlerOverride = undefined; - if (configuration.outboundHandlerOverride !== undefined) { - try { - this.validateOutboundHandlerMethodName(configuration.outboundHandlerOverride.method); - this.outboundHandlerOverride = configuration.outboundHandlerOverride; - } catch (error) { - console.warn('Ignoring invalid persisted outbound handler override:', error); - } - } - - this.outboundByHostOverrides = {}; - for (const [hostname, override] of Object.entries( - configuration.outboundByHostOverrides ?? {} - )) { - try { - this.validateOutboundHandlerMethodName(override.method); - this.outboundByHostOverrides[hostname] = override; - } catch (error) { - console.warn(`Ignoring invalid persisted outbound override for ${hostname}:`, error); - } - } - - this.hasInterceptAllRegistration = configuration.hasInterceptAllRegistration === true; - - if (configuration.allowedHosts) { - this.allowedHostsOverride = configuration.allowedHosts; - } - - if (configuration.deniedHosts) { - this.deniedHostsOverride = configuration.deniedHosts; - } - - return this.getOutboundConfiguration(); - } - /** * Returns true if a catch-all outbound HTTP interception is needed. * This is the case when a static `outbound` handler or a runtime @@ -1536,14 +1466,12 @@ export class Container extends DurableObject { return [...hosts]; } - private async refreshOutboundInterception( - options: { persistOutboundConfiguration?: boolean } = {} - ): Promise { + private async refreshOutboundInterception(): Promise { if (!this.usingInterception) { return; } - this.applyOutboundInterceptionPromise = this.applyOutboundInterception(options); + this.applyOutboundInterceptionPromise = this.applyOutboundInterception(); await this.applyOutboundInterceptionPromise; } @@ -1560,9 +1488,7 @@ export class Container extends DurableObject { * - Intercept-all mode: `interceptOutboundHttps('*', ...)` for all HTTPS traffic * - Per-host mode: `interceptOutboundHttps(host, ...)` for each known host */ - private async applyOutboundInterception( - options: { persistOutboundConfiguration?: boolean } = {} - ): Promise { + private async applyOutboundInterception(): Promise { const ctx = this.ctx as unknown as { exports?: { ContainerProxy?: (params: { props: Record }) => Fetcher }; }; @@ -1585,9 +1511,6 @@ export class Container extends DurableObject { } const outboundConfiguration = this.getOutboundConfiguration(); - if (options.persistOutboundConfiguration !== false) { - this.persistOutboundConfiguration(outboundConfiguration); - } const hosts = this.getHostsToIntercept(); @@ -1769,7 +1692,6 @@ export class Container extends DurableObject { if (this.startInFlight === startPromise) { this.startInFlight = undefined; } - await this.drainPendingStoppedEvent(); } } @@ -1844,16 +1766,10 @@ export class Container extends DurableObject { const containerWasRunning = this.container.running; if (!containerWasRunning) { if (this.usingInterception) { - await this.refreshOutboundInterception({ persistOutboundConfiguration: false }); + await this.refreshOutboundInterception(); } this.container.start(startConfig); this.monitor = this.container.monitor(); - const persistedOutboundConfiguration = this.restoreOutboundConfigurationOnce(); - this.updateUsingInterception(persistedOutboundConfiguration); - if (this.usingInterception) { - await this.refreshOutboundInterception(); - } - await this.capturePendingStoppedEvent(containerWasRunning); await this.state.setRunning(); await this.scheduleNextAlarm(); } else { @@ -2138,39 +2054,6 @@ export class Container extends DurableObject { } } - private async capturePendingStoppedEvent(containerWasRunning: boolean): Promise { - if (containerWasRunning) { - return; - } - - const state = await this.state.getStoredState(); - if (!state) { - return; - } - - if (state.status === 'healthy' || state.status === 'running') { - this.pendingStoppedEvent = { params: { exitCode: 0, reason: 'exit' }, state }; - return; - } - - if (state.status === 'stopped_with_code') { - this.pendingStoppedEvent = { - params: { exitCode: state.exitCode ?? 0, reason: 'exit' }, - state, - }; - } - } - - private async drainPendingStoppedEvent(): Promise { - if (!this.pendingStoppedEvent) { - return; - } - - const event = this.pendingStoppedEvent; - this.pendingStoppedEvent = undefined; - await this.callOnStop(event.params, event.state); - } - private async callOnStop(onStopParams: StopParams, stateBeforeOnStop: State) { if (this.onStopCalled) { return; diff --git a/src/tests/container.test.ts b/src/tests/container.test.ts index 5816cd6..9fea8c2 100644 --- a/src/tests/container.test.ts +++ b/src/tests/container.test.ts @@ -242,10 +242,10 @@ describe('Container', () => { const startOrder = mockCtx.container.start.mock.invocationCallOrder[0]; expect(startOrder).toBeDefined(); - expect(mockCtx.storage.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.get).not.toHaveBeenCalled(); expect(mockCtx.storage.put.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); expect(mockCtx.storage.setAlarm.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); - expect(mockCtx.storage.kv.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.kv.get).not.toHaveBeenCalled(); expect(mockCtx.storage.kv.put).not.toHaveBeenCalled(); expect(mockCtx.storage.sql.exec).not.toHaveBeenCalled(); expect(mockCtx.storage.sync).not.toHaveBeenCalled(); @@ -267,15 +267,17 @@ describe('Container', () => { vi.mocked(mockCtx.storage.put).mockClear(); vi.mocked(mockCtx.storage.setAlarm).mockClear(); vi.mocked(mockCtx.storage.kv.get).mockClear(); + vi.mocked(mockCtx.storage.kv.put).mockClear(); await container.containerFetch(new Request('https://example.com/test')); const startOrder = mockCtx.container.start.mock.invocationCallOrder[0]; expect(startOrder).toBeDefined(); - expect(mockCtx.storage.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.get).not.toHaveBeenCalled(); expect(mockCtx.storage.put.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); expect(mockCtx.storage.setAlarm.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); - expect(mockCtx.storage.kv.get.mock.invocationCallOrder[0]).toBeGreaterThan(startOrder); + expect(mockCtx.storage.kv.get).not.toHaveBeenCalled(); + expect(mockCtx.storage.kv.put).not.toHaveBeenCalled(); }); test('cold start should not touch storage before container.start', async () => { @@ -325,10 +327,25 @@ describe('Container', () => { const container = new Container(mockCtx as never, {}); container.defaultPort = 8080; + container.allowedHosts = ['example.com']; + container.usingInterception = true; await container.start(undefined, { portToCheck: 8080, retries: 1, waitInterval: 1 }); expect(mockCtx.container.start).toHaveBeenCalledOnce(); + expect(mockCtx.container.interceptAllOutboundHttp).toHaveBeenCalled(); + }); + + test('cold start should not call onStop for a prior stored run', async ({ + mockCtx, + container, + }) => { + mockCtx.storage.get.mockResolvedValue({ status: 'running', lastChange: Date.now() }); + using onStopSpy = vi.spyOn(container, 'onStop'); + + await container.startAndWaitForPorts(8080); + + expect(onStopSpy).not.toHaveBeenCalled(); }); test('alarm should not stop a container while its start loop is in flight', async ({ @@ -343,9 +360,7 @@ describe('Container', () => { const refreshSpy = vi .spyOn( container as unknown as { - refreshOutboundInterception(options?: { - persistOutboundConfiguration?: boolean; - }): Promise; + refreshOutboundInterception(): Promise; }, 'refreshOutboundInterception' )