diff --git a/packaging.test.ts b/packaging.test.ts index 1363341..21b1669 100644 --- a/packaging.test.ts +++ b/packaging.test.ts @@ -1,9 +1,23 @@ import { assertEquals } from "jsr:@std/assert@^1.0.0"; +import { fromFileUrl } from "jsr:@std/path@^1.0.0/from-file-url"; import { join } from "node:path"; import { pathToFileURL } from "node:url"; const workspaceRoot = new URL(".", import.meta.url); -const workspacePath = workspaceRoot.pathname; +const workspacePath = fromFileUrl(workspaceRoot); +const packagingRunPermissions = await Promise.all([ + Deno.permissions.query({ name: "run", command: "deno" }), + Deno.permissions.query({ name: "run", command: "node" }), + Deno.permissions.query({ + name: "run", + command: Deno.build.os === "windows" ? "where" : "which", + }), + Deno.permissions.query({ name: "run", command: "bun" }), +]); +const packagingRunPermissionGranted = packagingRunPermissions.every( + (permission, index) => index === 3 || permission.state === "granted", +); +const bunRunPermissionGranted = packagingRunPermissions[3]?.state === "granted"; const decodeText = (value: Uint8Array): string => new TextDecoder().decode(value); @@ -40,91 +54,114 @@ const run = async ( }; }; -Deno.test("built npm package loads in node through the published ESM entrypoint", async () => { - const build = await run("deno", ["task", "build"]); - assertEquals(build.code, 0, build.stderr || build.stdout); +Deno.test({ + name: "built npm package loads in node through the published ESM entrypoint", + ignore: !packagingRunPermissionGranted, + fn: async () => { + const build = await run("deno", ["task", "build"]); + assertEquals(build.code, 0, build.stderr || build.stdout); - const builtPackage = JSON.parse( - await Deno.readTextFile(join(workspacePath, "dist/package.json")), - ) as { - dependencies?: Record; - devDependencies?: Record; - }; - assertEquals( - builtPackage.dependencies?.cosmiconfig, - "^9.0.0", - "generated npm package must declare cosmiconfig for runtime config loading", - ); - assertEquals( - typeof builtPackage.devDependencies?.["@types/node"], - "string", - "generated npm package must declare Node typings for dnt typecheck", - ); + const builtPackage = JSON.parse( + await Deno.readTextFile(join(workspacePath, "dist/package.json")), + ) as { + dependencies?: Record; + devDependencies?: Record; + }; + const builtConfig = await Deno.readTextFile( + join(workspacePath, "dist/esm/src/config.js"), + ); + assertEquals( + builtPackage.dependencies?.cosmiconfig, + "^9.0.0", + "generated npm package must declare cosmiconfig for runtime config loading", + ); + assertEquals( + typeof builtPackage.devDependencies?.["@types/node"], + "string", + "generated npm package must declare Node typings for dnt typecheck", + ); + assertEquals( + builtConfig.includes("import-meta-ponyfill-esmodule"), + false, + "generated config loader should not depend on DNT import-meta ponyfill", + ); - const tempDir = await Deno.makeTempDir(); - try { - const esmRunnerPath = join(tempDir, "load-esm.mjs"); - const bunRunnerPath = join(tempDir, "load-bun.mjs"); - const esmEntrypoint = - pathToFileURL(join(workspacePath, "dist/esm/mod.js")).href; - const packageDir = join(tempDir, "node_modules", "opencode-graphiti"); - const isolatedHome = join(tempDir, "home"); - const isolatedConfig = join(isolatedHome, ".config", "opencode"); + const tempDir = await Deno.makeTempDir(); + try { + let optionalOpenCodePath: string | undefined; + try { + optionalOpenCodePath = Deno.env.get("OPENCODE_BIN") ?? undefined; + } catch { + optionalOpenCodePath = undefined; + } - await Deno.mkdir(join(tempDir, "node_modules"), { recursive: true }); - await Deno.mkdir(isolatedConfig, { recursive: true }); - await Deno.symlink(join(workspacePath, "dist"), packageDir, { - type: "dir", - }); + const esmRunnerPath = join(tempDir, "load-esm.mjs"); + const bunRunnerPath = join(tempDir, "load-bun.mjs"); + const esmEntrypoint = + pathToFileURL(join(workspacePath, "dist/esm/mod.js")).href; + const packageDir = join(tempDir, "node_modules", "opencode-graphiti"); + const isolatedHome = join(tempDir, "home"); + const isolatedConfig = join(isolatedHome, ".config", "opencode"); - await Deno.writeTextFile( - esmRunnerPath, - `import * as plugin from ${ - JSON.stringify(esmEntrypoint) - };\nconsole.log(JSON.stringify(Object.keys(plugin).sort()));\n`, - ); - await Deno.writeTextFile( - bunRunnerPath, - 'import * as plugin from "opencode-graphiti";\n' + - "console.log(JSON.stringify(Object.keys(plugin).sort()));\n", - ); + await Deno.mkdir(join(tempDir, "node_modules"), { recursive: true }); + await Deno.mkdir(isolatedConfig, { recursive: true }); + await Deno.symlink(join(workspacePath, "dist"), packageDir, { + type: "dir", + }); - const esmLoad = await run("node", [esmRunnerPath]); - assertEquals(esmLoad.code, 0, esmLoad.stderr || esmLoad.stdout); - assertEquals(esmLoad.stdout.trim(), '["graphiti"]'); + await Deno.writeTextFile( + esmRunnerPath, + `import * as plugin from ${ + JSON.stringify(esmEntrypoint) + };\nconsole.log(JSON.stringify(Object.keys(plugin).sort()));\n`, + ); + await Deno.writeTextFile( + bunRunnerPath, + 'import * as plugin from "opencode-graphiti";\n' + + "console.log(JSON.stringify(Object.keys(plugin).sort()));\n", + ); - if (await commandExists("bun")) { - const bunLoad = await run("bun", [bunRunnerPath], tempDir); - assertEquals(bunLoad.code, 0, bunLoad.stderr || bunLoad.stdout); - assertEquals(bunLoad.stdout.trim(), '["graphiti"]'); - } + const esmLoad = await run("node", [esmRunnerPath]); + assertEquals(esmLoad.code, 0, esmLoad.stderr || esmLoad.stdout); + assertEquals(esmLoad.stdout.trim(), '["graphiti"]'); - const localOpenCodePath = "/Users/vicary/.opencode/bin/opencode"; - try { - const opencodeInfo = await Deno.stat(localOpenCodePath); - if (opencodeInfo.isFile) { - const isolatedOpenCode = await new Deno.Command(localOpenCodePath, { - args: ["--print-logs", "stats"], - cwd: workspacePath, - env: { - HOME: isolatedHome, - XDG_CONFIG_HOME: join(isolatedHome, ".config"), - }, - stdout: "piped", - stderr: "piped", - }).output(); - const isolatedOpenCodeOutput = decodeText(isolatedOpenCode.stdout) + - decodeText(isolatedOpenCode.stderr); - assertEquals( - isolatedOpenCodeOutput.includes("Missing 'default' export"), - false, - isolatedOpenCodeOutput, - ); + if (bunRunPermissionGranted && await commandExists("bun")) { + const bunLoad = await run("bun", [bunRunnerPath], tempDir); + assertEquals(bunLoad.code, 0, bunLoad.stderr || bunLoad.stdout); + assertEquals(bunLoad.stdout.trim(), '["graphiti"]'); } - } catch { - // OpenCode is not available in CI; keep the portable package checks above. + + if (optionalOpenCodePath) { + try { + const opencodeInfo = await Deno.stat(optionalOpenCodePath); + if (opencodeInfo.isFile) { + const isolatedOpenCode = await new Deno.Command( + optionalOpenCodePath, + { + args: ["--print-logs", "stats"], + cwd: workspacePath, + env: { + HOME: isolatedHome, + XDG_CONFIG_HOME: join(isolatedHome, ".config"), + }, + stdout: "piped", + stderr: "piped", + }, + ).output(); + const isolatedOpenCodeOutput = decodeText(isolatedOpenCode.stdout) + + decodeText(isolatedOpenCode.stderr); + assertEquals( + isolatedOpenCodeOutput.includes("Missing 'default' export"), + false, + isolatedOpenCodeOutput, + ); + } + } catch { + // OPENCODE_BIN is optional; keep the portable package checks above. + } + } + } finally { + await Deno.remove(tempDir, { recursive: true }).catch(() => undefined); } - } finally { - await Deno.remove(tempDir, { recursive: true }).catch(() => undefined); - } + }, }); diff --git a/src/config.ts b/src/config.ts index 86c1fa7..c6a3865 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,6 +1,7 @@ import os from "node:os"; import { createRequire } from "node:module"; import { join } from "node:path"; +import process from "node:process"; import { redactEndpointUserInfo } from "./services/endpoint-redaction.ts"; import { notifyPluginWarning } from "./services/opencode-warning.ts"; import type { GraphitiConfig, RawGraphitiConfig } from "./types/index.ts"; @@ -60,7 +61,9 @@ export interface ConfigExplorerAdapter { type ConfigExplorerFactory = () => ConfigExplorerAdapter; -const nodeRequire = createRequire(import.meta.url); +const nodeRequire = createRequire( + join(process.cwd(), "graphiti.config.runtime.cjs"), +); const isRecord = (value: unknown): value is Record => !!value && typeof value === "object" && !Array.isArray(value); diff --git a/src/index.test.ts b/src/index.test.ts index c072512..ac573b7 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -10,6 +10,7 @@ import { warnOnRedisStartupUnavailable, } from "./index.ts"; import { logger } from "./services/logger.ts"; +import { registerRuntimeTeardown } from "./services/runtime-teardown.ts"; import { setOpenCodeClient, setWarningTaskScheduler, @@ -1359,5 +1360,75 @@ describe("index", () => { "redis", ]); }); + + it("gracefully shuts down on first SIGINT in a node-style host runtime", async () => { + const { input, records, dependencies } = createEntrypointHarness(true); + const signalHandlers = new Map<"SIGINT" | "SIGTERM", () => void>(); + const processEventHandlers = new Map<"beforeExit" | "exit", () => void>(); + const exitCalls: number[] = []; + let exitReject!: (reason?: unknown) => void; + const exitPromise = new Promise((_, reject) => { + exitReject = reject; + }); + + const runtime = { + process: { + on(event: string, handler: () => void) { + if (event === "SIGINT" || event === "SIGTERM") { + signalHandlers.set(event, handler); + return; + } + if (event === "beforeExit" || event === "exit") { + processEventHandlers.set(event, handler); + } + }, + off(event: string, _handler: () => void) { + if (event === "SIGINT" || event === "SIGTERM") { + signalHandlers.delete(event); + return; + } + if (event === "beforeExit" || event === "exit") { + processEventHandlers.delete(event); + } + }, + exit(code?: number) { + exitCalls.push(code ?? 0); + exitReject(new Error(`exit:${code ?? 0}`)); + return undefined as never; + }, + exitCode: undefined, + }, + }; + + await invokeGraphiti(input, { + ...dependencies, + registerRuntimeTeardown: ( + tasks: Array<{ + name: string; + run: () => void | Promise; + }>, + ) => registerRuntimeTeardown(tasks, runtime), + }); + + await assertRejects( + async () => { + signalHandlers.get("SIGINT")?.(); + await exitPromise; + }, + Error, + "exit:130", + ); + + assertEquals(records.teardownTaskRuns, [ + "graphiti-drain-flush", + "graphiti-async", + "session-mcp-runtime", + "graphiti", + "redis", + ]); + assertEquals(exitCalls, [130]); + assertEquals(signalHandlers.size, 0); + assertEquals(processEventHandlers.size, 0); + }); }); }); diff --git a/src/services/connection-manager.ts b/src/services/connection-manager.ts index 87aae36..dcc647d 100644 --- a/src/services/connection-manager.ts +++ b/src/services/connection-manager.ts @@ -1,5 +1,7 @@ import { createRequire } from "node:module"; +import { join } from "node:path"; import { pathToFileURL } from "node:url"; +import process from "node:process"; import manifest from "../../deno.json" with { type: "json" }; import { isAbortError } from "../utils.ts"; import { redactEndpointUserInfo } from "./endpoint-redaction.ts"; @@ -26,7 +28,9 @@ type McpRuntimeModules = { StreamableHTTPClientTransport: McpTransportConstructor; }; -const nodeRequire = createRequire(import.meta.url); +const nodeRequire = createRequire( + pathToFileURL(join(process.cwd(), "graphiti.runtime.cjs")).href, +); let mcpRuntimeModulesPromise: Promise | null = null; const importResolvedModule = async (specifier: string): Promise => { diff --git a/src/services/runtime-teardown.smoke-fixture.ts b/src/services/runtime-teardown.smoke-fixture.ts new file mode 100644 index 0000000..d95d97b --- /dev/null +++ b/src/services/runtime-teardown.smoke-fixture.ts @@ -0,0 +1,23 @@ +import process from "node:process"; +import { registerRuntimeTeardown } from "./runtime-teardown.ts"; + +const keepAlive = setInterval(() => {}, 1_000); + +registerRuntimeTeardown([ + { + name: "flush", + run: async () => { + await new Promise((resolve) => setTimeout(resolve, 25)); + clearInterval(keepAlive); + process.stdout.write("teardown-run\n"); + }, + }, +], { + process: { + on: process.on.bind(process), + off: process.off.bind(process), + exit: process.exit.bind(process), + }, +}); + +process.stdout.write("ready\n"); diff --git a/src/services/runtime-teardown.smoke.test.ts b/src/services/runtime-teardown.smoke.test.ts new file mode 100644 index 0000000..94a3596 --- /dev/null +++ b/src/services/runtime-teardown.smoke.test.ts @@ -0,0 +1,149 @@ +import { assertEquals, assertStringIncludes } from "jsr:@std/assert@^1.0.0"; +import { fromFileUrl } from "jsr:@std/path@^1.0.0/from-file-url"; + +const SMOKE_FIXTURE_PATH = fromFileUrl( + new URL( + "./runtime-teardown.smoke-fixture.ts", + import.meta.url, + ), +); + +const smokeRunPermission = await Deno.permissions.query({ + name: "run", + command: Deno.execPath(), +}); +const smokeReadPermission = await Deno.permissions.query({ + name: "read", + path: SMOKE_FIXTURE_PATH, +}); + +const waitForExit = async ( + child: Deno.ChildProcess, + timeoutMs: number, +): Promise => { + let timeoutId: ReturnType | undefined; + try { + return await Promise.race([ + child.status, + new Promise((_, reject) => { + timeoutId = setTimeout(() => { + try { + child.kill("SIGKILL"); + } catch { + // Best-effort timeout cleanup only. + } + reject(new Error(`subprocess did not exit within ${timeoutMs}ms`)); + }, timeoutMs); + }), + ]); + } finally { + if (timeoutId !== undefined) clearTimeout(timeoutId); + } +}; + +const waitForText = async ( + stream: ReadableStream | null, + expected: string, + timeoutMs: number, + onTimeout?: () => void, +): Promise<{ + seen: string; + remainder: Promise; +}> => { + if (!stream) { + return { seen: "", remainder: Promise.resolve("") }; + } + const reader = stream.getReader(); + const decoder = new TextDecoder(); + let seen = ""; + let timeoutId: ReturnType | undefined; + + try { + await Promise.race([ + (async () => { + while (!seen.includes(expected)) { + const { value, done } = await reader.read(); + if (done) break; + if (!value) continue; + seen += decoder.decode(value, { stream: true }); + } + })(), + new Promise((_, reject) => { + timeoutId = setTimeout(() => { + onTimeout?.(); + void reader.cancel(); + reject( + new Error(`timed out waiting for ${JSON.stringify(expected)}`), + ); + }, timeoutMs); + }), + ]); + seen += decoder.decode(); + + const remainder = (async () => { + let output = seen; + try { + for (;;) { + const { value, done } = await reader.read(); + if (done) break; + if (!value) continue; + output += decoder.decode(value, { stream: true }); + } + output += decoder.decode(); + return output; + } finally { + reader.releaseLock(); + } + })(); + + return { seen, remainder }; + } catch (error) { + reader.releaseLock(); + throw error; + } finally { + if (timeoutId !== undefined) clearTimeout(timeoutId); + } +}; + +Deno.test({ + name: + "runtime teardown smoke: gracefully exits a live node-style host process on first SIGINT", + ignore: smokeRunPermission.state !== "granted" || + smokeReadPermission.state === "denied", + fn: async () => { + const child = new Deno.Command(Deno.execPath(), { + args: ["run", SMOKE_FIXTURE_PATH], + stdout: "piped", + stderr: "piped", + }).spawn(); + + const stdoutState = await waitForText( + child.stdout, + "ready\n", + 2_000, + () => { + try { + child.kill("SIGKILL"); + } catch { + // Best-effort cleanup only. + } + }, + ); + const stderrPromise = new Response(child.stderr).text(); + + child.kill("SIGINT"); + + const status = await waitForExit(child, 2_000); + const stdout = await stdoutState.remainder; + const stderr = await stderrPromise; + + assertEquals(status.success, false); + assertEquals(status.code, 130); + assertStringIncludes(stdoutState.seen, "ready\n"); + assertStringIncludes(stdout, "teardown-run\n"); + assertStringIncludes( + stderr, + "Graceful shutdown in progress; waiting for pending memory flush.", + ); + }, +}); diff --git a/src/services/runtime-teardown.test.ts b/src/services/runtime-teardown.test.ts index 9ff4a20..2e33ea4 100644 --- a/src/services/runtime-teardown.test.ts +++ b/src/services/runtime-teardown.test.ts @@ -66,7 +66,7 @@ describe("runtime teardown", () => { assertEquals(signalHandlers.size, 0); }); - it("removes signal listeners as soon as graceful shutdown starts from a signal", async () => { + it("keeps signal listeners active while graceful shutdown is running from a signal", async () => { const signalHandlers = new Map<"SIGINT" | "SIGTERM", () => void>(); const removedSignalHandlers: Array<"SIGINT" | "SIGTERM"> = []; let releaseTask!: () => void; @@ -101,8 +101,8 @@ describe("runtime teardown", () => { signalHandlers.get("SIGINT")?.(); - assertEquals(signalHandlers.size, 0); - assertEquals(removedSignalHandlers.sort(), ["SIGINT", "SIGTERM"]); + assertEquals([...signalHandlers.keys()].sort(), ["SIGINT", "SIGTERM"]); + assertEquals(removedSignalHandlers, []); releaseTask(); await assertRejects( @@ -112,6 +112,8 @@ describe("runtime teardown", () => { Error, "exit:130", ); + + assertEquals(removedSignalHandlers.sort(), ["SIGINT", "SIGTERM"]); }); it("removes signal listeners when graceful shutdown starts from unload", async () => { @@ -398,18 +400,25 @@ describe("runtime teardown", () => { signalHandlers.get("SIGINT")?.(); await taskStarted; - assertEquals([...signalHandlers.keys()].sort(), []); + assertEquals([...signalHandlers.keys()].sort(), ["SIGINT", "SIGTERM"]); assertEquals(warnings.length, 1); assertEquals( warnings[0][0], "Graceful shutdown in progress; waiting for pending memory flush. Press Ctrl+C again to exit immediately and drop pending memories.", ); - releaseTask(); + signalHandlers.get("SIGINT")?.(); await assertRejects(async () => await exitPromise, Error, "exit:130"); assertEquals(exitCalls, [130]); + assertEquals(warnings.length, 2); + assertEquals( + warnings[1][0], + "Forced shutdown requested; exiting immediately and dropping pending memories.", + ); assertEquals(removedSignalHandlers.sort(), ["SIGINT", "SIGTERM"]); + + releaseTask(); } finally { logger.warn = originalWarn; } @@ -469,4 +478,64 @@ describe("runtime teardown", () => { logger.warn = originalWarn; } }); + + it("forces process exit after graceful SIGINT teardown completes in node-style runtimes", async () => { + const signalHandlers = new Map<"SIGINT" | "SIGTERM", () => void>(); + const warnings: unknown[][] = []; + const exitCalls: number[] = []; + let exitReject!: (reason?: unknown) => void; + const exitPromise = new Promise((_, reject) => { + exitReject = reject; + }); + const originalWarn = logger.warn; + logger.warn = (...args: unknown[]) => { + warnings.push(args); + }; + + try { + registerRuntimeTeardown([ + { + name: "redis", + run: () => Promise.resolve(), + }, + ], { + process: { + on(event: string, handler: () => void) { + if (event === "SIGINT" || event === "SIGTERM") { + signalHandlers.set(event, handler); + } + }, + off(event: string, _handler: () => void) { + if (event === "SIGINT" || event === "SIGTERM") { + signalHandlers.delete(event); + } + }, + exit(code?: number) { + exitCalls.push(code ?? 0); + exitReject(new Error(`exit:${code ?? 0}`)); + return undefined as never; + }, + exitCode: undefined, + } as unknown as { + on?: (event: string, handler: () => void) => void; + off?: (event: string, handler: () => void) => void; + exitCode?: number; + }, + }); + + await assertRejects( + async () => { + signalHandlers.get("SIGINT")?.(); + await exitPromise; + }, + Error, + "exit:130", + ); + + assertEquals(exitCalls, [130]); + assertEquals(warnings.length, 1); + } finally { + logger.warn = originalWarn; + } + }); }); diff --git a/src/services/runtime-teardown.ts b/src/services/runtime-teardown.ts index 703adf5..103e1ec 100644 --- a/src/services/runtime-teardown.ts +++ b/src/services/runtime-teardown.ts @@ -10,6 +10,10 @@ export interface RuntimeTeardownRegistration { dispose(): void; } +type RunOptions = { + preserveSignalListeners?: boolean; +}; + type ShutdownTrigger = | { kind: "event"; type: (typeof SHUTDOWN_EVENTS)[number] } | { kind: "signal"; signal: (typeof SHUTDOWN_SIGNALS)[number] }; @@ -39,6 +43,7 @@ type ShutdownRegistrationAdapter = { process?: { on?: (event: string, handler: () => void) => void; off?: (event: string, handler: () => void) => void; + exit?: (code?: number) => never; exitCode?: number; }; }; @@ -138,15 +143,18 @@ export function registerRuntimeTeardown( } if (runtime.process) { runtime.process.exitCode = exitCode; + runtime.process.exit?.(exitCode); } }; - const run = (): Promise => { + const run = (options: RunOptions = {}): Promise => { if (teardownPromise) return teardownPromise; teardownPromise = (async () => { disposeEventListeners(); - disposeSignalListeners(); + if (!options.preserveSignalListeners) { + disposeSignalListeners(); + } releaseRegistration(); try { @@ -161,7 +169,9 @@ export function registerRuntimeTeardown( } } } finally { - disposeSignalListeners(); + if (!options.preserveSignalListeners) { + disposeSignalListeners(); + } } })(); @@ -172,19 +182,19 @@ export function registerRuntimeTeardown( if (gracefulShutdownStarted) return; gracefulShutdownStarted = true; disposeEventListeners(); - disposeSignalListeners(); if (trigger.kind === "signal") { shutdownSignal = trigger.signal; logger.warn(getShutdownNotice(trigger.signal), { signal: trigger.signal, }); - void run().finally(() => { + void run({ preserveSignalListeners: true }).finally(() => { requestExit(trigger.signal); }); return; } + disposeSignalListeners(); void run(); };