diff --git a/package.json b/package.json index 4a69ec0..7e1bf5e 100644 --- a/package.json +++ b/package.json @@ -48,9 +48,7 @@ "type": "git", "url": "https://github.com/AgentWorkforce/trajectories" }, - "files": [ - "dist" - ], + "files": ["dist"], "engines": { "node": ">=20.0.0" }, diff --git a/src/storage/file.ts b/src/storage/file.ts index e26c723..573172d 100644 --- a/src/storage/file.ts +++ b/src/storage/file.ts @@ -5,8 +5,16 @@ * Active trajectories go in active/, completed in completed/YYYY-MM/. */ +import { randomUUID } from "node:crypto"; import { type Dirent, existsSync } from "node:fs"; -import { mkdir, readFile, readdir, unlink, writeFile } from "node:fs/promises"; +import { + mkdir, + readFile, + readdir, + rename, + unlink, + writeFile, +} from "node:fs/promises"; import { join } from "node:path"; import { validateTrajectory } from "../core/schema.js"; import type { @@ -98,6 +106,35 @@ export interface ReconcileSummary { skippedIoError: number; } +/** + * Per-path promise-chain mutex for index.json access. + * + * Keyed by the absolute index path, so multiple FileStorage instances in + * the same process that target the same `.trajectories` directory share + * the same lock. This is an in-process mutex only — it does not protect + * against writers in other processes. Cross-process safety is provided + * by the atomic tmp-file + rename in `saveIndex` (rename is atomic on + * POSIX, so readers never observe a half-written index). + * + * Implementation: store the tail of a promise chain per path. Each new + * critical section chains onto `.then(task)` so it only runs after the + * previous task resolves. We swallow errors on the tail so one failed + * task doesn't poison the chain for subsequent callers. + */ +const indexLocks = new Map>(); + +function withIndexLock(path: string, task: () => Promise): Promise { + const prev = indexLocks.get(path) ?? Promise.resolve(); + const next = prev.then(task, task); + // Replace the tail with a swallowed-error version so a rejection in + // `task` doesn't propagate to the next queued caller. + indexLocks.set( + path, + next.catch(() => undefined), + ); + return next; +} + /** * File system storage adapter */ @@ -133,12 +170,13 @@ export class FileStorage implements StorageAdapter { await mkdir(this.activeDir, { recursive: true }); await mkdir(this.completedDir, { recursive: true }); - // Create index if it doesn't exist + // Create index if it doesn't exist. Take the lock so a parallel + // initialize() in the same process doesn't race its seed write. if (!existsSync(this.indexPath)) { - await this.saveIndex({ - version: 1, - lastUpdated: new Date().toISOString(), - trajectories: {}, + await withIndexLock(this.indexPath, async () => { + if (!existsSync(this.indexPath)) { + await this.saveIndex(this.emptyIndex()); + } }); } @@ -172,58 +210,60 @@ export class FileStorage implements StorageAdapter { skippedIoError: 0, }; - const index = await this.loadIndex(); - const before = Object.keys(index.trajectories).length; + await withIndexLock(this.indexPath, async () => { + const index = await this.loadIndex(); + const before = Object.keys(index.trajectories).length; - const discovered: string[] = []; + const discovered: string[] = []; - // Walk active/ — intentionally NOT recursive; active trajectories - // always live at the flat root. - try { - const activeFiles = await readdir(this.activeDir); - for (const file of activeFiles) { - if (!file.endsWith(".json")) continue; - discovered.push(join(this.activeDir, file)); + // Walk active/ — intentionally NOT recursive; active trajectories + // always live at the flat root. + try { + const activeFiles = await readdir(this.activeDir); + for (const file of activeFiles) { + if (!file.endsWith(".json")) continue; + discovered.push(join(this.activeDir, file)); + } + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") throw error; } - } catch (error) { - if ((error as NodeJS.ErrnoException).code !== "ENOENT") throw error; - } - // Walk completed/ recursively so we transparently support every - // historical layout without guessing depth. - await this.walkJsonFilesInto(this.completedDir, discovered); - - for (const filePath of discovered) { - summary.scanned += 1; - const result = await this.readTrajectoryFile(filePath); - if (!result.ok) { - if (result.reason === "malformed_json") { - summary.skippedMalformedJson += 1; - } else if (result.reason === "schema_violation") { - summary.skippedSchemaViolation += 1; - } else { - summary.skippedIoError += 1; + // Walk completed/ recursively so we transparently support every + // historical layout without guessing depth. + await this.walkJsonFilesInto(this.completedDir, discovered); + + for (const filePath of discovered) { + summary.scanned += 1; + const result = await this.readTrajectoryFile(filePath); + if (!result.ok) { + if (result.reason === "malformed_json") { + summary.skippedMalformedJson += 1; + } else if (result.reason === "schema_violation") { + summary.skippedSchemaViolation += 1; + } else { + summary.skippedIoError += 1; + } + continue; } - continue; - } - const trajectory = result.trajectory; - if (index.trajectories[trajectory.id]) { - summary.alreadyIndexed += 1; - continue; + const trajectory = result.trajectory; + if (index.trajectories[trajectory.id]) { + summary.alreadyIndexed += 1; + continue; + } + index.trajectories[trajectory.id] = { + title: trajectory.task.title, + status: trajectory.status, + startedAt: trajectory.startedAt, + completedAt: trajectory.completedAt, + path: filePath, + }; + summary.added += 1; } - index.trajectories[trajectory.id] = { - title: trajectory.task.title, - status: trajectory.status, - startedAt: trajectory.startedAt, - completedAt: trajectory.completedAt, - path: filePath, - }; - summary.added += 1; - } - if (Object.keys(index.trajectories).length !== before) { - await this.saveIndex(index); - } + if (Object.keys(index.trajectories).length !== before) { + await this.saveIndex(index); + } + }); // Only log when something interesting happened. Noise is worse than // silence here — the CLI spinner is the user's feedback. @@ -490,21 +530,22 @@ export class FileStorage implements StorageAdapter { await unlink(activePath); } - // Remove from completed (search subdirectories) - const index = await this.loadIndex(); - const entry = index.trajectories[id]; - if (entry?.path && existsSync(entry.path)) { - await unlink(entry.path); - // Also remove markdown if exists - const mdPath = entry.path.replace(".json", ".md"); - if (existsSync(mdPath)) { - await unlink(mdPath); + // Read + mutate + write the index under the lock so we can't clobber + // a concurrent save's update. + await withIndexLock(this.indexPath, async () => { + const index = await this.loadIndex(); + const entry = index.trajectories[id]; + if (entry?.path && existsSync(entry.path)) { + await unlink(entry.path); + // Also remove markdown if exists + const mdPath = entry.path.replace(".json", ".md"); + if (existsSync(mdPath)) { + await unlink(mdPath); + } } - } - - // Update index - delete index.trajectories[id]; - await this.saveIndex(index); + delete index.trajectories[id]; + await this.saveIndex(index); + }); } /** @@ -611,43 +652,91 @@ export class FileStorage implements StorageAdapter { return result.ok ? result.trajectory : null; } + /** + * Read and parse the on-disk index. + * + * Tolerances (belt-and-braces against the read/write race): + * - ENOENT: first-run, return an empty index silently. + * - Empty file: a concurrent writer truncated index.json in "w" mode + * right before we read. Return an empty index silently — this is + * not a real corruption, just an interleaving the mutex + atomic + * rename should already prevent. Logging here would be noise. + * - Non-empty but malformed JSON: genuinely corrupted on disk (hand + * edit, disk error, etc). Log it and return an empty index so the + * caller can recover, but keep the log so the problem is visible. + */ private async loadIndex(): Promise { + let content: string; try { - const content = await readFile(this.indexPath, "utf-8"); - return JSON.parse(content); + content = await readFile(this.indexPath, "utf-8"); } catch (error) { - // ENOENT means index doesn't exist yet - this is expected on first run + // ENOENT means index doesn't exist yet - expected on first run. if ((error as NodeJS.ErrnoException).code !== "ENOENT") { console.error( "Error loading trajectory index, using empty index:", error, ); } - return { - version: 1, - lastUpdated: new Date().toISOString(), - trajectories: {}, - }; + return this.emptyIndex(); } + + // Empty file == treat as empty index. Happens when readFile sneaks + // in between a writer's truncate and its write. Defense in depth + // against the race the in-process mutex already eliminates. + if (content.length === 0) { + return this.emptyIndex(); + } + + try { + return JSON.parse(content) as TrajectoryIndex; + } catch (error) { + console.error( + "Error loading trajectory index, using empty index:", + error, + ); + return this.emptyIndex(); + } + } + + private emptyIndex(): TrajectoryIndex { + return { + version: 1, + lastUpdated: new Date().toISOString(), + trajectories: {}, + }; } + /** + * Atomic write: stage into a process-unique temp path in the same directory + * and then rename over the live file. `rename` is atomic on POSIX, so + * concurrent readers in any process either see the old complete file or + * the new complete file — never a half-written / zero-byte state. + * + * Callers MUST hold `withIndexLock(this.indexPath, ...)` so the in-process + * read-modify-write cycle stays serialized; the unique temp name also keeps + * parallel writers in other processes from colliding on a shared tmp path. + */ private async saveIndex(index: TrajectoryIndex): Promise { index.lastUpdated = new Date().toISOString(); - await writeFile(this.indexPath, JSON.stringify(index, null, 2), "utf-8"); + const tmpPath = `${this.indexPath}.${process.pid}.${randomUUID()}.tmp`; + await writeFile(tmpPath, JSON.stringify(index, null, 2), "utf-8"); + await rename(tmpPath, this.indexPath); } private async updateIndex( trajectory: Trajectory, filePath: string, ): Promise { - const index = await this.loadIndex(); - index.trajectories[trajectory.id] = { - title: trajectory.task.title, - status: trajectory.status, - startedAt: trajectory.startedAt, - completedAt: trajectory.completedAt, - path: filePath, - }; - await this.saveIndex(index); + await withIndexLock(this.indexPath, async () => { + const index = await this.loadIndex(); + index.trajectories[trajectory.id] = { + title: trajectory.task.title, + status: trajectory.status, + startedAt: trajectory.startedAt, + completedAt: trajectory.completedAt, + path: filePath, + }; + await this.saveIndex(index); + }); } } diff --git a/tests/storage/concurrent-save.test.ts b/tests/storage/concurrent-save.test.ts new file mode 100644 index 0000000..a1656b4 --- /dev/null +++ b/tests/storage/concurrent-save.test.ts @@ -0,0 +1,135 @@ +import { readFile } from "node:fs/promises"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +/** + * Regression tests for the concurrent index.json read/write race. + * + * Repro shape (seen in relay workflow fan-outs): multiple async callers + * invoke FileStorage.save at roughly the same millisecond. Each call + * runs loadIndex -> mutate -> saveIndex. Without serialization: + * + * - A writer opens index.json in "w" mode, truncating it to 0 bytes. + * - A concurrent reader sees the empty file, JSON.parse("") throws, + * the catch treats it as "no index" and returns an empty one. + * - That reader then writes its one mutation on top of the empty + * object, wiping every trajectory registered before it. + */ +describe("FileStorage concurrent save", () => { + let tempDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), "trail-concurrent-")); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + vi.restoreAllMocks(); + }); + + it("does not lose trajectories when many callers save concurrently", async () => { + const { FileStorage } = await import("../../src/storage/file.js"); + const { createTrajectory } = await import("../../src/core/trajectory.js"); + + const storage = new FileStorage(tempDir); + await storage.initialize(); + + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + // Fan out ~20 concurrent saves. Each one does load -> mutate -> save + // of index.json, which is the exact interleaving that wipes prior + // entries in the buggy code. + const count = 20; + const trajectories = Array.from({ length: count }, (_, i) => + createTrajectory({ title: `Concurrent ${i}` }), + ); + await Promise.all(trajectories.map((t) => storage.save(t))); + + const indexPath = join(tempDir, ".trajectories", "index.json"); + const raw = await readFile(indexPath, "utf-8"); + + // Final file on disk must be valid JSON — never a truncated write. + const parsed = JSON.parse(raw) as { + trajectories: Record; + }; + + // All saved trajectories must be present in the final index. + // The buggy code would lose most of them to truncated-read wipes. + const ids = Object.keys(parsed.trajectories); + expect(ids).toHaveLength(count); + for (const t of trajectories) { + expect(ids).toContain(t.id); + } + + // No SyntaxError logged — empty reads should be tolerated silently, + // and the mutex should prevent concurrent read/write interleaves. + const loggedSyntaxError = errorSpy.mock.calls.some((args) => + args.some( + (a) => + a instanceof SyntaxError || + (typeof a === "string" && a.includes("Unexpected end of JSON input")), + ), + ); + expect(loggedSyntaxError).toBe(false); + }); + + it("tolerates an empty index.json on disk without logging", async () => { + // Mimics the "file truncated mid-write" observation: an empty file + // should be treated as an empty index, not a JSON parse error. + const { FileStorage } = await import("../../src/storage/file.js"); + const { createTrajectory } = await import("../../src/core/trajectory.js"); + const { writeFile } = await import("node:fs/promises"); + + const storage = new FileStorage(tempDir); + await storage.initialize(); + + // Force index.json to be empty after initialize. + const indexPath = join(tempDir, ".trajectories", "index.json"); + await writeFile(indexPath, "", "utf-8"); + + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + const trajectory = createTrajectory({ title: "After empty read" }); + await storage.save(trajectory); + + const loggedSyntaxError = errorSpy.mock.calls.some((args) => + args.some( + (a) => + a instanceof SyntaxError || + (typeof a === "string" && a.includes("Unexpected end of JSON input")), + ), + ); + expect(loggedSyntaxError).toBe(false); + + const raw = await readFile(indexPath, "utf-8"); + const parsed = JSON.parse(raw) as { + trajectories: Record; + }; + expect(parsed.trajectories[trajectory.id]).toBeDefined(); + }); + + it("still surfaces genuinely malformed (non-empty) index.json", async () => { + // If someone hand-edits index.json into invalid garbage, we still + // want the console.error — that's a real problem worth surfacing. + const { FileStorage } = await import("../../src/storage/file.js"); + const { createTrajectory } = await import("../../src/core/trajectory.js"); + const { writeFile } = await import("node:fs/promises"); + + const storage = new FileStorage(tempDir); + await storage.initialize(); + + const indexPath = join(tempDir, ".trajectories", "index.json"); + await writeFile(indexPath, "{not valid json at all", "utf-8"); + + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + // Save still succeeds (we treat the index as empty and rebuild). + const trajectory = createTrajectory({ title: "After garbage index" }); + await storage.save(trajectory); + + // Malformed JSON should still be logged. + expect(errorSpy).toHaveBeenCalled(); + }); +});