From 5dbd49a610c94b945e2cd431fa8b245fe4fb93d2 Mon Sep 17 00:00:00 2001 From: laplace young Date: Fri, 15 May 2026 13:47:25 +0800 Subject: [PATCH 1/2] fix: recover stale sessions before eviction Constraint: preserve recoverable session insights during stale cleanup Confidence: high Scope-risk: narrow Tested: npm test -- test/evict.test.ts --reporter=dot; npx tsdown; git diff --check Tested: npm test -- --reporter=dot (fails on existing Windows path-sensitive compress-file and obsidian-export tests) Signed-off-by: laplace young --- src/functions/evict.ts | 68 ++++++++++++++++++ test/evict.test.ts | 155 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 test/evict.test.ts diff --git a/src/functions/evict.ts b/src/functions/evict.ts index 46b1845a..778edb2c 100644 --- a/src/functions/evict.ts +++ b/src/functions/evict.ts @@ -36,6 +36,51 @@ interface EvictionStats { dryRun: boolean; } +function triggerRecoveredSession(result: unknown): boolean { + if (!result || typeof result !== "object") return false; + if (!("success" in result)) return true; + return (result as { success?: unknown }).success !== false; +} + +async function recoverStaleSession( + sdk: ISdk, + sessionId: string, +): Promise { + try { + const result = await sdk.trigger({ + function_id: "event::session::stopped", + payload: { sessionId }, + }); + if (!triggerRecoveredSession(result)) { + logger.warn("Stale session recovery failed", { + sessionId, + result, + }); + return false; + } + return true; + } catch (err) { + logger.warn("Stale session recovery failed", { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + return false; + } +} + +async function runRecoveredSessionConsolidation(sdk: ISdk): Promise { + try { + await sdk.trigger({ + function_id: "mem::consolidate-pipeline", + payload: { tier: "all" }, + }); + } catch (err) { + logger.warn("Recovered session consolidation failed", { + error: err instanceof Error ? err.message : String(err), + }); + } +} + export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { sdk.registerFunction("mem::evict", async (data: { dryRun?: boolean }): Promise => { @@ -57,6 +102,7 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { dryRun, }; + let recoveredStaleSessions = 0; const sessions = await kv.list(KV.sessions).catch(() => []); const summaries = await kv .list(KV.summaries) @@ -71,6 +117,24 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { if (dryRun) { stats.staleSessions++; } else { + const observations = await kv + .list(KV.observations(session.id)) + .catch((err) => { + logger.warn("Stale session observation scan failed", { + sessionId: session.id, + error: err instanceof Error ? err.message : String(err), + }); + return null; + }); + if (!observations) continue; + + const hasCompressedObservations = observations.some((o) => o.title); + if (hasCompressedObservations) { + const recovered = await recoverStaleSession(sdk, session.id); + if (!recovered) continue; + recoveredStaleSessions++; + } + try { await kv.delete(KV.sessions, session.id); stats.staleSessions++; @@ -91,6 +155,10 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { } } + if (!dryRun && recoveredStaleSessions > 0) { + await runRecoveredSessionConsolidation(sdk); + } + const projectObs = new Map(); for (const session of sessions) { const obs = await kv diff --git a/test/evict.test.ts b/test/evict.test.ts new file mode 100644 index 00000000..152b3ba2 --- /dev/null +++ b/test/evict.test.ts @@ -0,0 +1,155 @@ +import { describe, expect, it, vi } from "vitest"; +import type { CompressedObservation, Session } from "../src/types.js"; +import { registerEvictFunction } from "../src/functions/evict.js"; +import { KV } from "../src/state/schema.js"; + +vi.mock("../src/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +type Store = Map>; +type Handler = (payload: unknown) => unknown | Promise; + +function daysAgo(days: number): string { + return new Date(Date.now() - days * 24 * 60 * 60 * 1000).toISOString(); +} + +function makeSession(id: string): Session { + return { + id, + project: "agentmemory", + cwd: "/repo/agentmemory", + startedAt: daysAgo(31), + status: "active", + observationCount: 1, + }; +} + +function makeObservation(sessionId: string): CompressedObservation { + return { + id: "obs_1", + sessionId, + timestamp: daysAgo(31), + type: "decision", + title: "Chose sqlite storage", + facts: ["Use sqlite for local state"], + narrative: "The session chose sqlite for local state.", + concepts: ["sqlite"], + files: ["src/state/kv.ts"], + importance: 8, + }; +} + +function mockKV(store: Store) { + return { + get: async (scope: string, key: string): Promise => + (store.get(scope)?.get(key) as T) ?? null, + set: async (scope: string, key: string, data: T): Promise => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, data); + return data; + }, + delete: async (scope: string, key: string): Promise => { + store.get(scope)?.delete(key); + }, + list: async (scope: string): Promise => { + const entries = store.get(scope); + return entries ? (Array.from(entries.values()) as T[]) : []; + }, + }; +} + +function mockSdk() { + const handlers = new Map(); + const calls: Array<{ function_id: string; payload: unknown }> = []; + return { + calls, + sdk: { + registerFunction: (functionId: string, handler: Handler) => { + handlers.set(functionId, handler); + }, + trigger: async (input: { function_id: string; payload: unknown }) => { + calls.push(input); + const handler = handlers.get(input.function_id); + if (!handler) throw new Error(`missing handler: ${input.function_id}`); + return handler(input.payload); + }, + }, + }; +} + +function storeForObservedSession(sessionId: string): Store { + const session = makeSession(sessionId); + const observation = makeObservation(sessionId); + return new Map([ + [KV.sessions, new Map([[session.id, session]])], + [KV.summaries, new Map()], + [KV.observations(session.id), new Map([[observation.id, observation]])], + [KV.config, new Map()], + [KV.audit, new Map()], + ]); +} + +describe("mem::evict stale sessions", () => { + it("runs session recovery before deleting a stale observed session", async () => { + const sessionId = "ses_stale"; + const store = storeForObservedSession(sessionId); + const kv = mockKV(store); + const { sdk, calls } = mockSdk(); + + registerEvictFunction(sdk as never, kv as never); + sdk.registerFunction("event::session::stopped", async (payload) => { + expect(payload).toEqual({ sessionId }); + expect(await kv.get(KV.sessions, sessionId)).toMatchObject({ + id: sessionId, + }); + return { success: true }; + }); + sdk.registerFunction("mem::consolidate-pipeline", () => ({ + success: true, + })); + + const result = (await sdk.trigger({ + function_id: "mem::evict", + payload: {}, + })) as { staleSessions: number }; + + expect(result.staleSessions).toBe(1); + expect(await kv.get(KV.sessions, sessionId)).toBeNull(); + expect(calls.map((call) => call.function_id)).toContain( + "event::session::stopped", + ); + expect(calls.map((call) => call.function_id)).toContain( + "mem::consolidate-pipeline", + ); + }); + + it("keeps a stale observed session when recovery fails", async () => { + const sessionId = "ses_unrecovered"; + const store = storeForObservedSession(sessionId); + const kv = mockKV(store); + const { sdk, calls } = mockSdk(); + + registerEvictFunction(sdk as never, kv as never); + sdk.registerFunction("event::session::stopped", () => ({ + success: false, + error: "no_provider", + })); + + const result = (await sdk.trigger({ + function_id: "mem::evict", + payload: {}, + })) as { staleSessions: number }; + + expect(result.staleSessions).toBe(0); + expect(await kv.get(KV.sessions, sessionId)).toMatchObject({ + id: sessionId, + }); + expect(calls.map((call) => call.function_id)).toContain( + "event::session::stopped", + ); + expect(calls.map((call) => call.function_id)).not.toContain( + "mem::consolidate-pipeline", + ); + }); +}); From d2ff913650aa0d3cf4c7d929e5deec45475ddfe0 Mon Sep 17 00:00:00 2001 From: laplace young Date: Fri, 15 May 2026 13:55:18 +0800 Subject: [PATCH 2/2] fix: refine stale session recovery guard Constraint: preserve raw-only stale sessions until a compression recovery path exists Confidence: high Scope-risk: narrow Tested: npm test -- test/evict.test.ts --reporter=dot; npx tsdown; git diff --check Signed-off-by: laplace young --- src/functions/evict.ts | 35 +++++++++++++--- test/evict.test.ts | 95 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 119 insertions(+), 11 deletions(-) diff --git a/src/functions/evict.ts b/src/functions/evict.ts index 778edb2c..626f4ff1 100644 --- a/src/functions/evict.ts +++ b/src/functions/evict.ts @@ -2,6 +2,7 @@ import type { ISdk } from "iii-sdk"; import type { Session, CompressedObservation, + RawObservation, SessionSummary, Memory, } from "../types.js"; @@ -36,12 +37,22 @@ interface EvictionStats { dryRun: boolean; } -function triggerRecoveredSession(result: unknown): boolean { +function isValidRecoveryResult(result: unknown): boolean { if (!result || typeof result !== "object") return false; if (!("success" in result)) return true; return (result as { success?: unknown }).success !== false; } +function isCompressedObservation( + observation: CompressedObservation | RawObservation, +): observation is CompressedObservation { + return ( + "title" in observation && + typeof observation.title === "string" && + observation.title.length > 0 + ); +} + async function recoverStaleSession( sdk: ISdk, sessionId: string, @@ -51,7 +62,7 @@ async function recoverStaleSession( function_id: "event::session::stopped", payload: { sessionId }, }); - if (!triggerRecoveredSession(result)) { + if (!isValidRecoveryResult(result)) { logger.warn("Stale session recovery failed", { sessionId, result, @@ -118,7 +129,9 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { stats.staleSessions++; } else { const observations = await kv - .list(KV.observations(session.id)) + .list( + KV.observations(session.id), + ) .catch((err) => { logger.warn("Stale session observation scan failed", { sessionId: session.id, @@ -128,11 +141,19 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { }); if (!observations) continue; - const hasCompressedObservations = observations.some((o) => o.title); + let recovered = false; + const hasCompressedObservations = observations.some( + isCompressedObservation, + ); if (hasCompressedObservations) { - const recovered = await recoverStaleSession(sdk, session.id); + recovered = await recoverStaleSession(sdk, session.id); if (!recovered) continue; recoveredStaleSessions++; + } else if (observations.length > 0) { + logger.warn("Stale session has no compressed observations", { + sessionId: session.id, + }); + continue; } try { @@ -148,7 +169,9 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void { } await recordAudit(kv, "delete", "mem::evict", [session.id], { resource: "session", - reason: "stale_session_without_summary", + reason: recovered + ? "stale_session_recovered_then_evicted" + : "stale_session_without_summary", dryRun, }); } diff --git a/test/evict.test.ts b/test/evict.test.ts index 152b3ba2..d1aedac2 100644 --- a/test/evict.test.ts +++ b/test/evict.test.ts @@ -1,5 +1,9 @@ import { describe, expect, it, vi } from "vitest"; -import type { CompressedObservation, Session } from "../src/types.js"; +import type { + CompressedObservation, + RawObservation, + Session, +} from "../src/types.js"; import { registerEvictFunction } from "../src/functions/evict.js"; import { KV } from "../src/state/schema.js"; @@ -40,7 +44,18 @@ function makeObservation(sessionId: string): CompressedObservation { }; } -function mockKV(store: Store) { +function makeRawObservation(sessionId: string): RawObservation { + return { + id: "raw_1", + sessionId, + timestamp: daysAgo(31), + hookType: "post_tool_use", + toolName: "Edit", + raw: { file_path: "src/state/kv.ts" }, + }; +} + +function mockKV(store: Store, listFailures: Set = new Set()) { return { get: async (scope: string, key: string): Promise => (store.get(scope)?.get(key) as T) ?? null, @@ -53,6 +68,9 @@ function mockKV(store: Store) { store.get(scope)?.delete(key); }, list: async (scope: string): Promise => { + if (listFailures.has(scope)) { + throw new Error(`list failed for ${scope}`); + } const entries = store.get(scope); return entries ? (Array.from(entries.values()) as T[]) : []; }, @@ -78,18 +96,27 @@ function mockSdk() { }; } -function storeForObservedSession(sessionId: string): Store { +function storeForObservations( + sessionId: string, + observations: Array, +): Store { const session = makeSession(sessionId); - const observation = makeObservation(sessionId); return new Map([ [KV.sessions, new Map([[session.id, session]])], [KV.summaries, new Map()], - [KV.observations(session.id), new Map([[observation.id, observation]])], + [ + KV.observations(session.id), + new Map(observations.map((observation) => [observation.id, observation])), + ], [KV.config, new Map()], [KV.audit, new Map()], ]); } +function storeForObservedSession(sessionId: string): Store { + return storeForObservations(sessionId, [makeObservation(sessionId)]); +} + describe("mem::evict stale sessions", () => { it("runs session recovery before deleting a stale observed session", async () => { const sessionId = "ses_stale"; @@ -116,6 +143,12 @@ describe("mem::evict stale sessions", () => { expect(result.staleSessions).toBe(1); expect(await kv.get(KV.sessions, sessionId)).toBeNull(); + const audits = await kv.list<{ + details: { reason: string }; + }>(KV.audit); + expect(audits[0].details.reason).toBe( + "stale_session_recovered_then_evicted", + ); expect(calls.map((call) => call.function_id)).toContain( "event::session::stopped", ); @@ -152,4 +185,56 @@ describe("mem::evict stale sessions", () => { "mem::consolidate-pipeline", ); }); + + it("keeps a stale session when observation scanning fails", async () => { + const sessionId = "ses_scan_failed"; + const store = storeForObservedSession(sessionId); + const kv = mockKV(store, new Set([KV.observations(sessionId)])); + const { sdk, calls } = mockSdk(); + + registerEvictFunction(sdk as never, kv as never); + sdk.registerFunction("event::session::stopped", () => ({ + success: true, + })); + + const result = (await sdk.trigger({ + function_id: "mem::evict", + payload: {}, + })) as { staleSessions: number }; + + expect(result.staleSessions).toBe(0); + expect(await kv.get(KV.sessions, sessionId)).toMatchObject({ + id: sessionId, + }); + expect(calls.map((call) => call.function_id)).not.toContain( + "event::session::stopped", + ); + }); + + it("keeps a stale session that only has raw observations", async () => { + const sessionId = "ses_raw_only"; + const store = storeForObservations(sessionId, [ + makeRawObservation(sessionId), + ]); + const kv = mockKV(store); + const { sdk, calls } = mockSdk(); + + registerEvictFunction(sdk as never, kv as never); + sdk.registerFunction("event::session::stopped", () => ({ + success: true, + })); + + const result = (await sdk.trigger({ + function_id: "mem::evict", + payload: {}, + })) as { staleSessions: number }; + + expect(result.staleSessions).toBe(0); + expect(await kv.get(KV.sessions, sessionId)).toMatchObject({ + id: sessionId, + }); + expect(calls.map((call) => call.function_id)).not.toContain( + "event::session::stopped", + ); + }); });