Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions apps/code/src/renderer/features/sessions/service/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2512,6 +2512,112 @@ describe("SessionService", () => {
// unbounded growth on long-running cloud runs.
expect(mockSessionStoreSetters.appendEvents).not.toHaveBeenCalled();
});

const setupReconcileLoopTest = (logContent: string) => {
const service = getSessionService();
const existingSession = createMockSession({
taskRunId: "run-123",
taskId: "task-123",
status: "connected",
isCloud: true,
logUrl: "https://logs.example.com/run-123",
processedLineCount: 5,
events: [],
});
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
existingSession,
);
mockSessionStoreSetters.getSessions.mockReturnValue({
"run-123": existingSession,
});
mockTrpcLogs.readLocalLogs.query.mockResolvedValue(logContent);
mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(logContent);
service.watchCloudTask(
"task-123",
"run-123",
"https://api.anthropic.com",
123,
);
const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock
.calls[0][1] as {
onData: (update: unknown) => void;
};
return { subscribeOptions };
};

const newEntry = {
type: "notification",
timestamp: "2024-01-01T00:00:01Z",
notification: { method: "session/update" },
};
const validLine = JSON.stringify({
type: "notification",
timestamp: "2024-01-01T00:00:00Z",
notification: { method: "session/update" },
});

it("breaks the reconcile loop on first observation when parse failures are present", async () => {
const { subscribeOptions } = setupReconcileLoopTest(
[
...Array.from({ length: 8 }, () => validLine),
"}}not-json{{",
"{broken",
].join("\n"),
);

subscribeOptions.onData({
kind: "logs",
taskId: "task-123",
runId: "run-123",
totalEntryCount: 20,
newEntries: [newEntry],
});

await vi.waitFor(() => {
expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith(
"run-123",
expect.objectContaining({ processedLineCount: 20 }),
);
});
});

it("breaks the reconcile loop after a repeated stable deficiency", async () => {
const { subscribeOptions } = setupReconcileLoopTest(
Array.from({ length: 8 }, () => validLine).join("\n"),
);

subscribeOptions.onData({
kind: "logs",
taskId: "task-123",
runId: "run-123",
totalEntryCount: 14,
newEntries: [newEntry],
});
await vi.waitFor(() => {
expect(mockTrpcLogs.fetchS3Logs.query).toHaveBeenCalledTimes(1);
});

expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith(
"run-123",
expect.objectContaining({ processedLineCount: 14 }),
);

subscribeOptions.onData({
kind: "logs",
taskId: "task-123",
runId: "run-123",
totalEntryCount: 14,
newEntries: [newEntry],
});

await vi.waitFor(() => {
expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith(
"run-123",
expect.objectContaining({ processedLineCount: 14 }),
);
});
});

