Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 71 additions & 42 deletions src/lib/ffmpeg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ type WorkerResponse =
| WorkerCancelledResponse;

let ffmpegWorker: Worker | null = null;
let workerReady: Promise<void> | null = null;
let workerReadyResolve: (() => void) | null = null;
let workerReadyReject: ((reason?: any) => void) | null = null;
// True once the worker has reported the WASM runtime finished loading.
let ffmpegLoaded = false;
// Shared in-flight initialization. Concurrent loadFFmpeg() callers await this
// single promise, so the underlying runtime is only ever loaded once even when
// several callers trigger a load at the same time.
let ffmpegLoadingPromise: Promise<void> | null = null;
let resolveLoad: (() => void) | null = null;
let rejectLoad: ((reason?: unknown) => void) | null = null;
let pendingExport: {
id: string;
resolve: (result: ExportResult) => void;
Expand All @@ -68,35 +73,40 @@ function createWorker(): Worker {
ffmpegWorker.onerror = (event) => {
const message = event.message || "FFmpeg worker error";
const error = new FFmpegLoadError(message);
workerReadyReject?.(error);
failLoad(error);
pendingExport?.reject(error);
resetWorker();
};

workerReady = new Promise((resolve, reject) => {
workerReadyResolve = resolve;
workerReadyReject = reject;
});

return ffmpegWorker;
}

function resetWorker() {
ffmpegWorker = null;
workerReady = null;
workerReadyResolve = null;
workerReadyReject = null;
ffmpegLoaded = false;
ffmpegLoadingPromise = null;
resolveLoad = null;
rejectLoad = null;
pendingExport = null;
pendingProgress = null;
}

// Reject the shared loading promise (if one is in flight) and clear its settle
// handlers so the loader is never left stuck in a half-initialized state.
function failLoad(error: unknown) {
rejectLoad?.(error);
resolveLoad = null;
rejectLoad = null;
}

function handleWorkerMessage(event: MessageEvent<WorkerResponse>) {
const data = event.data;

if (data.type === "ready") {
workerReadyResolve?.();
workerReadyResolve = null;
workerReadyReject = null;
ffmpegLoaded = true;
resolveLoad?.();
resolveLoad = null;
rejectLoad = null;
pendingProgress?.(100);
return;
}
Expand Down Expand Up @@ -130,10 +140,7 @@ function handleWorkerMessage(event: MessageEvent<WorkerResponse>) {
return;
}

workerReadyReject?.(new FFmpegLoadError(data.message));
workerReady = null;
workerReadyResolve = null;
workerReadyReject = null;
failLoad(new FFmpegLoadError(data.message));
resetWorker();
return;
}
Expand All @@ -148,54 +155,76 @@ function handleWorkerMessage(event: MessageEvent<WorkerResponse>) {
}
}

