Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion src/functions/evict.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ISdk } from "iii-sdk";
import type {
Session,
CompressedObservation,
RawObservation,
SessionSummary,
Memory,
} from "../types.js";
Expand Down Expand Up @@ -36,6 +37,61 @@ interface EvictionStats {
dryRun: boolean;
}

function isValidRecoveryResult(result: unknown): boolean {
if (!result || typeof result !== "object") return false;
if (!("success" in result)) return true;
return (result as { success?: unknown }).success !== false;
}

function isCompressedObservation(
observation: CompressedObservation | RawObservation,
): observation is CompressedObservation {
return (
"title" in observation &&
typeof observation.title === "string" &&
observation.title.length > 0
);
}

async function recoverStaleSession(
sdk: ISdk,
sessionId: string,
): Promise<boolean> {
try {
const result = await sdk.trigger({
function_id: "event::session::stopped",
payload: { sessionId },
});
if (!isValidRecoveryResult(result)) {
logger.warn("Stale session recovery failed", {
sessionId,
result,
});
return false;
}
return true;
} catch (err) {
logger.warn("Stale session recovery failed", {
sessionId,
error: err instanceof Error ? err.message : String(err),
});
return false;
}
}

async function runRecoveredSessionConsolidation(sdk: ISdk): Promise<void> {
try {
await sdk.trigger({
function_id: "mem::consolidate-pipeline",
payload: { tier: "all" },
});
} catch (err) {
logger.warn("Recovered session consolidation failed", {
error: err instanceof Error ? err.message : String(err),
});
}
}

