Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions services/gastown/container/src/process-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { describe, it, expect, vi } from 'vitest';

// Mock heavy imports so the module can be loaded without spinning up
// a real SDK server or hono app.
vi.mock('@kilocode/sdk', () => ({
createKilo: vi.fn(),
}));
vi.mock('./agent-runner', () => ({
runAgent: vi.fn(),
buildKiloConfigContent: vi.fn(),
resolveGitCredentials: vi.fn(),
writeMayorSystemPromptToAgentsMd: vi.fn(),
}));
vi.mock('./control-server', () => ({
getCurrentTownConfig: vi.fn(() => ({})),
getLastAppliedEnvVarKeys: vi.fn(() => new Set<string>()),
RESERVED_ENV_KEYS: new Set<string>(),
}));
vi.mock('./completion-reporter', () => ({
reportAgentCompleted: vi.fn(),
reportMayorWaiting: vi.fn(),
}));
vi.mock('./token-refresh', () => ({
refreshTokenIfNearExpiry: vi.fn(),
}));

const { applyModelToSession } = await import('./process-manager');

type PromptCall = {
path: { id: string };
body: {
parts: Array<{ type: 'text'; text: string }>;
model: { providerID: string; modelID: string };
noReply?: boolean;
};
};

function makeClient(impl?: (args: PromptCall) => Promise<unknown>) {
const calls: PromptCall[] = [];
const prompt = vi.fn(async (args: PromptCall) => {
calls.push(args);
if (impl) return impl(args);
return {};
});
return { client: { session: { prompt } }, calls, prompt };
}

describe('applyModelToSession', () => {
it('sends the startup prompt with the model for a fresh session', async () => {
const { client, calls } = makeClient();
await applyModelToSession({
client,
sessionId: 'sess-new',
model: 'anthropic/claude-sonnet-4.6',
prompt: 'STARTUP PROMPT',
resumedSession: false,
});
expect(calls).toHaveLength(1);
expect(calls[0].path).toEqual({ id: 'sess-new' });
expect(calls[0].body.parts).toEqual([{ type: 'text', text: 'STARTUP PROMPT' }]);
expect(calls[0].body.model).toEqual({
providerID: 'kilo',
modelID: 'anthropic/claude-sonnet-4.6',
});
expect(calls[0].body.noReply).toBeUndefined();
});

it('pushes the new model with noReply:true for a resumed session without replaying the startup prompt', async () => {
const { client, calls } = makeClient();
await applyModelToSession({
client,
sessionId: 'sess-resumed',
model: 'anthropic/claude-opus-4.7',
prompt: 'STARTUP PROMPT (must not be sent)',
resumedSession: true,
});
expect(calls).toHaveLength(1);
expect(calls[0].path).toEqual({ id: 'sess-resumed' });
expect(calls[0].body.model).toEqual({
providerID: 'kilo',
modelID: 'anthropic/claude-opus-4.7',
});
expect(calls[0].body.noReply).toBe(true);
expect(calls[0].body.parts).toEqual([{ type: 'text', text: '' }]);
// Ensure the MAYOR_STARTUP_PROMPT is NOT replayed on resume.
expect(calls[0].body.parts[0].text).not.toContain('STARTUP PROMPT');
});

it('swallows errors from the resumed-session prompt so the hot-swap can continue', async () => {
const { client } = makeClient(async () => {
throw new Error('simulated SDK failure');
});
// Should not throw — errors on the noReply path are logged and ignored.
await expect(
applyModelToSession({
client,
sessionId: 'sess-resumed',
model: 'anthropic/claude-opus-4.7',
prompt: 'STARTUP PROMPT',
resumedSession: true,
})
).resolves.toBeUndefined();
});

it('propagates errors for a fresh session (so the hot-swap can roll back)', async () => {
const { client } = makeClient(async () => {
throw new Error('simulated SDK failure');
});
await expect(
applyModelToSession({
client,
sessionId: 'sess-new',
model: 'anthropic/claude-sonnet-4.6',
prompt: 'STARTUP PROMPT',
resumedSession: false,
})
).rejects.toThrow('simulated SDK failure');
});
});
91 changes: 81 additions & 10 deletions services/gastown/container/src/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,80 @@ export async function refreshTokenForAllAgents(): Promise<
return Promise.all(snapshot.map(restartAgent));
}