async function ensureWorker() {
function ensureWorker(): Worker {
if (!ffmpegWorker) {
createWorker();
}
return ffmpegWorker!;
}

// Begin a single worker initialization cycle: create the worker, register the
// shared settle handlers, and trigger the worker's one-time load. The returned
// promise resolves on "ready" and rejects (after resetting state) on failure.
function startLoad(): Promise<void> {
const worker = ensureWorker();
const promise = new Promise<void>((resolve, reject) => {
resolveLoad = resolve;
rejectLoad = reject;
});
worker.postMessage({ type: "load" });
return promise;
}

export async function loadFFmpeg(
signal?: AbortSignal,
onProgress?: (percent: number) => void
): Promise<void> {
// 1. Capture if the worker is uninitialized before ensureWorker runs
const isFirstLoad = !ffmpegWorker;

await ensureWorker();

if (workerReady && workerReadyResolve === null) {
// Fast path: the runtime is already initialized — never load twice.
if (ffmpegLoaded) {
onProgress?.(100);
return;
}

// 2. Use the captured flag to securely trigger the worker's internal load phase
if (isFirstLoad) {
ffmpegWorker!.postMessage({ type: "load" });
if (signal?.aborted) {
throw new DOMException("Aborted", "AbortError");
}

pendingProgress = onProgress ?? null;
// Latest caller's progress callback wins (mirrors prior behavior).
if (onProgress) {
pendingProgress = onProgress;
}

if (signal?.aborted) {
ffmpegWorker?.postMessage({ type: "cancel" });
throw new DOMException("Aborted", "AbortError");
// Start the initialization exactly once. Any caller that arrives while a load
// is in flight joins the same promise instead of kicking off a second
// ffmpeg.load() (and a second set of listeners) on the same runtime.
if (!ffmpegLoadingPromise) {
ffmpegLoadingPromise = startLoad();
}
const loadingPromise = ffmpegLoadingPromise;

const cleanup = () => {
signal?.removeEventListener("abort", onAbort);
};
if (!signal) {
await loadingPromise;
onProgress?.(100);
return;
}

// Per-caller abort: rejects only this caller's wait while leaving the shared
// load intact for other in-flight callers.
let abortReject: ((reason: unknown) => void) | null = null;
const abortPromise = new Promise<never>((_, reject) => {
abortReject = reject;
});
const onAbort = () => {
ffmpegWorker?.postMessage({ type: "cancel" });
workerReadyReject?.(new DOMException("Aborted", "AbortError"));
cleanup();
abortReject?.(new DOMException("Aborted", "AbortError"));
};

signal?.addEventListener("abort", onAbort, { once: true });
signal.addEventListener("abort", onAbort, { once: true });

try {
await workerReady;
await Promise.race([loadingPromise, abortPromise]);
onProgress?.(100);
} finally {
cleanup();
signal.removeEventListener("abort", onAbort);
}
}

Expand Down
155 changes: 155 additions & 0 deletions src/lib/tests/ffmpegLoader.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";

/**
* Regression coverage for the concurrency-safe FFmpeg loader (issue #1014).
*
* The real runtime lives inside a Web Worker, so these tests stub the global
* `Worker` and assert on the messages the loader posts to it. The key invariant
* is that concurrent `loadFFmpeg()` calls share a single initialization: the
* worker is created once and the one-time `{ type: "load" }` command — which is
* what triggers `ffmpeg.load()` and registers the worker's listeners — is posted
* at most once per initialization.
*/

type WorkerMessage = { type: string; [key: string]: unknown };

class MockWorker {
static instances: MockWorker[] = [];

onmessage: ((event: { data: unknown }) => void) | null = null;
onerror: ((event: { message?: string }) => void) | null = null;
postMessage = vi.fn<(message: WorkerMessage) => void>();
terminate = vi.fn();

constructor(public url: unknown, public options: unknown) {
MockWorker.instances.push(this);
}

/** Simulate a message coming back from the worker thread. */
emit(data: WorkerMessage) {
this.onmessage?.({ data });
}

/** Number of `{ type: "load" }` commands this worker received. */
loadCount() {
return this.postMessage.mock.calls.filter(([m]) => m?.type === "load").length;
}
}

function loadMessages() {
return MockWorker.instances.reduce((sum, w) => sum + w.loadCount(), 0);
}

async function importLoader() {
// Fresh module instance per test so the module-level loader state is reset.
vi.resetModules();
return import("../ffmpeg");
}

beforeEach(() => {
MockWorker.instances = [];
vi.stubGlobal("Worker", MockWorker as unknown as typeof Worker);
});

afterEach(() => {
vi.unstubAllGlobals();
vi.restoreAllMocks();
});

describe("loadFFmpeg concurrency guard", () => {
it("shares a single initialization across concurrent callers", async () => {
const { loadFFmpeg } = await importLoader();

// Fire several loads without awaiting in between.
const pending = [loadFFmpeg(), loadFFmpeg(), loadFFmpeg()];

// Exactly one worker is created and only one load command is posted, even
// though three callers requested initialization simultaneously.
expect(MockWorker.instances).toHaveLength(1);
expect(loadMessages()).toBe(1);

// Resolve the shared load — every caller settles together.
MockWorker.instances[0].emit({ type: "ready" });
await expect(Promise.all(pending)).resolves.toEqual([undefined, undefined, undefined]);
});

it("does not re-load once the runtime is ready", async () => {
const { loadFFmpeg } = await importLoader();

const first = loadFFmpeg();
const worker = MockWorker.instances[0];
worker.emit({ type: "ready" });
await first;

const postsBefore = worker.postMessage.mock.calls.length;

// Subsequent calls take the fast path: no new worker, no new load command.
await loadFFmpeg();
await loadFFmpeg();

expect(MockWorker.instances).toHaveLength(1);
expect(worker.postMessage.mock.calls.length).toBe(postsBefore);
expect(loadMessages()).toBe(1);
});

it("reports progress to the latest caller exactly once on ready", async () => {
const { loadFFmpeg } = await importLoader();

const onProgress = vi.fn();
const pending = loadFFmpeg(undefined, onProgress);

MockWorker.instances[0].emit({ type: "ready" });
await pending;

// `ready` triggers a single 100% notification (no duplicate listeners firing).
expect(onProgress).toHaveBeenCalledWith(100);
});

it("resets state after a failed load so callers can retry cleanly", async () => {
const { loadFFmpeg } = await importLoader();

// First attempt fails during load.
const first = loadFFmpeg();
MockWorker.instances[0].emit({ type: "error", message: "boom" });
await expect(first).rejects.toThrow("boom");

// A retry starts a fresh worker and a fresh single load command...
const second = loadFFmpeg();
expect(MockWorker.instances).toHaveLength(2);
expect(MockWorker.instances[1].loadCount()).toBe(1);

// ...and succeeds.
MockWorker.instances[1].emit({ type: "ready" });
await expect(second).resolves.toBeUndefined();
});

it("rejects an aborted caller without tearing down the shared load", async () => {
const { loadFFmpeg } = await importLoader();

const controller = new AbortController();
const aborted = loadFFmpeg(controller.signal);
const joiner = loadFFmpeg();

expect(MockWorker.instances).toHaveLength(1);
expect(loadMessages()).toBe(1);

controller.abort();
await expect(aborted).rejects.toThrow(/abort/i);

// The non-aborted caller still resolves once the shared load completes.
MockWorker.instances[0].emit({ type: "ready" });
await expect(joiner).resolves.toBeUndefined();
expect(loadMessages()).toBe(1);
});

it("rejects immediately when called with an already-aborted signal", async () => {
const { loadFFmpeg } = await importLoader();

const controller = new AbortController();
controller.abort();

await expect(loadFFmpeg(controller.signal)).rejects.toThrow(/abort/i);
// No worker should have been spun up for an already-aborted request.
expect(MockWorker.instances).toHaveLength(0);
});
});