export function registerEvictFunction(sdk: ISdk, kv: StateKV): void {
sdk.registerFunction("mem::evict",
async (data: { dryRun?: boolean }): Promise<EvictionStats> => {
Expand All @@ -57,6 +113,7 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void {
dryRun,
};

let recoveredStaleSessions = 0;
const sessions = await kv.list<Session>(KV.sessions).catch(() => []);
const summaries = await kv
.list<SessionSummary>(KV.summaries)
Expand All @@ -71,6 +128,34 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void {
if (dryRun) {
stats.staleSessions++;
} else {
const observations = await kv
.list<CompressedObservation | RawObservation>(
KV.observations(session.id),
)
.catch((err) => {
logger.warn("Stale session observation scan failed", {
sessionId: session.id,
error: err instanceof Error ? err.message : String(err),
});
return null;
});
if (!observations) continue;

let recovered = false;
const hasCompressedObservations = observations.some(
isCompressedObservation,
);
if (hasCompressedObservations) {
recovered = await recoverStaleSession(sdk, session.id);
if (!recovered) continue;
recoveredStaleSessions++;
} else if (observations.length > 0) {
logger.warn("Stale session has no compressed observations", {
sessionId: session.id,
});
continue;
}
Comment on lines +142 to +157
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect CompressedObservation/observation shapes and any place that constructs them without a title.
rg -nP --type=ts -C3 '\btitle\s*[:?]\s*' src/types.ts src/state || true
ast-grep --pattern $'interface CompressedObservation {
  $$$
}'
rg -nP --type=ts -C2 'title\s*:' -g 'src/functions/**' -g '!**/*.test.ts'
# Also check how observations are produced before vs after session::stopped.
rg -nP --type=ts -C3 'observations?\s*\(' -g 'src/functions/**'

Repository: rohitg00/agentmemory

Length of output: 38755


🏁 Script executed:

#!/bin/bash
# Find RawObservation interface definition
rg -nP --type=ts 'interface RawObservation' src/
# Also check for type RawObservation
rg -nP --type=ts 'type RawObservation' src/
# Look for places where RawObservation is created
rg -nP --type=ts 'RawObservation' src/types.ts -A 15 -B 2

Repository: rohitg00/agentmemory

Length of output: 527


Reverse the compressed-observation recovery gate—it deletes exactly the sessions that need recovery most.

The KV store can contain both RawObservation and CompressedObservation objects (confirmed by observe.ts storing both, and replay.ts typing the same location as RawObservation | CompressedObservation). RawObservation has no title field, while CompressedObservation requires it.

The current check observations.some((o) => o.title) therefore:

  • Returns false for sessions containing only raw observations (where session::stopped never fired—exactly the crash/disconnect cases that need recovery per issue #308)
  • Returns true for sessions that already have compressed observations (which mean session::stopped likely already ran)

This is backwards. Sessions with only raw observations are silently deleted without any recovery attempt. Invert the condition to recover sessions without compressed observations:

const hasOnlyRawObservations = !observations.some((o) => o.title);
if (hasOnlyRawObservations) {
  const recovered = await recoverStaleSession(sdk, session.id);
  if (!recovered) continue;
  recoveredStaleSessions++;
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/functions/evict.ts` around lines 129 - 136, The recovery gate is
inverted: currently you call recoverStaleSession only when observations.some((o)
=> o.title) is true, but that detects compressed observations; instead detect
sessions missing compressed observations (i.e., only RawObservation) and call
recoverStaleSession for those. Change the logic around observations and
hasCompressedObservations so you compute whether there are no compressed
observations (e.g., !observations.some(o => o.title)) and only then invoke
recoverStaleSession(sdk, session.id) and increment recoveredStaleSessions when
it returns true.


try {
await kv.delete(KV.sessions, session.id);
stats.staleSessions++;
Expand All @@ -84,13 +169,19 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void {
}
await recordAudit(kv, "delete", "mem::evict", [session.id], {
resource: "session",
reason: "stale_session_without_summary",
reason: recovered
? "stale_session_recovered_then_evicted"
: "stale_session_without_summary",
dryRun,
});
}
}
}

if (!dryRun && recoveredStaleSessions > 0) {
await runRecoveredSessionConsolidation(sdk);
}

const projectObs = new Map<string, CompressedObservation[]>();
for (const session of sessions) {
const obs = await kv
Expand Down
240 changes: 240 additions & 0 deletions test/evict.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import { describe, expect, it, vi } from "vitest";
import type {
CompressedObservation,
RawObservation,
Session,
} from "../src/types.js";
import { registerEvictFunction } from "../src/functions/evict.js";
import { KV } from "../src/state/schema.js";

vi.mock("../src/logger.js", () => ({
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() },
}));

type Store = Map<string, Map<string, unknown>>;
type Handler = (payload: unknown) => unknown | Promise<unknown>;

function daysAgo(days: number): string {
return new Date(Date.now() - days * 24 * 60 * 60 * 1000).toISOString();
}

function makeSession(id: string): Session {
return {
id,
project: "agentmemory",
cwd: "/repo/agentmemory",
startedAt: daysAgo(31),
status: "active",
observationCount: 1,
};
}

function makeObservation(sessionId: string): CompressedObservation {
return {
id: "obs_1",
sessionId,
timestamp: daysAgo(31),
type: "decision",
title: "Chose sqlite storage",
facts: ["Use sqlite for local state"],
narrative: "The session chose sqlite for local state.",
concepts: ["sqlite"],
files: ["src/state/kv.ts"],
importance: 8,
};
}

function makeRawObservation(sessionId: string): RawObservation {
return {
id: "raw_1",
sessionId,
timestamp: daysAgo(31),
hookType: "post_tool_use",
toolName: "Edit",
raw: { file_path: "src/state/kv.ts" },
};
}

function mockKV(store: Store, listFailures: Set<string> = new Set()) {
return {
get: async <T>(scope: string, key: string): Promise<T | null> =>
(store.get(scope)?.get(key) as T) ?? null,
set: async <T>(scope: string, key: string, data: T): Promise<T> => {
if (!store.has(scope)) store.set(scope, new Map());
store.get(scope)!.set(key, data);
return data;
},
delete: async (scope: string, key: string): Promise<void> => {
store.get(scope)?.delete(key);
},
list: async <T>(scope: string): Promise<T[]> => {
if (listFailures.has(scope)) {
throw new Error(`list failed for ${scope}`);
}
const entries = store.get(scope);
return entries ? (Array.from(entries.values()) as T[]) : [];
},
};
}

function mockSdk() {
const handlers = new Map<string, Handler>();
const calls: Array<{ function_id: string; payload: unknown }> = [];
return {
calls,
sdk: {
registerFunction: (functionId: string, handler: Handler) => {
handlers.set(functionId, handler);
},
trigger: async (input: { function_id: string; payload: unknown }) => {
calls.push(input);
const handler = handlers.get(input.function_id);
if (!handler) throw new Error(`missing handler: ${input.function_id}`);
return handler(input.payload);
},
},
};
}

function storeForObservations(
sessionId: string,
observations: Array<CompressedObservation | RawObservation>,
): Store {
const session = makeSession(sessionId);
return new Map([
[KV.sessions, new Map([[session.id, session]])],
[KV.summaries, new Map()],
[
KV.observations(session.id),
new Map(observations.map((observation) => [observation.id, observation])),
],
[KV.config, new Map()],
[KV.audit, new Map()],
]);
}

function storeForObservedSession(sessionId: string): Store {
return storeForObservations(sessionId, [makeObservation(sessionId)]);
}

describe("mem::evict stale sessions", () => {
it("runs session recovery before deleting a stale observed session", async () => {
const sessionId = "ses_stale";
const store = storeForObservedSession(sessionId);
const kv = mockKV(store);
const { sdk, calls } = mockSdk();

registerEvictFunction(sdk as never, kv as never);
sdk.registerFunction("event::session::stopped", async (payload) => {
expect(payload).toEqual({ sessionId });
expect(await kv.get(KV.sessions, sessionId)).toMatchObject({
id: sessionId,
});
return { success: true };
});
sdk.registerFunction("mem::consolidate-pipeline", () => ({
success: true,
}));

const result = (await sdk.trigger({
function_id: "mem::evict",
payload: {},
})) as { staleSessions: number };

expect(result.staleSessions).toBe(1);
expect(await kv.get(KV.sessions, sessionId)).toBeNull();
const audits = await kv.list<{
details: { reason: string };
}>(KV.audit);
expect(audits[0].details.reason).toBe(
"stale_session_recovered_then_evicted",
);
expect(calls.map((call) => call.function_id)).toContain(
"event::session::stopped",
);
expect(calls.map((call) => call.function_id)).toContain(
"mem::consolidate-pipeline",
);
});

it("keeps a stale observed session when recovery fails", async () => {
const sessionId = "ses_unrecovered";
const store = storeForObservedSession(sessionId);
const kv = mockKV(store);
const { sdk, calls } = mockSdk();

registerEvictFunction(sdk as never, kv as never);
sdk.registerFunction("event::session::stopped", () => ({
success: false,
error: "no_provider",
}));

const result = (await sdk.trigger({
function_id: "mem::evict",
payload: {},
})) as { staleSessions: number };

expect(result.staleSessions).toBe(0);
expect(await kv.get(KV.sessions, sessionId)).toMatchObject({
id: sessionId,
});
expect(calls.map((call) => call.function_id)).toContain(
"event::session::stopped",
);
expect(calls.map((call) => call.function_id)).not.toContain(
"mem::consolidate-pipeline",
);
});

it("keeps a stale session when observation scanning fails", async () => {
const sessionId = "ses_scan_failed";
const store = storeForObservedSession(sessionId);
const kv = mockKV(store, new Set([KV.observations(sessionId)]));
const { sdk, calls } = mockSdk();

registerEvictFunction(sdk as never, kv as never);
sdk.registerFunction("event::session::stopped", () => ({
success: true,
}));

const result = (await sdk.trigger({
function_id: "mem::evict",
payload: {},
})) as { staleSessions: number };

expect(result.staleSessions).toBe(0);
expect(await kv.get(KV.sessions, sessionId)).toMatchObject({
id: sessionId,
});
expect(calls.map((call) => call.function_id)).not.toContain(
"event::session::stopped",
);
});

it("keeps a stale session that only has raw observations", async () => {
const sessionId = "ses_raw_only";
const store = storeForObservations(sessionId, [
makeRawObservation(sessionId),
]);
const kv = mockKV(store);
const { sdk, calls } = mockSdk();

registerEvictFunction(sdk as never, kv as never);
sdk.registerFunction("event::session::stopped", () => ({
success: true,
}));

const result = (await sdk.trigger({
function_id: "mem::evict",
payload: {},
})) as { staleSessions: number };

expect(result.staleSessions).toBe(0);
expect(await kv.get(KV.sessions, sessionId)).toMatchObject({
id: sessionId,
});
expect(calls.map((call) => call.function_id)).not.toContain(
"event::session::stopped",
);
});
});