From 5974c43272e315f2046ccb3f96de6f5aa9f812fc Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Wed, 25 Mar 2026 21:34:08 +0800 Subject: [PATCH] fix(openclaw-plugin): harden duplicate registration guard Make duplicate plugin registration idempotent across retries and stop/reload flows, and add regression coverage for failure rollback and stale stop handling. --- .../duplicate-registration-948.test.ts | 268 ++++++++++++++++++ examples/openclaw-plugin/index.ts | 185 ++++++++---- 2 files changed, 393 insertions(+), 60 deletions(-) create mode 100644 examples/openclaw-plugin/__tests__/duplicate-registration-948.test.ts diff --git a/examples/openclaw-plugin/__tests__/duplicate-registration-948.test.ts b/examples/openclaw-plugin/__tests__/duplicate-registration-948.test.ts new file mode 100644 index 000000000..f562ea2e7 --- /dev/null +++ b/examples/openclaw-plugin/__tests__/duplicate-registration-948.test.ts @@ -0,0 +1,268 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const DUPLICATE_REGISTRATION_LOG = + "openviking: plugin registration already active, skipping duplicate registration"; + +type MockApi = { + pluginConfig: Record; + logger: { + info: ReturnType; + warn: ReturnType; + error: ReturnType; + debug: ReturnType; + }; + registerTool: ReturnType; + registerService: ReturnType; + registerContextEngine: ReturnType; + on: ReturnType; +}; + +type MockService = { + id: string; + start: (ctx?: unknown) => void | Promise; + stop?: (ctx?: unknown) => void | Promise; +}; + +function createParsedConfig(overrides: Record = {}) { + return { + mode: "remote", + configPath: "/tmp/openviking-test.conf", + port: 1933, + baseUrl: "http://127.0.0.1:8000", + agentId: "test-agent", + apiKey: "test-key", + targetUri: "viking://user/memories", + timeoutMs: 30_000, + autoCapture: true, + captureMode: "semantic", + captureMaxLength: 1_000, + autoRecall: true, + recallLimit: 5, + recallScoreThreshold: 0.7, + recallMaxContentChars: 500, + recallPreferAbstract: true, + recallTokenBudget: 2_000, + ingestReplyAssist: true, + ingestReplyAssistMinSpeakerTurns: 2, + ingestReplyAssistMinChars: 120, + ...overrides, + }; +} + +function createMockApi(): MockApi { + return { + pluginConfig: {}, + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, + registerTool: vi.fn(), + registerService: vi.fn(), + registerContextEngine: vi.fn(), + on: vi.fn(), + }; +} + +async function loadPlugin( + options: { + parseImpl?: (value: unknown) => Record; + } = {}, +) { + vi.resetModules(); + + const parse = vi.fn((value: unknown) => { + if (options.parseImpl) { + return options.parseImpl(value); + } + return createParsedConfig(value as Record); + }); + + const localClientCache = new Map(); + const localClientPendingPromises = new Map< + string, + { + promise: Promise; + resolve: (client: unknown) => void; + reject: (err: unknown) => void; + } + >(); + class MockOpenVikingClient { + healthCheck = vi.fn().mockResolvedValue(undefined); + find = vi.fn().mockResolvedValue({ memories: [] }); + read = vi.fn().mockResolvedValue(""); + addSessionMessage = vi.fn().mockResolvedValue(undefined); + commitSession = vi.fn().mockResolvedValue({ archived: true, memories_extracted: 0 }); + deleteSession = vi.fn().mockResolvedValue(undefined); + deleteUri = vi.fn().mockResolvedValue(undefined); + getSession = vi.fn().mockResolvedValue({ message_count: 0 }); + } + + vi.doMock("../config.js", () => ({ + memoryOpenVikingConfigSchema: { parse }, + })); + vi.doMock("../client.js", () => ({ + OpenVikingClient: MockOpenVikingClient, + localClientCache, + localClientPendingPromises, + isMemoryUri: vi.fn((uri: string) => uri.startsWith("viking://")), + })); + vi.doMock("../process-manager.js", () => ({ + IS_WIN: false, + waitForHealth: vi.fn().mockResolvedValue(undefined), + quickRecallPrecheck: vi.fn().mockResolvedValue({ ok: true }), + withTimeout: vi.fn((promise: Promise) => promise), + resolvePythonCommand: vi.fn().mockReturnValue("python3"), + prepareLocalPort: vi.fn().mockResolvedValue(8000), + })); + vi.doMock("../context-engine.js", () => ({ + createMemoryOpenVikingContextEngine: vi.fn(() => ({ + commitOVSession: vi.fn().mockResolvedValue(undefined), + })), + })); + + const module = await import("../index.js"); + return { + plugin: module.default, + parse, + localClientPendingPromises, + }; +} + +function getRegisteredService(api: MockApi, callIndex = 0): MockService { + return api.registerService.mock.calls[callIndex]![0] as MockService; +} + +describe("duplicate registration guard (issue #948)", () => { + beforeEach(() => { + vi.restoreAllMocks(); + vi.resetModules(); + }); + + it("registers tools, hooks, context engine, and service on first call", async () => { + const { plugin } = await loadPlugin(); + const api = createMockApi(); + + plugin.register(api); + + expect(api.registerTool).toHaveBeenCalledTimes(3); + expect(api.registerService).toHaveBeenCalledTimes(1); + expect(api.registerContextEngine).toHaveBeenCalledTimes(1); + expect(api.on).toHaveBeenCalled(); + expect(api.logger.info).not.toHaveBeenCalledWith(DUPLICATE_REGISTRATION_LOG); + }); + + it("skips duplicate registration on the same module instance", async () => { + const { plugin } = await loadPlugin(); + const api = createMockApi(); + + plugin.register(api); + + api.logger.info.mockClear(); + api.registerTool.mockClear(); + api.registerService.mockClear(); + api.registerContextEngine.mockClear(); + api.on.mockClear(); + + plugin.register(api); + + expect(api.logger.info).toHaveBeenCalledWith(DUPLICATE_REGISTRATION_LOG); + expect(api.registerTool).not.toHaveBeenCalled(); + expect(api.registerService).not.toHaveBeenCalled(); + expect(api.registerContextEngine).not.toHaveBeenCalled(); + expect(api.on).not.toHaveBeenCalled(); + }); + + it("rolls back registration state after config parse failure", async () => { + let parseAttempts = 0; + const { plugin, localClientPendingPromises } = await loadPlugin({ + parseImpl: () => { + parseAttempts += 1; + if (parseAttempts === 1) { + throw new Error("config parse failed"); + } + return createParsedConfig(); + }, + }); + + expect(() => plugin.register(createMockApi())).toThrow("config parse failed"); + expect(localClientPendingPromises.size).toBe(0); + + const retryApi = createMockApi(); + plugin.register(retryApi); + + expect(retryApi.registerService).toHaveBeenCalledTimes(1); + expect(retryApi.registerContextEngine).toHaveBeenCalledTimes(1); + }); + + it("cleans pending local-client state if registration fails after creating it", async () => { + const { plugin, localClientPendingPromises } = await loadPlugin({ + parseImpl: () => createParsedConfig({ mode: "local" }), + }); + + const failingApi = createMockApi(); + failingApi.registerService.mockImplementation(() => { + throw new Error("registerService failed"); + }); + + expect(() => plugin.register(failingApi)).toThrow("registerService failed"); + expect(localClientPendingPromises.size).toBe(0); + + const retryApi = createMockApi(); + plugin.register(retryApi); + + expect(retryApi.registerService).toHaveBeenCalledTimes(1); + expect(localClientPendingPromises.size).toBe(1); + }); + + it("allows clean re-registration after stop", async () => { + const { plugin } = await loadPlugin(); + const api = createMockApi(); + + plugin.register(api); + const service = getRegisteredService(api); + + api.logger.info.mockClear(); + api.registerTool.mockClear(); + api.registerService.mockClear(); + api.registerContextEngine.mockClear(); + api.on.mockClear(); + + service.stop?.(); + plugin.register(api); + + expect(api.logger.info).not.toHaveBeenCalledWith(DUPLICATE_REGISTRATION_LOG); + expect(api.registerTool).toHaveBeenCalledTimes(3); + expect(api.registerService).toHaveBeenCalledTimes(1); + expect(api.registerContextEngine).toHaveBeenCalledTimes(1); + expect(api.on).toHaveBeenCalled(); + }); + + it("ignores stale stop calls from an older registration", async () => { + const { plugin } = await loadPlugin(); + + const firstApi = createMockApi(); + plugin.register(firstApi); + const firstService = getRegisteredService(firstApi); + firstService.stop?.(); + + const secondApi = createMockApi(); + plugin.register(secondApi); + + secondApi.logger.info.mockClear(); + secondApi.registerTool.mockClear(); + secondApi.registerService.mockClear(); + secondApi.registerContextEngine.mockClear(); + secondApi.on.mockClear(); + + firstService.stop?.(); + plugin.register(secondApi); + + expect(secondApi.logger.info).toHaveBeenCalledWith(DUPLICATE_REGISTRATION_LOG); + expect(secondApi.registerTool).not.toHaveBeenCalled(); + expect(secondApi.registerService).not.toHaveBeenCalled(); + expect(secondApi.registerContextEngine).not.toHaveBeenCalled(); + expect(secondApi.on).not.toHaveBeenCalled(); + }); +}); diff --git a/examples/openclaw-plugin/index.ts b/examples/openclaw-plugin/index.ts index 401102b34..4baa91cb8 100644 --- a/examples/openclaw-plugin/index.ts +++ b/examples/openclaw-plugin/index.ts @@ -68,9 +68,49 @@ type OpenClawPluginApi = { ) => void; }; +type PluginRegistrationState = "idle" | "registering" | "registered"; + const MAX_OPENVIKING_STDERR_LINES = 200; const MAX_OPENVIKING_STDERR_CHARS = 256_000; const AUTO_RECALL_TIMEOUT_MS = 5_000; +const DUPLICATE_REGISTRATION_LOG = + "openviking: plugin registration already active, skipping duplicate registration"; + +let pluginRegistrationState: PluginRegistrationState = "idle"; +let activeRegistrationToken: number | null = null; +let nextRegistrationToken = 0; + +function beginPluginRegistration(api: OpenClawPluginApi): number | null { + if (pluginRegistrationState !== "idle") { + api.logger.info(DUPLICATE_REGISTRATION_LOG); + return null; + } + + pluginRegistrationState = "registering"; + const token = ++nextRegistrationToken; + activeRegistrationToken = token; + return token; +} + +function commitPluginRegistration(token: number) { + if (activeRegistrationToken === token && pluginRegistrationState === "registering") { + pluginRegistrationState = "registered"; + } +} + +function rollbackPluginRegistration(token: number) { + if (activeRegistrationToken === token && pluginRegistrationState === "registering") { + pluginRegistrationState = "idle"; + activeRegistrationToken = null; + } +} + +function resetPluginRegistration(token: number) { + if (activeRegistrationToken === token) { + pluginRegistrationState = "idle"; + activeRegistrationToken = null; + } +} const contextEnginePlugin = { id: "openviking", @@ -80,54 +120,66 @@ const contextEnginePlugin = { configSchema: memoryOpenVikingConfigSchema, register(api: OpenClawPluginApi) { - const cfg = memoryOpenVikingConfigSchema.parse(api.pluginConfig); - const localCacheKey = `${cfg.mode}:${cfg.baseUrl}:${cfg.configPath}:${cfg.apiKey}`; - - let clientPromise: Promise; - let localProcess: ReturnType | null = null; - let resolveLocalClient: ((c: OpenVikingClient) => void) | null = null; - let rejectLocalClient: ((err: unknown) => void) | null = null; - let localUnavailableReason: string | null = null; - const markLocalUnavailable = (reason: string, err?: unknown) => { - if (!localUnavailableReason) { - localUnavailableReason = reason; - api.logger.warn( - `openviking: local mode marked unavailable (${reason})${err ? `: ${String(err)}` : ""}`, - ); - } - if (rejectLocalClient) { - rejectLocalClient( - err instanceof Error ? err : new Error(`openviking unavailable: ${reason}`), - ); - rejectLocalClient = null; - } - resolveLocalClient = null; - }; + const registrationToken = beginPluginRegistration(api); + if (registrationToken == null) { + return; + } - if (cfg.mode === "local") { - const cached = localClientCache.get(localCacheKey); - if (cached) { - localProcess = cached.process; - clientPromise = Promise.resolve(cached.client); - } else { - const existingPending = localClientPendingPromises.get(localCacheKey); - if (existingPending) { - clientPromise = existingPending.promise; + let localCacheKey = ""; + let createdPendingClientEntry = false; + + try { + const cfg = memoryOpenVikingConfigSchema.parse(api.pluginConfig); + localCacheKey = `${cfg.mode}:${cfg.baseUrl}:${cfg.configPath}:${cfg.apiKey}`; + + let clientPromise: Promise; + let localProcess: ReturnType | null = null; + let resolveLocalClient: ((c: OpenVikingClient) => void) | null = null; + let rejectLocalClient: ((err: unknown) => void) | null = null; + let localUnavailableReason: string | null = null; + const markLocalUnavailable = (reason: string, err?: unknown) => { + if (!localUnavailableReason) { + localUnavailableReason = reason; + api.logger.warn( + `openviking: local mode marked unavailable (${reason})${err ? `: ${String(err)}` : ""}`, + ); + } + if (rejectLocalClient) { + rejectLocalClient( + err instanceof Error ? err : new Error(`openviking unavailable: ${reason}`), + ); + rejectLocalClient = null; + } + resolveLocalClient = null; + }; + + if (cfg.mode === "local") { + const cached = localClientCache.get(localCacheKey); + if (cached) { + localProcess = cached.process; + clientPromise = Promise.resolve(cached.client); } else { - const entry = {} as PendingClientEntry; - entry.promise = new Promise((resolve, reject) => { - entry.resolve = resolve; - entry.reject = reject; - }); - clientPromise = entry.promise; - localClientPendingPromises.set(localCacheKey, entry); + const existingPending = localClientPendingPromises.get(localCacheKey); + if (existingPending) { + clientPromise = existingPending.promise; + } else { + const entry = {} as PendingClientEntry; + entry.promise = new Promise((resolve, reject) => { + entry.resolve = resolve; + entry.reject = reject; + }); + clientPromise = entry.promise; + localClientPendingPromises.set(localCacheKey, entry); + createdPendingClientEntry = true; + } } + } else { + clientPromise = Promise.resolve( + new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs), + ); } - } else { - clientPromise = Promise.resolve(new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs)); - } - const getClient = (): Promise => clientPromise; + const getClient = (): Promise => clientPromise; api.registerTool( { @@ -597,9 +649,9 @@ const contextEnginePlugin = { ); } - api.registerService({ - id: "openviking", - start: async () => { + api.registerService({ + id: "openviking", + start: async () => { // Claim the pending entry — only the first start() call to claim it spawns the process. // Subsequent start() calls (from other registrations sharing the same promise) fall through. const pendingEntry = localClientPendingPromises.get(localCacheKey); @@ -621,7 +673,7 @@ const contextEnginePlugin = { // Inherit system environment; optionally override Go/Python paths via env vars const pathSep = IS_WIN ? ";" : ":"; - const { ALL_PROXY, all_proxy, HTTP_PROXY, http_proxy, HTTPS_PROXY, https_proxy, ...filteredEnv } = process.env; + const { ALL_PROXY, all_proxy, HTTP_PROXY, http_proxy, HTTPS_PROXY, https_proxy, ...filteredEnv } = process.env; const env = { ...filteredEnv, PYTHONUNBUFFERED: "1", @@ -710,19 +762,32 @@ const contextEnginePlugin = { `openviking: initialized (url: ${cfg.baseUrl}, targetUri: ${cfg.targetUri}, search: hybrid endpoint)`, ); } - }, - stop: () => { - if (localProcess) { - localProcess.kill("SIGTERM"); - localClientCache.delete(localCacheKey); - localClientPendingPromises.delete(localCacheKey); - localProcess = null; - api.logger.info("openviking: local server stopped"); - } else { - api.logger.info("openviking: stopped"); - } - }, - }); + }, + stop: () => { + try { + if (localProcess) { + localProcess.kill("SIGTERM"); + localClientCache.delete(localCacheKey); + localClientPendingPromises.delete(localCacheKey); + localProcess = null; + api.logger.info("openviking: local server stopped"); + } else { + api.logger.info("openviking: stopped"); + } + } finally { + resetPluginRegistration(registrationToken); + } + }, + }); + + commitPluginRegistration(registrationToken); + } catch (err) { + if (createdPendingClientEntry && localCacheKey) { + localClientPendingPromises.delete(localCacheKey); + } + rollbackPluginRegistration(registrationToken); + throw err; + } }, };