/**
* Minimal shape of `client.session` needed by {@link applyModelToSession}.
* Defined structurally so tests can pass a fake without pulling in the
* whole KiloClient type.
*/
type SessionPromptClient = {
session: {
prompt: (args: {
path: { id: string };
body: {
parts: Array<{ type: 'text'; text: string }>;
model: { providerID: string; modelID: string };
noReply?: boolean;
};
}) => Promise<unknown>;
};
};

/**
* Push a model selection onto a mayor session.
*
* For a freshly created session, sends the startup prompt together with
* the model param so the first turn runs the configured model.
*
* For a resumed session the startup prompt MUST NOT be replayed (it
* would recreate the duplicate turn regression fixed by 9785570b9),
* but the per-session model on the SDK server still needs to be updated
* so the next user turn uses the newly-selected model. We do this by
* sending a `noReply: true` prompt that carries only the model param;
* the SDK treats this as a state update and does not trigger the model.
*
* Errors on the resumed path are swallowed: if pushing the model fails,
* the mayor falls back to whichever model the SDK server loaded from
* KILO_CONFIG_CONTENT at startup, which we have already updated.
*/
export async function applyModelToSession(params: {
client: SessionPromptClient;
sessionId: string;
model: string;
prompt: string;
resumedSession: boolean;
}): Promise<void> {
const { client, sessionId, model, prompt, resumedSession } = params;
const modelParam = { providerID: 'kilo', modelID: model };
if (!resumedSession) {
await client.session.prompt({
path: { id: sessionId },
body: {
parts: [{ type: 'text', text: prompt }],
model: modelParam,
},
});
return;
}
try {
await client.session.prompt({
path: { id: sessionId },
body: {
parts: [{ type: 'text', text: '' }],
model: modelParam,
noReply: true,
},
});
console.log(
`${MANAGER_LOG} updateAgentModel: pushed model=${model} to resumed session ${sessionId}`
);
} catch (err) {
console.warn(
`${MANAGER_LOG} updateAgentModel: failed to push model to resumed session ${sessionId}:`,
err
);
}
}

/**
* Update the model for a running agent by restarting its SDK server with
* new KILO_CONFIG_CONTENT. The kilo serve child process reads the model
Expand Down Expand Up @@ -1958,16 +2032,13 @@ export async function updateAgentModel(
const prompt = conversationHistory
? `${conversationHistory}\n\n${MAYOR_STARTUP_PROMPT}`
: MAYOR_STARTUP_PROMPT;
if (!resumedSession) {
const modelParam = { providerID: 'kilo', modelID: model };
await client.session.prompt({
path: { id: agent.sessionId },
body: {
parts: [{ type: 'text', text: prompt }],
model: modelParam,
},
});
}
await applyModelToSession({
client,
sessionId: agent.sessionId,
model,
prompt,
resumedSession,
});
agent.messageCount = 1;

// 6. New server is healthy — now tear down the old one.
Expand Down
2 changes: 1 addition & 1 deletion services/gastown/container/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ import { defineConfig } from 'vitest/config';
export default defineConfig({
test: {
globals: false,
include: ['plugin/**/*.test.ts'],
include: ['plugin/**/*.test.ts', 'src/**/*.test.ts'],
},
});
87 changes: 77 additions & 10 deletions services/gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ import { agent_metadata } from '../db/tables/agent-metadata.table';
import { escalation_metadata } from '../db/tables/escalation-metadata.table';
import { convoy_metadata } from '../db/tables/convoy-metadata.table';
import { bead_dependencies } from '../db/tables/bead-dependencies.table';
import { town_events, TownEventRecord } from '../db/tables/town-events.table';
import {
town_events,
TownEventRecord,
type TownEventType,
} from '../db/tables/town-events.table';
import {
agent_nudges,
AgentNudgeRecord,
Expand Down Expand Up @@ -3896,15 +3900,28 @@ export class TownDO extends DurableObject<Env> {
reconciler.applyEvent(this.sql, event, { townConfig });
events.markProcessed(this.sql, event.event_id);
} catch (err) {
logger.error('reconciler: applyEvent failed', {
eventId: event.event_id,
eventType: event.event_type,
error: err instanceof Error ? err.message : String(err),
});
// Event stays unprocessed — will be retried on the next alarm tick.
// Mark it processed anyway after 3 consecutive failures to prevent
// a poison event from blocking the entire queue forever.
// For now, we skip it and let the next tick retry.
const message = err instanceof Error ? err.message : String(err);
// Terminal errors referencing a missing bead/agent can never
// succeed on retry — mark them processed so the drain loop
// stops re-running them every alarm tick.
const isMissingEntity =
err instanceof Error &&
/\b(Bead|Agent) [0-9a-f-]{36} not found\b/.test(err.message);
if (isMissingEntity) {
logger.warn('reconciler: applyEvent skipped (missing entity)', {
eventId: event.event_id,
eventType: event.event_type,
error: message,
});
events.markProcessed(this.sql, event.event_id);
} else {
logger.error('reconciler: applyEvent failed', {
eventId: event.event_id,
eventType: event.event_type,
error: message,
});
// Event stays unprocessed — will be retried on the next alarm tick.
}
}
}
} catch (err) {
Expand Down Expand Up @@ -5161,6 +5178,56 @@ export class TownDO extends DurableObject<Env> {
];
}

