From f55f9a46aa6b42e86e3c791a3fc332f8daa453ca Mon Sep 17 00:00:00 2001 From: "CodesBy.Mitch" Date: Thu, 25 Jun 2026 14:58:06 +0000 Subject: [PATCH] Distributed lock TTL expires during long Stellar operations FIXED --- src/utils/lock.ts | 28 +++++++- tests/unit/utils/lock.test.ts | 123 ++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 tests/unit/utils/lock.test.ts diff --git a/src/utils/lock.ts b/src/utils/lock.ts index 8c2b1c7..3eb8d7f 100644 --- a/src/utils/lock.ts +++ b/src/utils/lock.ts @@ -1,11 +1,12 @@ import { redis } from "../config/redis.js"; import { ConflictError } from "./errors.js"; import crypto from "node:crypto"; +import { logger } from "./logger.js"; export async function withLock( key: string, fn: () => Promise, - ttlMs: number = 10_000 + ttlMs: number = 30_000 ): Promise { const lockKey = `lock:${key}`; const lockValue = crypto.randomUUID(); @@ -21,9 +22,34 @@ export async function withLock( throw new ConflictError("Operation in progress, please retry"); } + let heartbeat: NodeJS.Timeout | undefined; + try { + heartbeat = setInterval(async () => { + try { + const script = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("pexpire", KEYS[1], ARGV[2]) + else + return 0 + end + `; + const result = await redis.eval(script, 1, lockKey, lockValue, ttlMs); + if (result !== 1) { + logger.warn({ lockKey }, "Lock renewal failed: lock lost or changed"); + if (heartbeat) { + clearInterval(heartbeat); + heartbeat = undefined; + } + } + } catch (err) { + logger.error({ err, lockKey }, "Error during lock renewal heartbeat"); + } + }, ttlMs / 2); + return await fn(); } finally { + if (heartbeat) clearInterval(heartbeat); const script = ` if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) diff --git a/tests/unit/utils/lock.test.ts b/tests/unit/utils/lock.test.ts new file mode 100644 index 0000000..d84a68b --- /dev/null +++ b/tests/unit/utils/lock.test.ts @@ -0,0 +1,123 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { withLock } from "../../../src/utils/lock.js"; +import { redis } from "../../../src/config/redis.js"; +import { ConflictError } from "../../../src/utils/errors.js"; + +vi.mock("../../../src/config/redis.js", () => ({ + redis: { + set: vi.fn(), + eval: vi.fn(), + }, +})); + +vi.mock("../../../src/utils/logger.js", () => ({ + logger: { + warn: vi.fn(), + error: vi.fn(), + info: vi.fn(), + }, +})); + +describe("withLock", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("should acquire lock and release it after execution", async () => { + (redis.set as any).mockResolvedValue("OK"); + (redis.eval as any).mockResolvedValue(1); + + const fn = vi.fn().mockResolvedValue("result"); + const result = await withLock("test", fn); + + expect(result).toBe("result"); + expect(redis.set).toHaveBeenCalledWith( + "lock:test", + expect.any(String), + "PX", + 30000, + "NX" + ); + // The release eval call should have happened + expect(redis.eval).toHaveBeenCalled(); + }); + + it("should throw ConflictError if lock cannot be acquired", async () => { + (redis.set as any).mockResolvedValue(null); + + const fn = vi.fn(); + await expect(withLock("test", fn)).rejects.toThrow("Operation in progress, please retry"); + expect(fn).not.toHaveBeenCalled(); + }); + + it("should renew lock via heartbeat", async () => { + (redis.set as any).mockResolvedValue("OK"); + (redis.eval as any).mockResolvedValue(1); + + let resolveFn: any; + const promise = new Promise((resolve) => { + resolveFn = resolve; + }); + + const fn = vi.fn().mockReturnValue(promise); + + const lockPromise = withLock("test", fn, 10000); + + // Advance time to trigger heartbeat (ttlMs / 2 = 5000) + await vi.advanceTimersByTimeAsync(5001); + + // Check if eval was called for renewal + expect(redis.eval).toHaveBeenCalledWith( + expect.stringContaining("pexpire"), + 1, + "lock:test", + expect.any(String), + 10000 + ); + + // Trigger another heartbeat + await vi.advanceTimersByTimeAsync(5001); + // Now it should have been called twice for renewal + expect(redis.eval).toHaveBeenCalledTimes(2); + + resolveFn("done"); + await lockPromise; + + // After completion, it should have called eval one more time for release + // But since release is also an eval call, total should be 3 + expect(redis.eval).toHaveBeenCalledTimes(3); + }); + + it("should stop heartbeat if renewal fails", async () => { + (redis.set as any).mockResolvedValue("OK"); + // First renewal returns 0 (lock lost) + (redis.eval as any).mockResolvedValue(0); + + let resolveFn: any; + const promise = new Promise((resolve) => { + resolveFn = resolve; + }); + + const fn = vi.fn().mockReturnValue(promise); + + const lockPromise = withLock("test", fn, 10000); + + await vi.advanceTimersByTimeAsync(5001); + expect(redis.eval).toHaveBeenCalledTimes(1); + + // Advance more time, should NOT call eval again because heartbeat should be cleared + await vi.advanceTimersByTimeAsync(5001); + expect(redis.eval).toHaveBeenCalledTimes(1); + + resolveFn("done"); + await lockPromise; + + // Release call happens at the end + expect(redis.eval).toHaveBeenCalledTimes(2); + }); +});