From 30854baff9f80c6dc675eb0c85cbb041fc27f1d7 Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Thu, 21 May 2026 13:44:43 +0100 Subject: [PATCH 1/4] fix(code): break unrecoverable cloud log reconcile loop on stable deficiency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the cloud-task gap-reconcile loop fetches and finds `totalLineCount < expectedCount` (corruption — usually concatenated NDJSON records without newlines), it used to warn and skip, leaving processedLineCount unchanged. Every subsequent SSE snapshot then re-triggered another fetch + writeLocalLogs because the snapshot delta stayed positive forever — this is the "can't click cloud task" failure mode that survives layer 1's write coalescer. Track the last observed (expectedCount, observedLineCount) per taskRunId. If a second reconcile produces the same pair, S3 is not catching up: commit the parseable entries best-effort and advance processedLineCount past the gap so the snapshot handler stops re-triggering. New higher expectedCount values get one more retry before being treated as exhausted. Also surface `parseFailureCount` from `parseLogContent` for visibility in the inconsistency-warning log. Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7 --- .../features/sessions/service/service.test.ts | 81 +++++++++++++++++ .../features/sessions/service/service.ts | 91 ++++++++++++++++--- 2 files changed, 160 insertions(+), 12 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 95a3ebfd2..4d45bf22d 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -2512,6 +2512,87 @@ describe("SessionService", () => { // unbounded growth on long-running cloud runs. expect(mockSessionStoreSetters.appendEvents).not.toHaveBeenCalled(); }); + + it("breaks the reconcile loop after a repeated stable deficiency", async () => { + const service = getSessionService(); + const existingSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + logUrl: "https://logs.example.com/run-123", + processedLineCount: 5, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + existingSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": existingSession, + }); + + const storedLine = JSON.stringify({ + type: "notification", + timestamp: "2024-01-01T00:00:00Z", + notification: { method: "session/update" }, + }); + // 8 parseable lines while the server claims 14 — stable corruption. + mockTrpcLogs.readLocalLogs.query.mockResolvedValue( + Array.from({ length: 8 }, () => storedLine).join("\n"), + ); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue( + Array.from({ length: 8 }, () => storedLine).join("\n"), + ); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: unknown) => void; + }; + + const newEntry = { + type: "notification", + timestamp: "2024-01-01T00:00:01Z", + notification: { method: "session/update" }, + }; + + subscribeOptions.onData({ + kind: "logs", + taskId: "task-123", + runId: "run-123", + totalEntryCount: 14, + newEntries: [newEntry], + }); + await vi.waitFor(() => { + expect(mockTrpcLogs.fetchS3Logs.query).toHaveBeenCalledTimes(1); + }); + + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ processedLineCount: 14 }), + ); + + subscribeOptions.onData({ + kind: "logs", + taskId: "task-123", + runId: "run-123", + totalEntryCount: 14, + newEntries: [newEntry], + }); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ processedLineCount: 14 }), + ); + }); + }); + it("flips status to connected on _posthog/run_started", async () => { const service = getSessionService(); const hydratedSession = createMockSession({ diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 930e5f0dc..cfc2a6816 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -208,6 +208,7 @@ interface CloudLogGapReconcileRequest { interface ParsedSessionLogs { rawEntries: StoredLogEntry[]; totalLineCount: number; + parseFailureCount: number; sessionId?: string; adapter?: Adapter; } @@ -216,6 +217,11 @@ interface CloudLogGapReconcileState { pendingRequest?: CloudLogGapReconcileRequest; } +interface CloudLogReconcileDeficiency { + expectedCount: number; + observedLineCount: number; +} + export interface ConnectParams { task: Task; repoPath: string; @@ -303,6 +309,17 @@ export class SessionService { } >(); private cloudLogGapReconciles = new Map(); + /** + * Per-taskRunId record of the last observed reconcile deficiency. Used to + * detect the corruption-amplification loop: if a second reconcile produces + * the same (expectedCount, observedLineCount) pair as the previous one, S3 + * isn't catching up — we commit best-effort and advance processedLineCount + * past the gap so snapshot deltas stop re-triggering reconcile. + */ + private cloudLogReconcileDeficiency = new Map< + string, + CloudLogReconcileDeficiency + >(); /** Maps toolCallId → cloud requestId for routing permission responses */ private cloudPermissionRequestIds = new Map(); private idleKilledSubscription: { unsubscribe: () => void } | null = null; @@ -739,6 +756,7 @@ export class SessionService { this.unsubscribeFromChannel(taskRunId); sessionStoreSetters.removeSession(taskRunId); this.cloudRunIdleTracker.delete(taskRunId); + this.cloudLogReconcileDeficiency.delete(taskRunId); if (session) { this.localRepoPaths.delete(session.taskId); this.localRecoveryAttempts.delete(session.taskId); @@ -1118,6 +1136,7 @@ export class SessionService { this.localRecoveryAttempts.clear(); this.cloudPermissionRequestIds.clear(); this.cloudLogGapReconciles.clear(); + this.cloudLogReconcileDeficiency.clear(); this.dispatchingCloudQueues.clear(); this.scheduledCloudQueueFlushes.clear(); this.cloudRunIdleTracker.clear(); @@ -3658,6 +3677,7 @@ export class SessionService { const rawEntries: StoredLogEntry[] = []; let sessionId: string | undefined; let adapter: Adapter | undefined; + let parseFailureCount = 0; const lines = content.trim().split("\n"); for (const line of lines) { @@ -3679,11 +3699,18 @@ export class SessionService { if (params?.adapter) adapter = params.adapter; } } catch { + parseFailureCount += 1; log.warn("Failed to parse log entry", { line }); } } - return { rawEntries, totalLineCount: lines.length, sessionId, adapter }; + return { + rawEntries, + totalLineCount: lines.length, + parseFailureCount, + sessionId, + adapter, + }; } private async fetchSessionLogs( @@ -3691,7 +3718,11 @@ export class SessionService { taskRunId?: string, options: { minEntryCount?: number } = {}, ): Promise { - const empty: ParsedSessionLogs = { rawEntries: [], totalLineCount: 0 }; + const empty: ParsedSessionLogs = { + rawEntries: [], + totalLineCount: 0, + parseFailureCount: 0, + }; if (!logUrl && !taskRunId) return empty; let localResult: ParsedSessionLogs | undefined; @@ -3811,11 +3842,10 @@ export class SessionService { newEntries, logUrl, }: CloudLogGapReconcileRequest): Promise { - const { rawEntries, totalLineCount } = await this.fetchSessionLogs( - logUrl, - taskRunId, - { minEntryCount: expectedCount }, - ); + const { rawEntries, totalLineCount, parseFailureCount } = + await this.fetchSessionLogs(logUrl, taskRunId, { + minEntryCount: expectedCount, + }); const session = sessionStoreSetters.getSessions()[taskRunId]; if (!session || session.taskId !== taskId) { return; @@ -3823,6 +3853,7 @@ export class SessionService { const latestCount = session.processedLineCount ?? 0; if (latestCount >= expectedCount) { + this.cloudLogReconcileDeficiency.delete(taskRunId); return; } @@ -3832,6 +3863,7 @@ export class SessionService { sessionStoreSetters.clearTailOptimisticItems(taskRunId); } this.cloudRunIdleTracker.delete(taskRunId); + this.cloudLogReconcileDeficiency.delete(taskRunId); sessionStoreSetters.updateSession(taskRunId, { events, isCloud: true, @@ -3842,16 +3874,51 @@ export class SessionService { return; } - // The fetched logs lag behind expectedCount and `newEntries` is the latest - // tail slice of the snapshot — appending it here would create duplicates - // and gaps in `session.events` (and bump processedLineCount past entries - // we don't actually have). Skip; the next snapshot/log update will retry - // once the source has caught up. + // The fetched logs lag behind expectedCount. If we've seen the exact same + // deficiency before (same expectedCount, same observed line count), S3 is + // not catching up — committing what we have and advancing processedLineCount + // past the gap is the only way to break the reconcile loop. Otherwise the + // snapshot delta stays positive forever, firing a fetch + writeLocalLogs + // on every SSE snapshot. + const previous = this.cloudLogReconcileDeficiency.get(taskRunId); + const sameDeficiencyAsBefore = + previous?.expectedCount === expectedCount && + previous?.observedLineCount === totalLineCount; + + if (sameDeficiencyAsBefore) { + log.warn("Cloud task log gap unrecoverable; committing best-effort", { + taskRunId, + expectedCount, + observedLineCount: totalLineCount, + parseFailureCount, + fetchedEntries: rawEntries.length, + }); + const events = convertStoredEntriesToEvents(rawEntries); + if (hasSessionPromptEvent(events)) { + sessionStoreSetters.clearTailOptimisticItems(taskRunId); + } + this.cloudRunIdleTracker.delete(taskRunId); + this.cloudLogReconcileDeficiency.delete(taskRunId); + sessionStoreSetters.updateSession(taskRunId, { + events, + isCloud: true, + logUrl: logUrl ?? session.logUrl, + processedLineCount: expectedCount, + }); + this.updatePromptStateFromEvents(taskRunId, events); + return; + } + + this.cloudLogReconcileDeficiency.set(taskRunId, { + expectedCount, + observedLineCount: totalLineCount, + }); log.warn("Cloud task log count inconsistency", { taskRunId, currentCount, expectedCount, fetchedCount: rawEntries.length, + parseFailureCount, entriesReceived: newEntries.length, }); } From f84db0822e6ab5e17670b75105abdebefc4113fb Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Thu, 21 May 2026 13:48:46 +0100 Subject: [PATCH 2/4] refactor(code): break reconcile loop immediately on proven parse failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the loop-break required two reconciles producing identical (expectedCount, observedLineCount) — brittle when S3 dribbled in a single record between attempts. parseFailureCount > 0 is a stronger signal: lines exist on disk that don't parse, so further fetches won't recover them. Treat it as proof of corruption and commit best-effort on the first observation. Also clean up cloudLogReconcileDeficiency in stopCloudTaskWatch to match the existing watcher teardown pattern. Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7 --- .../features/sessions/service/service.test.ts | 67 +++++++++++++++++++ .../features/sessions/service/service.ts | 19 ++++-- 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 4d45bf22d..ad55e8880 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -2513,6 +2513,73 @@ describe("SessionService", () => { expect(mockSessionStoreSetters.appendEvents).not.toHaveBeenCalled(); }); + it("breaks the reconcile loop on first observation when parse failures are present", async () => { + const service = getSessionService(); + const existingSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + logUrl: "https://logs.example.com/run-123", + processedLineCount: 5, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + existingSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": existingSession, + }); + + const validLine = JSON.stringify({ + type: "notification", + timestamp: "2024-01-01T00:00:00Z", + notification: { method: "session/update" }, + }); + // Mix of valid and malformed JSON — totalLineCount=10, but two lines + // fail JSON.parse so parseFailureCount=2. Even on first observation + // we should commit best-effort to break the loop. + const corruptedContent = [ + ...Array.from({ length: 8 }, () => validLine), + "}}not-json{{", + "{broken", + ].join("\n"); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(corruptedContent); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(corruptedContent); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: unknown) => void; + }; + + subscribeOptions.onData({ + kind: "logs", + taskId: "task-123", + runId: "run-123", + totalEntryCount: 20, + newEntries: [ + { + type: "notification", + timestamp: "2024-01-01T00:00:01Z", + notification: { method: "session/update" }, + }, + ], + }); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ processedLineCount: 20 }), + ); + }); + }); + it("breaks the reconcile loop after a repeated stable deficiency", async () => { const service = getSessionService(); const existingSession = createMockSession({ diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index cfc2a6816..8c137e844 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -3017,6 +3017,7 @@ export class SessionService { watcher.subscription.unsubscribe(); this.cloudTaskWatchers.delete(taskId); + this.cloudLogReconcileDeficiency.delete(watcher.runId); } async preflightToLocal(taskId: string, repoPath: string) { @@ -3874,24 +3875,28 @@ export class SessionService { return; } - // The fetched logs lag behind expectedCount. If we've seen the exact same - // deficiency before (same expectedCount, same observed line count), S3 is - // not catching up — committing what we have and advancing processedLineCount - // past the gap is the only way to break the reconcile loop. Otherwise the - // snapshot delta stays positive forever, firing a fetch + writeLocalLogs - // on every SSE snapshot. + // The fetched logs lag behind expectedCount. Two situations break the + // reconcile loop early by committing best-effort and advancing + // processedLineCount past the gap: + // 1. `parseFailureCount > 0` — proven corruption: lines exist but + // don't parse. S3 will never make them parseable. Break on first + // observation. + // 2. Same deficiency twice in a row (same expectedCount, same + // observedLineCount) — S3 isn't catching up. Break on second. + // Otherwise this is plausibly transient lag — record and wait. const previous = this.cloudLogReconcileDeficiency.get(taskRunId); const sameDeficiencyAsBefore = previous?.expectedCount === expectedCount && previous?.observedLineCount === totalLineCount; - if (sameDeficiencyAsBefore) { + if (parseFailureCount > 0 || sameDeficiencyAsBefore) { log.warn("Cloud task log gap unrecoverable; committing best-effort", { taskRunId, expectedCount, observedLineCount: totalLineCount, parseFailureCount, fetchedEntries: rawEntries.length, + reason: parseFailureCount > 0 ? "parse-failure" : "stable-deficit", }); const events = convertStoredEntriesToEvents(rawEntries); if (hasSessionPromptEvent(events)) { From 53d4dc2acce5fdc25c6245574cf652b2c16a0170 Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Thu, 21 May 2026 14:41:14 +0100 Subject: [PATCH 3/4] chore(code): trim verbose comments on cloud log reconcile fix Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7 --- .../features/sessions/service/service.test.ts | 4 ---- .../features/sessions/service/service.ts | 19 +++---------------- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index ad55e8880..1eca2a1ad 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -2536,9 +2536,6 @@ describe("SessionService", () => { timestamp: "2024-01-01T00:00:00Z", notification: { method: "session/update" }, }); - // Mix of valid and malformed JSON — totalLineCount=10, but two lines - // fail JSON.parse so parseFailureCount=2. Even on first observation - // we should commit best-effort to break the loop. const corruptedContent = [ ...Array.from({ length: 8 }, () => validLine), "}}not-json{{", @@ -2603,7 +2600,6 @@ describe("SessionService", () => { timestamp: "2024-01-01T00:00:00Z", notification: { method: "session/update" }, }); - // 8 parseable lines while the server claims 14 — stable corruption. mockTrpcLogs.readLocalLogs.query.mockResolvedValue( Array.from({ length: 8 }, () => storedLine).join("\n"), ); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 8c137e844..b0b38d4b5 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -309,13 +309,7 @@ export class SessionService { } >(); private cloudLogGapReconciles = new Map(); - /** - * Per-taskRunId record of the last observed reconcile deficiency. Used to - * detect the corruption-amplification loop: if a second reconcile produces - * the same (expectedCount, observedLineCount) pair as the previous one, S3 - * isn't catching up — we commit best-effort and advance processedLineCount - * past the gap so snapshot deltas stop re-triggering reconcile. - */ + /** Last observed reconcile deficit per taskRunId — see reconcileCloudLogGapOnce. */ private cloudLogReconcileDeficiency = new Map< string, CloudLogReconcileDeficiency @@ -3875,15 +3869,8 @@ export class SessionService { return; } - // The fetched logs lag behind expectedCount. Two situations break the - // reconcile loop early by committing best-effort and advancing - // processedLineCount past the gap: - // 1. `parseFailureCount > 0` — proven corruption: lines exist but - // don't parse. S3 will never make them parseable. Break on first - // observation. - // 2. Same deficiency twice in a row (same expectedCount, same - // observedLineCount) — S3 isn't catching up. Break on second. - // Otherwise this is plausibly transient lag — record and wait. + // Break the reconcile loop on proven corruption (parseFailureCount > 0) + // or on a stable repeat of the same deficit. Otherwise wait — likely lag. const previous = this.cloudLogReconcileDeficiency.get(taskRunId); const sameDeficiencyAsBefore = previous?.expectedCount === expectedCount && From 5d2948c05b88970e3304e32fe52bab6f78a6336c Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Thu, 21 May 2026 15:35:15 +0100 Subject: [PATCH 4/4] test(code): extract shared scaffolding for reconcile-loop tests Generated-By: PostHog Code Task-Id: 65171e71-4765-479c-b0c8-e4d0dd5c5bc7 --- .../features/sessions/service/service.test.ts | 94 ++++++------------- 1 file changed, 28 insertions(+), 66 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 1eca2a1ad..1b7f411b4 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -2513,7 +2513,7 @@ describe("SessionService", () => { expect(mockSessionStoreSetters.appendEvents).not.toHaveBeenCalled(); }); - it("breaks the reconcile loop on first observation when parse failures are present", async () => { + const setupReconcileLoopTest = (logContent: string) => { const service = getSessionService(); const existingSession = createMockSession({ taskRunId: "run-123", @@ -2530,20 +2530,8 @@ describe("SessionService", () => { mockSessionStoreSetters.getSessions.mockReturnValue({ "run-123": existingSession, }); - - const validLine = JSON.stringify({ - type: "notification", - timestamp: "2024-01-01T00:00:00Z", - notification: { method: "session/update" }, - }); - const corruptedContent = [ - ...Array.from({ length: 8 }, () => validLine), - "}}not-json{{", - "{broken", - ].join("\n"); - mockTrpcLogs.readLocalLogs.query.mockResolvedValue(corruptedContent); - mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(corruptedContent); - + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(logContent); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(logContent); service.watchCloudTask( "task-123", "run-123", @@ -2554,19 +2542,35 @@ describe("SessionService", () => { .calls[0][1] as { onData: (update: unknown) => void; }; + return { subscribeOptions }; + }; + + const newEntry = { + type: "notification", + timestamp: "2024-01-01T00:00:01Z", + notification: { method: "session/update" }, + }; + const validLine = JSON.stringify({ + type: "notification", + timestamp: "2024-01-01T00:00:00Z", + notification: { method: "session/update" }, + }); + + it("breaks the reconcile loop on first observation when parse failures are present", async () => { + const { subscribeOptions } = setupReconcileLoopTest( + [ + ...Array.from({ length: 8 }, () => validLine), + "}}not-json{{", + "{broken", + ].join("\n"), + ); subscribeOptions.onData({ kind: "logs", taskId: "task-123", runId: "run-123", totalEntryCount: 20, - newEntries: [ - { - type: "notification", - timestamp: "2024-01-01T00:00:01Z", - notification: { method: "session/update" }, - }, - ], + newEntries: [newEntry], }); await vi.waitFor(() => { @@ -2578,51 +2582,9 @@ describe("SessionService", () => { }); it("breaks the reconcile loop after a repeated stable deficiency", async () => { - const service = getSessionService(); - const existingSession = createMockSession({ - taskRunId: "run-123", - taskId: "task-123", - status: "connected", - isCloud: true, - logUrl: "https://logs.example.com/run-123", - processedLineCount: 5, - events: [], - }); - mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( - existingSession, - ); - mockSessionStoreSetters.getSessions.mockReturnValue({ - "run-123": existingSession, - }); - - const storedLine = JSON.stringify({ - type: "notification", - timestamp: "2024-01-01T00:00:00Z", - notification: { method: "session/update" }, - }); - mockTrpcLogs.readLocalLogs.query.mockResolvedValue( - Array.from({ length: 8 }, () => storedLine).join("\n"), - ); - mockTrpcLogs.fetchS3Logs.query.mockResolvedValue( - Array.from({ length: 8 }, () => storedLine).join("\n"), - ); - - service.watchCloudTask( - "task-123", - "run-123", - "https://api.anthropic.com", - 123, + const { subscribeOptions } = setupReconcileLoopTest( + Array.from({ length: 8 }, () => validLine).join("\n"), ); - const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock - .calls[0][1] as { - onData: (update: unknown) => void; - }; - - const newEntry = { - type: "notification", - timestamp: "2024-01-01T00:00:01Z", - notification: { method: "session/update" }, - }; subscribeOptions.onData({ kind: "logs",