it("flips status to connected on _posthog/run_started", async () => {
const service = getSessionService();
Comment on lines 2514 to 2622
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Duplicated test setup across two new cases

The two new tests repeat an identical block (~25 lines) to create service, existingSession, set up all three store-setter mocks, start watchCloudTask, and extract subscribeOptions. Per the OnceAndOnlyOnce simplicity rule (and the project-wide preference for parameterised tests), this shared scaffolding should be extracted — either into a shared beforeEach / helper factory within this describe block, or, since the two scenarios diverge only in the log content and the number of onData invocations, as a single parameterised case using it.each.

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/code/src/renderer/features/sessions/service/service.test.ts
Line: 2514-2660

Comment:
**Duplicated test setup across two new cases**

The two new tests repeat an identical block (~25 lines) to create `service`, `existingSession`, set up all three store-setter mocks, start `watchCloudTask`, and extract `subscribeOptions`. Per the OnceAndOnlyOnce simplicity rule (and the project-wide preference for parameterised tests), this shared scaffolding should be extracted — either into a shared `beforeEach` / helper factory within this `describe` block, or, since the two scenarios diverge only in the log content and the number of `onData` invocations, as a single parameterised case using `it.each`.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

const hydratedSession = createMockSession({
Expand Down
83 changes: 71 additions & 12 deletions apps/code/src/renderer/features/sessions/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ interface CloudLogGapReconcileRequest {
interface ParsedSessionLogs {
rawEntries: StoredLogEntry[];
totalLineCount: number;
parseFailureCount: number;
sessionId?: string;
adapter?: Adapter;
}
Expand All @@ -216,6 +217,11 @@ interface CloudLogGapReconcileState {
pendingRequest?: CloudLogGapReconcileRequest;
}

interface CloudLogReconcileDeficiency {
expectedCount: number;
observedLineCount: number;
}

export interface ConnectParams {
task: Task;
repoPath: string;
Expand Down Expand Up @@ -303,6 +309,11 @@ export class SessionService {
}
>();
private cloudLogGapReconciles = new Map<string, CloudLogGapReconcileState>();
/** Last observed reconcile deficit per taskRunId — see reconcileCloudLogGapOnce. */
private cloudLogReconcileDeficiency = new Map<
string,
CloudLogReconcileDeficiency
>();
/** Maps toolCallId → cloud requestId for routing permission responses */
private cloudPermissionRequestIds = new Map<string, string>();
private idleKilledSubscription: { unsubscribe: () => void } | null = null;
Expand Down Expand Up @@ -739,6 +750,7 @@ export class SessionService {
this.unsubscribeFromChannel(taskRunId);
sessionStoreSetters.removeSession(taskRunId);
this.cloudRunIdleTracker.delete(taskRunId);
this.cloudLogReconcileDeficiency.delete(taskRunId);
if (session) {
this.localRepoPaths.delete(session.taskId);
this.localRecoveryAttempts.delete(session.taskId);
Expand Down Expand Up @@ -1118,6 +1130,7 @@ export class SessionService {
this.localRecoveryAttempts.clear();
this.cloudPermissionRequestIds.clear();
this.cloudLogGapReconciles.clear();
this.cloudLogReconcileDeficiency.clear();
this.dispatchingCloudQueues.clear();
this.scheduledCloudQueueFlushes.clear();
this.cloudRunIdleTracker.clear();
Expand Down Expand Up @@ -2998,6 +3011,7 @@ export class SessionService {

watcher.subscription.unsubscribe();
this.cloudTaskWatchers.delete(taskId);
this.cloudLogReconcileDeficiency.delete(watcher.runId);
}

async preflightToLocal(taskId: string, repoPath: string) {
Expand Down Expand Up @@ -3658,6 +3672,7 @@ export class SessionService {
const rawEntries: StoredLogEntry[] = [];
let sessionId: string | undefined;
let adapter: Adapter | undefined;
let parseFailureCount = 0;
const lines = content.trim().split("\n");

for (const line of lines) {
Expand All @@ -3679,19 +3694,30 @@ export class SessionService {
if (params?.adapter) adapter = params.adapter;
}
} catch {
parseFailureCount += 1;
log.warn("Failed to parse log entry", { line });
}
}

return { rawEntries, totalLineCount: lines.length, sessionId, adapter };
return {
rawEntries,
totalLineCount: lines.length,
parseFailureCount,
sessionId,
adapter,
};
}

private async fetchSessionLogs(
logUrl: string | undefined,
taskRunId?: string,
options: { minEntryCount?: number } = {},
): Promise<ParsedSessionLogs> {
const empty: ParsedSessionLogs = { rawEntries: [], totalLineCount: 0 };
const empty: ParsedSessionLogs = {
rawEntries: [],
totalLineCount: 0,
parseFailureCount: 0,
};
if (!logUrl && !taskRunId) return empty;
let localResult: ParsedSessionLogs | undefined;

Expand Down Expand Up @@ -3811,18 +3837,18 @@ export class SessionService {
newEntries,
logUrl,
}: CloudLogGapReconcileRequest): Promise<void> {
const { rawEntries, totalLineCount } = await this.fetchSessionLogs(
logUrl,
taskRunId,
{ minEntryCount: expectedCount },
);
const { rawEntries, totalLineCount, parseFailureCount } =
await this.fetchSessionLogs(logUrl, taskRunId, {
minEntryCount: expectedCount,
});
const session = sessionStoreSetters.getSessions()[taskRunId];
if (!session || session.taskId !== taskId) {
return;
}

const latestCount = session.processedLineCount ?? 0;
if (latestCount >= expectedCount) {
this.cloudLogReconcileDeficiency.delete(taskRunId);
return;
}

Expand All @@ -3832,6 +3858,7 @@ export class SessionService {
sessionStoreSetters.clearTailOptimisticItems(taskRunId);
}
this.cloudRunIdleTracker.delete(taskRunId);
this.cloudLogReconcileDeficiency.delete(taskRunId);
sessionStoreSetters.updateSession(taskRunId, {
events,
isCloud: true,
Expand All @@ -3842,16 +3869,48 @@ export class SessionService {
return;
}

// The fetched logs lag behind expectedCount and `newEntries` is the latest
// tail slice of the snapshot — appending it here would create duplicates
// and gaps in `session.events` (and bump processedLineCount past entries
// we don't actually have). Skip; the next snapshot/log update will retry
// once the source has caught up.
// Break the reconcile loop on proven corruption (parseFailureCount > 0)
// or on a stable repeat of the same deficit. Otherwise wait — likely lag.
const previous = this.cloudLogReconcileDeficiency.get(taskRunId);
const sameDeficiencyAsBefore =
previous?.expectedCount === expectedCount &&
previous?.observedLineCount === totalLineCount;

if (parseFailureCount > 0 || sameDeficiencyAsBefore) {
log.warn("Cloud task log gap unrecoverable; committing best-effort", {
taskRunId,
expectedCount,
observedLineCount: totalLineCount,
parseFailureCount,
fetchedEntries: rawEntries.length,
reason: parseFailureCount > 0 ? "parse-failure" : "stable-deficit",
});
const events = convertStoredEntriesToEvents(rawEntries);
if (hasSessionPromptEvent(events)) {
sessionStoreSetters.clearTailOptimisticItems(taskRunId);
}
this.cloudRunIdleTracker.delete(taskRunId);
this.cloudLogReconcileDeficiency.delete(taskRunId);
sessionStoreSetters.updateSession(taskRunId, {
events,
isCloud: true,
logUrl: logUrl ?? session.logUrl,
processedLineCount: expectedCount,
});
this.updatePromptStateFromEvents(taskRunId, events);
return;
}

this.cloudLogReconcileDeficiency.set(taskRunId, {
expectedCount,
observedLineCount: totalLineCount,
});
log.warn("Cloud task log count inconsistency", {
taskRunId,
currentCount,
expectedCount,
fetchedCount: rawEntries.length,
parseFailureCount,
entriesReceived: newEntries.length,
});
}
Expand Down
Loading