diff --git a/packages/protocol/__tests__/event-store.test.ts b/packages/protocol/__tests__/event-store.test.ts index 4b39b0a1..7e742784 100644 --- a/packages/protocol/__tests__/event-store.test.ts +++ b/packages/protocol/__tests__/event-store.test.ts @@ -527,6 +527,121 @@ describe('EventStore', () => { }); }); + describe('session state machine', () => { + const sid = 'state-test-session'; + + beforeEach(() => { + store.upsertSession({ sessionId: sid }); + }); + + it('sets and gets session state', () => { + store.setSessionState(sid, 'CREATED', { clientId: 'c1' }); + expect(store.getSessionState(sid)).toBe('CREATED'); + }); + + it('returns null for unknown session', () => { + expect(store.getSessionState('nonexistent')).toBeNull(); + }); + + it('includes state in session metadata', () => { + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1' }); + const meta = store.getSession(sid); + expect(meta?.state).toBe('ACTIVE'); + expect(meta?.lastStateChange).toBeGreaterThan(0); + }); + + it('defaults to ENDED for existing sessions after migration', () => { + // New sessions get DEFAULT 'ENDED' from migration + const meta = store.getSession(sid); + expect(meta?.state).toBe('ENDED'); + }); + + it('allows valid CREATED → STARTING transition', () => { + store.setSessionState(sid, 'CREATED', { clientId: 'c1', force: true }); + store.setSessionState(sid, 'STARTING', { clientId: 'c1' }); + expect(store.getSessionState(sid)).toBe('STARTING'); + }); + + it('allows valid ACTIVE → DETACHED transition', () => { + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1', force: true }); + store.setSessionState(sid, 'DETACHED', { clientId: 'c1' }); + expect(store.getSessionState(sid)).toBe('DETACHED'); + }); + + it('allows ENDED → CREATED for resume', () => { + store.setSessionState(sid, 'ENDED', { clientId: 'c1', force: true }); + store.setSessionState(sid, 'CREATED', { clientId: 'c1' }); + expect(store.getSessionState(sid)).toBe('CREATED'); + }); + + it('allows CREATED → ENDED for early failure', () => { + store.setSessionState(sid, 'CREATED', { clientId: 'c1', force: true }); + store.setSessionState(sid, 'ENDED', { clientId: 'c1' }); + expect(store.getSessionState(sid)).toBe('ENDED'); + }); + + it('warns but does not block invalid transitions (Phase 1)', () => { + const messages: string[] = []; + const logger = { info: (msg: string) => messages.push(msg) }; + const s = new EventStore(':memory:', logger); + s.upsertSession({ sessionId: 'warn-test' }); + s.setSessionState('warn-test', 'ACTIVE', { clientId: 'c1', force: true }); + // ACTIVE → CREATED is invalid + s.setSessionState('warn-test', 'CREATED', { clientId: 'c1' }); + // Should still succeed (Phase 1 — warn only) + expect(s.getSessionState('warn-test')).toBe('CREATED'); + // Should have logged the invalid transition + expect(messages.some((m) => m.includes('invalid'))).toBe(true); + s.close(); + }); + + it('force flag bypasses validation', () => { + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1', force: true }); + // ACTIVE → CREATED is invalid but force bypasses + store.setSessionState(sid, 'CREATED', { clientId: 'c1', force: true }); + expect(store.getSessionState(sid)).toBe('CREATED'); + }); + + it('tracks full lifecycle: CREATED → STARTING → ACTIVE → ENDED', () => { + store.setSessionState(sid, 'CREATED', { clientId: 'c1', force: true }); + store.setSessionState(sid, 'STARTING', { clientId: 'c1' }); + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1' }); + store.setSessionState(sid, 'ENDED', { clientId: 'c1', reason: 'completed' }); + expect(store.getSessionState(sid)).toBe('ENDED'); + }); + + it('tracks detach/reattach cycle', () => { + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1', force: true }); + store.setSessionState(sid, 'DETACHED', { clientId: 'c1', reason: 'transport_close' }); + expect(store.getSessionState(sid)).toBe('DETACHED'); + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1', reason: 'reattach' }); + expect(store.getSessionState(sid)).toBe('ACTIVE'); + }); + + it('tracks suspend/resume cycle', () => { + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1', force: true }); + store.setSessionState(sid, 'SUSPENDED', { clientId: 'c1', reason: 'ios_background' }); + expect(store.getSessionState(sid)).toBe('SUSPENDED'); + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1', reason: 'resume' }); + expect(store.getSessionState(sid)).toBe('ACTIVE'); + }); + + it('tracks closeout flow', () => { + store.setSessionState(sid, 'ACTIVE', { clientId: 'c1', force: true }); + store.setSessionState(sid, 'CLOSING', { clientId: 'c1', reason: 'detach_ttl' }); + store.setSessionState(sid, 'ENDED', { clientId: 'c1', reason: 'closeout_complete' }); + expect(store.getSessionState(sid)).toBe('ENDED'); + }); + + it('updates lastStateChange timestamp on each transition', () => { + store.setSessionState(sid, 'CREATED', { clientId: 'c1', force: true }); + const meta1 = store.getSession(sid); + store.setSessionState(sid, 'STARTING', { clientId: 'c1' }); + const meta2 = store.getSession(sid); + expect(meta2!.lastStateChange).toBeGreaterThanOrEqual(meta1!.lastStateChange!); + }); + }); + describe('close', () => { it('is safe to call multiple times', () => { store.close(); diff --git a/packages/protocol/src/event-store.ts b/packages/protocol/src/event-store.ts index b7f0e5c2..3735f1e2 100644 --- a/packages/protocol/src/event-store.ts +++ b/packages/protocol/src/event-store.ts @@ -4,11 +4,12 @@ import type { StoredEvent, SessionMeta, SessionSearchResult, + SessionState, EventStoreLogger, } from './types.js'; // Re-export types for consumer convenience -export type { StoredEvent, SessionMeta, SessionSearchResult, EventStoreLogger }; +export type { StoredEvent, SessionMeta, SessionSearchResult, SessionState, EventStoreLogger }; const noopLogger: EventStoreLogger = { info() {} }; @@ -45,6 +46,8 @@ interface SessionRow { closed_by: string | null; last_speaker: string | null; last_speaker_at: number | null; + state: string | null; + last_state_change: number | null; created_at: number; updated_at: number; } @@ -75,6 +78,16 @@ const SCHEMA = ` ); `; +const VALID_TRANSITIONS: Record = { + CREATED: ['STARTING', 'ENDED'], + STARTING: ['ACTIVE', 'ENDED'], + ACTIVE: ['DETACHED', 'SUSPENDED', 'CLOSING', 'ENDED'], + DETACHED: ['ACTIVE', 'SUSPENDED', 'CLOSING', 'ENDED'], + SUSPENDED: ['ACTIVE', 'ENDED'], + CLOSING: ['ENDED'], + ENDED: ['CREATED'], +}; + export class EventStore { private db: Database.Database | null; private log: EventStoreLogger; @@ -91,6 +104,8 @@ export class EventStore { recordUsage: Database.Statement; updateLastSpeaker: Database.Statement; getAttentionSessions: Database.Statement; + setSessionState: Database.Statement; + getSessionState: Database.Statement; }; constructor(dbPath: string, logger?: EventStoreLogger) { @@ -106,6 +121,7 @@ export class EventStore { this.migrateWorktreeTracking(db); this.migrateCloseTracking(db); this.migrateAttentionTracking(db); + this.migrateSessionState(db); this.log.info('EventStore initialized', { dbPath }); @@ -160,6 +176,14 @@ export class EventStore { ORDER BY last_speaker_at DESC LIMIT 10`, ), + setSessionState: db.prepare( + `UPDATE sessions SET + state = ?, + last_state_change = ?, + updated_at = unixepoch('now', 'subsec') * 1000 + WHERE session_id = ?`, + ), + getSessionState: db.prepare('SELECT state FROM sessions WHERE session_id = ?'), }; } @@ -244,6 +268,19 @@ export class EventStore { } } + private migrateSessionState(db: Database.Database): void { + const columns = db.prepare("PRAGMA table_info('sessions')").all() as Array<{ name: string }>; + const columnNames = new Set(columns.map((c) => c.name)); + if (!columnNames.has('state')) { + db.exec("ALTER TABLE sessions ADD COLUMN state TEXT DEFAULT 'ENDED'"); + this.log.info('migrated sessions table: added state'); + } + if (!columnNames.has('last_state_change')) { + db.exec('ALTER TABLE sessions ADD COLUMN last_state_change INTEGER'); + this.log.info('migrated sessions table: added last_state_change'); + } + } + close(): void { if (this.db) { this.db.close(); @@ -488,6 +525,45 @@ export class EventStore { return (rows as SessionRow[]).map(rowToSession); } + /** Set session lifecycle state. Warns on invalid transitions but does not block (Phase 1). */ + setSessionState( + sessionId: string, + newState: SessionState, + opts?: { clientId?: string; reason?: string; force?: boolean }, + ): void { + const current = this.getSession(sessionId); + const fromState = (current?.state as SessionState) ?? null; + const now = Date.now(); + + if (fromState && !opts?.force) { + const allowed = VALID_TRANSITIONS[fromState]; + if (!allowed?.includes(newState)) { + this.log.info('invalid session state transition (warn-only)', { + sessionId, + fromState, + toState: newState, + clientId: opts?.clientId, + reason: opts?.reason, + }); + } + } + + this.stmts.setSessionState.run(newState, now, sessionId); + + this.log.info('session state transition', { + sessionId, + fromState, + toState: newState, + clientId: opts?.clientId, + reason: opts?.reason, + }); + } + + getSessionState(sessionId: string): SessionState | null { + const row = this.stmts.getSessionState.get(sessionId) as { state: string | null } | undefined; + return (row?.state as SessionState) ?? null; + } + recordUsage( sessionId: string, usage: { @@ -575,6 +651,8 @@ function rowToSession(row: SessionRow): SessionMeta { closedBy: (row.closed_by as SessionMeta['closedBy']) ?? null, lastSpeaker: (row.last_speaker as SessionMeta['lastSpeaker']) ?? null, lastSpeakerAt: row.last_speaker_at ?? null, + state: (row.state as SessionMeta['state']) ?? null, + lastStateChange: row.last_state_change ?? null, createdAt: row.created_at, updatedAt: row.updated_at, }; diff --git a/packages/protocol/src/index.ts b/packages/protocol/src/index.ts index 98ab8d02..8ead105e 100644 --- a/packages/protocol/src/index.ts +++ b/packages/protocol/src/index.ts @@ -16,6 +16,7 @@ export type { ImageAttachment, Session, SessionClosedBy, + SessionState, StoredEvent, SessionMeta, SessionSearchResult, diff --git a/packages/protocol/src/types.ts b/packages/protocol/src/types.ts index 70b0fc7c..738714c6 100644 --- a/packages/protocol/src/types.ts +++ b/packages/protocol/src/types.ts @@ -107,6 +107,15 @@ export interface ImageAttachment { export type SessionClosedBy = 'user' | 'auto' | 'abandoned'; +export type SessionState = + | 'CREATED' + | 'STARTING' + | 'ACTIVE' + | 'DETACHED' + | 'SUSPENDED' + | 'CLOSING' + | 'ENDED'; + export interface Session { id: string; summary: string; @@ -206,6 +215,8 @@ export interface SessionMeta { closedBy: SessionClosedBy | null; lastSpeaker: 'user' | 'assistant' | null; lastSpeakerAt: number | null; + state: SessionState | null; + lastStateChange: number | null; createdAt: number; updatedAt: number; } diff --git a/server/__tests__/ws-handler-v2.test.ts b/server/__tests__/ws-handler-v2.test.ts index dd680abc..61ea0584 100644 --- a/server/__tests__/ws-handler-v2.test.ts +++ b/server/__tests__/ws-handler-v2.test.ts @@ -63,6 +63,7 @@ import { isHelloHandshake, dispatchV2Message, getOwnerConnection, + detectStateMismatch, type V2HandlerContext, } from '../ws-handler-v2.js'; import { NativeCommandRegistry } from '../native-commands.js'; @@ -84,6 +85,8 @@ function mockEventStore() { return { getEventsAfter: vi.fn().mockReturnValue([]), getSession: vi.fn().mockReturnValue(null), + getSessionState: vi.fn().mockReturnValue(null), + setSessionState: vi.fn(), }; } @@ -2742,3 +2745,219 @@ describe('dispatchV2Message session_suspend', () => { expect(sessionReg.suspend).toHaveBeenCalledWith('conn-1:sess-1', 5); }); }); + +// ─── detectStateMismatch ──────────────────────────────────────────────────── + +describe('detectStateMismatch', () => { + it('returns no mismatch when registry and store agree (both absent)', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue(null); + store.getSessionState.mockReturnValue(null); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(false); + }); + + it('returns no mismatch when registry and store agree (ENDED + absent)', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue(null); + store.getSessionState.mockReturnValue('ENDED'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(false); + }); + + it('returns no mismatch when ACTIVE + attached', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(true); + store.getSessionState.mockReturnValue('ACTIVE'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(false); + }); + + it('returns no mismatch when DETACHED + not attached', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(false); + store.getSessionState.mockReturnValue('DETACHED'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(false); + }); + + it('returns no mismatch when SUSPENDED + not attached', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(false); + store.getSessionState.mockReturnValue('SUSPENDED'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(false); + }); + + it('detects registry has session but state=ENDED', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + store.getSessionState.mockReturnValue('ENDED'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(true); + expect(result.details).toContain('registry has session but state=ENDED'); + }); + + it('detects registry has session but state=null', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + store.getSessionState.mockReturnValue(null); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(true); + expect(result.details).toContain('registry has session but state=null'); + }); + + it('detects registry missing but state=ACTIVE', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue(null); + store.getSessionState.mockReturnValue('ACTIVE'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(true); + expect(result.details).toContain('registry missing session but state=ACTIVE'); + }); + + it('detects attached transport but DETACHED state', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(true); + store.getSessionState.mockReturnValue('DETACHED'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(true); + expect(result.details).toContain('transport attached but state=DETACHED'); + }); + + it('detects attached transport but SUSPENDED state', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(true); + store.getSessionState.mockReturnValue('SUSPENDED'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(true); + expect(result.details).toContain('transport attached but state=SUSPENDED'); + }); + + it('detects detached transport but ACTIVE state', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(false); + store.getSessionState.mockReturnValue('ACTIVE'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(true); + expect(result.details).toContain('transport detached but state=ACTIVE'); + }); + + it('detects detached transport but CLOSING state', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(false); + store.getSessionState.mockReturnValue('CLOSING'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(true); + expect(result.details).toContain('transport detached but state=CLOSING'); + }); + + it('allows STARTING state in registry regardless of attach state', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(true); + store.getSessionState.mockReturnValue('STARTING'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(false); + }); + + it('allows CREATED state in registry regardless of attach state', () => { + const reg = mockSessionRegistry(); + const store = mockEventStore(); + reg.findBySessionId.mockReturnValue({ clientId: 'c1', session: {} }); + reg.isAttached.mockReturnValue(true); + store.getSessionState.mockReturnValue('CREATED'); + + const result = detectStateMismatch( + 'sess-1', + reg as unknown as V2HandlerContext['sessionRegistry'], + store as unknown as V2HandlerContext['eventStore'], + ); + expect(result.mismatch).toBe(false); + }); +}); diff --git a/server/app.ts b/server/app.ts index 1389d7bc..9e31f4d9 100644 --- a/server/app.ts +++ b/server/app.ts @@ -544,6 +544,10 @@ app.post('/api/sessions/suspend', (req, res) => { if (ownerConnection !== connectionId) continue; registry.suspend(found.clientId, lastSeq); + eventStore.setSessionState(entry.sessionId, 'SUSPENDED', { + clientId: found.clientId, + reason: 'ios_background_rest', + }); log.info('session suspended via REST', { connectionId, sessionId: entry.sessionId, diff --git a/server/chat.ts b/server/chat.ts index 819914f9..ccd6d557 100644 --- a/server/chat.ts +++ b/server/chat.ts @@ -684,6 +684,12 @@ async function _startChatInner( session.inputQueue = inputQueue as { push: (msg: unknown) => void; close: () => void }; _onSessionChange?.(clientId, 'start'); + // Session state machine: mark CREATED (Phase 1 — write only, no behavior change) + const stateSessionId = options.resume ?? session.sessionId; + if (stateSessionId) { + eventStore.setSessionState(stateSessionId, 'CREATED', { clientId }); + } + // Copy all repo worktrees into the session for cleanup tracking for (const [name, info] of repoWorktrees) { session.worktreePaths.set(name, info); @@ -915,6 +921,12 @@ async function _startChatInner( session.queryInstance = q; + // Session state machine: mark STARTING (query allocated, waiting for first SDK event) + const startingSessionId = options.resume ?? session.sessionId; + if (startingSessionId) { + eventStore.setSessionState(startingSessionId, 'STARTING', { clientId }); + } + // For resumed sessions the prompt is sent to the SDK but was never stored // in the event store — making user messages invisible after WS reconnect. // Store and echo it here so the frontend can replay it. @@ -1178,6 +1190,13 @@ function _closeoutSessionInner(clientId: string): void { return; } + if (session.sessionId) { + eventStore.setSessionState(session.sessionId, 'CLOSING', { + clientId, + reason: 'detach_ttl_closeout', + }); + } + log.info('injecting closeout prompt', { clientId, wtId: session.wtId }); // Push the closeout prompt as an interrupt so the agent sees it immediately @@ -1312,12 +1331,27 @@ export function stopChat(clientId: string) { } export function detachChat(clientId: string) { withSpan('session.detach', { 'session.clientId': clientId }, () => { + const session = registry.get(clientId); registry.detach(clientId); + if (session?.sessionId) { + eventStore.setSessionState(session.sessionId, 'DETACHED', { + clientId, + reason: 'transport_close', + }); + } }); } export function reattachChat(clientId: string, transport: SessionTransport): boolean { return withSpan('session.reattach', { 'session.clientId': clientId }, () => { - return registry.reattach(clientId, transport); + const session = registry.get(clientId); + const ok = registry.reattach(clientId, transport); + if (ok && session?.sessionId) { + eventStore.setSessionState(session.sessionId, 'ACTIVE', { + clientId, + reason: 'reattach', + }); + } + return ok; }); } export function rekeyChat(oldClientId: string, newClientId: string): boolean { diff --git a/server/query-loop.ts b/server/query-loop.ts index 9bcd7906..f416306d 100644 --- a/server/query-loop.ts +++ b/server/query-loop.ts @@ -407,6 +407,11 @@ async function _runQueryLoopInner( if (!firstEventReceived) { firstEventReceived = true; clearTimeout(firstEventTimer); + // Session state machine: mark ACTIVE on first SDK event (resume path) + const sid = resolvedSessionId || registry.get(clientId)?.sessionId; + if (store && sid) { + store.setSessionState(sid, 'ACTIVE', { clientId, reason: 'first_sdk_event' }); + } } const currentSession = registry.get(clientId); if (!currentSession) break; @@ -1352,6 +1357,10 @@ async function _runQueryLoopInner( // Mark session as inactive in durable store if (store && resolvedSessionId) { store.markSessionInactive(resolvedSessionId); + store.setSessionState(resolvedSessionId, 'ENDED', { + clientId, + reason: caughtError ? 'error' : 'completed', + }); } // Clean up any open subagent spans (ERROR if catch was entered) const subagentCleanupStatus = caughtError ? SpanStatusCode.ERROR : SpanStatusCode.OK; diff --git a/server/ws-handler-v2.ts b/server/ws-handler-v2.ts index e56969d3..ae34206d 100644 --- a/server/ws-handler-v2.ts +++ b/server/ws-handler-v2.ts @@ -94,6 +94,65 @@ export function getOwnerConnection(clientId: string): string { return colonIdx === -1 ? clientId : clientId.slice(0, colonIdx); } +// ─── State mismatch detection (Phase 2) ───────────────────────────────────── + +export interface StateMismatchResult { + mismatch: boolean; + details?: string; +} + +/** + * Detect inconsistencies between in-memory SessionRegistry and durable + * EventStore session state. Runs on every send/interrupt to surface bugs + * before Phase 3 replaces routing logic. + */ +export function detectStateMismatch( + sessionId: string, + registry: SessionRegistry, + store: EventStore, +): StateMismatchResult { + const found = registry.findBySessionId(sessionId); + const state = store.getSessionState(sessionId); + + const registryHas = !!found; + const shouldHave = state != null && state !== 'ENDED'; + + if (registryHas && !shouldHave) { + return { + mismatch: true, + details: `registry has session but state=${state ?? 'null'}`, + }; + } + + if (!registryHas && shouldHave) { + return { + mismatch: true, + details: `registry missing session but state=${state}`, + }; + } + + if (found && state) { + const attached = registry.isAttached(found.clientId); + const shouldBeAttached = state === 'ACTIVE' || state === 'CLOSING'; + const shouldBeDetached = state === 'DETACHED' || state === 'SUSPENDED'; + + if (attached && shouldBeDetached) { + return { + mismatch: true, + details: `transport attached but state=${state}`, + }; + } + if (!attached && shouldBeAttached) { + return { + mismatch: true, + details: `transport detached but state=${state}`, + }; + } + } + + return { mismatch: false }; +} + // ─── Handlers ──────────────────────────────────────────────────────────────── export function handleHello( @@ -381,6 +440,21 @@ export function handleSendV2( if (sessionId) { const found = ctx.sessionRegistry.findBySessionId(sessionId); + const storeState = ctx.eventStore.getSessionState(sessionId); + + // Phase 2: detect state mismatches (observability only) + const mismatch = detectStateMismatch(sessionId, ctx.sessionRegistry, ctx.eventStore); + if (mismatch.mismatch) { + log.error('session state mismatch detected (send)', { + connectionId, + sessionId, + storeState, + registryHas: !!found, + details: mismatch.details, + }); + span.setAttribute('session.state_mismatch', mismatch.details ?? 'unknown'); + } + if (found && isActive(found.clientId)) { const storeMeta = ctx.eventStore.getSession(sessionId); const staleInMemory = storeMeta && !storeMeta.isActive; @@ -390,6 +464,7 @@ export function handleSendV2( connectionId, sessionId, clientId: found.clientId, + storeState, }); ctx.sessionRegistry.remove(found.clientId); } else { @@ -417,12 +492,14 @@ export function handleSendV2( sessionId, oldOwner: ownerConnection, newClientId: activeClientId, + storeState, }); } else if (isDetached) { reattachChat(found.clientId, transport); log.info('reattached own detached session on send', { connectionId, sessionId, + storeState, }); } applySkillPolicy(activeClientId); @@ -509,6 +586,19 @@ export function handleInterruptV2( if (!found) return; let activeClientId = found.clientId; + const storeState = ctx.eventStore.getSessionState(msg.sessionId); + + // Phase 2: detect state mismatches (observability only) + const mismatch = detectStateMismatch(msg.sessionId, ctx.sessionRegistry, ctx.eventStore); + if (mismatch.mismatch) { + log.error('session state mismatch detected (interrupt)', { + connectionId, + sessionId: msg.sessionId, + storeState, + registryHas: true, + details: mismatch.details, + }); + } if (isActive(found.clientId)) { const storeMeta = ctx.eventStore.getSession(msg.sessionId); @@ -519,6 +609,7 @@ export function handleInterruptV2( connectionId, sessionId: msg.sessionId, clientId: found.clientId, + storeState, }); ctx.sessionRegistry.remove(found.clientId); } else { @@ -545,6 +636,7 @@ export function handleInterruptV2( sessionId: msg.sessionId, oldOwner: ownerConnection, newClientId: activeClientId, + storeState, }); } else if (isDetached) { reattachChat(found.clientId, transport); @@ -658,6 +750,10 @@ export function handleSessionSuspend( } ctx.sessionRegistry.suspend(found.clientId, entry.lastSeq); + ctx.eventStore.setSessionState(entry.sessionId, 'SUSPENDED', { + clientId: found.clientId, + reason: 'ios_background', + }); log.info('session suspended', { connectionId, sessionId: entry.sessionId,