From 361407543741e328a107679b73da40c66795e1f5 Mon Sep 17 00:00:00 2001 From: smirk-dev Date: Sun, 7 Jun 2026 09:39:24 +0530 Subject: [PATCH] fix: guard concurrent ffmpeg load calls with shared promise Concurrent loadFFmpeg() calls could each trigger the worker initialization flow, double-loading the same WASM runtime and registering duplicate listeners. Replace the brittle isFirstLoad/workerReady sentinels with an explicit shared in-flight promise (ffmpegLoadingPromise) plus an ffmpegLoaded flag so: - The worker is created and {type:"load"} is posted at most once per init. - Concurrent callers join the same promise instead of starting a second load. - A failed load resets state (resetWorker) so later calls can retry cleanly. - Aborting one caller rejects only that caller and leaves the shared load intact for others. Adds regression coverage for concurrent init, retry-after-failure, and abort. Closes #1014 Co-Authored-By: Claude Opus 4.8 (1M context) --- src/lib/ffmpeg.ts | 113 +++++++++++++-------- src/lib/tests/ffmpegLoader.test.ts | 155 +++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+), 42 deletions(-) create mode 100644 src/lib/tests/ffmpegLoader.test.ts diff --git a/src/lib/ffmpeg.ts b/src/lib/ffmpeg.ts index 625387d2..092fbff5 100644 --- a/src/lib/ffmpeg.ts +++ b/src/lib/ffmpeg.ts @@ -46,9 +46,14 @@ type WorkerResponse = | WorkerCancelledResponse; let ffmpegWorker: Worker | null = null; -let workerReady: Promise | null = null; -let workerReadyResolve: (() => void) | null = null; -let workerReadyReject: ((reason?: any) => void) | null = null; +// True once the worker has reported the WASM runtime finished loading. +let ffmpegLoaded = false; +// Shared in-flight initialization. Concurrent loadFFmpeg() callers await this +// single promise, so the underlying runtime is only ever loaded once even when +// several callers trigger a load at the same time. +let ffmpegLoadingPromise: Promise | null = null; +let resolveLoad: (() => void) | null = null; +let rejectLoad: ((reason?: unknown) => void) | null = null; let pendingExport: { id: string; resolve: (result: ExportResult) => void; @@ -68,35 +73,40 @@ function createWorker(): Worker { ffmpegWorker.onerror = (event) => { const message = event.message || "FFmpeg worker error"; const error = new FFmpegLoadError(message); - workerReadyReject?.(error); + failLoad(error); pendingExport?.reject(error); resetWorker(); }; - workerReady = new Promise((resolve, reject) => { - workerReadyResolve = resolve; - workerReadyReject = reject; - }); - return ffmpegWorker; } function resetWorker() { ffmpegWorker = null; - workerReady = null; - workerReadyResolve = null; - workerReadyReject = null; + ffmpegLoaded = false; + ffmpegLoadingPromise = null; + resolveLoad = null; + rejectLoad = null; pendingExport = null; pendingProgress = null; } +// Reject the shared loading promise (if one is in flight) and clear its settle +// handlers so the loader is never left stuck in a half-initialized state. +function failLoad(error: unknown) { + rejectLoad?.(error); + resolveLoad = null; + rejectLoad = null; +} + function handleWorkerMessage(event: MessageEvent) { const data = event.data; if (data.type === "ready") { - workerReadyResolve?.(); - workerReadyResolve = null; - workerReadyReject = null; + ffmpegLoaded = true; + resolveLoad?.(); + resolveLoad = null; + rejectLoad = null; pendingProgress?.(100); return; } @@ -130,10 +140,7 @@ function handleWorkerMessage(event: MessageEvent) { return; } - workerReadyReject?.(new FFmpegLoadError(data.message)); - workerReady = null; - workerReadyResolve = null; - workerReadyReject = null; + failLoad(new FFmpegLoadError(data.message)); resetWorker(); return; } @@ -148,54 +155,76 @@ function handleWorkerMessage(event: MessageEvent) { } } -async function ensureWorker() { +function ensureWorker(): Worker { if (!ffmpegWorker) { createWorker(); } + return ffmpegWorker!; +} + +// Begin a single worker initialization cycle: create the worker, register the +// shared settle handlers, and trigger the worker's one-time load. The returned +// promise resolves on "ready" and rejects (after resetting state) on failure. +function startLoad(): Promise { + const worker = ensureWorker(); + const promise = new Promise((resolve, reject) => { + resolveLoad = resolve; + rejectLoad = reject; + }); + worker.postMessage({ type: "load" }); + return promise; } export async function loadFFmpeg( signal?: AbortSignal, onProgress?: (percent: number) => void ): Promise { - // 1. Capture if the worker is uninitialized before ensureWorker runs - const isFirstLoad = !ffmpegWorker; - - await ensureWorker(); - - if (workerReady && workerReadyResolve === null) { + // Fast path: the runtime is already initialized — never load twice. + if (ffmpegLoaded) { onProgress?.(100); return; } - // 2. Use the captured flag to securely trigger the worker's internal load phase - if (isFirstLoad) { - ffmpegWorker!.postMessage({ type: "load" }); + if (signal?.aborted) { + throw new DOMException("Aborted", "AbortError"); } - pendingProgress = onProgress ?? null; + // Latest caller's progress callback wins (mirrors prior behavior). + if (onProgress) { + pendingProgress = onProgress; + } - if (signal?.aborted) { - ffmpegWorker?.postMessage({ type: "cancel" }); - throw new DOMException("Aborted", "AbortError"); + // Start the initialization exactly once. Any caller that arrives while a load + // is in flight joins the same promise instead of kicking off a second + // ffmpeg.load() (and a second set of listeners) on the same runtime. + if (!ffmpegLoadingPromise) { + ffmpegLoadingPromise = startLoad(); } + const loadingPromise = ffmpegLoadingPromise; - const cleanup = () => { - signal?.removeEventListener("abort", onAbort); - }; + if (!signal) { + await loadingPromise; + onProgress?.(100); + return; + } + // Per-caller abort: rejects only this caller's wait while leaving the shared + // load intact for other in-flight callers. + let abortReject: ((reason: unknown) => void) | null = null; + const abortPromise = new Promise((_, reject) => { + abortReject = reject; + }); const onAbort = () => { ffmpegWorker?.postMessage({ type: "cancel" }); - workerReadyReject?.(new DOMException("Aborted", "AbortError")); - cleanup(); + abortReject?.(new DOMException("Aborted", "AbortError")); }; - - signal?.addEventListener("abort", onAbort, { once: true }); + signal.addEventListener("abort", onAbort, { once: true }); try { - await workerReady; + await Promise.race([loadingPromise, abortPromise]); + onProgress?.(100); } finally { - cleanup(); + signal.removeEventListener("abort", onAbort); } } diff --git a/src/lib/tests/ffmpegLoader.test.ts b/src/lib/tests/ffmpegLoader.test.ts new file mode 100644 index 00000000..35ce8b0a --- /dev/null +++ b/src/lib/tests/ffmpegLoader.test.ts @@ -0,0 +1,155 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +/** + * Regression coverage for the concurrency-safe FFmpeg loader (issue #1014). + * + * The real runtime lives inside a Web Worker, so these tests stub the global + * `Worker` and assert on the messages the loader posts to it. The key invariant + * is that concurrent `loadFFmpeg()` calls share a single initialization: the + * worker is created once and the one-time `{ type: "load" }` command — which is + * what triggers `ffmpeg.load()` and registers the worker's listeners — is posted + * at most once per initialization. + */ + +type WorkerMessage = { type: string; [key: string]: unknown }; + +class MockWorker { + static instances: MockWorker[] = []; + + onmessage: ((event: { data: unknown }) => void) | null = null; + onerror: ((event: { message?: string }) => void) | null = null; + postMessage = vi.fn<(message: WorkerMessage) => void>(); + terminate = vi.fn(); + + constructor(public url: unknown, public options: unknown) { + MockWorker.instances.push(this); + } + + /** Simulate a message coming back from the worker thread. */ + emit(data: WorkerMessage) { + this.onmessage?.({ data }); + } + + /** Number of `{ type: "load" }` commands this worker received. */ + loadCount() { + return this.postMessage.mock.calls.filter(([m]) => m?.type === "load").length; + } +} + +function loadMessages() { + return MockWorker.instances.reduce((sum, w) => sum + w.loadCount(), 0); +} + +async function importLoader() { + // Fresh module instance per test so the module-level loader state is reset. + vi.resetModules(); + return import("../ffmpeg"); +} + +beforeEach(() => { + MockWorker.instances = []; + vi.stubGlobal("Worker", MockWorker as unknown as typeof Worker); +}); + +afterEach(() => { + vi.unstubAllGlobals(); + vi.restoreAllMocks(); +}); + +describe("loadFFmpeg concurrency guard", () => { + it("shares a single initialization across concurrent callers", async () => { + const { loadFFmpeg } = await importLoader(); + + // Fire several loads without awaiting in between. + const pending = [loadFFmpeg(), loadFFmpeg(), loadFFmpeg()]; + + // Exactly one worker is created and only one load command is posted, even + // though three callers requested initialization simultaneously. + expect(MockWorker.instances).toHaveLength(1); + expect(loadMessages()).toBe(1); + + // Resolve the shared load — every caller settles together. + MockWorker.instances[0].emit({ type: "ready" }); + await expect(Promise.all(pending)).resolves.toEqual([undefined, undefined, undefined]); + }); + + it("does not re-load once the runtime is ready", async () => { + const { loadFFmpeg } = await importLoader(); + + const first = loadFFmpeg(); + const worker = MockWorker.instances[0]; + worker.emit({ type: "ready" }); + await first; + + const postsBefore = worker.postMessage.mock.calls.length; + + // Subsequent calls take the fast path: no new worker, no new load command. + await loadFFmpeg(); + await loadFFmpeg(); + + expect(MockWorker.instances).toHaveLength(1); + expect(worker.postMessage.mock.calls.length).toBe(postsBefore); + expect(loadMessages()).toBe(1); + }); + + it("reports progress to the latest caller exactly once on ready", async () => { + const { loadFFmpeg } = await importLoader(); + + const onProgress = vi.fn(); + const pending = loadFFmpeg(undefined, onProgress); + + MockWorker.instances[0].emit({ type: "ready" }); + await pending; + + // `ready` triggers a single 100% notification (no duplicate listeners firing). + expect(onProgress).toHaveBeenCalledWith(100); + }); + + it("resets state after a failed load so callers can retry cleanly", async () => { + const { loadFFmpeg } = await importLoader(); + + // First attempt fails during load. + const first = loadFFmpeg(); + MockWorker.instances[0].emit({ type: "error", message: "boom" }); + await expect(first).rejects.toThrow("boom"); + + // A retry starts a fresh worker and a fresh single load command... + const second = loadFFmpeg(); + expect(MockWorker.instances).toHaveLength(2); + expect(MockWorker.instances[1].loadCount()).toBe(1); + + // ...and succeeds. + MockWorker.instances[1].emit({ type: "ready" }); + await expect(second).resolves.toBeUndefined(); + }); + + it("rejects an aborted caller without tearing down the shared load", async () => { + const { loadFFmpeg } = await importLoader(); + + const controller = new AbortController(); + const aborted = loadFFmpeg(controller.signal); + const joiner = loadFFmpeg(); + + expect(MockWorker.instances).toHaveLength(1); + expect(loadMessages()).toBe(1); + + controller.abort(); + await expect(aborted).rejects.toThrow(/abort/i); + + // The non-aborted caller still resolves once the shared load completes. + MockWorker.instances[0].emit({ type: "ready" }); + await expect(joiner).resolves.toBeUndefined(); + expect(loadMessages()).toBe(1); + }); + + it("rejects immediately when called with an already-aborted signal", async () => { + const { loadFFmpeg } = await importLoader(); + + const controller = new AbortController(); + controller.abort(); + + await expect(loadFFmpeg(controller.signal)).rejects.toThrow(/abort/i); + // No worker should have been spun up for an already-aborted request. + expect(MockWorker.instances).toHaveLength(0); + }); +});