fix(code): break unrecoverable cloud log reconcile loop on stable deficiency#2284
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
7c4e07c to
21d3033
Compare
Prompt To Fix All With AIFix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
apps/code/src/renderer/features/sessions/service/service.test.ts:2514-2660
**Duplicated test setup across two new cases**
The two new tests repeat an identical block (~25 lines) to create `service`, `existingSession`, set up all three store-setter mocks, start `watchCloudTask`, and extract `subscribeOptions`. Per the OnceAndOnlyOnce simplicity rule (and the project-wide preference for parameterised tests), this shared scaffolding should be extracted — either into a shared `beforeEach` / helper factory within this `describe` block, or, since the two scenarios diverge only in the log content and the number of `onData` invocations, as a single parameterised case using `it.each`.
Reviews (1): Last reviewed commit: "chore(code): trim verbose comments on cl..." | Re-trigger Greptile |
| }); | ||
|
|
||
| it("breaks the reconcile loop on first observation when parse failures are present", async () => { | ||
| const service = getSessionService(); | ||
| const existingSession = createMockSession({ | ||
| taskRunId: "run-123", | ||
| taskId: "task-123", | ||
| status: "connected", | ||
| isCloud: true, | ||
| logUrl: "https://logs.example.com/run-123", | ||
| processedLineCount: 5, | ||
| events: [], | ||
| }); | ||
| mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( | ||
| existingSession, | ||
| ); | ||
| mockSessionStoreSetters.getSessions.mockReturnValue({ | ||
| "run-123": existingSession, | ||
| }); | ||
|
|
||
| const validLine = JSON.stringify({ | ||
| type: "notification", | ||
| timestamp: "2024-01-01T00:00:00Z", | ||
| notification: { method: "session/update" }, | ||
| }); | ||
| const corruptedContent = [ | ||
| ...Array.from({ length: 8 }, () => validLine), | ||
| "}}not-json{{", | ||
| "{broken", | ||
| ].join("\n"); | ||
| mockTrpcLogs.readLocalLogs.query.mockResolvedValue(corruptedContent); | ||
| mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(corruptedContent); | ||
|
|
||
| service.watchCloudTask( | ||
| "task-123", | ||
| "run-123", | ||
| "https://api.anthropic.com", | ||
| 123, | ||
| ); | ||
| const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock | ||
| .calls[0][1] as { | ||
| onData: (update: unknown) => void; | ||
| }; | ||
|
|
||
| subscribeOptions.onData({ | ||
| kind: "logs", | ||
| taskId: "task-123", | ||
| runId: "run-123", | ||
| totalEntryCount: 20, | ||
| newEntries: [ | ||
| { | ||
| type: "notification", | ||
| timestamp: "2024-01-01T00:00:01Z", | ||
| notification: { method: "session/update" }, | ||
| }, | ||
| ], | ||
| }); | ||
|
|
||
| await vi.waitFor(() => { | ||
| expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( | ||
| "run-123", | ||
| expect.objectContaining({ processedLineCount: 20 }), | ||
| ); | ||
| }); | ||
| }); | ||
|
|
||
| it("breaks the reconcile loop after a repeated stable deficiency", async () => { | ||
| const service = getSessionService(); | ||
| const existingSession = createMockSession({ | ||
| taskRunId: "run-123", | ||
| taskId: "task-123", | ||
| status: "connected", | ||
| isCloud: true, | ||
| logUrl: "https://logs.example.com/run-123", | ||
| processedLineCount: 5, | ||
| events: [], | ||
| }); | ||
| mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( | ||
| existingSession, | ||
| ); | ||
| mockSessionStoreSetters.getSessions.mockReturnValue({ | ||
| "run-123": existingSession, | ||
| }); | ||
|
|
||
| const storedLine = JSON.stringify({ | ||
| type: "notification", | ||
| timestamp: "2024-01-01T00:00:00Z", | ||
| notification: { method: "session/update" }, | ||
| }); | ||
| mockTrpcLogs.readLocalLogs.query.mockResolvedValue( | ||
| Array.from({ length: 8 }, () => storedLine).join("\n"), | ||
| ); | ||
| mockTrpcLogs.fetchS3Logs.query.mockResolvedValue( | ||
| Array.from({ length: 8 }, () => storedLine).join("\n"), | ||
| ); | ||
|
|
||
| service.watchCloudTask( | ||
| "task-123", | ||
| "run-123", | ||
| "https://api.anthropic.com", | ||
| 123, | ||
| ); | ||
| const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock | ||
| .calls[0][1] as { | ||
| onData: (update: unknown) => void; | ||
| }; | ||
|
|
||
| const newEntry = { | ||
| type: "notification", | ||
| timestamp: "2024-01-01T00:00:01Z", | ||
| notification: { method: "session/update" }, | ||
| }; | ||
|
|
||
| subscribeOptions.onData({ | ||
| kind: "logs", | ||
| taskId: "task-123", | ||
| runId: "run-123", | ||
| totalEntryCount: 14, | ||
| newEntries: [newEntry], | ||
| }); | ||
| await vi.waitFor(() => { | ||
| expect(mockTrpcLogs.fetchS3Logs.query).toHaveBeenCalledTimes(1); | ||
| }); | ||
|
|
||
| expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( | ||
| "run-123", | ||
| expect.objectContaining({ processedLineCount: 14 }), | ||
| ); | ||
|
|
||
| subscribeOptions.onData({ | ||
| kind: "logs", | ||
| taskId: "task-123", | ||
| runId: "run-123", | ||
| totalEntryCount: 14, | ||
| newEntries: [newEntry], | ||
| }); | ||
|
|
||
| await vi.waitFor(() => { | ||
| expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( | ||
| "run-123", | ||
| expect.objectContaining({ processedLineCount: 14 }), | ||
| ); | ||
| }); | ||
| }); | ||
|
|
||
| it("flips status to connected on _posthog/run_started", async () => { | ||
| const service = getSessionService(); |
There was a problem hiding this comment.
Duplicated test setup across two new cases
The two new tests repeat an identical block (~25 lines) to create service, existingSession, set up all three store-setter mocks, start watchCloudTask, and extract subscribeOptions. Per the OnceAndOnlyOnce simplicity rule (and the project-wide preference for parameterised tests), this shared scaffolding should be extracted — either into a shared beforeEach / helper factory within this describe block, or, since the two scenarios diverge only in the log content and the number of onData invocations, as a single parameterised case using it.each.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/code/src/renderer/features/sessions/service/service.test.ts
Line: 2514-2660
Comment:
**Duplicated test setup across two new cases**
The two new tests repeat an identical block (~25 lines) to create `service`, `existingSession`, set up all three store-setter mocks, start `watchCloudTask`, and extract `subscribeOptions`. Per the OnceAndOnlyOnce simplicity rule (and the project-wide preference for parameterised tests), this shared scaffolding should be extracted — either into a shared `beforeEach` / helper factory within this `describe` block, or, since the two scenarios diverge only in the log content and the number of `onData` invocations, as a single parameterised case using `it.each`.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
21d3033 to
419607b
Compare
…iciency When the cloud-task gap-reconcile loop fetches and finds `totalLineCount < expectedCount` (corruption — usually concatenated NDJSON records without newlines), it used to warn and skip, leaving processedLineCount unchanged. Every subsequent SSE snapshot then re-triggered another fetch + writeLocalLogs because the snapshot delta stayed positive forever — this is the "can't click cloud task" failure mode that survives layer 1's write coalescer. Track the last observed (expectedCount, observedLineCount) per taskRunId. If a second reconcile produces the same pair, S3 is not catching up: commit the parseable entries best-effort and advance processedLineCount past the gap so the snapshot handler stops re-triggering. New higher expectedCount values get one more retry before being treated as exhausted. Also surface `parseFailureCount` from `parseLogContent` for visibility in the inconsistency-warning log. Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7
Previously the loop-break required two reconciles producing identical (expectedCount, observedLineCount) — brittle when S3 dribbled in a single record between attempts. parseFailureCount > 0 is a stronger signal: lines exist on disk that don't parse, so further fetches won't recover them. Treat it as proof of corruption and commit best-effort on the first observation. Also clean up cloudLogReconcileDeficiency in stopCloudTaskWatch to match the existing watcher teardown pattern. Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7
Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7
Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7
8913168 to
d7b92a8
Compare
419607b to
5d2948c
Compare

Problem
Cloud log reconciliation could get stuck in an infinite loop when log files contained corrupted or unparseable lines. Because
processedLineCountwas never advanced past the gap, every new snapshot delta would re-trigger a reconcile that could never succeed — either because lines were permanently malformed (proven corruption) or because S3 simply wasn't catching up.Changes
Introduced two early-exit conditions in the reconcile loop that commit a best-effort state and advance
processedLineCountpast the gap:JSON.parse, the corruption is permanent and S3 will never fix it. The reconcile breaks immediately rather than waiting.(expectedCount, observedLineCount)pair as the previous one, S3 is not catching up. The reconcile breaks on the second observation.In all other cases the deficiency is treated as transient lag and the reconcile waits for the next snapshot update as before.
A
parseFailureCountfield was added toParsedSessionLogsso the reconcile handler can distinguish between "fewer lines than expected" and "lines exist but are corrupt." AcloudLogReconcileDeficiencymap tracks the last observed deficiency pertaskRunIdand is cleaned up on session removal, watcher teardown, and full reset.How did you test this?
Two new unit tests were added:
processedLineCountis advanced toexpectedCounton the first reconcile attempt.processedLineCountis only advanced after the second (repeated) observation.Publish to changelog?
No