diff --git a/examples/openclaw-plugin/__tests__/duplicate-registration-948.test.ts b/examples/openclaw-plugin/__tests__/duplicate-registration-948.test.ts deleted file mode 100644 index f562ea2e7..000000000 --- a/examples/openclaw-plugin/__tests__/duplicate-registration-948.test.ts +++ /dev/null @@ -1,268 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; - -const DUPLICATE_REGISTRATION_LOG = - "openviking: plugin registration already active, skipping duplicate registration"; - -type MockApi = { - pluginConfig: Record; - logger: { - info: ReturnType; - warn: ReturnType; - error: ReturnType; - debug: ReturnType; - }; - registerTool: ReturnType; - registerService: ReturnType; - registerContextEngine: ReturnType; - on: ReturnType; -}; - -type MockService = { - id: string; - start: (ctx?: unknown) => void | Promise; - stop?: (ctx?: unknown) => void | Promise; -}; - -function createParsedConfig(overrides: Record = {}) { - return { - mode: "remote", - configPath: "/tmp/openviking-test.conf", - port: 1933, - baseUrl: "http://127.0.0.1:8000", - agentId: "test-agent", - apiKey: "test-key", - targetUri: "viking://user/memories", - timeoutMs: 30_000, - autoCapture: true, - captureMode: "semantic", - captureMaxLength: 1_000, - autoRecall: true, - recallLimit: 5, - recallScoreThreshold: 0.7, - recallMaxContentChars: 500, - recallPreferAbstract: true, - recallTokenBudget: 2_000, - ingestReplyAssist: true, - ingestReplyAssistMinSpeakerTurns: 2, - ingestReplyAssistMinChars: 120, - ...overrides, - }; -} - -function createMockApi(): MockApi { - return { - pluginConfig: {}, - logger: { - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - debug: vi.fn(), - }, - registerTool: vi.fn(), - registerService: vi.fn(), - registerContextEngine: vi.fn(), - on: vi.fn(), - }; -} - -async function loadPlugin( - options: { - parseImpl?: (value: unknown) => Record; - } = {}, -) { - vi.resetModules(); - - const parse = vi.fn((value: unknown) => { - if (options.parseImpl) { - return options.parseImpl(value); - } - return createParsedConfig(value as Record); - }); - - const localClientCache = new Map(); - const localClientPendingPromises = new Map< - string, - { - promise: Promise; - resolve: (client: unknown) => void; - reject: (err: unknown) => void; - } - >(); - class MockOpenVikingClient { - healthCheck = vi.fn().mockResolvedValue(undefined); - find = vi.fn().mockResolvedValue({ memories: [] }); - read = vi.fn().mockResolvedValue(""); - addSessionMessage = vi.fn().mockResolvedValue(undefined); - commitSession = vi.fn().mockResolvedValue({ archived: true, memories_extracted: 0 }); - deleteSession = vi.fn().mockResolvedValue(undefined); - deleteUri = vi.fn().mockResolvedValue(undefined); - getSession = vi.fn().mockResolvedValue({ message_count: 0 }); - } - - vi.doMock("../config.js", () => ({ - memoryOpenVikingConfigSchema: { parse }, - })); - vi.doMock("../client.js", () => ({ - OpenVikingClient: MockOpenVikingClient, - localClientCache, - localClientPendingPromises, - isMemoryUri: vi.fn((uri: string) => uri.startsWith("viking://")), - })); - vi.doMock("../process-manager.js", () => ({ - IS_WIN: false, - waitForHealth: vi.fn().mockResolvedValue(undefined), - quickRecallPrecheck: vi.fn().mockResolvedValue({ ok: true }), - withTimeout: vi.fn((promise: Promise) => promise), - resolvePythonCommand: vi.fn().mockReturnValue("python3"), - prepareLocalPort: vi.fn().mockResolvedValue(8000), - })); - vi.doMock("../context-engine.js", () => ({ - createMemoryOpenVikingContextEngine: vi.fn(() => ({ - commitOVSession: vi.fn().mockResolvedValue(undefined), - })), - })); - - const module = await import("../index.js"); - return { - plugin: module.default, - parse, - localClientPendingPromises, - }; -} - -function getRegisteredService(api: MockApi, callIndex = 0): MockService { - return api.registerService.mock.calls[callIndex]![0] as MockService; -} - -describe("duplicate registration guard (issue #948)", () => { - beforeEach(() => { - vi.restoreAllMocks(); - vi.resetModules(); - }); - - it("registers tools, hooks, context engine, and service on first call", async () => { - const { plugin } = await loadPlugin(); - const api = createMockApi(); - - plugin.register(api); - - expect(api.registerTool).toHaveBeenCalledTimes(3); - expect(api.registerService).toHaveBeenCalledTimes(1); - expect(api.registerContextEngine).toHaveBeenCalledTimes(1); - expect(api.on).toHaveBeenCalled(); - expect(api.logger.info).not.toHaveBeenCalledWith(DUPLICATE_REGISTRATION_LOG); - }); - - it("skips duplicate registration on the same module instance", async () => { - const { plugin } = await loadPlugin(); - const api = createMockApi(); - - plugin.register(api); - - api.logger.info.mockClear(); - api.registerTool.mockClear(); - api.registerService.mockClear(); - api.registerContextEngine.mockClear(); - api.on.mockClear(); - - plugin.register(api); - - expect(api.logger.info).toHaveBeenCalledWith(DUPLICATE_REGISTRATION_LOG); - expect(api.registerTool).not.toHaveBeenCalled(); - expect(api.registerService).not.toHaveBeenCalled(); - expect(api.registerContextEngine).not.toHaveBeenCalled(); - expect(api.on).not.toHaveBeenCalled(); - }); - - it("rolls back registration state after config parse failure", async () => { - let parseAttempts = 0; - const { plugin, localClientPendingPromises } = await loadPlugin({ - parseImpl: () => { - parseAttempts += 1; - if (parseAttempts === 1) { - throw new Error("config parse failed"); - } - return createParsedConfig(); - }, - }); - - expect(() => plugin.register(createMockApi())).toThrow("config parse failed"); - expect(localClientPendingPromises.size).toBe(0); - - const retryApi = createMockApi(); - plugin.register(retryApi); - - expect(retryApi.registerService).toHaveBeenCalledTimes(1); - expect(retryApi.registerContextEngine).toHaveBeenCalledTimes(1); - }); - - it("cleans pending local-client state if registration fails after creating it", async () => { - const { plugin, localClientPendingPromises } = await loadPlugin({ - parseImpl: () => createParsedConfig({ mode: "local" }), - }); - - const failingApi = createMockApi(); - failingApi.registerService.mockImplementation(() => { - throw new Error("registerService failed"); - }); - - expect(() => plugin.register(failingApi)).toThrow("registerService failed"); - expect(localClientPendingPromises.size).toBe(0); - - const retryApi = createMockApi(); - plugin.register(retryApi); - - expect(retryApi.registerService).toHaveBeenCalledTimes(1); - expect(localClientPendingPromises.size).toBe(1); - }); - - it("allows clean re-registration after stop", async () => { - const { plugin } = await loadPlugin(); - const api = createMockApi(); - - plugin.register(api); - const service = getRegisteredService(api); - - api.logger.info.mockClear(); - api.registerTool.mockClear(); - api.registerService.mockClear(); - api.registerContextEngine.mockClear(); - api.on.mockClear(); - - service.stop?.(); - plugin.register(api); - - expect(api.logger.info).not.toHaveBeenCalledWith(DUPLICATE_REGISTRATION_LOG); - expect(api.registerTool).toHaveBeenCalledTimes(3); - expect(api.registerService).toHaveBeenCalledTimes(1); - expect(api.registerContextEngine).toHaveBeenCalledTimes(1); - expect(api.on).toHaveBeenCalled(); - }); - - it("ignores stale stop calls from an older registration", async () => { - const { plugin } = await loadPlugin(); - - const firstApi = createMockApi(); - plugin.register(firstApi); - const firstService = getRegisteredService(firstApi); - firstService.stop?.(); - - const secondApi = createMockApi(); - plugin.register(secondApi); - - secondApi.logger.info.mockClear(); - secondApi.registerTool.mockClear(); - secondApi.registerService.mockClear(); - secondApi.registerContextEngine.mockClear(); - secondApi.on.mockClear(); - - firstService.stop?.(); - plugin.register(secondApi); - - expect(secondApi.logger.info).toHaveBeenCalledWith(DUPLICATE_REGISTRATION_LOG); - expect(secondApi.registerTool).not.toHaveBeenCalled(); - expect(secondApi.registerService).not.toHaveBeenCalled(); - expect(secondApi.registerContextEngine).not.toHaveBeenCalled(); - expect(secondApi.on).not.toHaveBeenCalled(); - }); -}); diff --git a/examples/openclaw-plugin/index.ts b/examples/openclaw-plugin/index.ts index 4baa91cb8..401102b34 100644 --- a/examples/openclaw-plugin/index.ts +++ b/examples/openclaw-plugin/index.ts @@ -68,49 +68,9 @@ type OpenClawPluginApi = { ) => void; }; -type PluginRegistrationState = "idle" | "registering" | "registered"; - const MAX_OPENVIKING_STDERR_LINES = 200; const MAX_OPENVIKING_STDERR_CHARS = 256_000; const AUTO_RECALL_TIMEOUT_MS = 5_000; -const DUPLICATE_REGISTRATION_LOG = - "openviking: plugin registration already active, skipping duplicate registration"; - -let pluginRegistrationState: PluginRegistrationState = "idle"; -let activeRegistrationToken: number | null = null; -let nextRegistrationToken = 0; - -function beginPluginRegistration(api: OpenClawPluginApi): number | null { - if (pluginRegistrationState !== "idle") { - api.logger.info(DUPLICATE_REGISTRATION_LOG); - return null; - } - - pluginRegistrationState = "registering"; - const token = ++nextRegistrationToken; - activeRegistrationToken = token; - return token; -} - -function commitPluginRegistration(token: number) { - if (activeRegistrationToken === token && pluginRegistrationState === "registering") { - pluginRegistrationState = "registered"; - } -} - -function rollbackPluginRegistration(token: number) { - if (activeRegistrationToken === token && pluginRegistrationState === "registering") { - pluginRegistrationState = "idle"; - activeRegistrationToken = null; - } -} - -function resetPluginRegistration(token: number) { - if (activeRegistrationToken === token) { - pluginRegistrationState = "idle"; - activeRegistrationToken = null; - } -} const contextEnginePlugin = { id: "openviking", @@ -120,66 +80,54 @@ const contextEnginePlugin = { configSchema: memoryOpenVikingConfigSchema, register(api: OpenClawPluginApi) { - const registrationToken = beginPluginRegistration(api); - if (registrationToken == null) { - return; - } - - let localCacheKey = ""; - let createdPendingClientEntry = false; + const cfg = memoryOpenVikingConfigSchema.parse(api.pluginConfig); + const localCacheKey = `${cfg.mode}:${cfg.baseUrl}:${cfg.configPath}:${cfg.apiKey}`; + + let clientPromise: Promise; + let localProcess: ReturnType | null = null; + let resolveLocalClient: ((c: OpenVikingClient) => void) | null = null; + let rejectLocalClient: ((err: unknown) => void) | null = null; + let localUnavailableReason: string | null = null; + const markLocalUnavailable = (reason: string, err?: unknown) => { + if (!localUnavailableReason) { + localUnavailableReason = reason; + api.logger.warn( + `openviking: local mode marked unavailable (${reason})${err ? `: ${String(err)}` : ""}`, + ); + } + if (rejectLocalClient) { + rejectLocalClient( + err instanceof Error ? err : new Error(`openviking unavailable: ${reason}`), + ); + rejectLocalClient = null; + } + resolveLocalClient = null; + }; - try { - const cfg = memoryOpenVikingConfigSchema.parse(api.pluginConfig); - localCacheKey = `${cfg.mode}:${cfg.baseUrl}:${cfg.configPath}:${cfg.apiKey}`; - - let clientPromise: Promise; - let localProcess: ReturnType | null = null; - let resolveLocalClient: ((c: OpenVikingClient) => void) | null = null; - let rejectLocalClient: ((err: unknown) => void) | null = null; - let localUnavailableReason: string | null = null; - const markLocalUnavailable = (reason: string, err?: unknown) => { - if (!localUnavailableReason) { - localUnavailableReason = reason; - api.logger.warn( - `openviking: local mode marked unavailable (${reason})${err ? `: ${String(err)}` : ""}`, - ); - } - if (rejectLocalClient) { - rejectLocalClient( - err instanceof Error ? err : new Error(`openviking unavailable: ${reason}`), - ); - rejectLocalClient = null; - } - resolveLocalClient = null; - }; - - if (cfg.mode === "local") { - const cached = localClientCache.get(localCacheKey); - if (cached) { - localProcess = cached.process; - clientPromise = Promise.resolve(cached.client); + if (cfg.mode === "local") { + const cached = localClientCache.get(localCacheKey); + if (cached) { + localProcess = cached.process; + clientPromise = Promise.resolve(cached.client); + } else { + const existingPending = localClientPendingPromises.get(localCacheKey); + if (existingPending) { + clientPromise = existingPending.promise; } else { - const existingPending = localClientPendingPromises.get(localCacheKey); - if (existingPending) { - clientPromise = existingPending.promise; - } else { - const entry = {} as PendingClientEntry; - entry.promise = new Promise((resolve, reject) => { - entry.resolve = resolve; - entry.reject = reject; - }); - clientPromise = entry.promise; - localClientPendingPromises.set(localCacheKey, entry); - createdPendingClientEntry = true; - } + const entry = {} as PendingClientEntry; + entry.promise = new Promise((resolve, reject) => { + entry.resolve = resolve; + entry.reject = reject; + }); + clientPromise = entry.promise; + localClientPendingPromises.set(localCacheKey, entry); } - } else { - clientPromise = Promise.resolve( - new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs), - ); } + } else { + clientPromise = Promise.resolve(new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs)); + } - const getClient = (): Promise => clientPromise; + const getClient = (): Promise => clientPromise; api.registerTool( { @@ -649,9 +597,9 @@ const contextEnginePlugin = { ); } - api.registerService({ - id: "openviking", - start: async () => { + api.registerService({ + id: "openviking", + start: async () => { // Claim the pending entry — only the first start() call to claim it spawns the process. // Subsequent start() calls (from other registrations sharing the same promise) fall through. const pendingEntry = localClientPendingPromises.get(localCacheKey); @@ -673,7 +621,7 @@ const contextEnginePlugin = { // Inherit system environment; optionally override Go/Python paths via env vars const pathSep = IS_WIN ? ";" : ":"; - const { ALL_PROXY, all_proxy, HTTP_PROXY, http_proxy, HTTPS_PROXY, https_proxy, ...filteredEnv } = process.env; + const { ALL_PROXY, all_proxy, HTTP_PROXY, http_proxy, HTTPS_PROXY, https_proxy, ...filteredEnv } = process.env; const env = { ...filteredEnv, PYTHONUNBUFFERED: "1", @@ -762,32 +710,19 @@ const contextEnginePlugin = { `openviking: initialized (url: ${cfg.baseUrl}, targetUri: ${cfg.targetUri}, search: hybrid endpoint)`, ); } - }, - stop: () => { - try { - if (localProcess) { - localProcess.kill("SIGTERM"); - localClientCache.delete(localCacheKey); - localClientPendingPromises.delete(localCacheKey); - localProcess = null; - api.logger.info("openviking: local server stopped"); - } else { - api.logger.info("openviking: stopped"); - } - } finally { - resetPluginRegistration(registrationToken); - } - }, - }); - - commitPluginRegistration(registrationToken); - } catch (err) { - if (createdPendingClientEntry && localCacheKey) { - localClientPendingPromises.delete(localCacheKey); - } - rollbackPluginRegistration(registrationToken); - throw err; - } + }, + stop: () => { + if (localProcess) { + localProcess.kill("SIGTERM"); + localClientCache.delete(localCacheKey); + localClientPendingPromises.delete(localCacheKey); + localProcess = null; + api.logger.info("openviking: local server stopped"); + } else { + api.logger.info("openviking: stopped"); + } + }, + }); }, };