diff --git a/apps/code/src/main/di/container.ts b/apps/code/src/main/di/container.ts index b5b0d0461..5d6a8d508 100644 --- a/apps/code/src/main/di/container.ts +++ b/apps/code/src/main/di/container.ts @@ -49,6 +49,7 @@ import { HandoffService } from "../services/handoff/service"; import { InboxLinkService } from "../services/inbox-link/service"; import { LinearIntegrationService } from "../services/linear-integration/service"; import { LlmGatewayService } from "../services/llm-gateway/service"; +import { LocalLogsService } from "../services/local-logs/service"; import { McpAppsService } from "../services/mcp-apps/service"; import { McpCallbackService } from "../services/mcp-callback/service"; import { McpProxyService } from "../services/mcp-proxy/service"; @@ -134,6 +135,7 @@ container.bind(MAIN_TOKENS.HandoffService).to(HandoffService); container .bind(MAIN_TOKENS.LinearIntegrationService) .to(LinearIntegrationService); +container.bind(MAIN_TOKENS.LocalLogsService).to(LocalLogsService); container.bind(MAIN_TOKENS.McpCallbackService).to(McpCallbackService); container.bind(MAIN_TOKENS.NotificationService).to(NotificationService); container.bind(MAIN_TOKENS.OAuthService).to(OAuthService); diff --git a/apps/code/src/main/di/tokens.ts b/apps/code/src/main/di/tokens.ts index 92d9a1287..aeade0e77 100644 --- a/apps/code/src/main/di/tokens.ts +++ b/apps/code/src/main/di/tokens.ts @@ -64,6 +64,7 @@ export const MAIN_TOKENS = Object.freeze({ GitHubIntegrationService: Symbol.for("Main.GitHubIntegrationService"), LinearIntegrationService: Symbol.for("Main.LinearIntegrationService"), SlackIntegrationService: Symbol.for("Main.SlackIntegrationService"), + LocalLogsService: Symbol.for("Main.LocalLogsService"), DeepLinkService: Symbol.for("Main.DeepLinkService"), NotificationService: Symbol.for("Main.NotificationService"), McpCallbackService: Symbol.for("Main.McpCallbackService"), diff --git a/apps/code/src/main/services/local-logs/service.test.ts b/apps/code/src/main/services/local-logs/service.test.ts new file mode 100644 index 000000000..80b735e73 --- /dev/null +++ b/apps/code/src/main/services/local-logs/service.test.ts @@ -0,0 +1,234 @@ +import os from "node:os"; +import path from "node:path"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { mockMkdir, mockWriteFile, mockReadFile } = vi.hoisted(() => ({ + mockMkdir: vi.fn(), + mockWriteFile: vi.fn(), + mockReadFile: vi.fn(), +})); + +vi.mock("node:fs", () => ({ + default: { + promises: { + mkdir: mockMkdir, + writeFile: mockWriteFile, + readFile: mockReadFile, + }, + }, +})); + +vi.mock("../../utils/logger.js", () => ({ + logger: { + scope: () => ({ + info: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + debug: vi.fn(), + }), + }, +})); + +import { LocalLogsService } from "./service"; + +const RUN_ID = "run-abc"; +const expectedPath = path.join( + os.homedir(), + ".posthog-code", + "sessions", + RUN_ID, + "logs.ndjson", +); + +function deferred(): { + promise: Promise; + resolve: (value: T) => void; + reject: (err: unknown) => void; +} { + let resolve!: (value: T) => void; + let reject!: (err: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +async function flushMicrotasks(): Promise { + for (let i = 0; i < 5; i++) await Promise.resolve(); +} + +describe("LocalLogsService", () => { + beforeEach(() => { + mockMkdir.mockReset().mockResolvedValue(undefined); + mockWriteFile.mockReset().mockResolvedValue(undefined); + mockReadFile.mockReset(); + }); + + describe("readLocalLogs", () => { + it("returns file contents", async () => { + mockReadFile.mockResolvedValue("hello"); + const service = new LocalLogsService(); + await expect(service.readLocalLogs(RUN_ID)).resolves.toBe("hello"); + expect(mockReadFile).toHaveBeenCalledWith(expectedPath, "utf-8"); + }); + + it.each([ + ["file is missing", Object.assign(new Error("nope"), { code: "ENOENT" })], + ["other read errors", new Error("boom")], + ])("returns null when %s", async (_label, err) => { + mockReadFile.mockRejectedValue(err); + const service = new LocalLogsService(); + await expect(service.readLocalLogs(RUN_ID)).resolves.toBeNull(); + }); + }); + + describe("writeLocalLogs", () => { + it("writes content to the run's NDJSON path", async () => { + const service = new LocalLogsService(); + await service.writeLocalLogs(RUN_ID, "line1\n"); + expect(mockMkdir).toHaveBeenCalledWith(path.dirname(expectedPath), { + recursive: true, + }); + expect(mockWriteFile).toHaveBeenCalledWith( + expectedPath, + "line1\n", + "utf-8", + ); + }); + + it("collapses many concurrent writes to one in-flight + one queued", async () => { + const firstWrite = deferred(); + mockWriteFile.mockImplementationOnce(() => firstWrite.promise); + + const service = new LocalLogsService(); + + const a = service.writeLocalLogs(RUN_ID, "A"); + const b = service.writeLocalLogs(RUN_ID, "B"); + const c = service.writeLocalLogs(RUN_ID, "C"); + const d = service.writeLocalLogs(RUN_ID, "D"); + + await flushMicrotasks(); + expect(mockWriteFile).toHaveBeenCalledTimes(1); + expect(mockWriteFile).toHaveBeenCalledWith(expectedPath, "A", "utf-8"); + + firstWrite.resolve(); + await Promise.all([a, b, c, d]); + + expect(mockWriteFile).toHaveBeenCalledTimes(2); + expect(mockWriteFile).toHaveBeenNthCalledWith( + 2, + expectedPath, + "D", + "utf-8", + ); + }); + + it("all coalesced callers see resolution when drain completes", async () => { + const firstWrite = deferred(); + mockWriteFile.mockImplementationOnce(() => firstWrite.promise); + + const service = new LocalLogsService(); + const a = service.writeLocalLogs(RUN_ID, "A"); + const b = service.writeLocalLogs(RUN_ID, "B"); + + let aResolved = false; + let bResolved = false; + void a.then(() => { + aResolved = true; + }); + void b.then(() => { + bResolved = true; + }); + + await Promise.resolve(); + expect(aResolved).toBe(false); + expect(bResolved).toBe(false); + + firstWrite.resolve(); + await Promise.all([a, b]); + expect(aResolved).toBe(true); + expect(bResolved).toBe(true); + }); + + it("keeps writes for different taskRunIds independent", async () => { + const writeA = deferred(); + const writeB = deferred(); + mockWriteFile + .mockImplementationOnce(() => writeA.promise) + .mockImplementationOnce(() => writeB.promise); + + const service = new LocalLogsService(); + const a = service.writeLocalLogs("run-a", "AAA"); + const b = service.writeLocalLogs("run-b", "BBB"); + + await flushMicrotasks(); + expect(mockWriteFile).toHaveBeenCalledTimes(2); + writeA.resolve(); + writeB.resolve(); + await Promise.all([a, b]); + }); + + it("starts fresh after the queue drains", async () => { + const service = new LocalLogsService(); + await service.writeLocalLogs(RUN_ID, "first"); + await service.writeLocalLogs(RUN_ID, "second"); + expect(mockWriteFile).toHaveBeenCalledTimes(2); + expect(mockWriteFile).toHaveBeenNthCalledWith( + 2, + expectedPath, + "second", + "utf-8", + ); + }); + + it("continues draining queued content even if a write rejects", async () => { + const firstWrite = deferred(); + mockWriteFile.mockImplementationOnce(() => firstWrite.promise); + + const service = new LocalLogsService(); + const a = service.writeLocalLogs(RUN_ID, "A"); + const b = service.writeLocalLogs(RUN_ID, "B"); + + firstWrite.reject(new Error("disk full")); + await Promise.all([a, b]); + + expect(mockWriteFile).toHaveBeenCalledTimes(2); + expect(mockWriteFile).toHaveBeenNthCalledWith( + 2, + expectedPath, + "B", + "utf-8", + ); + }); + + it("skips writeFile when coalesced content matches the last write", async () => { + const firstWrite = deferred(); + mockWriteFile.mockImplementationOnce(() => firstWrite.promise); + + const service = new LocalLogsService(); + const a = service.writeLocalLogs(RUN_ID, "SAME"); + const b = service.writeLocalLogs(RUN_ID, "SAME"); + + firstWrite.resolve(); + await Promise.all([a, b]); + + expect(mockWriteFile).toHaveBeenCalledTimes(1); + }); + + it("only mkdirs once per drain", async () => { + const firstWrite = deferred(); + mockWriteFile.mockImplementationOnce(() => firstWrite.promise); + + const service = new LocalLogsService(); + const a = service.writeLocalLogs(RUN_ID, "A"); + const b = service.writeLocalLogs(RUN_ID, "B"); + + firstWrite.resolve(); + await Promise.all([a, b]); + + expect(mockWriteFile).toHaveBeenCalledTimes(2); + expect(mockMkdir).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/apps/code/src/main/services/local-logs/service.ts b/apps/code/src/main/services/local-logs/service.ts new file mode 100644 index 000000000..4c4281bf2 --- /dev/null +++ b/apps/code/src/main/services/local-logs/service.ts @@ -0,0 +1,108 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import { injectable } from "inversify"; +import { DATA_DIR } from "../../../shared/constants"; +import { logger } from "../../utils/logger"; + +const log = logger.scope("local-logs"); + +interface WriteState { + pending: string | undefined; + lastWritten: string | undefined; + dirReady: boolean; +} + +/** + * Single-flight per `taskRunId` with latest-wins coalescing. Prevents the + * gap-reconcile loop from spawning parallel writeFile of the same NDJSON. + */ +@injectable() +export class LocalLogsService { + private writes = new Map< + string, + { state: WriteState; inFlight: Promise } + >(); + + async readLocalLogs(taskRunId: string): Promise { + const logPath = this.getLocalLogPath(taskRunId); + try { + return await fs.promises.readFile(logPath, "utf-8"); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return null; + } + log.warn("Failed to read local logs:", error); + return null; + } + } + + writeLocalLogs(taskRunId: string, content: string): Promise { + const existing = this.writes.get(taskRunId); + if (existing) { + existing.state.pending = content; + return existing.inFlight; + } + + const state: WriteState = { + pending: undefined, + lastWritten: undefined, + dirReady: false, + }; + const inFlight = this.drain(taskRunId, content, state); + this.writes.set(taskRunId, { state, inFlight }); + return inFlight; + } + + private async drain( + taskRunId: string, + initialContent: string, + state: WriteState, + ): Promise { + try { + let next: string | undefined = initialContent; + while (next !== undefined) { + const current = next; + next = undefined; + if (current !== state.lastWritten) { + await this.doWrite(taskRunId, current, state); + state.lastWritten = current; + } + if (state.pending !== undefined) { + next = state.pending; + state.pending = undefined; + } + } + } finally { + this.writes.delete(taskRunId); + } + } + + private async doWrite( + taskRunId: string, + content: string, + state: WriteState, + ): Promise { + const logPath = this.getLocalLogPath(taskRunId); + try { + if (!state.dirReady) { + await fs.promises.mkdir(path.dirname(logPath), { recursive: true }); + state.dirReady = true; + } + await fs.promises.writeFile(logPath, content, "utf-8"); + } catch (error) { + log.warn("Failed to write local logs:", error); + } + } + + private getLocalLogPath(taskRunId: string): string { + return path.join( + os.homedir(), + DATA_DIR, + "sessions", + taskRunId, + "logs.ndjson", + ); + } +} diff --git a/apps/code/src/main/trpc/routers/logs.ts b/apps/code/src/main/trpc/routers/logs.ts index 23b578854..bd0e80a67 100644 --- a/apps/code/src/main/trpc/routers/logs.ts +++ b/apps/code/src/main/trpc/routers/logs.ts @@ -1,22 +1,14 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; - import { z } from "zod"; +import { container } from "../../di/container"; +import { MAIN_TOKENS } from "../../di/tokens"; +import type { LocalLogsService } from "../../services/local-logs/service"; import { logger } from "../../utils/logger"; import { publicProcedure, router } from "../trpc"; const log = logger.scope("logsRouter"); -function getLocalLogPath(taskRunId: string): string { - return path.join( - os.homedir(), - ".posthog-code", - "sessions", - taskRunId, - "logs.ndjson", - ); -} +const getLocalLogsService = (): LocalLogsService => + container.get(MAIN_TOKENS.LocalLogsService); export const logsRouter = router({ fetchS3Logs: publicProcedure @@ -47,30 +39,11 @@ export const logsRouter = router({ readLocalLogs: publicProcedure .input(z.object({ taskRunId: z.string() })) - .query(async ({ input }) => { - const logPath = getLocalLogPath(input.taskRunId); - try { - return await fs.promises.readFile(logPath, "utf-8"); - } catch (error) { - if ((error as NodeJS.ErrnoException).code === "ENOENT") { - return null; - } - log.warn("Failed to read local logs:", error); - return null; - } - }), + .query(({ input }) => getLocalLogsService().readLocalLogs(input.taskRunId)), writeLocalLogs: publicProcedure .input(z.object({ taskRunId: z.string(), content: z.string() })) - .mutation(async ({ input }) => { - const logPath = getLocalLogPath(input.taskRunId); - const logDir = path.dirname(logPath); - - try { - await fs.promises.mkdir(logDir, { recursive: true }); - await fs.promises.writeFile(logPath, input.content, "utf-8"); - } catch (error) { - log.warn("Failed to write local logs:", error); - } - }), + .mutation(({ input }) => + getLocalLogsService().writeLocalLogs(input.taskRunId, input.content), + ), });