Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 105 additions & 15 deletions apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ function makeSnapshot(input: {
readonly threadId: ThreadId;
readonly workspaceRoot: string;
readonly worktreePath: string | null;
readonly checkpointTurnCount: number;
readonly checkpointRef: CheckpointRef;
readonly checkpoints: ReadonlyArray<{
readonly turnId: TurnId;
readonly checkpointTurnCount: number;
readonly checkpointRef: CheckpointRef;
}>;
}): OrchestrationReadModel {
return {
snapshotSequence: 0,
Expand Down Expand Up @@ -62,17 +65,15 @@ function makeSnapshot(input: {
messages: [],
activities: [],
proposedPlans: [],
checkpoints: [
{
turnId: TurnId.makeUnsafe("turn-1"),
checkpointTurnCount: input.checkpointTurnCount,
checkpointRef: input.checkpointRef,
status: "ready",
files: [],
assistantMessageId: null,
completedAt: "2026-01-01T00:00:00.000Z",
},
],
checkpoints: input.checkpoints.map((checkpoint) => ({
turnId: checkpoint.turnId,
checkpointTurnCount: checkpoint.checkpointTurnCount,
checkpointRef: checkpoint.checkpointRef,
status: "ready" as const,
files: [],
assistantMessageId: null,
completedAt: "2026-01-01T00:00:00.000Z",
})),
session: null,
},
],
Expand All @@ -96,8 +97,13 @@ describe("CheckpointDiffQueryLive", () => {
threadId,
workspaceRoot: "/tmp/workspace",
worktreePath: null,
checkpointTurnCount: 1,
checkpointRef: toCheckpointRef,
checkpoints: [
{
turnId: TurnId.makeUnsafe("turn-1"),
checkpointTurnCount: 1,
checkpointRef: toCheckpointRef,
},
],
});

const checkpointStore: CheckpointStoreShape = {
Expand Down Expand Up @@ -194,4 +200,88 @@ describe("CheckpointDiffQueryLive", () => {
),
).rejects.toThrow("Thread 'thread-missing' not found.");
});

it("falls back to the nearest earlier checkpoint when an exact turn checkpoint is missing", async () => {
const projectId = ProjectId.makeUnsafe("project-1");
const threadId = ThreadId.makeUnsafe("thread-1");
const checkpointRef2 = checkpointRefForThreadTurn(threadId, 2);
const checkpointRef4 = checkpointRefForThreadTurn(threadId, 4);
const hasCheckpointRefCalls: Array<CheckpointRef> = [];
const diffCheckpointsCalls: Array<{
readonly fromCheckpointRef: CheckpointRef;
readonly toCheckpointRef: CheckpointRef;
readonly cwd: string;
}> = [];

const snapshot = makeSnapshot({
projectId,
threadId,
workspaceRoot: "/tmp/workspace",
worktreePath: null,
checkpoints: [
{
turnId: TurnId.makeUnsafe("turn-2"),
checkpointTurnCount: 2,
checkpointRef: checkpointRef2,
},
{
turnId: TurnId.makeUnsafe("turn-4"),
checkpointTurnCount: 4,
checkpointRef: checkpointRef4,
},
],
});

const checkpointStore: CheckpointStoreShape = {
isGitRepository: () => Effect.succeed(true),
captureCheckpoint: () => Effect.void,
hasCheckpointRef: ({ checkpointRef }) =>
Effect.sync(() => {
hasCheckpointRefCalls.push(checkpointRef);
return true;
}),
restoreCheckpoint: () => Effect.succeed(true),
diffCheckpoints: ({ fromCheckpointRef, toCheckpointRef, cwd }) =>
Effect.sync(() => {
diffCheckpointsCalls.push({ fromCheckpointRef, toCheckpointRef, cwd });
return "";
}),
deleteCheckpointRefs: () => Effect.void,
};

const layer = CheckpointDiffQueryLive.pipe(
Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)),
Layer.provideMerge(
Layer.succeed(ProjectionSnapshotQuery, {
getSnapshot: () => Effect.succeed(snapshot),
}),
),
);

const result = await Effect.runPromise(
Effect.gen(function* () {
const query = yield* CheckpointDiffQuery;
return yield* query.getTurnDiff({
threadId,
fromTurnCount: 2,
toTurnCount: 3,
});
}).pipe(Effect.provide(layer)),
);

expect(hasCheckpointRefCalls).toEqual([checkpointRef2, checkpointRef2]);
expect(diffCheckpointsCalls).toEqual([
{
cwd: "/tmp/workspace",
fromCheckpointRef: checkpointRef2,
toCheckpointRef: checkpointRef2,
},
]);
expect(result).toEqual({
threadId,
fromTurnCount: 2,
toTurnCount: 3,
diff: "",
});
});
});
64 changes: 40 additions & 24 deletions apps/server/src/checkpointing/Layers/CheckpointDiffQuery.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {
CheckpointRef,
OrchestrationGetTurnDiffResult,
ThreadId,
type OrchestrationGetFullThreadDiffInput,
type OrchestrationGetFullThreadDiffResult,
type OrchestrationGetTurnDiffResult as OrchestrationGetTurnDiffResultType,
Expand All @@ -21,6 +23,34 @@ const make = Effect.gen(function* () {
const projectionSnapshotQuery = yield* ProjectionSnapshotQuery;
const checkpointStore = yield* CheckpointStore;

const resolveCheckpointRefAtOrBeforeTurnCount = (input: {
readonly threadId: ThreadId;
readonly requestedTurnCount: number;
readonly checkpoints: ReadonlyArray<{
readonly checkpointTurnCount: number;
readonly checkpointRef: CheckpointRef;
}>;
}): CheckpointRef => {
if (input.requestedTurnCount <= 0) {
return checkpointRefForThreadTurn(input.threadId, 0);
}

const matchingCheckpoint = input.checkpoints.reduce<{
readonly checkpointTurnCount: number;
readonly checkpointRef: CheckpointRef;
} | null>((resolved, checkpoint) => {
if (checkpoint.checkpointTurnCount > input.requestedTurnCount) {
return resolved;
}
if (resolved && resolved.checkpointTurnCount >= checkpoint.checkpointTurnCount) {
return resolved;
}
return checkpoint;
}, null);

return matchingCheckpoint?.checkpointRef ?? checkpointRefForThreadTurn(input.threadId, 0);
};

const getTurnDiff: CheckpointDiffQueryShape["getTurnDiff"] = (input) =>
Effect.gen(function* () {
const operation = "CheckpointDiffQuery.getTurnDiff";
Expand Down Expand Up @@ -73,30 +103,16 @@ const make = Effect.gen(function* () {
});
}

const fromCheckpointRef =
input.fromTurnCount === 0
? checkpointRefForThreadTurn(input.threadId, 0)
: thread.checkpoints.find(
(checkpoint) => checkpoint.checkpointTurnCount === input.fromTurnCount,
)?.checkpointRef;
if (!fromCheckpointRef) {
return yield* new CheckpointUnavailableError({
threadId: input.threadId,
turnCount: input.fromTurnCount,
detail: `Checkpoint ref is unavailable for turn ${input.fromTurnCount}.`,
});
}

const toCheckpointRef = thread.checkpoints.find(
(checkpoint) => checkpoint.checkpointTurnCount === input.toTurnCount,
)?.checkpointRef;
if (!toCheckpointRef) {
return yield* new CheckpointUnavailableError({
threadId: input.threadId,
turnCount: input.toTurnCount,
detail: `Checkpoint ref is unavailable for turn ${input.toTurnCount}.`,
});
}
const fromCheckpointRef = resolveCheckpointRefAtOrBeforeTurnCount({
threadId: input.threadId,
requestedTurnCount: input.fromTurnCount,
checkpoints: thread.checkpoints,
});
const toCheckpointRef = resolveCheckpointRefAtOrBeforeTurnCount({
threadId: input.threadId,
requestedTurnCount: input.toTurnCount,
checkpoints: thread.checkpoints,
});

const [fromExists, toExists] = yield* Effect.all(
[
Expand Down
27 changes: 17 additions & 10 deletions apps/server/src/orchestration/Layers/CheckpointReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ async function waitForThread(
engine: OrchestrationEngineShape,
predicate: (thread: {
latestTurn: { turnId: string } | null;
checkpoints: ReadonlyArray<{ checkpointTurnCount: number }>;
checkpoints: ReadonlyArray<{ checkpointTurnCount: number; turnId: string }>;
activities: ReadonlyArray<{ kind: string }>;
}) => boolean,
timeoutMs = 5000,
) {
const deadline = Date.now() + timeoutMs;
const poll = async (): Promise<{
latestTurn: { turnId: string } | null;
checkpoints: ReadonlyArray<{ checkpointTurnCount: number }>;
checkpoints: ReadonlyArray<{ checkpointTurnCount: number; turnId: string }>;
activities: ReadonlyArray<{ kind: string }>;
}> => {
const readModel = await Effect.runPromise(engine.getReadModel());
Expand Down Expand Up @@ -403,7 +403,7 @@ describe("CheckpointReactor", () => {
).toBe("v2\n");
});

it("ignores auxiliary thread turn completion while primary turn is active", async () => {
it("captures completed turns even when session activeTurnId points at a different turn", async () => {
const harness = await createHarness({ seedFilesystemCheckpoints: false });
const createdAt = new Date().toISOString();

Expand Down Expand Up @@ -453,11 +453,12 @@ describe("CheckpointReactor", () => {
});

await harness.drain();
const midReadModel = await Effect.runPromise(harness.engine.getReadModel());
const midThread = midReadModel.threads.find(
(entry) => entry.id === ThreadId.makeUnsafe("thread-1"),
const midThread = await waitForThread(
harness.engine,
(entry) =>
entry.checkpoints.length === 1 && entry.checkpoints[0]?.turnId === asTurnId("turn-aux"),
);
expect(midThread?.checkpoints).toHaveLength(0);
expect(midThread.checkpoints[0]?.checkpointTurnCount).toBe(1);

harness.provider.emit({
type: "turn.completed",
Expand All @@ -472,9 +473,13 @@ describe("CheckpointReactor", () => {

const thread = await waitForThread(
harness.engine,
(entry) => entry.latestTurn?.turnId === "turn-main" && entry.checkpoints.length === 1,
(entry) => entry.latestTurn?.turnId === "turn-main" && entry.checkpoints.length === 2,
);
expect(thread.checkpoints[0]?.checkpointTurnCount).toBe(1);
expect(thread.checkpoints.map((checkpoint) => checkpoint.checkpointTurnCount)).toEqual([1, 2]);
expect(thread.checkpoints.map((checkpoint) => checkpoint.turnId)).toEqual([
asTurnId("turn-aux"),
asTurnId("turn-main"),
]);
});

it("appends capture failure activity when turn diff summary cannot be derived", async () => {
Expand Down Expand Up @@ -786,7 +791,9 @@ describe("CheckpointReactor", () => {
threadId: ThreadId.makeUnsafe("thread-1"),
numTurns: 1,
});
expect(fs.readFileSync(path.join(harness.cwd, "README.md"), "utf8")).toBe("v2\n");
expect(
fs.readFileSync(path.join(harness.cwd, "README.md"), "utf8").replaceAll("\r\n", "\n"),
).toBe("v2\n");
expect(
gitRefExists(harness.cwd, checkpointRefForThreadTurn(ThreadId.makeUnsafe("thread-1"), 2)),
).toBe(false);
Expand Down
12 changes: 0 additions & 12 deletions apps/server/src/orchestration/Layers/CheckpointReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@ function toTurnId(value: string | undefined): TurnId | null {
return value === undefined ? null : TurnId.makeUnsafe(String(value));
}

function sameId(left: string | null | undefined, right: string | null | undefined): boolean {
if (left === null || left === undefined || right === null || right === undefined) {
return false;
}
return left === right;
}

function checkpointStatusFromRuntime(status: string | undefined): "ready" | "missing" | "error" {
switch (status) {
case "failed":
Expand Down Expand Up @@ -334,11 +327,6 @@ const make = Effect.gen(function* () {
return;
}

// When a primary turn is active, only that turn may produce completion checkpoints.
if (thread.session?.activeTurnId && !sameId(thread.session.activeTurnId, turnId)) {
return;
}

// Only skip if a real (non-placeholder) checkpoint already exists for this turn.
// ProviderRuntimeIngestion may insert placeholder entries with status "missing"
// before this reactor runs; those must not prevent real git capture.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,62 @@ describe("ProviderRuntimeIngestion", () => {
expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated");
});

it("dedupes repeated turn diff placeholder updates for the same turn", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "turn.diff.updated",
eventId: asEventId("evt-turn-diff-updated-1"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-p1"),
itemId: asItemId("item-p1-assistant"),
payload: {
unifiedDiff: "diff --git a/file.txt b/file.txt\n+hello\n",
},
});

harness.emit({
type: "turn.diff.updated",
eventId: asEventId("evt-turn-diff-updated-2"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-p1"),
itemId: asItemId("item-p1-assistant"),
payload: {
unifiedDiff: "diff --git a/file.txt b/file.txt\n+hello again\n",
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.checkpoints.some(
(checkpoint: ProviderRuntimeTestCheckpoint) => checkpoint.turnId === "turn-p1",
),
);

await harness.drain();
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const updatedThread = readModel.threads.find(
(entry) => entry.id === ThreadId.makeUnsafe("thread-1"),
);

expect(updatedThread).toBeDefined();
const matchingCheckpoints = updatedThread?.checkpoints.filter(
(checkpoint: ProviderRuntimeTestCheckpoint) => checkpoint.turnId === "turn-p1",
);
expect(matchingCheckpoints).toHaveLength(1);
expect(matchingCheckpoints?.[0]?.checkpointTurnCount).toBe(1);
expect(matchingCheckpoints?.[0]?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated-1");
expect(
thread.checkpoints.filter(
(checkpoint: ProviderRuntimeTestCheckpoint) => checkpoint.turnId === "turn-p1",
),
).toHaveLength(1);
});

it("projects Codex task lifecycle chunks into thread activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
Loading