async debugTownEvents(): Promise<unknown[]> {
return [
...query(
this.sql,
/* sql */ `
SELECT ${town_events.event_id},
${town_events.event_type},
${town_events.agent_id},
${town_events.bead_id},
${town_events.processed_at}
FROM ${town_events}
ORDER BY ${town_events.created_at} ASC
`,
[]
),
];
}

/**
* Test-only helper: directly insert a row into the town_events queue
* without going through the producer APIs. Used to reproduce orphan
* events (referencing deleted beads/agents) in tests.
*/
async debugInsertTownEvent(input: {
event_type: TownEventType;
agent_id?: string | null;
bead_id?: string | null;
payload?: Record<string, unknown>;
}): Promise<string> {
const eventId = events.insertEvent(this.sql, input.event_type, {
agent_id: input.agent_id ?? null,
bead_id: input.bead_id ?? null,
payload: input.payload ?? {},
});
await this.armAlarmIfNeeded();
return eventId;
}

/**
* Test-only helper: insert a container_status event for a given agent.
* Mirrors the container observer's upsert so tests can verify that
* deleteBead sweeps agent-keyed events.
*/
async debugRecordContainerStatus(
agentId: string,
payload: { status: string; exit_reason?: string | null }
): Promise<void> {
events.upsertContainerStatus(this.sql, agentId, payload);
}

async destroy(): Promise<void> {
console.log(`${TOWN_LOG} destroy: clearing all storage and alarms`);

Expand Down
22 changes: 22 additions & 0 deletions services/gastown/src/dos/town/beads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
createTableConvoyMetadata,
migrateConvoyMetadata,
} from '../../db/tables/convoy-metadata.table';
import { town_events } from '../../db/tables/town-events.table';
import { query } from '../../util/query.util';
import type {
CreateBeadInput,
Expand Down Expand Up @@ -903,6 +904,17 @@ export function deleteBead(sql: SqlStorage, beadId: string, rigId?: string): boo
beadId,
]);

// Remove any pending/processed reconciler events targeting this bead or
// this agent (agents are themselves beads, so deleteBead is used for both).
// Without this, bead_cancelled / container_status / … events that reference
// a deleted bead make applyEvent throw forever on every alarm tick.
query(
sql,
/* sql */ `DELETE FROM ${town_events}
WHERE ${town_events.bead_id} = ? OR ${town_events.agent_id} = ?`,
[beadId, beadId]
);

query(sql, /* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} = ?`, [beadId]);
return true;
}
Expand Down Expand Up @@ -1003,6 +1015,16 @@ export function deleteBeads(sql: SqlStorage, beadIds: string[], rigId?: string):
...allIdsArr
);

// Remove any reconciler events referencing these beads/agents. See
// deleteBead above for rationale.
sql.exec(
/* sql */ `DELETE FROM ${town_events}
WHERE ${town_events.bead_id} IN (${placeholders})
OR ${town_events.agent_id} IN (${placeholders})`,
...allIdsArr,
...allIdsArr
);

// Delete the beads themselves
sql.exec(
/* sql */ `DELETE FROM ${beads} WHERE ${beads.bead_id} IN (${placeholders})`,
Expand Down
Loading
Loading