diff --git a/.changeset/tough-clubs-eat.md b/.changeset/tough-clubs-eat.md new file mode 100644 index 000000000..3575a0f02 --- /dev/null +++ b/.changeset/tough-clubs-eat.md @@ -0,0 +1,5 @@ +--- +"@ensnode/ensdb-sdk": patch +--- + +Made `EnsDbWriter.migrateEnsNodeSchema` race-condition safe. diff --git a/packages/ensdb-sdk/src/client/ensdb-writer.test.ts b/packages/ensdb-sdk/src/client/ensdb-writer.test.ts index 51e7ccf75..0ec99aaa3 100644 --- a/packages/ensdb-sdk/src/client/ensdb-writer.test.ts +++ b/packages/ensdb-sdk/src/client/ensdb-writer.test.ts @@ -11,10 +11,19 @@ import * as ensDbClientMock from "./ensdb-client.mock"; import { EnsDbWriter } from "./ensdb-writer"; import { EnsNodeMetadataKeys } from "./ensnode-metadata"; +const executeMock = vi.fn(async () => undefined); const onConflictDoUpdateMock = vi.fn(async () => undefined); const valuesMock = vi.fn(() => ({ onConflictDoUpdate: onConflictDoUpdateMock })); const insertMock = vi.fn(() => ({ values: valuesMock })); -const drizzleClientMock = { insert: insertMock } as any; +const transactionMock = vi.fn(async (callback: (tx: any) => Promise) => { + const tx = { execute: executeMock, insert: insertMock }; + return callback(tx); +}); +const drizzleClientMock = { + insert: insertMock, + transaction: transactionMock, + execute: executeMock, +} as any; vi.mock("drizzle-orm/node-postgres", () => ({ drizzle: vi.fn(() => drizzleClientMock), @@ -26,9 +35,11 @@ describe("EnsDbWriter", () => { new EnsDbWriter(ensDbClientMock.ensDbUrl, ensDbClientMock.ensIndexerSchemaName); beforeEach(() => { + executeMock.mockClear(); onConflictDoUpdateMock.mockClear(); valuesMock.mockClear(); insertMock.mockClear(); + transactionMock.mockClear(); vi.mocked(migrate).mockClear(); }); @@ -84,18 +95,31 @@ describe("EnsDbWriter", () => { }); describe("migrateEnsNodeSchema", () => { - it("calls drizzle-orm migrateEnsNodeSchema with the correct parameters", async () => { + it("calls drizzle-orm migrate with the correct parameters inside a transaction", async () => { const migrationsDirPath = "/path/to/migrations"; await createEnsDbWriter().migrateEnsNodeSchema(migrationsDirPath); - expect(vi.mocked(migrate)).toHaveBeenCalledWith(drizzleClientMock, { - migrationsFolder: migrationsDirPath, - migrationsSchema: "ensnode", - }); + expect(transactionMock).toHaveBeenCalled(); + expect(executeMock).toHaveBeenCalledWith( + expect.objectContaining({ + queryChunks: expect.arrayContaining([ + expect.objectContaining({ value: ["SELECT pg_advisory_xact_lock("] }), + expect.any(BigInt), + expect.objectContaining({ value: [")"] }), + ]), + }), + ); + expect(vi.mocked(migrate)).toHaveBeenCalledWith( + expect.objectContaining({ execute: executeMock }), + { + migrationsFolder: migrationsDirPath, + migrationsSchema: "ensnode", + }, + ); }); - it("propagates errors from the migrateEnsNodeSchema function", async () => { + it("propagates errors from the migrate function", async () => { const migrationsDirPath = "/path/to/migrations"; vi.mocked(migrate).mockRejectedValueOnce(new Error("Migration failed")); diff --git a/packages/ensdb-sdk/src/client/ensdb-writer.ts b/packages/ensdb-sdk/src/client/ensdb-writer.ts index 2ebbd9e08..c7841ed32 100644 --- a/packages/ensdb-sdk/src/client/ensdb-writer.ts +++ b/packages/ensdb-sdk/src/client/ensdb-writer.ts @@ -1,3 +1,4 @@ +import { sql } from "drizzle-orm"; import { migrate } from "drizzle-orm/node-postgres/migrator"; import { @@ -7,6 +8,7 @@ import { serializeEnsIndexerPublicConfig, } from "@ensnode/ensnode-sdk"; +import { advisoryLockId } from "../lib/advisory-lock-id"; import { EnsDbReader } from "./ensdb-reader"; import { EnsNodeMetadataKeys } from "./ensnode-metadata"; import type { SerializedEnsNodeMetadata } from "./serialize/ensnode-metadata"; @@ -19,17 +21,40 @@ import type { SerializedEnsNodeMetadata } from "./serialize/ensnode-metadata"; * - updating ENSNode Metadata records in ENSDb for the given ENSIndexer instance. */ export class EnsDbWriter extends EnsDbReader { + /** + * Stable arbitrary lock ID for ENSNode Schema migrations to + * prevent concurrent migration execution across multiple ENSIndexer instances. + */ + private static readonly MIGRATION_LOCK_ID: bigint = advisoryLockId( + "ensnode-schema-migration-lock", + ); + /** * Execute pending database migrations for ENSNode Schema in ENSDb. * + * This function is: + * - idempotent and can be safely executed multiple times, + * - safe to execute concurrently across multiple ENSIndexer instances, + * as it uses a stable arbitrary advisory lock to prevent concurrent + * execution of migrations. + * * @param migrationsDirPath - The file path to the directory containing * database migration files for ENSNode Schema. * @throws error when migration execution fails. */ async migrateEnsNodeSchema(migrationsDirPath: string): Promise { - return migrate(this.drizzleClient, { - migrationsFolder: migrationsDirPath, - migrationsSchema: "ensnode", + // `pg_advisory_xact_lock` is transaction-scoped, and is automatically released + // when the transaction ends, with no explicit unlock needed. Running it inside + // a Drizzle transaction also guarantees that the lock acquisition, all + // migration queries, and the lock release all run on the same physical + // connection — which is required for advisory locks to work correctly with a + // connection pool. + await this.drizzleClient.transaction(async (tx) => { + await tx.execute(sql`SELECT pg_advisory_xact_lock(${EnsDbWriter.MIGRATION_LOCK_ID})`); + await migrate(tx, { + migrationsFolder: migrationsDirPath, + migrationsSchema: "ensnode", + }); }); } diff --git a/packages/ensdb-sdk/src/lib/advisory-lock-id.test.ts b/packages/ensdb-sdk/src/lib/advisory-lock-id.test.ts new file mode 100644 index 000000000..4b44c7ef6 --- /dev/null +++ b/packages/ensdb-sdk/src/lib/advisory-lock-id.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, it } from "vitest"; + +import { advisoryLockId } from "./advisory-lock-id"; + +describe("advisoryLockId", () => { + it("returns a bigint for any string input", () => { + expect(advisoryLockId("test-name")).toBeTypeOf("bigint"); + expect(advisoryLockId("")).toBeTypeOf("bigint"); + expect(advisoryLockId("schema-migrations")).toBeTypeOf("bigint"); + }); + + it("returns consistent (deterministic) results for the same input", () => { + expect(advisoryLockId("name")).toBe(advisoryLockId("name")); + expect(advisoryLockId("")).toBe(advisoryLockId("")); + }); + + it("returns different results for different inputs", () => { + expect(advisoryLockId("name-one")).not.toBe(advisoryLockId("name-two")); + }); + + it("produces expected lock ID for known input", () => { + // SHA-256 of "hello" -> first 8 bytes as signed 64-bit big-endian + expect(advisoryLockId("hello")).toBe(3238736544897475342n); + }); + + it("produces values within PostgreSQL bigint range", () => { + // PostgreSQL bigint is signed 64-bit + const result = advisoryLockId("any-name"); + + expect(result).toBeGreaterThanOrEqual(-9223372036854775808n); + expect(result).toBeLessThanOrEqual(9223372036854775807n); + }); +}); diff --git a/packages/ensdb-sdk/src/lib/advisory-lock-id.ts b/packages/ensdb-sdk/src/lib/advisory-lock-id.ts new file mode 100644 index 000000000..f9e31c134 --- /dev/null +++ b/packages/ensdb-sdk/src/lib/advisory-lock-id.ts @@ -0,0 +1,15 @@ +import { createHash } from "node:crypto"; + +/** + * Generate a stable arbitrary advisory lock ID for the given name. + * + * @param name - The name to derive the advisory lock ID from. This should be + * a fixed string that uniquely identifies the critical section of code that + * requires synchronization, such as "schema-migrations". + * @returns A bigint representing the advisory lock ID to be used with PostgreSQL advisory locks. + */ +export function advisoryLockId(name: string): bigint { + const hash = createHash("sha256").update(name).digest(); + // Read the first 8 bytes as a signed 64-bit integer (Postgres bigint range) + return hash.readBigInt64BE(0); +}