diff --git a/src/functions/evict.ts b/src/functions/evict.ts index 46b1845a..626f4ff1 100644 --- a/src/functions/evict.ts +++ b/src/functions/evict.ts @@ -2,6 +2,7 @@ import type { ISdk } from "iii-sdk"; import type { Session, CompressedObservation, + RawObservation, SessionSummary, Memory, } from "../types.js"; @@ -36,6 +37,61 @@ interface EvictionStats { dryRun: boolean; } +function isValidRecoveryResult(result: unknown): boolean { + if (!result || typeof result !== "object") return false; + if (!("success" in result)) return true; + return (result as { success?: unknown }).success !== false; +} + +function isCompressedObservation( + observation: CompressedObservation | RawObservation, +): observation is CompressedObservation { + return ( + "title" in observation && + typeof observation.title === "string" && + observation.title.length > 0 + ); +} + +async function recoverStaleSession( + sdk: ISdk, + sessionId: string, +): Promise { + try { + const result = await sdk.trigger({ + function_id: "event::session::stopped", + payload: { sessionId }, + }); + if (!isValidRecoveryResult(result)) { + logger.warn("Stale session recovery failed", { + sessionId, + result, + }); + return false; + } + return true; + } catch (err) { + logger.warn("Stale session recovery failed", { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + return false; + } +} + +async function runRecoveredSessionConsolidation(sdk: ISdk): Promise { + try { + await sdk.trigger({ + function_id: "mem::consolidate-pipeline", + payload: { tier: "all" }, + }); + } catch (err) { + logger.warn("Recovered session consolidation failed", { + error: err instanceof Error ? err.message : String(err), + }); + } +} + export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { sdk.registerFunction("mem::evict", async (data: { dryRun?: boolean }): Promise => { @@ -57,6 +113,7 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { dryRun, }; + let recoveredStaleSessions = 0; const sessions = await kv.list(KV.sessions).catch(() => []); const summaries = await kv .list(KV.summaries) @@ -71,6 +128,34 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { if (dryRun) { stats.staleSessions++; } else { + const observations = await kv + .list( + KV.observations(session.id), + ) + .catch((err) => { + logger.warn("Stale session observation scan failed", { + sessionId: session.id, + error: err instanceof Error ? err.message : String(err), + }); + return null; + }); + if (!observations) continue; + + let recovered = false; + const hasCompressedObservations = observations.some( + isCompressedObservation, + ); + if (hasCompressedObservations) { + recovered = await recoverStaleSession(sdk, session.id); + if (!recovered) continue; + recoveredStaleSessions++; + } else if (observations.length > 0) { + logger.warn("Stale session has no compressed observations", { + sessionId: session.id, + }); + continue; + } + try { await kv.delete(KV.sessions, session.id); stats.staleSessions++; @@ -84,13 +169,19 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { } await recordAudit(kv, "delete", "mem::evict", [session.id], { resource: "session", - reason: "stale_session_without_summary", + reason: recovered + ? "stale_session_recovered_then_evicted" + : "stale_session_without_summary", dryRun, }); } } } + if (!dryRun && recoveredStaleSessions > 0) { + await runRecoveredSessionConsolidation(sdk); + } + const projectObs = new Map(); for (const session of sessions) { const obs = await kv diff --git a/test/evict.test.ts b/test/evict.test.ts new file mode 100644 index 00000000..d1aedac2 --- /dev/null +++ b/test/evict.test.ts @@ -0,0 +1,240 @@ +import { describe, expect, it, vi } from "vitest"; +import type { + CompressedObservation, + RawObservation, + Session, +} from "../src/types.js"; +import { registerEvictFunction } from "../src/functions/evict.js"; +import { KV } from "../src/state/schema.js"; + +vi.mock("../src/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +type Store = Map>; +type Handler = (payload: unknown) => unknown | Promise; + +function daysAgo(days: number): string { + return new Date(Date.now() - days * 24 * 60 * 60 * 1000).toISOString(); +} + +function makeSession(id: string): Session { + return { + id, + project: "agentmemory", + cwd: "/repo/agentmemory", + startedAt: daysAgo(31), + status: "active", + observationCount: 1, + }; +} + +function makeObservation(sessionId: string): CompressedObservation { + return { + id: "obs_1", + sessionId, + timestamp: daysAgo(31), + type: "decision", + title: "Chose sqlite storage", + facts: ["Use sqlite for local state"], + narrative: "The session chose sqlite for local state.", + concepts: ["sqlite"], + files: ["src/state/kv.ts"], + importance: 8, + }; +} + +function makeRawObservation(sessionId: string): RawObservation { + return { + id: "raw_1", + sessionId, + timestamp: daysAgo(31), + hookType: "post_tool_use", + toolName: "Edit", + raw: { file_path: "src/state/kv.ts" }, + }; +} + +function mockKV(store: Store, listFailures: Set = new Set()) { + return { + get: async (scope: string, key: string): Promise => + (store.get(scope)?.get(key) as T) ?? null, + set: async (scope: string, key: string, data: T): Promise => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, data); + return data; + }, + delete: async (scope: string, key: string): Promise => { + store.get(scope)?.delete(key); + }, + list: async (scope: string): Promise => { + if (listFailures.has(scope)) { + throw new Error(`list failed for ${scope}`); + } + const entries = store.get(scope); + return entries ? (Array.from(entries.values()) as T[]) : []; + }, + }; +} + +function mockSdk() { + const handlers = new Map(); + const calls: Array<{ function_id: string; payload: unknown }> = []; + return { + calls, + sdk: { + registerFunction: (functionId: string, handler: Handler) => { + handlers.set(functionId, handler); + }, + trigger: async (input: { function_id: string; payload: unknown }) => { + calls.push(input); + const handler = handlers.get(input.function_id); + if (!handler) throw new Error(`missing handler: ${input.function_id}`); + return handler(input.payload); + }, + }, + }; +} + +function storeForObservations( + sessionId: string, + observations: Array, +): Store { + const session = makeSession(sessionId); + return new Map([ + [KV.sessions, new Map([[session.id, session]])], + [KV.summaries, new Map()], + [ + KV.observations(session.id), + new Map(observations.map((observation) => [observation.id, observation])), + ], + [KV.config, new Map()], + [KV.audit, new Map()], + ]); +} + +function storeForObservedSession(sessionId: string): Store { + return storeForObservations(sessionId, [makeObservation(sessionId)]); +} + +describe("mem::evict stale sessions", () => { + it("runs session recovery before deleting a stale observed session", async () => { + const sessionId = "ses_stale"; + const store = storeForObservedSession(sessionId); + const kv = mockKV(store); + const { sdk, calls } = mockSdk(); + + registerEvictFunction(sdk as never, kv as never); + sdk.registerFunction("event::session::stopped", async (payload) => { + expect(payload).toEqual({ sessionId }); + expect(await kv.get(KV.sessions, sessionId)).toMatchObject({ + id: sessionId, + }); + return { success: true }; + }); + sdk.registerFunction("mem::consolidate-pipeline", () => ({ + success: true, + })); + + const result = (await sdk.trigger({ + function_id: "mem::evict", + payload: {}, + })) as { staleSessions: number }; + + expect(result.staleSessions).toBe(1); + expect(await kv.get(KV.sessions, sessionId)).toBeNull(); + const audits = await kv.list<{ + details: { reason: string }; + }>(KV.audit); + expect(audits[0].details.reason).toBe( + "stale_session_recovered_then_evicted", + ); + expect(calls.map((call) => call.function_id)).toContain( + "event::session::stopped", + ); + expect(calls.map((call) => call.function_id)).toContain( + "mem::consolidate-pipeline", + ); + }); + + it("keeps a stale observed session when recovery fails", async () => { + const sessionId = "ses_unrecovered"; + const store = storeForObservedSession(sessionId); + const kv = mockKV(store); + const { sdk, calls } = mockSdk(); + + registerEvictFunction(sdk as never, kv as never); + sdk.registerFunction("event::session::stopped", () => ({ + success: false, + error: "no_provider", + })); + + const result = (await sdk.trigger({ + function_id: "mem::evict", + payload: {}, + })) as { staleSessions: number }; + + expect(result.staleSessions).toBe(0); + expect(await kv.get(KV.sessions, sessionId)).toMatchObject({ + id: sessionId, + }); + expect(calls.map((call) => call.function_id)).toContain( + "event::session::stopped", + ); + expect(calls.map((call) => call.function_id)).not.toContain( + "mem::consolidate-pipeline", + ); + }); + + it("keeps a stale session when observation scanning fails", async () => { + const sessionId = "ses_scan_failed"; + const store = storeForObservedSession(sessionId); + const kv = mockKV(store, new Set([KV.observations(sessionId)])); + const { sdk, calls } = mockSdk(); + + registerEvictFunction(sdk as never, kv as never); + sdk.registerFunction("event::session::stopped", () => ({ + success: true, + })); + + const result = (await sdk.trigger({ + function_id: "mem::evict", + payload: {}, + })) as { staleSessions: number }; + + expect(result.staleSessions).toBe(0); + expect(await kv.get(KV.sessions, sessionId)).toMatchObject({ + id: sessionId, + }); + expect(calls.map((call) => call.function_id)).not.toContain( + "event::session::stopped", + ); + }); + + it("keeps a stale session that only has raw observations", async () => { + const sessionId = "ses_raw_only"; + const store = storeForObservations(sessionId, [ + makeRawObservation(sessionId), + ]); + const kv = mockKV(store); + const { sdk, calls } = mockSdk(); + + registerEvictFunction(sdk as never, kv as never); + sdk.registerFunction("event::session::stopped", () => ({ + success: true, + })); + + const result = (await sdk.trigger({ + function_id: "mem::evict", + payload: {}, + })) as { staleSessions: number }; + + expect(result.staleSessions).toBe(0); + expect(await kv.get(KV.sessions, sessionId)).toMatchObject({ + id: sessionId, + }); + expect(calls.map((call) => call.function_id)).not.toContain( + "event::session::stopped", + ); + }); +});