From 21a14d0490a4bbe3bc94533879dfee9ac6548105 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 30 Apr 2026 13:19:39 -0500 Subject: [PATCH 1/4] fix(gastown): point dev GIT_TOKEN_SERVICE binding at git-token-service-dev git-token-service's wrangler env.dev overrides the worker name to 'git-token-service-dev', but gastown's env.dev.services binding was still referencing the base 'git-token-service' name. Wrangler's local dev registry does exact-name matching, so the binding showed as [not connected] whenever both workers were running side by side. Every other consumer in the repo (cloud-agent-next, security-sync, security-auto-analysis) already uses 'git-token-service-dev' in their env.dev block; gastown was the outlier. --- services/gastown/wrangler.jsonc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/gastown/wrangler.jsonc b/services/gastown/wrangler.jsonc index 4cc1410810..e613dc666a 100644 --- a/services/gastown/wrangler.jsonc +++ b/services/gastown/wrangler.jsonc @@ -148,7 +148,7 @@ "services": [ { "binding": "GIT_TOKEN_SERVICE", - "service": "git-token-service", + "service": "git-token-service-dev", "entrypoint": "GitTokenRPCEntrypoint", }, ], From c532f40e5627062a80077b30a2f0d60d0bc8d3b5 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Fri, 1 May 2026 11:22:07 -0500 Subject: [PATCH 2/4] fix(gastown): push new model onto resumed mayor session on hot-swap (#2999) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user changes the mayor's model in town settings, updateAgentModel restarts the SDK server with new KILO_CONFIG_CONTENT and resumes the existing session from kilo.db. Commit 9785570b9 intentionally stopped sending any session.prompt on resume to avoid duplicating the MAYOR_STARTUP_PROMPT, but that also dropped the model param — so the resumed session kept its prior per-session model until the user ran /model manually. Extract the fresh vs. resumed session-prompt logic into applyModelToSession and on resume send a noReply:true prompt carrying only the new model param. This updates the SDK server's per-session model without replaying the startup prompt. Errors on the resume path are swallowed so the hot-swap still succeeds; the SDK server fell back to the config-loaded model at startup, which was already updated. Add container tests covering both fresh and resumed paths. Co-authored-by: John Fawcett --- .../container/src/process-manager.test.ts | 119 ++++++++++++++++++ .../gastown/container/src/process-manager.ts | 91 ++++++++++++-- services/gastown/container/vitest.config.ts | 2 +- 3 files changed, 201 insertions(+), 11 deletions(-) create mode 100644 services/gastown/container/src/process-manager.test.ts diff --git a/services/gastown/container/src/process-manager.test.ts b/services/gastown/container/src/process-manager.test.ts new file mode 100644 index 0000000000..91ffdcce4c --- /dev/null +++ b/services/gastown/container/src/process-manager.test.ts @@ -0,0 +1,119 @@ +import { describe, it, expect, vi } from 'vitest'; + +// Mock heavy imports so the module can be loaded without spinning up +// a real SDK server or hono app. +vi.mock('@kilocode/sdk', () => ({ + createKilo: vi.fn(), +})); +vi.mock('./agent-runner', () => ({ + runAgent: vi.fn(), + buildKiloConfigContent: vi.fn(), + resolveGitCredentials: vi.fn(), + writeMayorSystemPromptToAgentsMd: vi.fn(), +})); +vi.mock('./control-server', () => ({ + getCurrentTownConfig: vi.fn(() => ({})), + getLastAppliedEnvVarKeys: vi.fn(() => new Set()), + RESERVED_ENV_KEYS: new Set(), +})); +vi.mock('./completion-reporter', () => ({ + reportAgentCompleted: vi.fn(), + reportMayorWaiting: vi.fn(), +})); +vi.mock('./token-refresh', () => ({ + refreshTokenIfNearExpiry: vi.fn(), +})); + +const { applyModelToSession } = await import('./process-manager'); + +type PromptCall = { + path: { id: string }; + body: { + parts: Array<{ type: 'text'; text: string }>; + model: { providerID: string; modelID: string }; + noReply?: boolean; + }; +}; + +function makeClient(impl?: (args: PromptCall) => Promise) { + const calls: PromptCall[] = []; + const prompt = vi.fn(async (args: PromptCall) => { + calls.push(args); + if (impl) return impl(args); + return {}; + }); + return { client: { session: { prompt } }, calls, prompt }; +} + +describe('applyModelToSession', () => { + it('sends the startup prompt with the model for a fresh session', async () => { + const { client, calls } = makeClient(); + await applyModelToSession({ + client, + sessionId: 'sess-new', + model: 'anthropic/claude-sonnet-4.6', + prompt: 'STARTUP PROMPT', + resumedSession: false, + }); + expect(calls).toHaveLength(1); + expect(calls[0].path).toEqual({ id: 'sess-new' }); + expect(calls[0].body.parts).toEqual([{ type: 'text', text: 'STARTUP PROMPT' }]); + expect(calls[0].body.model).toEqual({ + providerID: 'kilo', + modelID: 'anthropic/claude-sonnet-4.6', + }); + expect(calls[0].body.noReply).toBeUndefined(); + }); + + it('pushes the new model with noReply:true for a resumed session without replaying the startup prompt', async () => { + const { client, calls } = makeClient(); + await applyModelToSession({ + client, + sessionId: 'sess-resumed', + model: 'anthropic/claude-opus-4.7', + prompt: 'STARTUP PROMPT (must not be sent)', + resumedSession: true, + }); + expect(calls).toHaveLength(1); + expect(calls[0].path).toEqual({ id: 'sess-resumed' }); + expect(calls[0].body.model).toEqual({ + providerID: 'kilo', + modelID: 'anthropic/claude-opus-4.7', + }); + expect(calls[0].body.noReply).toBe(true); + expect(calls[0].body.parts).toEqual([{ type: 'text', text: '' }]); + // Ensure the MAYOR_STARTUP_PROMPT is NOT replayed on resume. + expect(calls[0].body.parts[0].text).not.toContain('STARTUP PROMPT'); + }); + + it('swallows errors from the resumed-session prompt so the hot-swap can continue', async () => { + const { client } = makeClient(async () => { + throw new Error('simulated SDK failure'); + }); + // Should not throw — errors on the noReply path are logged and ignored. + await expect( + applyModelToSession({ + client, + sessionId: 'sess-resumed', + model: 'anthropic/claude-opus-4.7', + prompt: 'STARTUP PROMPT', + resumedSession: true, + }) + ).resolves.toBeUndefined(); + }); + + it('propagates errors for a fresh session (so the hot-swap can roll back)', async () => { + const { client } = makeClient(async () => { + throw new Error('simulated SDK failure'); + }); + await expect( + applyModelToSession({ + client, + sessionId: 'sess-new', + model: 'anthropic/claude-sonnet-4.6', + prompt: 'STARTUP PROMPT', + resumedSession: false, + }) + ).rejects.toThrow('simulated SDK failure'); + }); +}); diff --git a/services/gastown/container/src/process-manager.ts b/services/gastown/container/src/process-manager.ts index 2c58efa795..33a37fa29a 100644 --- a/services/gastown/container/src/process-manager.ts +++ b/services/gastown/container/src/process-manager.ts @@ -1815,6 +1815,80 @@ export async function refreshTokenForAllAgents(): Promise< return Promise.all(snapshot.map(restartAgent)); } +/** + * Minimal shape of `client.session` needed by {@link applyModelToSession}. + * Defined structurally so tests can pass a fake without pulling in the + * whole KiloClient type. + */ +type SessionPromptClient = { + session: { + prompt: (args: { + path: { id: string }; + body: { + parts: Array<{ type: 'text'; text: string }>; + model: { providerID: string; modelID: string }; + noReply?: boolean; + }; + }) => Promise; + }; +}; + +/** + * Push a model selection onto a mayor session. + * + * For a freshly created session, sends the startup prompt together with + * the model param so the first turn runs the configured model. + * + * For a resumed session the startup prompt MUST NOT be replayed (it + * would recreate the duplicate turn regression fixed by 9785570b9), + * but the per-session model on the SDK server still needs to be updated + * so the next user turn uses the newly-selected model. We do this by + * sending a `noReply: true` prompt that carries only the model param; + * the SDK treats this as a state update and does not trigger the model. + * + * Errors on the resumed path are swallowed: if pushing the model fails, + * the mayor falls back to whichever model the SDK server loaded from + * KILO_CONFIG_CONTENT at startup, which we have already updated. + */ +export async function applyModelToSession(params: { + client: SessionPromptClient; + sessionId: string; + model: string; + prompt: string; + resumedSession: boolean; +}): Promise { + const { client, sessionId, model, prompt, resumedSession } = params; + const modelParam = { providerID: 'kilo', modelID: model }; + if (!resumedSession) { + await client.session.prompt({ + path: { id: sessionId }, + body: { + parts: [{ type: 'text', text: prompt }], + model: modelParam, + }, + }); + return; + } + try { + await client.session.prompt({ + path: { id: sessionId }, + body: { + parts: [{ type: 'text', text: '' }], + model: modelParam, + noReply: true, + }, + }); + console.log( + `${MANAGER_LOG} updateAgentModel: pushed model=${model} to resumed session ${sessionId}` + ); + } catch (err) { + console.warn( + `${MANAGER_LOG} updateAgentModel: failed to push model to resumed session ${sessionId}:`, + err + ); + } +} + /** * Update the model for a running agent by restarting its SDK server with * new KILO_CONFIG_CONTENT. The kilo serve child process reads the model @@ -1958,16 +2032,13 @@ export async function updateAgentModel( const prompt = conversationHistory ? `${conversationHistory}\n\n${MAYOR_STARTUP_PROMPT}` : MAYOR_STARTUP_PROMPT; - if (!resumedSession) { - const modelParam = { providerID: 'kilo', modelID: model }; - await client.session.prompt({ - path: { id: agent.sessionId }, - body: { - parts: [{ type: 'text', text: prompt }], - model: modelParam, - }, - }); - } + await applyModelToSession({ + client, + sessionId: agent.sessionId, + model, + prompt, + resumedSession, + }); agent.messageCount = 1; // 6. New server is healthy — now tear down the old one. diff --git a/services/gastown/container/vitest.config.ts b/services/gastown/container/vitest.config.ts index 468ee375fe..32b052a10d 100644 --- a/services/gastown/container/vitest.config.ts +++ b/services/gastown/container/vitest.config.ts @@ -3,6 +3,6 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ test: { globals: false, - include: ['plugin/**/*.test.ts'], + include: ['plugin/**/*.test.ts', 'src/**/*.test.ts'], }, }); From db58406603e6278b53484928f5fe0eae9bb2f76f Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 5 May 2026 12:11:55 -0500 Subject: [PATCH 3/4] fix(gastown): stop reconciler log spam from orphaned bead_cancelled events (#3047) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two independent bugs compose to flood production logs every alarm tick with 'Bead not found' errors: 1. deleteBead / deleteBeads did not clean up the town_events queue, leaving bead_cancelled and container_status rows pointing at deleted beads/agents. 2. applyEvent threw on missing beads and the drain loop never marked the failing event processed — so it retried forever. Fix 1: purge town_events rows (by bead_id OR agent_id, since agents are beads) from deleteBead and the deleteBeads bulk path. Fix 2a: reconciler.applyEvent('bead_cancelled') checks for the target bead up front and returns (with a warn) when it's missing, instead of throwing. Fix 2b: the Town.do.ts drain loop recognises 'Bead/Agent not found' terminal errors, logs them at warn, and marks the offending event processed so it stops retrying. Adds debug RPCs (debugTownEvents, debugInsertTownEvent, debugRecordContainerStatus) and integration coverage in event-cleanup.test.ts. Co-authored-by: John Fawcett --- services/gastown/src/dos/Town.do.ts | 87 ++++++++- services/gastown/src/dos/town/beads.ts | 22 +++ services/gastown/src/dos/town/reconciler.ts | 11 ++ .../test/integration/event-cleanup.test.ts | 183 ++++++++++++++++++ 4 files changed, 293 insertions(+), 10 deletions(-) create mode 100644 services/gastown/test/integration/event-cleanup.test.ts diff --git a/services/gastown/src/dos/Town.do.ts b/services/gastown/src/dos/Town.do.ts index bdbaf03575..a6be9aecdb 100644 --- a/services/gastown/src/dos/Town.do.ts +++ b/services/gastown/src/dos/Town.do.ts @@ -47,7 +47,11 @@ import { agent_metadata } from '../db/tables/agent-metadata.table'; import { escalation_metadata } from '../db/tables/escalation-metadata.table'; import { convoy_metadata } from '../db/tables/convoy-metadata.table'; import { bead_dependencies } from '../db/tables/bead-dependencies.table'; -import { town_events, TownEventRecord } from '../db/tables/town-events.table'; +import { + town_events, + TownEventRecord, + type TownEventType, +} from '../db/tables/town-events.table'; import { agent_nudges, AgentNudgeRecord, @@ -3896,15 +3900,28 @@ export class TownDO extends DurableObject { reconciler.applyEvent(this.sql, event, { townConfig }); events.markProcessed(this.sql, event.event_id); } catch (err) { - logger.error('reconciler: applyEvent failed', { - eventId: event.event_id, - eventType: event.event_type, - error: err instanceof Error ? err.message : String(err), - }); - // Event stays unprocessed — will be retried on the next alarm tick. - // Mark it processed anyway after 3 consecutive failures to prevent - // a poison event from blocking the entire queue forever. - // For now, we skip it and let the next tick retry. + const message = err instanceof Error ? err.message : String(err); + // Terminal errors referencing a missing bead/agent can never + // succeed on retry — mark them processed so the drain loop + // stops re-running them every alarm tick. + const isMissingEntity = + err instanceof Error && + /\b(Bead|Agent) [0-9a-f-]{36} not found\b/.test(err.message); + if (isMissingEntity) { + logger.warn('reconciler: applyEvent skipped (missing entity)', { + eventId: event.event_id, + eventType: event.event_type, + error: message, + }); + events.markProcessed(this.sql, event.event_id); + } else { + logger.error('reconciler: applyEvent failed', { + eventId: event.event_id, + eventType: event.event_type, + error: message, + }); + // Event stays unprocessed — will be retried on the next alarm tick. + } } } } catch (err) { @@ -5161,6 +5178,56 @@ export class TownDO extends DurableObject { ]; } + async debugTownEvents(): Promise { + return [ + ...query( + this.sql, + /* sql */ ` + SELECT ${town_events.event_id}, + ${town_events.event_type}, + ${town_events.agent_id}, + ${town_events.bead_id}, + ${town_events.processed_at} + FROM ${town_events} + ORDER BY ${town_events.created_at} ASC + `, + [] + ), + ]; + } + + /** + * Test-only helper: directly insert a row into the town_events queue + * without going through the producer APIs. Used to reproduce orphan + * events (referencing deleted beads/agents) in tests. + */ + async debugInsertTownEvent(input: { + event_type: TownEventType; + agent_id?: string | null; + bead_id?: string | null; + payload?: Record; + }): Promise { + const eventId = events.insertEvent(this.sql, input.event_type, { + agent_id: input.agent_id ?? null, + bead_id: input.bead_id ?? null, + payload: input.payload ?? {}, + }); + await this.armAlarmIfNeeded(); + return eventId; + } + + /** + * Test-only helper: insert a container_status event for a given agent. + * Mirrors the container observer's upsert so tests can verify that + * deleteBead sweeps agent-keyed events. + */ + async debugRecordContainerStatus( + agentId: string, + payload: { status: string; exit_reason?: string | null } + ): Promise { + events.upsertContainerStatus(this.sql, agentId, payload); + } + async destroy(): Promise { console.log(`${TOWN_LOG} destroy: clearing all storage and alarms`); diff --git a/services/gastown/src/dos/town/beads.ts b/services/gastown/src/dos/town/beads.ts index a3faa75437..945679cf1f 100644 --- a/services/gastown/src/dos/town/beads.ts +++ b/services/gastown/src/dos/town/beads.ts @@ -41,6 +41,7 @@ import { createTableConvoyMetadata, migrateConvoyMetadata, } from '../../db/tables/convoy-metadata.table'; +import { town_events } from '../../db/tables/town-events.table'; import { query } from '../../util/query.util'; import type { CreateBeadInput, @@ -903,6 +904,17 @@ export function deleteBead(sql: SqlStorage, beadId: string, rigId?: string): boo beadId, ]); + // Remove any pending/processed reconciler events targeting this bead or + // this agent (agents are themselves beads, so deleteBead is used for both). + // Without this, bead_cancelled / container_status / … events that reference + // a deleted bead make applyEvent throw forever on every alarm tick. + query( + sql, + /* sql */ `DELETE FROM ${town_events} + WHERE ${town_events.bead_id} = ? OR ${town_events.agent_id} = ?`, + [beadId, beadId] + ); + query(sql, /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} = ?`, [beadId]); return true; } @@ -1003,6 +1015,16 @@ export function deleteBeads(sql: SqlStorage, beadIds: string[], rigId?: string): ...allIdsArr ); + // Remove any reconciler events referencing these beads/agents. See + // deleteBead above for rationale. + sql.exec( + /* sql */ `DELETE FROM ${town_events} + WHERE ${town_events.bead_id} IN (${placeholders}) + OR ${town_events.agent_id} IN (${placeholders})`, + ...allIdsArr, + ...allIdsArr + ); + // Delete the beads themselves sql.exec( /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} IN (${placeholders})`, diff --git a/services/gastown/src/dos/town/reconciler.ts b/services/gastown/src/dos/town/reconciler.ts index 8e9b1d4671..f7a62d0cf3 100644 --- a/services/gastown/src/dos/town/reconciler.ts +++ b/services/gastown/src/dos/town/reconciler.ts @@ -289,6 +289,17 @@ export function applyEvent( console.warn(`${LOG} applyEvent: bead_cancelled missing bead_id`); return; } + // Tolerate the bead having been deleted after the event was enqueued. + // Without this guard updateBeadStatus throws `Bead not found`, + // the drain loop can't mark the event processed, and the error + // recurs on every alarm tick forever. + const existing = beadOps.getBead(sql, event.bead_id); + if (!existing) { + console.warn( + `${LOG} applyEvent: bead_cancelled target bead ${event.bead_id} no longer exists — skipping` + ); + return; + } const cancelStatus = payload.cancel_status === 'closed' || payload.cancel_status === 'failed' ? payload.cancel_status diff --git a/services/gastown/test/integration/event-cleanup.test.ts b/services/gastown/test/integration/event-cleanup.test.ts new file mode 100644 index 0000000000..0d964e44c9 --- /dev/null +++ b/services/gastown/test/integration/event-cleanup.test.ts @@ -0,0 +1,183 @@ +/** + * Tests for the "orphaned bead_cancelled events retried forever" bug. + * + * Two independent fixes compose to eliminate the failure: + * Fix 1: deleteBead / deleteBeads purge town_events rows that reference + * the deleted bead (by bead_id or agent_id), so the drain loop + * never sees them. + * Fix 2a: reconciler.applyEvent('bead_cancelled') tolerates the bead + * being missing (returns early, logs warn — does not throw). + * Fix 2b: the Town.do.ts drain loop recognises "Bead/Agent ... not + * found" terminal errors and marks the offending event + * processed so it is not retried forever. + */ + +import { env, runDurableObjectAlarm } from 'cloudflare:test'; +import { describe, it, expect, beforeEach } from 'vitest'; + +function getTownStub(name: string) { + return env.TOWN.get(env.TOWN.idFromName(name)); +} + +describe('town_events cleanup on bead deletion (#fix-1)', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `evcleanup-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + it('deleteBead removes pending town_events referencing the bead by bead_id', async () => { + const bead = await town.createBead({ + type: 'issue', + title: 'To be deleted', + rig_id: 'rig-1', + }); + + // Transitioning to a terminal status enqueues a bead_cancelled event. + await town.updateBeadStatus(bead.bead_id, 'failed', 'system'); + + const pendingBefore = (await town.debugTownEvents()) as Array<{ + bead_id: string | null; + processed_at: string | null; + }>; + expect( + pendingBefore.filter(e => e.bead_id === bead.bead_id && e.processed_at === null).length + ).toBeGreaterThan(0); + + await town.deleteBead(bead.bead_id); + + const pendingAfter = (await town.debugTownEvents()) as Array<{ + bead_id: string | null; + agent_id: string | null; + }>; + expect( + pendingAfter.some(e => e.bead_id === bead.bead_id || e.agent_id === bead.bead_id) + ).toBe(false); + }); + + it('deleteBead also removes events referencing the bead as agent_id (agents are beads)', async () => { + const agent = await town.registerAgent({ + role: 'polecat', + name: 'P1', + identity: `ev-agent-${townName}`, + rig_id: 'rig-1', + }); + + // Upsert a container_status event keyed by agent_id — this is the shape + // of events that hang off an agent's bead row. + await town.debugRecordContainerStatus(agent.id, { status: 'running' }); + + const beforeRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>; + expect(beforeRows.some(e => e.agent_id === agent.id)).toBe(true); + + // deleteBead is used for agents too (agents are beads). + await town.deleteBead(agent.id); + + const afterRows = (await town.debugTownEvents()) as Array<{ agent_id: string | null }>; + expect(afterRows.some(e => e.agent_id === agent.id)).toBe(false); + }); + + it('deleteBeads bulk path removes events for every deleted bead', async () => { + const a = await town.createBead({ type: 'issue', title: 'A', rig_id: 'rig-1' }); + const b = await town.createBead({ type: 'issue', title: 'B', rig_id: 'rig-1' }); + + await town.updateBeadStatus(a.bead_id, 'failed', 'system'); + await town.updateBeadStatus(b.bead_id, 'failed', 'system'); + + const before = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>; + expect(before.filter(e => e.bead_id === a.bead_id).length).toBeGreaterThan(0); + expect(before.filter(e => e.bead_id === b.bead_id).length).toBeGreaterThan(0); + + await town.deleteBeads([a.bead_id, b.bead_id]); + + const after = (await town.debugTownEvents()) as Array<{ bead_id: string | null }>; + expect(after.some(e => e.bead_id === a.bead_id)).toBe(false); + expect(after.some(e => e.bead_id === b.bead_id)).toBe(false); + }); +}); + +describe('applyEvent tolerance + drain loop marks missing-entity events processed (#fix-2)', () => { + let town: ReturnType; + let townName: string; + + beforeEach(async () => { + townName = `evtolerate-${crypto.randomUUID()}`; + town = getTownStub(townName); + await town.setTownId(townName); + await town.addRig({ + rigId: 'rig-1', + name: 'main-rig', + gitUrl: 'https://github.com/test/repo.git', + defaultBranch: 'main', + }); + }); + + it('drain loop marks a bead_cancelled event processed when the bead is gone', async () => { + // Simulate the historical orphan: enqueue a bead_cancelled event whose + // bead has been deleted (or never existed). Before Fix 2, applyEvent + // would throw `Bead not found` forever on every alarm tick. + await town.debugInsertTownEvent({ + event_type: 'bead_cancelled', + bead_id: '00000000-0000-4000-8000-000000000001', + payload: { cancel_status: 'failed' }, + }); + + const beforeDrain = (await town.debugTownEvents()) as Array<{ + event_type: string; + bead_id: string | null; + processed_at: string | null; + }>; + const orphan = beforeDrain.find( + e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001' + ); + expect(orphan?.processed_at).toBeNull(); + + await runDurableObjectAlarm(town); + + // After the alarm, the orphan event should be processed — not retried. + const afterDrain = (await town.debugTownEvents()) as Array<{ + event_type: string; + bead_id: string | null; + processed_at: string | null; + }>; + const orphanAfter = afterDrain.find( + e => e.event_type === 'bead_cancelled' && e.bead_id === '00000000-0000-4000-8000-000000000001' + ); + // If retention GC already pruned it, that's also acceptable — the key + // invariant is that it is no longer pending. + if (orphanAfter) { + expect(orphanAfter.processed_at).not.toBeNull(); + } + }); + + it('drain loop marks an agent-missing event processed too', async () => { + await town.debugInsertTownEvent({ + event_type: 'agent_done', + agent_id: '00000000-0000-4000-8000-0000000000aa', + payload: { branch: 'gt/ghost' }, + }); + + await runDurableObjectAlarm(town); + + const after = (await town.debugTownEvents()) as Array<{ + event_type: string; + agent_id: string | null; + processed_at: string | null; + }>; + const orphanAfter = after.find( + e => e.event_type === 'agent_done' && e.agent_id === '00000000-0000-4000-8000-0000000000aa' + ); + if (orphanAfter) { + expect(orphanAfter.processed_at).not.toBeNull(); + } + }); +}); From 47e95a8b83fe750b563ffab9676ce6bed77eda99 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 5 May 2026 15:23:07 -0500 Subject: [PATCH 4/4] feat(gastown-container): add crash visibility + per-agent start mutex (#3055) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(gastown-container): add crash visibility + per-agent start mutex Diagnostic changes to investigate frequent container restarts for town 4d82f099-ccb7-4eaf-8676-73562e0a27eb (~1.5–2 min boot-hydration loops). - main.ts: add unhandledRejection listener that logs full error/stack without exiting (Bun/Node silently drop rejections without a handler, making fire-and-forget failures like void saveDbSnapshot()/void subscribeToEvents() invisible). Include uptime and active-agent count for correlation. - main.ts: improve uncaughtException log with name/uptime/agent count. - main.ts: 30s periodic container.memory_usage log (rss/heap/external) so OOM-class failures (external SIGKILL from Cloudflare Containers runtime when the memory ceiling is hit) become observable — these leave no exception behind. - main.ts: wrap bootHydration() in try/catch so a rare synchronous throw before the first await doesn't crash the process. - process-manager.ts: add per-agentId mutex for startAgent. Production logs show two /agents/start requests for the same agentId logged at the same millisecond; both pass the re-entrancy check before either commits a 'starting' record, then race on startupAbortController, session creation, idle timers, and SDK sessionCount. Serialising per agentId makes the re-entrant path observe a consistent snapshot. - process-manager.test.ts: three tests for the mutex — same-id serialisation, different-id concurrency, lock release on throw. * fix(container): replace Promise.withResolvers with explicit new Promise Promise.withResolvers is a newer API not available on older Bun runtimes. Since process-manager.ts is imported during container startup, a missing global would throw before crash handlers are registered and prevent the control server from starting. Use the same explicit new Promise pattern as the existing sdkServerLock. * feat(gastown/container): include townId in crash and memory logs Per review feedback, attach the container's GASTOWN_TOWN_ID to unhandled_rejection, uncaught_exception, cold_start, memory_usage, and boot_hydration_failed log entries so production crash logs can be correlated with a specific town without needing to also have an agent registered. --------- Co-authored-by: John Fawcett --- services/gastown/container/src/main.ts | 88 ++++++++++++++++++- .../container/src/process-manager.test.ts | 63 ++++++++++++- .../gastown/container/src/process-manager.ts | 52 +++++++++++ 3 files changed, 198 insertions(+), 5 deletions(-) diff --git a/services/gastown/container/src/main.ts b/services/gastown/container/src/main.ts index 5b12a1aa19..e351a91731 100644 --- a/services/gastown/container/src/main.ts +++ b/services/gastown/container/src/main.ts @@ -1,11 +1,54 @@ import { startControlServer } from './control-server'; import { log } from './logger'; -import { bootHydration, getUptime } from './process-manager'; +import { activeAgentCount, bootHydration, getUptime, listAgents } from './process-manager'; -log.info('container.cold_start', { uptime: getUptime(), ts: new Date().toISOString() }); +// Container-scoped identifiers for crash/diagnostic logs. The container is +// pinned to a single town for its lifetime (see GASTOWN_TOWN_ID injection in +// the deployer), so reading these once at module init is safe and lets us +// emit them even when no agents are registered yet. +const TOWN_ID = process.env.GASTOWN_TOWN_ID ?? null; + +log.info('container.cold_start', { + uptime: getUptime(), + ts: new Date().toISOString(), + townId: TOWN_ID, +}); + +// Bun (like Node) will ignore unhandled promise rejections unless a handler +// is registered. Without this handler a rejection in a fire-and-forget path +// (e.g. `void saveDbSnapshot(...)`, `void subscribeToEvents(...)`, +// `setInterval(() => void fn())`) is effectively invisible — making the +// root cause of container crashes impossible to diagnose from logs. +// +// We deliberately DO NOT call process.exit here: visibility is the goal. +// If a specific rejection turns out to be fatal state corruption we can +// escalate it individually. +process.on('unhandledRejection', (reason, promise) => { + const err = + reason instanceof Error + ? { message: reason.message, stack: reason.stack, name: reason.name } + : { message: String(reason) }; + log.error('container.unhandled_rejection', { + ...err, + townId: TOWN_ID, + uptimeMs: getUptime(), + activeAgents: activeAgentCount(), + promise: String(promise), + }); +}); process.on('uncaughtException', err => { - log.error('container.uncaught_exception', { error: err.message, stack: err.stack }); + log.error('container.uncaught_exception', { + message: err.message, + stack: err.stack, + name: err.name, + townId: TOWN_ID, + uptimeMs: getUptime(), + activeAgents: activeAgentCount(), + }); + // Keep the existing fatal behaviour for truly uncaught synchronous errors. + // An unhandled rejection is handled separately above without exit so we + // can observe the crash class before deciding whether to remain fatal. process.exit(1); }); @@ -13,8 +56,45 @@ process.on('SIGTERM', () => { console.log('SIGTERM received — starting graceful drain...'); }); +// Periodically log RSS memory so we can correlate OOM-class failures +// (external SIGKILL from Cloudflare Containers runtime when a memory +// ceiling is hit) with steady-state memory growth. 30s cadence matches +// the heartbeat interval and is cheap. +const MEMORY_LOG_INTERVAL_MS = 30_000; +setInterval(() => { + try { + const mem = process.memoryUsage(); + log.info('container.memory_usage', { + rssMB: Math.round(mem.rss / 1024 / 1024), + heapUsedMB: Math.round(mem.heapUsed / 1024 / 1024), + heapTotalMB: Math.round(mem.heapTotal / 1024 / 1024), + externalMB: Math.round(mem.external / 1024 / 1024), + townId: TOWN_ID, + uptimeMs: getUptime(), + agents: listAgents().length, + activeAgents: activeAgentCount(), + }); + } catch (err) { + log.warn('container.memory_usage_failed', { + error: err instanceof Error ? err.message : String(err), + }); + } +}, MEMORY_LOG_INTERVAL_MS); + startControlServer(); void (async () => { - await bootHydration(); + try { + await bootHydration(); + } catch (err) { + // bootHydration has its own try/catch for the registry fetch path but + // the inner startAgent loop can still throw on rare synchronous errors + // before its first await. Log rather than crash so the next /agents/start + // request can recover. + log.error('container.boot_hydration_failed', { + message: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined, + townId: TOWN_ID, + }); + } })(); diff --git a/services/gastown/container/src/process-manager.test.ts b/services/gastown/container/src/process-manager.test.ts index 91ffdcce4c..d2c8906c2a 100644 --- a/services/gastown/container/src/process-manager.test.ts +++ b/services/gastown/container/src/process-manager.test.ts @@ -24,7 +24,7 @@ vi.mock('./token-refresh', () => ({ refreshTokenIfNearExpiry: vi.fn(), })); -const { applyModelToSession } = await import('./process-manager'); +const { applyModelToSession, withStartAgentLock } = await import('./process-manager'); type PromptCall = { path: { id: string }; @@ -117,3 +117,64 @@ describe('applyModelToSession', () => { ).rejects.toThrow('simulated SDK failure'); }); }); + +describe('withStartAgentLock', () => { + it('serialises concurrent callers for the same agentId', async () => { + const order: string[] = []; + let secondStartedBeforeFirstFinished = false; + + // Fire both in the same microtask so they race on the lock. + const first = withStartAgentLock('agent-1', async () => { + order.push('first:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('first:end'); + return 1; + }); + const second = withStartAgentLock('agent-1', async () => { + // If the lock works, `first:end` has already been pushed. + if (!order.includes('first:end')) { + secondStartedBeforeFirstFinished = true; + } + order.push('second:start'); + order.push('second:end'); + return 2; + }); + + const [r1, r2] = await Promise.all([first, second]); + expect(r1).toBe(1); + expect(r2).toBe(2); + expect(secondStartedBeforeFirstFinished).toBe(false); + expect(order).toEqual(['first:start', 'first:end', 'second:start', 'second:end']); + }); + + it('runs concurrently for different agentIds', async () => { + const order: string[] = []; + + const a = withStartAgentLock('agent-a', async () => { + order.push('a:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('a:end'); + }); + const b = withStartAgentLock('agent-b', async () => { + order.push('b:start'); + await new Promise(r => setTimeout(r, 20)); + order.push('b:end'); + }); + + await Promise.all([a, b]); + + // Both should have started before either ended (no serialisation across ids). + expect(order.indexOf('b:start')).toBeLessThan(order.indexOf('a:end')); + }); + + it('releases the lock when the fn throws so subsequent callers can proceed', async () => { + await expect( + withStartAgentLock('agent-err', async () => { + throw new Error('boom'); + }) + ).rejects.toThrow('boom'); + + const result = await withStartAgentLock('agent-err', async () => 'ok'); + expect(result).toBe('ok'); + }); +}); diff --git a/services/gastown/container/src/process-manager.ts b/services/gastown/container/src/process-manager.ts index 33a37fa29a..02eb63a0f7 100644 --- a/services/gastown/container/src/process-manager.ts +++ b/services/gastown/container/src/process-manager.ts @@ -71,6 +71,47 @@ export function isDraining(): boolean { // once created, the SDK instance is reused without locking. let sdkServerLock: Promise = Promise.resolve(); +// Per-agentId mutex for startAgent. Without this, two concurrent POST +// /agents/start calls for the same agentId (observed in production: two +// `[control-server] /agents/start:` log lines at the same millisecond) +// both pass the re-entrancy check at the top of startAgent before either +// has committed a 'starting' record. The second invocation aborts the +// first's startupAbortController and both paths race on session creation, +// idle timers, and SDK instance reference counts — leaving the agent in +// an inconsistent state (orphaned sessions, leaked sessionCount, etc). +// +// Serialising per agentId means the second caller waits for the first to +// complete (or abort) before proceeding, and then observes a consistent +// snapshot in `agents.get(agentId)`. +const startAgentLocks = new Map>(); + +// Exported for tests that exercise the locking behaviour directly without +// bringing up the whole SDK/process harness. Production callers should use +// `startAgent` (which wraps `startAgentImpl` with this lock). +export async function withStartAgentLock(agentId: string, fn: () => Promise): Promise { + const previous = startAgentLocks.get(agentId) ?? Promise.resolve(); + // Use the same explicit `new Promise` pattern as `sdkServerLock` above + // instead of `Promise.withResolvers`, which is not available on older + // Bun runtimes. This module is imported during container startup, so a + // missing global here would throw before the crash handlers are + // registered and prevent the control server from starting. + let releaseLock!: () => void; + const lockPromise = new Promise(resolve => { + releaseLock = resolve; + }); + startAgentLocks.set(agentId, lockPromise); + try { + await previous.catch(() => {}); + return await fn(); + } finally { + releaseLock(); + // Only clear the slot if no newer caller has queued behind us. + if (startAgentLocks.get(agentId) === lockPromise) { + startAgentLocks.delete(agentId); + } + } +} + export function getUptime(): number { return Date.now() - startTime; } @@ -1005,11 +1046,22 @@ async function subscribeToEvents( /** * Start an agent: ensure SDK server, create session, subscribe to events, * send initial prompt. + * + * Serialises concurrent callers for the same agentId so the re-entrancy + * handling inside `startAgentImpl` observes a consistent snapshot. */ export async function startAgent( request: StartAgentRequest, workdir: string, env: Record +): Promise { + return withStartAgentLock(request.agentId, () => startAgentImpl(request, workdir, env)); +} + +async function startAgentImpl( + request: StartAgentRequest, + workdir: string, + env: Record ): Promise { const existing = agents.get(request.agentId); if (existing && (existing.status === 'running' || existing.status === 'starting')) {