Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tough-clubs-eat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ensnode/ensdb-sdk": patch
---

Made `EnsDbWriter.migrateEnsNodeSchema` race-condition safe.
38 changes: 31 additions & 7 deletions packages/ensdb-sdk/src/client/ensdb-writer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ import * as ensDbClientMock from "./ensdb-client.mock";
import { EnsDbWriter } from "./ensdb-writer";
import { EnsNodeMetadataKeys } from "./ensnode-metadata";

const executeMock = vi.fn(async () => undefined);
const onConflictDoUpdateMock = vi.fn(async () => undefined);
const valuesMock = vi.fn(() => ({ onConflictDoUpdate: onConflictDoUpdateMock }));
const insertMock = vi.fn(() => ({ values: valuesMock }));
const drizzleClientMock = { insert: insertMock } as any;
const transactionMock = vi.fn(async (callback: (tx: any) => Promise<void>) => {
const tx = { execute: executeMock, insert: insertMock };
return callback(tx);
});
const drizzleClientMock = {
insert: insertMock,
transaction: transactionMock,
execute: executeMock,
} as any;

vi.mock("drizzle-orm/node-postgres", () => ({
drizzle: vi.fn(() => drizzleClientMock),
Expand All @@ -26,9 +35,11 @@ describe("EnsDbWriter", () => {
new EnsDbWriter(ensDbClientMock.ensDbUrl, ensDbClientMock.ensIndexerSchemaName);

beforeEach(() => {
executeMock.mockClear();
onConflictDoUpdateMock.mockClear();
valuesMock.mockClear();
insertMock.mockClear();
transactionMock.mockClear();
vi.mocked(migrate).mockClear();
});

Expand Down Expand Up @@ -84,18 +95,31 @@ describe("EnsDbWriter", () => {
});

describe("migrateEnsNodeSchema", () => {
it("calls drizzle-orm migrateEnsNodeSchema with the correct parameters", async () => {
it("calls drizzle-orm migrate with the correct parameters inside a transaction", async () => {
const migrationsDirPath = "/path/to/migrations";

await createEnsDbWriter().migrateEnsNodeSchema(migrationsDirPath);

expect(vi.mocked(migrate)).toHaveBeenCalledWith(drizzleClientMock, {
migrationsFolder: migrationsDirPath,
migrationsSchema: "ensnode",
});
expect(transactionMock).toHaveBeenCalled();
Comment thread
tk-o marked this conversation as resolved.
expect(executeMock).toHaveBeenCalledWith(
expect.objectContaining({
queryChunks: expect.arrayContaining([
expect.objectContaining({ value: ["SELECT pg_advisory_xact_lock("] }),
expect.any(BigInt),
expect.objectContaining({ value: [")"] }),
]),
}),
);
expect(vi.mocked(migrate)).toHaveBeenCalledWith(
expect.objectContaining({ execute: executeMock }),
{
migrationsFolder: migrationsDirPath,
migrationsSchema: "ensnode",
},
);
});

it("propagates errors from the migrateEnsNodeSchema function", async () => {
it("propagates errors from the migrate function", async () => {
const migrationsDirPath = "/path/to/migrations";
vi.mocked(migrate).mockRejectedValueOnce(new Error("Migration failed"));

Expand Down
31 changes: 28 additions & 3 deletions packages/ensdb-sdk/src/client/ensdb-writer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { sql } from "drizzle-orm";
import { migrate } from "drizzle-orm/node-postgres/migrator";

import {
Expand All @@ -7,6 +8,7 @@ import {
serializeEnsIndexerPublicConfig,
} from "@ensnode/ensnode-sdk";

import { advisoryLockId } from "../lib/advisory-lock-id";
import { EnsDbReader } from "./ensdb-reader";
import { EnsNodeMetadataKeys } from "./ensnode-metadata";
import type { SerializedEnsNodeMetadata } from "./serialize/ensnode-metadata";
Expand All @@ -19,17 +21,40 @@ import type { SerializedEnsNodeMetadata } from "./serialize/ensnode-metadata";
* - updating ENSNode Metadata records in ENSDb for the given ENSIndexer instance.
*/
export class EnsDbWriter extends EnsDbReader {
/**
* Stable arbitrary lock ID for ENSNode Schema migrations to
* prevent concurrent migration execution across multiple ENSIndexer instances.
*/
private static readonly MIGRATION_LOCK_ID: bigint = advisoryLockId(
"ensnode-schema-migration-lock",
);

/**
* Execute pending database migrations for ENSNode Schema in ENSDb.
*
* This function is:
* - idempotent and can be safely executed multiple times,
* - safe to execute concurrently across multiple ENSIndexer instances,
* as it uses a stable arbitrary advisory lock to prevent concurrent
* execution of migrations.
*
* @param migrationsDirPath - The file path to the directory containing
* database migration files for ENSNode Schema.
* @throws error when migration execution fails.
*/
async migrateEnsNodeSchema(migrationsDirPath: string): Promise<void> {
return migrate(this.drizzleClient, {
migrationsFolder: migrationsDirPath,
migrationsSchema: "ensnode",
// `pg_advisory_xact_lock` is transaction-scoped, and is automatically released
// when the transaction ends, with no explicit unlock needed. Running it inside
// a Drizzle transaction also guarantees that the lock acquisition, all
// migration queries, and the lock release all run on the same physical
// connection — which is required for advisory locks to work correctly with a
// connection pool.
await this.drizzleClient.transaction(async (tx) => {
await tx.execute(sql`SELECT pg_advisory_xact_lock(${EnsDbWriter.MIGRATION_LOCK_ID})`);
await migrate(tx, {
migrationsFolder: migrationsDirPath,
migrationsSchema: "ensnode",
});
});
}

Expand Down
33 changes: 33 additions & 0 deletions packages/ensdb-sdk/src/lib/advisory-lock-id.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { describe, expect, it } from "vitest";

import { advisoryLockId } from "./advisory-lock-id";

describe("advisoryLockId", () => {
it("returns a bigint for any string input", () => {
expect(advisoryLockId("test-name")).toBeTypeOf("bigint");
expect(advisoryLockId("")).toBeTypeOf("bigint");
expect(advisoryLockId("schema-migrations")).toBeTypeOf("bigint");
});

it("returns consistent (deterministic) results for the same input", () => {
expect(advisoryLockId("name")).toBe(advisoryLockId("name"));
expect(advisoryLockId("")).toBe(advisoryLockId(""));
});

it("returns different results for different inputs", () => {
expect(advisoryLockId("name-one")).not.toBe(advisoryLockId("name-two"));
});

it("produces expected lock ID for known input", () => {
// SHA-256 of "hello" -> first 8 bytes as signed 64-bit big-endian
expect(advisoryLockId("hello")).toBe(3238736544897475342n);
});

it("produces values within PostgreSQL bigint range", () => {
// PostgreSQL bigint is signed 64-bit
const result = advisoryLockId("any-name");

expect(result).toBeGreaterThanOrEqual(-9223372036854775808n);
expect(result).toBeLessThanOrEqual(9223372036854775807n);
});
});
15 changes: 15 additions & 0 deletions packages/ensdb-sdk/src/lib/advisory-lock-id.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { createHash } from "node:crypto";

/**
* Generate a stable arbitrary advisory lock ID for the given name.
*
* @param name - The name to derive the advisory lock ID from. This should be
* a fixed string that uniquely identifies the critical section of code that
* requires synchronization, such as "schema-migrations".
* @returns A bigint representing the advisory lock ID to be used with PostgreSQL advisory locks.
*/
export function advisoryLockId(name: string): bigint {
const hash = createHash("sha256").update(name).digest();
// Read the first 8 bytes as a signed 64-bit integer (Postgres bigint range)
return hash.readBigInt64BE(0);
}
Loading