diff --git a/bun.lock b/bun.lock index 02a7dd7..a503402 100644 --- a/bun.lock +++ b/bun.lock @@ -75,6 +75,7 @@ }, "dependencies": { "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*", "jmespath": "^0.16.0", }, "devDependencies": { @@ -91,6 +92,7 @@ }, "dependencies": { "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*", }, "devDependencies": { "@types/bun": "latest", @@ -106,6 +108,7 @@ "dependencies": { "@anthropic-ai/sdk": "^0.71.2", "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*", }, "devDependencies": { "@types/bun": "latest", @@ -120,6 +123,7 @@ }, "dependencies": { "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*", }, "devDependencies": { "@types/bun": "latest", @@ -134,6 +138,7 @@ }, "dependencies": { "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*", }, "devDependencies": { "@types/bun": "latest", @@ -148,6 +153,7 @@ }, "dependencies": { "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*", "zod": "^4.1.12", }, "devDependencies": { @@ -158,6 +164,17 @@ "bun": ">=1.0.0", }, }, + "packages/proxy-plugin-server": { + "name": "@jixo/proxy-plugin-server", + "version": "0.1.0", + "dependencies": { + "@jixo/proxy-plugin": "workspace:*", + }, + "devDependencies": { + "@types/bun": "latest", + "typescript": "^5.0.0", + }, + }, }, "packages": { "@anthropic-ai/sdk": ["@anthropic-ai/sdk@0.71.2", "", { "dependencies": { "json-schema-to-ts": "^3.1.1" }, "peerDependencies": { "zod": "^3.25.0 || ^4.0.0" }, "optionalPeers": ["zod"], "bin": { "anthropic-ai-sdk": "bin/cli" } }, "sha512-TGNDEUuEstk/DKu0/TflXAEt+p+p/WhTlFzEnoosvbaDU2LTjm42igSdlL0VijrKpWejtOKxX0b8A7uc+XiSAQ=="], @@ -266,6 +283,8 @@ "@jixo/proxy-plugin-responses4claudecode": ["@jixo/proxy-plugin-responses4claudecode@workspace:packages/proxy-plugin-responses4claudecode"], + "@jixo/proxy-plugin-server": ["@jixo/proxy-plugin-server@workspace:packages/proxy-plugin-server"], + "@oven/bun-darwin-aarch64": ["@oven/bun-darwin-aarch64@1.3.2", "", { "os": "darwin", "cpu": "arm64" }, "sha512-licBDIbbLP5L5/S0+bwtJynso94XD3KyqSP48K59Sq7Mude6C7dR5ZujZm4Ut4BwZqUFfNOfYNMWBU5nlL7t1A=="], "@oven/bun-darwin-x64": ["@oven/bun-darwin-x64@1.3.2", "", { "os": "darwin", "cpu": "x64" }, "sha512-hn8lLzsYyyh6ULo2E8v2SqtrWOkdQKJwapeVy1rDw7juTTeHY3KDudGWf4mVYteC9riZU6HD88Fn3nGwyX0eIg=="], @@ -866,6 +885,8 @@ "@jixo/proxy-plugin-responses4claudecode/@types/bun": ["@types/bun@1.3.5", "", { "dependencies": { "bun-types": "1.3.5" } }, "sha512-RnygCqNrd3srIPEWBd5LFeUYG7plCoH2Yw9WaZGyNmdTEei+gWaHqydbaIRkIkcbXwhBT94q78QljxN0Sk838w=="], + "@jixo/proxy-plugin-server/@types/bun": ["@types/bun@1.3.5", "", { "dependencies": { "bun-types": "1.3.5" } }, "sha512-RnygCqNrd3srIPEWBd5LFeUYG7plCoH2Yw9WaZGyNmdTEei+gWaHqydbaIRkIkcbXwhBT94q78QljxN0Sk838w=="], + "@radix-ui/react-alert-dialog/@radix-ui/react-slot": ["@radix-ui/react-slot@1.2.3", "", { "dependencies": { "@radix-ui/react-compose-refs": "1.1.2" }, "peerDependencies": { "@types/react": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react"] }, "sha512-aeNmHnBxbi2St0au6VBVC7JXFlhLlOnvIIlePNniyUNAClzmtAUEY8/pBiK3iHjufOlwA+c20/8jngo7xcrg8A=="], "@radix-ui/react-collection/@radix-ui/react-slot": ["@radix-ui/react-slot@1.2.3", "", { "dependencies": { "@radix-ui/react-compose-refs": "1.1.2" }, "peerDependencies": { "@types/react": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react"] }, "sha512-aeNmHnBxbi2St0au6VBVC7JXFlhLlOnvIIlePNniyUNAClzmtAUEY8/pBiK3iHjufOlwA+c20/8jngo7xcrg8A=="], @@ -904,6 +925,8 @@ "@jixo/proxy-plugin-responses4claudecode/@types/bun/bun-types": ["bun-types@1.3.5", "", { "dependencies": { "@types/node": "*" } }, "sha512-inmAYe2PFLs0SUbFOWSVD24sg1jFlMPxOjOSSCYqUgn4Hsc3rDc7dFvfVYjFPNHtov6kgUeulV4SxbuIV/stPw=="], + "@jixo/proxy-plugin-server/@types/bun/bun-types": ["bun-types@1.3.5", "", { "dependencies": { "@types/node": "*" } }, "sha512-inmAYe2PFLs0SUbFOWSVD24sg1jFlMPxOjOSSCYqUgn4Hsc3rDc7dFvfVYjFPNHtov6kgUeulV4SxbuIV/stPw=="], + "@jixo/proxy-plugin/@types/bun/bun-types": ["bun-types@1.3.5", "", { "dependencies": { "@types/node": "*" } }, "sha512-inmAYe2PFLs0SUbFOWSVD24sg1jFlMPxOjOSSCYqUgn4Hsc3rDc7dFvfVYjFPNHtov6kgUeulV4SxbuIV/stPw=="], } } diff --git a/packages/proxy-plugin-anthropic-ping/package.json b/packages/proxy-plugin-anthropic-ping/package.json index e183380..b42a9f0 100644 --- a/packages/proxy-plugin-anthropic-ping/package.json +++ b/packages/proxy-plugin-anthropic-ping/package.json @@ -17,6 +17,7 @@ }, "dependencies": { "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*", "jmespath": "^0.16.0" }, "devDependencies": { diff --git a/packages/proxy-plugin-anthropic-ping/src/index.ts b/packages/proxy-plugin-anthropic-ping/src/index.ts index b182234..b6a62ae 100755 --- a/packages/proxy-plugin-anthropic-ping/src/index.ts +++ b/packages/proxy-plugin-anthropic-ping/src/index.ts @@ -18,7 +18,8 @@ * - debug: 是否开启调试日志 */ -import { definePlugin, getPluginConfigWithDefaults } from "@jixo/proxy-plugin"; +import { createProxyServer } from "@jixo/proxy-plugin-server"; +import { getPluginConfigWithDefaults } from "@jixo/proxy-plugin"; import { createAnthropicPingPlugin } from "./plugin"; export { AnthropicPingMiddleware } from "./ping-middleware"; @@ -52,19 +53,15 @@ export interface AnthropicPingConfig { } if (import.meta.main) { - // 从环境变量 PLUGIN_CONFIG 读取配置 const config = getPluginConfigWithDefaults({ - maxKeepAliveDurationMs: 60 * 60 * 1000, // 60 分钟 - cacheTtlMs: 5 * 60 * 1000, // 5 分钟,Anthropic 官方值 - pingLeadTimeMs: 1 * 60 * 1000, // 提前 1 分钟开始保活 + maxKeepAliveDurationMs: 60 * 60 * 1000, + cacheTtlMs: 5 * 60 * 1000, + pingLeadTimeMs: 1 * 60 * 1000, pollingIntervalMs: 30 * 1000, debug: false, }); - // 也支持通过环境变量直接配置(向后兼容) const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1" || config.debug; - - // 计算实际的 idle 阈值 const idleThresholdMs = config.cacheTtlMs - config.pingLeadTimeMs; console.log("[AnthropicPing] Starting with config:", { @@ -76,12 +73,11 @@ if (import.meta.main) { debug, }); - void idleThresholdMs; - void config; - void debug; - void definePlugin; - void createAnthropicPingPlugin; - throw new Error( - `[anthropic-ping] standalone plugin server mode is no longer supported: hooks are now in-process and streaming-native.`, - ); + createProxyServer({ + plugin: createAnthropicPingPlugin({ + ...config, + debug, + idleThresholdMs, + }), + }); } diff --git a/packages/proxy-plugin-anthropic4codex/package.json b/packages/proxy-plugin-anthropic4codex/package.json index 21386ad..b4402b3 100644 --- a/packages/proxy-plugin-anthropic4codex/package.json +++ b/packages/proxy-plugin-anthropic4codex/package.json @@ -28,7 +28,8 @@ "test": "bun test" }, "dependencies": { - "@jixo/proxy-plugin": "workspace:*" + "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*" }, "devDependencies": { "@types/bun": "latest", diff --git a/packages/proxy-plugin-anthropic4codex/src/index.ts b/packages/proxy-plugin-anthropic4codex/src/index.ts index e0e247c..530d97f 100755 --- a/packages/proxy-plugin-anthropic4codex/src/index.ts +++ b/packages/proxy-plugin-anthropic4codex/src/index.ts @@ -7,6 +7,7 @@ * - Response Hook: 将 Claude Messages SSE 响应转换为 Codex Responses SSE 响应 */ +import { createProxyServer } from "@jixo/proxy-plugin-server"; import { createCodexPlugin } from "./plugin"; // Plugin @@ -93,9 +94,10 @@ export type { // 作为独立进程运行时启动服务器 if (import.meta.main) { - const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1"; - void debug; - throw new Error( - `[anthropic4codex] standalone plugin server mode is no longer supported: hooks are now in-process and streaming-native.`, - ); + const config = JSON.parse(process.env.PLUGIN_CONFIG || "{}"); + const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1" || config.debug; + + createProxyServer({ + plugin: createCodexPlugin({ ...config, debug }), + }); } diff --git a/packages/proxy-plugin-anthropic4droid/package.json b/packages/proxy-plugin-anthropic4droid/package.json index 952b839..ccc36a0 100644 --- a/packages/proxy-plugin-anthropic4droid/package.json +++ b/packages/proxy-plugin-anthropic4droid/package.json @@ -19,7 +19,8 @@ }, "dependencies": { "@anthropic-ai/sdk": "^0.71.2", - "@jixo/proxy-plugin": "workspace:*" + "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*" }, "devDependencies": { "@types/bun": "latest", diff --git a/packages/proxy-plugin-anthropic4droid/src/index.ts b/packages/proxy-plugin-anthropic4droid/src/index.ts index 842bb4f..8141fa2 100755 --- a/packages/proxy-plugin-anthropic4droid/src/index.ts +++ b/packages/proxy-plugin-anthropic4droid/src/index.ts @@ -7,6 +7,7 @@ * - Response Hook: 将上游错误重写为 context_length_exceeded 以触发 auto-compact */ +import { createProxyServer } from "@jixo/proxy-plugin-server"; import { createDroidPlugin } from "./plugin"; // 导出公共 API @@ -39,9 +40,10 @@ export type { RequestBody, Message, TextBlock, RewriteResult } from "./types"; // 作为独立进程运行时启动服务器 if (import.meta.main) { - const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1"; - void debug; - throw new Error( - `[anthropic4droid] standalone plugin server mode is no longer supported: hooks are now in-process and streaming-native.`, - ); + const config = JSON.parse(process.env.PLUGIN_CONFIG || "{}"); + const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1" || config.debug; + + createProxyServer({ + plugin: createDroidPlugin({ ...config, debug }), + }); } diff --git a/packages/proxy-plugin-gemini4droid/package.json b/packages/proxy-plugin-gemini4droid/package.json index a52e651..1ed5e69 100644 --- a/packages/proxy-plugin-gemini4droid/package.json +++ b/packages/proxy-plugin-gemini4droid/package.json @@ -19,7 +19,8 @@ "test": "bun test" }, "dependencies": { - "@jixo/proxy-plugin": "workspace:*" + "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*" }, "devDependencies": { "@types/bun": "latest", diff --git a/packages/proxy-plugin-gemini4droid/src/index.ts b/packages/proxy-plugin-gemini4droid/src/index.ts index b350db7..fb5420a 100755 --- a/packages/proxy-plugin-gemini4droid/src/index.ts +++ b/packages/proxy-plugin-gemini4droid/src/index.ts @@ -10,7 +10,7 @@ * droid --provider anthropic → proxy → gemini4droid → 用户的 Gemini 服务 */ -import { definePlugin } from "@jixo/proxy-plugin"; +import { createProxyServer } from "@jixo/proxy-plugin-server"; import { createGeminiPlugin } from "./plugin"; // 导出插件工厂 @@ -75,35 +75,27 @@ export type { RequestConversionResult, ResponseConversionResult, } from "./types"; -export const createPlugin=createGeminiPlugin + +export const createPlugin = createGeminiPlugin; // 作为独立进程运行时启动服务器 if (import.meta.main) { - const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1"; - - // 从环境变量或 PLUGIN_CONFIG 读取配置 - let upstreamBaseUrl = process.env.GEMINI_UPSTREAM_URL; + const config = JSON.parse(process.env.PLUGIN_CONFIG || "{}"); + const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1" || config.debug; - // 尝试从 PLUGIN_CONFIG 读取(proxy hook config 传递) - if (!upstreamBaseUrl && process.env.PLUGIN_CONFIG) { - try { - const pluginConfig = JSON.parse(process.env.PLUGIN_CONFIG); - upstreamBaseUrl = pluginConfig.upstreamBaseUrl; - } catch { - // ignore parse errors - } - } + // 从环境变量或 config 读取 upstream URL + const upstreamBaseUrl = process.env.GEMINI_UPSTREAM_URL || config.upstreamBaseUrl; console.log("Starting gemini4droid plugin server..."); if (upstreamBaseUrl) { console.log(`Upstream URL: ${upstreamBaseUrl}`); } - definePlugin( - createGeminiPlugin({ + createProxyServer({ + plugin: createGeminiPlugin({ + ...config, debug, upstreamBaseUrl, }), - { debug } - ); + }); } diff --git a/packages/proxy-plugin-openai4droid/package.json b/packages/proxy-plugin-openai4droid/package.json index 6deb725..c47c091 100644 --- a/packages/proxy-plugin-openai4droid/package.json +++ b/packages/proxy-plugin-openai4droid/package.json @@ -16,7 +16,8 @@ "test": "bun test" }, "dependencies": { - "@jixo/proxy-plugin": "workspace:*" + "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*" }, "devDependencies": { "@types/bun": "latest", diff --git a/packages/proxy-plugin-openai4droid/src/index.ts b/packages/proxy-plugin-openai4droid/src/index.ts index bb2d4d0..c6e8a9b 100755 --- a/packages/proxy-plugin-openai4droid/src/index.ts +++ b/packages/proxy-plugin-openai4droid/src/index.ts @@ -10,6 +10,7 @@ * 注意:websearch 响应解析由 droid-patch 的 websearch-native.ts 处理 */ +import { createProxyServer } from "@jixo/proxy-plugin-server"; import { createDroidPlugin } from "./plugin"; // 导出公共 API @@ -20,9 +21,10 @@ export type { RequestBody, RewriteResult, AnyTool, WebSearchTool, FunctionTool } // 作为独立进程运行时启动服务器 if (import.meta.main) { - const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1"; - void debug; - throw new Error( - `[openai4droid] standalone plugin server mode is no longer supported: hooks are now in-process and streaming-native.`, - ); + const config = JSON.parse(process.env.PLUGIN_CONFIG || "{}"); + const debug = process.env.DEBUG === "true" || process.env.DEBUG === "1" || config.debug; + + createProxyServer({ + plugin: createDroidPlugin({ ...config, debug }), + }); } diff --git a/packages/proxy-plugin-responses4claudecode/package.json b/packages/proxy-plugin-responses4claudecode/package.json index 0cdc45a..f378f9a 100644 --- a/packages/proxy-plugin-responses4claudecode/package.json +++ b/packages/proxy-plugin-responses4claudecode/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@jixo/proxy-plugin": "workspace:*", + "@jixo/proxy-plugin-server": "workspace:*", "zod": "^4.1.12" }, "devDependencies": { diff --git a/packages/proxy-plugin-responses4claudecode/src/index.ts b/packages/proxy-plugin-responses4claudecode/src/index.ts index fed6225..09e703f 100755 --- a/packages/proxy-plugin-responses4claudecode/src/index.ts +++ b/packages/proxy-plugin-responses4claudecode/src/index.ts @@ -9,10 +9,11 @@ * 2. 作为模块导入: import { createResponses4ClaudeCodePlugin } from "@jixo/proxy-plugin-responses4claudecode" */ -import { startPlugin } from "./plugin"; +import { createProxyServer } from "@jixo/proxy-plugin-server"; +import { createResponses4ClaudeCodePlugin } from "./plugin"; // 导出公共 API -export { startPlugin, createResponses4ClaudeCodePlugin, type Responses4ClaudeCodePluginOptions } from "./plugin"; +export { createResponses4ClaudeCodePlugin, type Responses4ClaudeCodePluginOptions } from "./plugin"; export { isClaudeRequest, rewriteRequest, convertRequest } from "./request-converter"; export { SSEStreamConverter, convertSSEResponse, convertErrorResponse, convertSuccessResponse, isCodexSuccessResponse } from "./response-converter"; export * from "./types"; @@ -23,14 +24,12 @@ export * from "./task-executor"; // 如果直接运行,启动插件服务器 if (import.meta.main) { + const config = JSON.parse(process.env.PLUGIN_CONFIG || "{}"); + const debug = process.env.DEBUG_RESPONSES4CLAUDECODE === "1" || config.debug; + console.log(`[responses4claudecode] Starting plugin server...`); - startPlugin() - .then(() => { - console.log(`[responses4claudecode] Ready to convert Claude Messages API → OpenAI Responses API`); - }) - .catch((err) => { - console.error(`[responses4claudecode] Failed to start:`, err); - process.exit(1); - }); + createProxyServer({ + plugin: createResponses4ClaudeCodePlugin({ ...config, debug }), + }); } diff --git a/packages/proxy-plugin-responses4claudecode/src/plugin.ts b/packages/proxy-plugin-responses4claudecode/src/plugin.ts index dedc54b..17f1697 100644 --- a/packages/proxy-plugin-responses4claudecode/src/plugin.ts +++ b/packages/proxy-plugin-responses4claudecode/src/plugin.ts @@ -7,7 +7,6 @@ import { z } from "zod"; import { - startPluginServer, type RequestHookParams, type RequestHookResult, type ResponseHookParams, @@ -378,14 +377,4 @@ export function createResponses4ClaudeCodePlugin(options: Responses4ClaudeCodePl }; } -/** - * 启动插件服务器 - */ -export async function startPlugin(): Promise { - const debug = process.env.DEBUG_RESPONSES4CLAUDECODE === "1"; - const plugin = createResponses4ClaudeCodePlugin({ debug }); - void plugin; - throw new Error( - `[responses4claudecode] startPlugin is no longer supported: hooks are now in-process and streaming-native.`, - ); -} + diff --git a/packages/proxy-plugin-server/package.json b/packages/proxy-plugin-server/package.json new file mode 100644 index 0000000..650e65d --- /dev/null +++ b/packages/proxy-plugin-server/package.json @@ -0,0 +1,21 @@ +{ + "name": "@jixo/proxy-plugin-server", + "version": "0.1.0", + "type": "module", + "main": "./src/index.ts", + "types": "./src/index.ts", + "exports": { + ".": "./src/index.ts" + }, + "scripts": { + "ts": "tsc --noEmit", + "test": "bun test" + }, + "dependencies": { + "@jixo/proxy-plugin": "workspace:*" + }, + "devDependencies": { + "@types/bun": "latest", + "typescript": "^5.0.0" + } +} diff --git a/packages/proxy-plugin-server/src/__tests__/callback.test.ts b/packages/proxy-plugin-server/src/__tests__/callback.test.ts new file mode 100644 index 0000000..e225e83 --- /dev/null +++ b/packages/proxy-plugin-server/src/__tests__/callback.test.ts @@ -0,0 +1,63 @@ +import { describe, it, expect, beforeAll, afterAll } from "bun:test"; +import * as http from "node:http"; +import { reportReady } from "../callback"; + +describe("callback", () => { + describe("reportReady", () => { + let server: http.Server; + let serverPort: number; + let receivedUrl: string | null = null; + + beforeAll(async () => { + server = http.createServer((req, res) => { + let body = ""; + req.on("data", (chunk) => { + body += chunk.toString(); + }); + req.on("end", () => { + receivedUrl = body; + res.writeHead(200); + res.end("OK"); + }); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const addr = server.address(); + serverPort = typeof addr === "object" && addr ? addr.port : 0; + resolve(); + }); + }); + }); + + afterAll(() => { + server.close(); + }); + + it("should POST plugin URL to callback server", async () => { + const originalEnv = process.env.__CALLBACK_URL__; + process.env.__CALLBACK_URL__ = `http://127.0.0.1:${serverPort}`; + + try { + await reportReady("http://127.0.0.1:12345"); + expect(receivedUrl).toBe("http://127.0.0.1:12345"); + } finally { + process.env.__CALLBACK_URL__ = originalEnv; + } + }); + + it("should not throw when __CALLBACK_URL__ is not set (standalone mode)", async () => { + const originalEnv = process.env.__CALLBACK_URL__; + delete process.env.__CALLBACK_URL__; + + try { + await reportReady("http://127.0.0.1:12345"); + // Should not throw + } finally { + if (originalEnv) { + process.env.__CALLBACK_URL__ = originalEnv; + } + } + }); + }); +}); diff --git a/packages/proxy-plugin-server/src/__tests__/proxy-server.test.ts b/packages/proxy-plugin-server/src/__tests__/proxy-server.test.ts new file mode 100644 index 0000000..c036b17 --- /dev/null +++ b/packages/proxy-plugin-server/src/__tests__/proxy-server.test.ts @@ -0,0 +1,56 @@ +import { describe, it, expect } from "bun:test"; +import type { ProxyPlugin } from "@jixo/proxy-plugin"; + +describe("proxy-server", () => { + describe("createProxyServer", () => { + it("should export createProxyServer function", async () => { + const { createProxyServer } = await import("../proxy-server"); + expect(typeof createProxyServer).toBe("function"); + }); + }); + + describe("ProxyPlugin interface compatibility", () => { + it("should accept a valid ProxyPlugin", () => { + const plugin: ProxyPlugin = { + name: "test-plugin", + async onRequest(params) { + return { modified: false }; + }, + async onResponse(params) { + return { modified: false }; + }, + }; + + expect(plugin.name).toBe("test-plugin"); + expect(typeof plugin.onRequest).toBe("function"); + expect(typeof plugin.onResponse).toBe("function"); + }); + + it("should work with minimal plugin (name only)", () => { + const plugin: ProxyPlugin = { + name: "minimal-plugin", + }; + + expect(plugin.name).toBe("minimal-plugin"); + expect(plugin.onRequest).toBeUndefined(); + expect(plugin.onResponse).toBeUndefined(); + }); + + it("should work with shouldProcess methods", () => { + const plugin: ProxyPlugin = { + name: "precheck-plugin", + shouldProcessRequest(meta) { + return meta.url?.includes("/api") ?? false; + }, + shouldProcessResponse(meta) { + return meta.statusCode === 200; + }, + }; + + expect(plugin.shouldProcessRequest?.({ url: "/api/test" })).toBe(true); + expect(plugin.shouldProcessRequest?.({ url: "/other" })).toBe(false); + expect(plugin.shouldProcessResponse?.({ statusCode: 200 })).toBe(true); + expect(plugin.shouldProcessResponse?.({ statusCode: 500 })).toBe(false); + }); + }); +}); diff --git a/packages/proxy-plugin-server/src/callback.ts b/packages/proxy-plugin-server/src/callback.ts new file mode 100644 index 0000000..95e8f92 --- /dev/null +++ b/packages/proxy-plugin-server/src/callback.ts @@ -0,0 +1,26 @@ +/** + * __CALLBACK_URL__ 回报逻辑 + */ + +export async function reportReady(url: string): Promise { + const callbackUrl = process.env.__CALLBACK_URL__; + if (!callbackUrl) { + console.log(`[proxy-plugin-server] No __CALLBACK_URL__, running standalone mode`); + console.log(`[proxy-plugin-server] Proxy server listening on ${url}`); + return; + } + + try { + const response = await fetch(callbackUrl, { + method: "POST", + body: url, + headers: { "Content-Type": "text/plain" }, + }); + if (!response.ok) { + throw new Error(`Callback failed: ${response.status} ${response.statusText}`); + } + } catch (error) { + console.error(`[proxy-plugin-server] Failed to report ready to ${callbackUrl}:`, error); + throw error; + } +} diff --git a/packages/proxy-plugin-server/src/index.ts b/packages/proxy-plugin-server/src/index.ts new file mode 100644 index 0000000..0397b34 --- /dev/null +++ b/packages/proxy-plugin-server/src/index.ts @@ -0,0 +1,8 @@ +/** + * @jixo/proxy-plugin-server + * + * 插件进程服务器框架,提供 HTTP 代理模式的插件运行环境 + */ + +export { createProxyServer, type ProxyServerOptions } from "./proxy-server"; +export { reportReady } from "./callback"; diff --git a/packages/proxy-plugin-server/src/proxy-server.ts b/packages/proxy-plugin-server/src/proxy-server.ts new file mode 100644 index 0000000..2514f7e --- /dev/null +++ b/packages/proxy-plugin-server/src/proxy-server.ts @@ -0,0 +1,483 @@ +/** + * HTTP 代理服务器 + * + * 作为标准 HTTP 正向代理运行,调用 plugin hooks 处理请求/响应 + */ + +import * as http from "node:http"; +import * as https from "node:https"; +import { URL } from "node:url"; +import type { + ProxyPlugin, + RequestMeta, + ResponseMeta, + PluginStore, +} from "@jixo/proxy-plugin"; +import { + createPluginStore, + readStreamToBuffer, + streamFromBuffer, +} from "@jixo/proxy-plugin"; +import { reportReady } from "./callback"; + +export interface ProxyServerOptions { + plugin: ProxyPlugin; + port?: number; +} + +function nodeReadableToWebStream(readable: http.IncomingMessage): ReadableStream { + return new ReadableStream({ + start(controller) { + readable.on("data", (chunk: Buffer) => { + controller.enqueue(new Uint8Array(chunk)); + }); + readable.on("end", () => { + controller.close(); + }); + readable.on("error", (err) => { + controller.error(err); + }); + }, + cancel() { + readable.destroy(); + }, + }); +} + +async function pipeWebStreamToNodeResponse( + stream: ReadableStream, + res: http.ServerResponse, +): Promise { + const reader = stream.getReader(); + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (value) { + res.write(Buffer.from(value)); + } + } + } finally { + reader.releaseLock(); + res.end(); + } +} + +function getProxyEnv(): { httpProxy?: string; httpsProxy?: string } { + return { + httpProxy: process.env.HTTP_PROXY || process.env.http_proxy, + httpsProxy: process.env.HTTPS_PROXY || process.env.https_proxy, + }; +} + +export function createProxyServer(options: ProxyServerOptions): void { + const { plugin, port = 0 } = options; + const pluginConfig = JSON.parse(process.env.PLUGIN_CONFIG || "{}"); + + const server = http.createServer(async (req, res) => { + try { + const requestUrl = req.url; + if (!requestUrl || !requestUrl.startsWith("http")) { + res.writeHead(400, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: "Invalid proxy request: absolute URI required" })); + return; + } + + const targetUrl = new URL(requestUrl); + const method = req.method || "GET"; + const requestHeaders: Record = {}; + for (const [key, value] of Object.entries(req.headers)) { + if (value !== undefined) { + requestHeaders[key] = value as string | string[]; + } + } + + const requestMeta: RequestMeta = { + method, + url: targetUrl.href, + headers: requestHeaders, + }; + + const requestBodyStream = nodeReadableToWebStream(req); + + let finalMethod = method; + let finalUrl = targetUrl; + let finalHeaders = { ...requestHeaders }; + let finalBodyStream = requestBodyStream; + + // Request hook + if (plugin.onRequest) { + const shouldProcess = plugin.shouldProcessRequest + ? await plugin.shouldProcessRequest(requestMeta) + : true; + + if (shouldProcess === true) { + const store = createPluginStore( + plugin.name, + (plugin as any).storeSchema, + requestHeaders, + ); + + const hookResult = await plugin.onRequest({ + meta: requestMeta, + body: requestBodyStream, + store, + }); + + if (hookResult) { + if ("respondWith" in hookResult) { + const { statusCode, headers, body } = hookResult.respondWith; + res.writeHead(statusCode, headers); + if (body) { + res.end(typeof body === "string" ? body : body); + } else { + res.end(); + } + return; + } + + if (!("modified" in hookResult) || hookResult.modified !== false) { + if (hookResult.meta?.method) finalMethod = hookResult.meta.method; + if (hookResult.meta?.url) finalUrl = new URL(hookResult.meta.url); + if (hookResult.meta?.headers) { + finalHeaders = hookResult.meta.headers as Record; + } + if (hookResult.body) { + finalBodyStream = hookResult.body; + } + } + } + } + } + + // Forward request + const proxyEnv = getProxyEnv(); + const isHttps = finalUrl.protocol === "https:"; + const upstreamProxy = isHttps ? proxyEnv.httpsProxy : proxyEnv.httpProxy; + + let proxyResponse: http.IncomingMessage; + + if (upstreamProxy) { + // Use upstream proxy (next hop) + proxyResponse = await forwardViaProxy( + upstreamProxy, + finalMethod, + finalUrl, + finalHeaders, + finalBodyStream, + ); + } else { + // Direct connection + proxyResponse = await forwardDirect( + finalMethod, + finalUrl, + finalHeaders, + finalBodyStream, + ); + } + + const responseMeta: ResponseMeta = { + statusCode: proxyResponse.statusCode, + statusMessage: proxyResponse.statusMessage, + headers: proxyResponse.headers as Record, + }; + + let finalStatusCode = proxyResponse.statusCode || 502; + let finalStatusMessage = proxyResponse.statusMessage || ""; + let finalResponseHeaders = { ...proxyResponse.headers }; + let finalResponseBodyStream = nodeReadableToWebStream(proxyResponse); + + // Response hook + if (plugin.onResponse) { + const shouldProcess = plugin.shouldProcessResponse + ? await plugin.shouldProcessResponse(responseMeta, requestMeta) + : true; + + if (shouldProcess === true) { + const store = createPluginStore( + plugin.name, + (plugin as any).storeSchema, + finalHeaders, + ); + + const hookResult = await plugin.onResponse({ + meta: responseMeta, + body: finalResponseBodyStream, + requestMeta, + store, + }); + + if (hookResult && (!("modified" in hookResult) || hookResult.modified !== false)) { + if (hookResult.meta?.statusCode) finalStatusCode = hookResult.meta.statusCode; + if (hookResult.meta?.statusMessage) finalStatusMessage = hookResult.meta.statusMessage; + if (hookResult.meta?.headers) { + finalResponseHeaders = hookResult.meta.headers as http.IncomingHttpHeaders; + } + if (hookResult.body) { + finalResponseBodyStream = hookResult.body; + } + } + } + } + + // Remove hop-by-hop headers + const hopByHopHeaders = [ + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailers", + "transfer-encoding", + "upgrade", + ]; + for (const h of hopByHopHeaders) { + delete finalResponseHeaders[h]; + } + + res.writeHead(finalStatusCode, finalStatusMessage, finalResponseHeaders); + await pipeWebStreamToNodeResponse(finalResponseBodyStream, res); + } catch (error) { + console.error(`[${plugin.name}] Proxy error:`, error); + if (!res.headersSent) { + res.writeHead(502, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: "Proxy error", message: String(error) })); + } + } + }); + + server.listen(port, "127.0.0.1", async () => { + const addr = server.address(); + const actualPort = typeof addr === "object" && addr ? addr.port : port; + const url = `http://127.0.0.1:${actualPort}`; + + await reportReady(url); + }); + + process.on("SIGTERM", () => { + server.close(); + process.exit(0); + }); + + process.on("SIGINT", () => { + server.close(); + process.exit(0); + }); +} + +async function forwardDirect( + method: string, + url: URL, + headers: Record, + bodyStream: ReadableStream, +): Promise { + const isHttps = url.protocol === "https:"; + const requestModule = isHttps ? https : http; + const defaultPort = isHttps ? 443 : 80; + + const outHeaders: http.OutgoingHttpHeaders = {}; + for (const [key, value] of Object.entries(headers)) { + if (key.toLowerCase() !== "host") { + outHeaders[key] = value; + } + } + outHeaders.host = url.host; + + const bodyBuffer = await readStreamToBuffer(bodyStream); + + return new Promise((resolve, reject) => { + const req = requestModule.request( + { + hostname: url.hostname, + port: url.port || defaultPort, + path: url.pathname + url.search, + method, + headers: outHeaders, + }, + (res) => { + resolve(res); + }, + ); + + req.on("error", reject); + + if (bodyBuffer.length > 0) { + req.write(bodyBuffer); + } + req.end(); + }); +} + +async function forwardViaProxy( + proxyUrl: string, + method: string, + targetUrl: URL, + headers: Record, + bodyStream: ReadableStream, +): Promise { + const proxy = new URL(proxyUrl); + const isTargetHttps = targetUrl.protocol === "https:"; + + const outHeaders: http.OutgoingHttpHeaders = {}; + for (const [key, value] of Object.entries(headers)) { + outHeaders[key] = value; + } + outHeaders.host = targetUrl.host; + + const bodyBuffer = await readStreamToBuffer(bodyStream); + + if (isTargetHttps) { + // HTTPS via CONNECT tunnel + return new Promise((resolve, reject) => { + const proxyPort = proxy.port ? parseInt(proxy.port, 10) : 80; + const connectReq = http.request({ + hostname: proxy.hostname, + port: proxyPort, + method: "CONNECT", + path: `${targetUrl.hostname}:${targetUrl.port || 443}`, + }); + + connectReq.on("connect", (res, socket) => { + if (res.statusCode !== 200) { + reject(new Error(`CONNECT failed: ${res.statusCode}`)); + return; + } + + const targetPort = targetUrl.port ? parseInt(targetUrl.port, 10) : 443; + const tls = require("tls") as typeof import("tls"); + const tlsSocket = tls.connect( + { + host: targetUrl.hostname, + port: targetPort, + socket, + servername: targetUrl.hostname, + }, + () => { + // Manually write HTTP request over TLS socket + const requestLine = `${method} ${targetUrl.pathname}${targetUrl.search} HTTP/1.1\r\n`; + const headerLines: string[] = []; + for (const [key, value] of Object.entries(outHeaders)) { + if (Array.isArray(value)) { + for (const v of value) { + headerLines.push(`${key}: ${v}`); + } + } else if (value !== undefined) { + headerLines.push(`${key}: ${value}`); + } + } + if (bodyBuffer.length > 0 && !outHeaders["content-length"]) { + headerLines.push(`content-length: ${bodyBuffer.length}`); + } + const httpRequest = requestLine + headerLines.join("\r\n") + "\r\n\r\n"; + + tlsSocket.write(httpRequest); + if (bodyBuffer.length > 0) { + tlsSocket.write(bodyBuffer); + } + + // Parse HTTP response from TLS socket + let responseData = Buffer.alloc(0); + let headersParsed = false; + let incomingMessage: http.IncomingMessage | null = null; + + tlsSocket.on("data", (chunk: Buffer) => { + responseData = Buffer.concat([responseData, chunk]); + + if (!headersParsed) { + const headerEnd = responseData.indexOf("\r\n\r\n"); + if (headerEnd !== -1) { + headersParsed = true; + const headerPart = responseData.subarray(0, headerEnd).toString(); + const bodyPart = responseData.subarray(headerEnd + 4); + + const lines = headerPart.split("\r\n"); + const statusLine = lines[0] || ""; + const statusMatch = statusLine.match(/HTTP\/\d\.\d (\d+) (.*)/); + const statusCode = statusMatch ? parseInt(statusMatch[1]!, 10) : 502; + const statusMessage = statusMatch ? statusMatch[2]! : ""; + + const responseHeaders: http.IncomingHttpHeaders = {}; + for (let i = 1; i < lines.length; i++) { + const colonIdx = lines[i]!.indexOf(":"); + if (colonIdx > 0) { + const key = lines[i]!.substring(0, colonIdx).toLowerCase(); + const value = lines[i]!.substring(colonIdx + 1).trim(); + responseHeaders[key] = value; + } + } + + // Create a readable stream for the response + const { Readable } = require("stream") as typeof import("stream"); + const bodyStream = new Readable({ read() {} }); + + incomingMessage = Object.assign(bodyStream, { + statusCode, + statusMessage, + headers: responseHeaders, + headersDistinct: {}, + httpVersion: "1.1", + httpVersionMajor: 1, + httpVersionMinor: 1, + complete: false, + rawHeaders: [], + trailers: {}, + trailersDistinct: {}, + rawTrailers: [], + socket: tlsSocket, + connection: tlsSocket, + aborted: false, + url: "", + method: null, + setTimeout: () => bodyStream, + }) as unknown as http.IncomingMessage; + + if (bodyPart.length > 0) { + bodyStream.push(bodyPart); + } + + resolve(incomingMessage); + } + } else if (incomingMessage) { + (incomingMessage as any).push(chunk); + } + }); + + tlsSocket.on("end", () => { + if (incomingMessage) { + (incomingMessage as any).push(null); + } + }); + + tlsSocket.on("error", reject); + }, + ); + + tlsSocket.on("error", reject); + }); + + connectReq.on("error", reject); + connectReq.end(); + }); + } else { + // HTTP via proxy (absolute URI) + return new Promise((resolve, reject) => { + const proxyPort = proxy.port ? parseInt(proxy.port, 10) : 80; + const req = http.request( + { + hostname: proxy.hostname, + port: proxyPort, + path: targetUrl.href, + method, + headers: outHeaders, + }, + (res) => { + resolve(res); + }, + ); + + req.on("error", reject); + if (bodyBuffer.length > 0) req.write(bodyBuffer); + req.end(); + }); + } +} diff --git a/packages/proxy-plugin-server/tsconfig.json b/packages/proxy-plugin-server/tsconfig.json new file mode 100644 index 0000000..c4e6618 --- /dev/null +++ b/packages/proxy-plugin-server/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ESNext", + "module": "ESNext", + "moduleResolution": "bundler", + "strict": true, + "skipLibCheck": true, + "declaration": true, + "declarationMap": true, + "esModuleInterop": true, + "allowSyntheticDefaultImports": true, + "forceConsistentCasingInFileNames": true, + "noEmit": true, + "types": ["bun-types"] + }, + "include": ["src/**/*"] +} diff --git a/packages/proxy-plugin/src/index.ts b/packages/proxy-plugin/src/index.ts index 1ef8fd7..ae6870e 100644 --- a/packages/proxy-plugin/src/index.ts +++ b/packages/proxy-plugin/src/index.ts @@ -32,8 +32,7 @@ export { // Logger export { createLogger, type PluginLogger, type LoggerOptions } from "./logger"; -// Server -export { startPluginServer, definePlugin, type PluginServerOptions } from "./server"; + // Private Headers export { diff --git a/src/lib/hooks-executor.ts b/src/lib/hooks-executor.ts index bc53219..3bb17e7 100644 --- a/src/lib/hooks-executor.ts +++ b/src/lib/hooks-executor.ts @@ -1,26 +1,19 @@ +/** + * Hooks 执行器 + * + * 使用进程模式启动插件,通过 HTTP 代理链式调用 + */ + +import * as http from "node:http"; import { spawn, type Subprocess } from "bun"; -import type { ProxyPlugin, PluginStore, PrecheckResult, RequestMeta, ResponseMeta } from "@jixo/proxy-plugin"; -import { createPluginStore } from "@jixo/proxy-plugin"; -import type { HookConfig, HooksConfig, HookLayer } from "../types/proxy"; - -/** 私有 header:记录处理过该请求的插件列表 */ -const HEADER_PLUGIN_PROCESSED = "-x-jixo-proxy-plugin-processed"; - -function addPluginProcessedHeader( - headers: Record, - pluginName: string, -): Record { - const existing = headers[HEADER_PLUGIN_PROCESSED]; - const list = existing - ? (Array.isArray(existing) ? existing.join(",") : existing).split(",") - : []; - if (!list.includes(pluginName)) { - list.push(pluginName); - } - return { - ...headers, - [HEADER_PLUGIN_PROCESSED]: list.join(","), - }; +import { createHash } from "node:crypto"; +import type { HookConfig, HooksConfig } from "../types/proxy"; + +const CALLBACK_TIMEOUT_MS = 15000; + +function computeConfigHash(config: HookConfig, nextHopUrl: string | undefined): string { + const data = JSON.stringify({ config, nextHopUrl }); + return createHash("sha256").update(data).digest("hex").slice(0, 16); } function normalizeHooksConfig(hooks: HooksConfig | null | undefined): HookConfig[] { @@ -29,398 +22,261 @@ function normalizeHooksConfig(hooks: HooksConfig | null | undefined): HookConfig return list.filter((hook) => hook.disabled !== true); } -function getPluginName(config: HookConfig): string { - if (config.config && typeof config.config === "object" && "name" in config.config) { - return String((config.config as any).name); - } - const args = config.args ?? []; - for (const arg of args) { - if (arg.startsWith("@jixo/")) return arg.replace("@jixo/", ""); - if (arg.includes("proxy-plugin-") || arg.includes("proxy-anthropic-")) { - return arg.split("/").pop() ?? arg; +interface CallbackServer { + url: string; + waitForUrl(): Promise; + close(): void; +} + +function createCallbackServer(): CallbackServer { + let resolveUrl: (url: string) => void; + let rejectUrl: (err: Error) => void; + const urlPromise = new Promise((resolve, reject) => { + resolveUrl = resolve; + rejectUrl = reject; + }); + + const server = http.createServer((req, res) => { + if (req.method === "POST") { + let body = ""; + req.on("data", (chunk) => { + body += chunk.toString(); + }); + req.on("end", () => { + res.writeHead(200); + res.end("OK"); + resolveUrl!(body.trim()); + }); + } else { + res.writeHead(405); + res.end(); } - } - return config.command; + }); + + server.listen(0, "127.0.0.1"); + const addr = server.address(); + const port = typeof addr === "object" && addr ? addr.port : 0; + const url = `http://127.0.0.1:${port}`; + + const timeout = setTimeout(() => { + rejectUrl!(new Error(`Callback timeout after ${CALLBACK_TIMEOUT_MS}ms`)); + server.close(); + }, CALLBACK_TIMEOUT_MS); + + return { + url, + async waitForUrl() { + const result = await urlPromise; + clearTimeout(timeout); + server.close(); + return result; + }, + close() { + clearTimeout(timeout); + server.close(); + }, + }; } -async function importPlugin(config: HookConfig): Promise { - if (config.type !== "http") { - throw new Error(`Unsupported hook type: ${config.type}`); +class PluginProcess { + readonly config: HookConfig; + readonly hash: string; + private _url: string | null = null; + private refCount = 0; + private process: Subprocess | null = null; + private nextHopUrl: string | undefined; + + constructor(config: HookConfig, nextHopUrl: string | undefined) { + this.config = config; + this.nextHopUrl = nextHopUrl; + this.hash = computeConfigHash(config, nextHopUrl); } - const args = config.args ?? []; - const entryArg = args.find((a) => a.startsWith("@jixo/")||a.endsWith(".ts")); - if (!entryArg) { - throw new Error(`Hook config missing package entry: ${config.command} ${args.join(" ")}`); + + get url(): string { + if (!this._url) throw new Error("Plugin not started"); + return this._url; } - const mod = await import(entryArg); - - const pluginConfig = - config.config && typeof config.config === "object" && !Array.isArray(config.config) - ? config.config - : undefined; - - const candidates: Array<() => unknown> = [ - () => (typeof (mod as any).default === "function" ? (mod as any).default(pluginConfig) : (mod as any).default), - () => (typeof (mod as any).plugin === "function" ? (mod as any).plugin(pluginConfig) : (mod as any).plugin), - () => (typeof (mod as any).createPlugin === "function" ? (mod as any).createPlugin(pluginConfig) : undefined), - () => - typeof (mod as any).createDroidPlugin === "function" - ? (mod as any).createDroidPlugin(pluginConfig) - : undefined, - () => - typeof (mod as any).createResponses4ClaudeCodePlugin === "function" - ? (mod as any).createResponses4ClaudeCodePlugin(pluginConfig) - : undefined, - () => mod, - ]; - - for (const getCandidate of candidates) { - const value = getCandidate(); - if (value && typeof value === "object" && typeof (value as any).name === "string") { - return value as ProxyPlugin; + + get pluginName(): string { + if (this.config.config && typeof this.config.config === "object" && "name" in this.config.config) { + return String((this.config.config as any).name); + } + const args = this.config.args ?? []; + for (const arg of args) { + if (arg.startsWith("@jixo/")) return arg.replace("@jixo/", ""); + if (arg.includes("proxy-plugin-")) { + return arg.split("/").pop() ?? arg; + } } + return this.config.command; } - throw new Error(`Invalid plugin module export for ${entryArg}`); -} + async start(): Promise { + const callback = createCallbackServer(); -export interface RequestHookParams { - method: string; - url: string; - headers: Record; - body: ReadableStream; - signal?: AbortSignal; -} + const env: Record = { + ...process.env as Record, + __CALLBACK_URL__: callback.url, + }; -export interface RequestHookResult { - skipped?: boolean; - modified?: boolean; - respondWith?: { statusCode: number; headers?: Record; body?: Buffer }; - method?: string; - url?: string; - headers?: Record; - body?: ReadableStream; -} + if (this.config.config) { + env.PLUGIN_CONFIG = JSON.stringify(this.config.config); + } -export interface ResponseHookParams { - statusCode: number; - statusMessage: string; - headers: Record; - body: ReadableStream; - signal?: AbortSignal; - requestMeta?: { method: string; url: string; headers: Record }; -} + if (this.nextHopUrl) { + env.HTTP_PROXY = this.nextHopUrl; + env.HTTPS_PROXY = this.nextHopUrl; + } -export interface ResponseHookResult { - skipped?: boolean; - modified?: boolean; - statusCode?: number; - statusMessage?: string; - headers?: Record; - body?: ReadableStream; -} + const args = this.config.args ?? []; + this.process = spawn([this.config.command, ...args], { + cwd: this.config.cwd, + env, + stdin: "ignore", + stdout: "inherit", + stderr: "inherit", + }); + + try { + this._url = await callback.waitForUrl(); + console.log(`[HooksPool:${this.hash}] Started: ${this.config.command} ${args.join(" ")} -> ${this._url}`); + } catch (error) { + this.process.kill(); + this.process = null; + throw error; + } + } -type LoadedHook = { - pluginName: string; - plugin: ProxyPlugin; - store?: PluginStore; -}; - -async function loadHooks(hooks: HookConfig[]): Promise { - const out: LoadedHook[] = []; - for (const config of hooks) { - const plugin = await importPlugin(config); - out.push({ pluginName: getPluginName(config), plugin }); + addRef(): void { + this.refCount++; } - return out; -} -/** 请求 hooks 执行结果 */ -export interface RequestHooksExecutionResult { - /** 最终的请求参数 */ - params: RequestHookParams; - /** 每层 hook 的执行结果 */ - layers: HookLayer[]; - /** 是否有任何修改 */ - hasChanges: boolean; - /** 短路响应(如果有插件要求直接返回) */ - respondWith?: { - statusCode: number; - headers?: Record; - body?: Buffer; - }; + release(): boolean { + this.refCount--; + return this.refCount <= 0; + } + + kill(): void { + if (this.process) { + this.process.kill(); + this.process = null; + console.log(`[HooksPool:${this.hash}] Stopped`); + } + } } -/** 响应 hooks 执行结果 */ -export interface ResponseHooksExecutionResult { - /** 最终的响应参数 */ - params: ResponseHookParams; - /** 每层 hook 的执行结果 */ - layers: HookLayer[]; - /** 是否有任何修改 */ - hasChanges: boolean; +class HooksPool { + private pool = new Map(); + + async acquire(config: HookConfig, nextHopUrl: string | undefined): Promise { + const hash = computeConfigHash(config, nextHopUrl); + + let proc = this.pool.get(hash); + if (!proc) { + proc = new PluginProcess(config, nextHopUrl); + await proc.start(); + this.pool.set(hash, proc); + } + proc.addRef(); + return proc; + } + + async release(proc: PluginProcess): Promise { + if (proc.release()) { + proc.kill(); + this.pool.delete(proc.hash); + } + } + + async stopAll(): Promise { + for (const proc of this.pool.values()) { + proc.kill(); + } + this.pool.clear(); + } + + get size(): number { + return this.pool.size; + } } -/** 预检结果汇总 */ +const globalHooksPool = new HooksPool(); + export interface PrecheckSummary { - /** 是否需要缓冲 body(任一插件返回 true) */ needsBuffer: boolean; - /** 需要处理的插件列表 */ activePlugins: string[]; - /** 全部返回 passthrough 或 false */ canPassthrough: boolean; } export class HooksExecutor { - private instanceHooksLoaded: LoadedHook[] = []; - private forwardHooksLoaded: LoadedHook[] = []; + private instanceName: string; + private instanceHooksConfig: HookConfig[]; + private forwardHooksConfig: HookConfig[] = []; + private pluginProcesses: PluginProcess[] = []; + private _firstPluginUrl: string | null = null; constructor( - private instanceName: string, - private instanceHooks: HooksConfig | null | undefined, - ) {} + instanceName: string, + instanceHooks: HooksConfig | null | undefined, + ) { + this.instanceName = instanceName; + this.instanceHooksConfig = normalizeHooksConfig(instanceHooks); + } async start(): Promise { - const configs = normalizeHooksConfig(this.instanceHooks); - this.instanceHooksLoaded = await loadHooks(configs); + await this.rebuildHopChain(); } async stop(): Promise { - this.instanceHooksLoaded = []; - this.forwardHooksLoaded = []; + for (const proc of this.pluginProcesses) { + await globalHooksPool.release(proc); + } + this.pluginProcesses = []; + this._firstPluginUrl = null; } async setForwardHooks(_forwardName: string, hooks: HooksConfig | null | undefined): Promise { - const configs = normalizeHooksConfig(hooks); - this.forwardHooksLoaded = await loadHooks(configs); - } - - /** 请求预检:决定是否需要缓冲请求 body */ - async precheckRequest(meta: RequestMeta): Promise { - const allHooks = [...this.instanceHooksLoaded, ...this.forwardHooksLoaded]; - const activePlugins: string[] = []; - let needsBuffer = false; - let allPassthrough = true; - - for (const hook of allHooks) { - if (!hook.plugin.onRequest) continue; - - let result: PrecheckResult; - if (hook.plugin.shouldProcessRequest) { - result = await hook.plugin.shouldProcessRequest(meta); - } else { - // 向后兼容:没有预检方法则默认需要处理 - result = true; - } + const newConfigs = normalizeHooksConfig(hooks); + const oldConfigsJson = JSON.stringify(this.forwardHooksConfig); + const newConfigsJson = JSON.stringify(newConfigs); - if (result === true) { - needsBuffer = true; - allPassthrough = false; - activePlugins.push(hook.pluginName); - } else if (result === 'passthrough') { - // passthrough 不需要缓冲,但不算 false - } else { - // false - 跳过 - allPassthrough = false; - } + if (oldConfigsJson !== newConfigsJson) { + this.forwardHooksConfig = newConfigs; + await this.rebuildHopChain(); } - - return { - needsBuffer, - activePlugins, - canPassthrough: allPassthrough && !needsBuffer, - }; } - /** 响应预检:决定是否需要缓冲响应 body */ - async precheckResponse(meta: ResponseMeta, requestMeta?: RequestMeta): Promise { - const allHooks = [...this.forwardHooksLoaded, ...this.instanceHooksLoaded]; - const activePlugins: string[] = []; - let needsBuffer = false; - let allPassthrough = true; - - for (const hook of allHooks) { - if (!hook.plugin.onResponse) continue; - - let result: PrecheckResult; - if (hook.plugin.shouldProcessResponse) { - result = await hook.plugin.shouldProcessResponse(meta, requestMeta); - } else { - // 向后兼容:没有预检方法则默认需要处理 - result = true; - } - - if (result === true) { - needsBuffer = true; - allPassthrough = false; - activePlugins.push(hook.pluginName); - } else if (result === 'passthrough') { - // passthrough 不需要缓冲 - } else { - // false - 跳过 - allPassthrough = false; - } + private async rebuildHopChain(): Promise { + for (const proc of this.pluginProcesses) { + await globalHooksPool.release(proc); } + this.pluginProcesses = []; - return { - needsBuffer, - activePlugins, - canPassthrough: allPassthrough && !needsBuffer, - }; - } - - /** 执行请求 hooks 并返回层层记录 */ - async executeRequestHooksWithLayers( - params: RequestHookParams, - bodyToDataUrl: (body: Buffer) => string | null, - ): Promise { - let result = params; - const layers: HookLayer[] = []; - let hasChanges = false; - - const allHooks = [...this.instanceHooksLoaded, ...this.forwardHooksLoaded]; - - for (const hook of allHooks) { - const pluginName = hook.pluginName; - - const store = createPluginStore(pluginName, (hook.plugin as any).storeSchema, result.headers); - const pluginResult = hook.plugin.onRequest - ? await hook.plugin.onRequest({ meta: { method: result.method, url: result.url, headers: result.headers }, body: result.body, store }) - : null; - - if (!pluginResult) { - continue; - } - - if ("respondWith" in pluginResult) { - const { statusCode, headers, body } = pluginResult.respondWith; - layers.push({ pluginName, modified: true }); - return { - params: result, - layers, - hasChanges: true, - respondWith: { - statusCode, - headers, - body: body ? Buffer.from(body) : Buffer.alloc(0), - }, - }; - } - - if ("modified" in pluginResult && pluginResult.modified === false) { - layers.push({ pluginName, modified: false }); - continue; - } - - hasChanges = true; - const nextMethod = pluginResult.meta?.method ?? result.method; - const nextUrl = pluginResult.meta?.url ?? result.url; - const nextHeaders = (pluginResult.meta?.headers as any) ?? result.headers; - const nextBody = pluginResult.body ?? result.body; - - const nextResult: RequestHookParams = { - ...result, - method: nextMethod, - url: nextUrl, - headers: addPluginProcessedHeader(nextHeaders, pluginName), - body: nextBody, - }; - - layers.push({ - pluginName, - modified: true, - method: nextResult.method, - url: nextResult.url, - headers: nextResult.headers, - bodyDataUrl: null, - }); - result = nextResult; + const allConfigs = [...this.instanceHooksConfig, ...this.forwardHooksConfig]; + if (allConfigs.length === 0) { + this._firstPluginUrl = null; + return; } - return { params: result, layers, hasChanges }; - } - - /** 执行响应 hooks 并返回层层记录 */ - async executeResponseHooksWithLayers( - params: ResponseHookParams, - bodyToDataUrl: (body: Buffer) => string | null, - getContentType: (headers: Record) => string | null, - ): Promise { - let result = params; - const layers: HookLayer[] = []; - let hasChanges = false; - - const allHooks = [...this.forwardHooksLoaded, ...this.instanceHooksLoaded]; - - for (const hook of allHooks) { - const pluginName = hook.pluginName; - const store = createPluginStore(pluginName, (hook.plugin as any).storeSchema, result.requestMeta?.headers); - const pluginResult = hook.plugin.onResponse - ? await hook.plugin.onResponse({ - meta: { statusCode: result.statusCode, statusMessage: result.statusMessage, headers: result.headers }, - body: result.body, - requestMeta: result.requestMeta, - store, - }) - : null; - - if (!pluginResult) continue; - if ("modified" in pluginResult && pluginResult.modified === false) { - layers.push({ pluginName, modified: false }); - continue; - } + let nextHop = process.env.HTTP_PROXY || process.env.http_proxy; - hasChanges = true; - const nextStatusCode = pluginResult.meta?.statusCode ?? result.statusCode; - const nextStatusMessage = pluginResult.meta?.statusMessage ?? result.statusMessage; - const nextHeaders = (pluginResult.meta?.headers as any) ?? result.headers; - const nextBody = pluginResult.body ?? result.body; - - const nextResult: ResponseHookParams = { - ...result, - statusCode: nextStatusCode, - statusMessage: nextStatusMessage, - headers: nextHeaders, - body: nextBody, - }; - - layers.push({ - pluginName, - modified: true, - statusCode: nextResult.statusCode, - statusMessage: nextResult.statusMessage, - headers: nextResult.headers, - bodyDataUrl: null, - contentType: getContentType(nextResult.headers), - }); - - result = nextResult; + for (let i = allConfigs.length - 1; i >= 0; i--) { + const config = allConfigs[i]!; + const proc = await globalHooksPool.acquire(config, nextHop); + this.pluginProcesses.unshift(proc); + nextHop = proc.url; } - return { params: result, layers, hasChanges }; - } - - /** 向后兼容:执行请求 hooks */ - async executeRequestHooks(params: RequestHookParams): Promise { - const { params: result } = await this.executeRequestHooksWithLayers( - params, - () => null, // 不需要 dataUrl - ); - return result; + this._firstPluginUrl = this.pluginProcesses[0]?.url ?? null; + console.log(`[HooksExecutor:${this.instanceName}] Hop chain: ${this.pluginProcesses.map((p) => p.pluginName).join(" -> ")} -> upstream`); } - /** 向后兼容:执行响应 hooks */ - async executeResponseHooks(params: ResponseHookParams): Promise { - const { params: result } = await this.executeResponseHooksWithLayers( - params, - () => null, - () => null, - ); - return result; + getFirstPluginUrl(): string | null { + return this._firstPluginUrl; } get hasHooks(): boolean { - return this.instanceHooksLoaded.length > 0 || this.forwardHooksLoaded.length > 0; + return this.pluginProcesses.length > 0; } get hasRequestHooks(): boolean { @@ -430,12 +286,28 @@ export class HooksExecutor { get hasResponseHooks(): boolean { return this.hasHooks; } + + async precheckRequest(): Promise { + return { + needsBuffer: false, + activePlugins: this.pluginProcesses.map((p) => p.pluginName), + canPassthrough: true, + }; + } + + async precheckResponse(): Promise { + return { + needsBuffer: false, + activePlugins: this.pluginProcesses.map((p) => p.pluginName), + canPassthrough: true, + }; + } } export function getHooksPoolStats(): { size: number } { - return { size: 0 }; + return { size: globalHooksPool.size }; } export async function stopAllHooks(): Promise { - // no-op (hooks are in-process) + await globalHooksPool.stopAll(); } diff --git a/src/proxy-server.ts b/src/proxy-server.ts index f4afef1..72e2f06 100644 --- a/src/proxy-server.ts +++ b/src/proxy-server.ts @@ -1,9 +1,8 @@ import * as http from "node:http"; import * as https from "node:https"; +import * as tls from "node:tls"; import * as fs from "node:fs"; -import * as path from "node:path"; import { URL } from "node:url"; -import { fileURLToPath } from "node:url"; import { parseArgs } from "node:util"; import createDebug from "debug"; import { @@ -17,27 +16,19 @@ import { setDataDir } from "./lib/runtime-paths"; import { initDatabase } from "./lib/db"; import { bufferToDataUrl } from "./lib/data-url"; import { handleWebSocketProxy } from "./lib/websocket-proxy"; -import { HooksExecutor, stopAllHooks, type PrecheckSummary } from "./lib/hooks-executor"; -import { streamFromBuffer, readStreamToBuffer, teeStream } from "@jixo/proxy-plugin"; -import { nodeReadableToWebStream, pipeWebStreamToNodeResponse } from "./lib/node-stream-adapter"; -import type { HooksConfig, HookLayer } from "./types/proxy"; +import { HooksExecutor, stopAllHooks } from "./lib/hooks-executor"; +import type { HooksConfig } from "./types/proxy"; import type { WorkerMessage, WorkerResponse, InstanceRuntimeConfig, - ForwardRuleConfig, } from "./types/worker-messages"; import { normalizeForwardGroups, normalizePathname } from "./lib/forward-utils"; import { createLogger, installGlobalErrorLogger } from "./lib/logger"; import { forwardStatsStore } from "./lib/forward-stats"; -/** 私有 header 前缀,这些 headers 只记录到数据库,不转发到远程服务器 */ const PRIVATE_HEADER_PREFIX = "-x-jixo-proxy-"; -/** 私有 header:原始 Proxy URL(用于插件发起回环请求,如心跳) */ -const HEADER_PROXY_URL = "-x-jixo-proxy-url"; - -/** 过滤掉私有 headers */ function stripPrivateHeaders(headers: http.IncomingHttpHeaders): http.IncomingHttpHeaders { const result: http.IncomingHttpHeaders = {}; for (const [key, value] of Object.entries(headers)) { @@ -103,14 +94,13 @@ async function main(argv: string[]) { class ProxyRequestAbortedError extends Error { readonly abortReason: AbortReason; - constructor(abortReason: AbortReason) { super("Request aborted"); this.name = "ProxyRequestAbortedError"; this.abortReason = abortReason; } } - // 在 worker 模式下,将日志透传给父线程以便 UI 实时展示 + if (parentPort) { const originalConsole = { ...console }; (["log", "info", "warn", "error"] as const).forEach((level) => { @@ -151,7 +141,6 @@ async function main(argv: string[]) { if (args.values.config) { try { const content = fs.readFileSync(args.values.config, "utf-8"); - // 读取 InstanceRuntimeConfig 格式(单一数据源) const instanceConfig = JSON.parse(content) as { name: string; headers?: Record | null; @@ -172,12 +161,10 @@ async function main(argv: string[]) { log.info( `[Config] Loaded ${forwards.length} forward rules (${enabledCount} enabled) for "${INSTANCE_NAME}"`, ); - // 详细日志:显示每个 forward 的 enabled 状态 forwards.forEach((f, idx) => { log.info(`[Config] [${idx}] ${f.name}: enabled=${f.enabled}, path=${f.path || "(default)"}`); }); - // 初始化 hooks 执行器 if (instanceHooks || forwards.some((f) => f.hooks)) { hooksExecutor = new HooksExecutor(INSTANCE_NAME, instanceHooks); hooksExecutor.start().catch((err) => { @@ -190,7 +177,6 @@ async function main(argv: string[]) { } } - /** 获取当前运行时配置 */ function getCurrentConfig(): InstanceRuntimeConfig { return { name: INSTANCE_NAME, @@ -210,22 +196,18 @@ async function main(argv: string[]) { }; } - /** 热更新配置(不重启服务器) */ async function reloadConfig(newConfig: InstanceRuntimeConfig): Promise { log.info(`[Reload] Applying new config for "${INSTANCE_NAME}"...`); - // 停止旧的 hooks executor if (hooksExecutor) { await hooksExecutor.stop(); hooksExecutor = null; } - // 更新配置 instanceHeaders = newConfig.headers; instanceHooks = newConfig.hooks; forwards = normalizeForwardGroups(newConfig.forwards.filter((f) => f)); - // 启动新的 hooks executor if (instanceHooks || forwards.some((f) => f.hooks)) { hooksExecutor = new HooksExecutor(INSTANCE_NAME, instanceHooks); await hooksExecutor.start(); @@ -233,73 +215,50 @@ async function main(argv: string[]) { const enabledCount = forwards.filter((f) => f.enabled).length; log.info( - `[Reload] Config updated: ${forwards.length} forward rules (${enabledCount} enabled), headers: ${instanceHeaders ? Object.keys(instanceHeaders).length : 0}`, + `[Reload] Config updated: ${forwards.length} forward rules (${enabledCount} enabled)`, ); - // 详细日志:显示每个 forward 的 enabled 状态 - forwards.forEach((f, idx) => { - log.info(`[Reload] [${idx}] ${f.name}: enabled=${f.enabled}, path=${f.path || "(default)"}`); - }); } - // Worker 消息处理 if (parentPort) { parentPort.addEventListener("message", async (event) => { const message: WorkerMessage = event.data; try { switch (message.type) { case "init": { - // 初始化数据目录和数据库 setDataDir(message.dataDir); await initDatabase(); - - const response: WorkerResponse = { - type: "init-result", - success: true, - }; - parentPort.postMessage(response); + parentPort.postMessage({ type: "init-result", success: true }); log.info(`[Init] Database initialized with dataDir: ${message.dataDir}`); break; } case "reload": { await reloadConfig(message.config); - const response: WorkerResponse = { - type: "reload-result", - success: true, - }; - parentPort.postMessage(response); + parentPort.postMessage({ type: "reload-result", success: true }); break; } case "get-config": { - const response: WorkerResponse = { - type: "config", - config: getCurrentConfig(), - }; - parentPort.postMessage(response); + parentPort.postMessage({ type: "config", config: getCurrentConfig() }); break; } case "ping": { - const response: WorkerResponse = { type: "pong" }; - parentPort.postMessage(response); + parentPort.postMessage({ type: "pong" }); break; } case "abort-request": { const controller = activeRequests.get(message.dbRecordId); - const success = !!controller; if (controller) { controller.abort(USER_ABORT); log.info(`[Abort] Request #${message.dbRecordId} aborted by user`); } - const response: WorkerResponse = { + parentPort.postMessage({ type: "abort-result", - success, + success: !!controller, dbRecordId: message.dbRecordId, - }; - parentPort.postMessage(response); + }); break; } case "shutdown": { log.info(`[Shutdown] Received shutdown message, closing server...`); - // server 会在后面定义,这里用 globalThis 延迟访问 const srv = globalThis.__proxyServer; if (srv) { srv.close(() => { @@ -316,44 +275,14 @@ async function main(argv: string[]) { const errorMessage = error instanceof Error ? error.message : String(error); const stack = error instanceof Error ? error.stack : undefined; - switch (message.type) { - case "init": { - const response: WorkerResponse = { - type: "init-result", - success: false, - error: errorMessage, - }; - parentPort.postMessage(response); - break; - } - case "reload": { - const response: WorkerResponse = { - type: "reload-result", - success: false, - error: errorMessage, - }; - parentPort.postMessage(response); - break; - } - case "abort-request": { - const response: WorkerResponse = { - type: "abort-result", - success: false, - dbRecordId: message.dbRecordId, - }; - parentPort.postMessage(response); - break; - } - default: { - const response: WorkerResponse = { - type: "server-error", - error: errorMessage, - stack, - port: Number(PROXY_PORT), - }; - parentPort.postMessage(response); - break; - } + if (message.type === "init") { + parentPort.postMessage({ type: "init-result", success: false, error: errorMessage }); + } else if (message.type === "reload") { + parentPort.postMessage({ type: "reload-result", success: false, error: errorMessage }); + } else if (message.type === "abort-request") { + parentPort.postMessage({ type: "abort-result", success: false, dbRecordId: message.dbRecordId }); + } else { + parentPort.postMessage({ type: "server-error", error: errorMessage, stack, port: Number(PROXY_PORT) }); } } }); @@ -366,10 +295,7 @@ async function main(argv: string[]) { return ruleMethods.map((m) => m.toUpperCase()).includes(method); } - function matchForwardRule( - requestMethod: string, - pathname: string, - ): { rule: ForwardRule; index: number } | null { + function matchForwardRule(requestMethod: string, pathname: string): { rule: ForwardRule; index: number } | null { if (forwards.length === 0) return null; const normalizedPath = normalizePathname(pathname); const method = (requestMethod || "GET").toUpperCase(); @@ -377,7 +303,6 @@ async function main(argv: string[]) { let fallback: { rule: ForwardRule; index: number } | null = null; for (let idx = 0; idx < forwards.length; idx++) { const rule = forwards[idx]!; - // 跳过禁用的规则 if (!rule.enabled) continue; if (!matchMethod(rule.methods, method)) continue; const rulePath = normalizePathname(rule.path || ""); @@ -406,8 +331,7 @@ async function main(argv: string[]) { let finalPath: string; if (rulePathRaw && incomingPath.startsWith(rulePathRaw)) { const suffix = incomingPath.slice(rulePathRaw.length) || "/"; - finalPath = - suffix === "/" ? targetBase.pathname || "/" : joinPaths(targetBase.pathname || "/", suffix); + finalPath = suffix === "/" ? targetBase.pathname || "/" : joinPaths(targetBase.pathname || "/", suffix); } else if (!rulePathRaw) { const basePath = targetBase.pathname || "/"; finalPath = basePath === "/" || basePath === "" ? incomingPath : basePath; @@ -419,10 +343,7 @@ async function main(argv: string[]) { return targetBase; } - function applyCustomHeaders( - headers: http.OutgoingHttpHeaders, - additions: Record | null | undefined, - ): void { + function applyCustomHeaders(headers: http.OutgoingHttpHeaders, additions: Record | null | undefined): void { if (!additions) return; for (const [rawKey, value] of Object.entries(additions)) { const isRegex = rawKey.startsWith("/") && rawKey.endsWith("/") && rawKey.length > 2; @@ -448,28 +369,271 @@ async function main(argv: string[]) { } let requestCounter = 0; - - // 存储活跃请求的 AbortController,用于支持请求中断 const activeRequests = new Map(); + const hopByHopHeaders = [ + "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", + "te", "trailers", "transfer-encoding", "upgrade", + ]; + + function sanitizeHeaders(headers: http.IncomingHttpHeaders): http.OutgoingHttpHeaders { + const clean: http.OutgoingHttpHeaders = { ...headers }; + for (const key of hopByHopHeaders) { + delete (clean as Record)[key]; + } + return clean; + } + + async function forwardViaProxy( + proxyUrl: string, + method: string, + targetUrl: URL, + headers: http.OutgoingHttpHeaders, + body: Buffer, + signal: AbortSignal, + ): Promise { + const proxy = new URL(proxyUrl); + const isTargetHttps = targetUrl.protocol === "https:"; + + const outHeaders: http.OutgoingHttpHeaders = { ...headers }; + outHeaders.host = targetUrl.host; + if (body.length > 0) { + outHeaders["content-length"] = body.length; + } + + const proxyPort = proxy.port ? parseInt(proxy.port, 10) : 80; + + if (isTargetHttps) { + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new ProxyRequestAbortedError(getAbortReasonFromSignal(signal))); + return; + } + + const connectReq = http.request({ + hostname: proxy.hostname, + port: proxyPort, + method: "CONNECT", + path: `${targetUrl.hostname}:${targetUrl.port || 443}`, + }); + + const handleAbort = () => { + connectReq.destroy(); + reject(new ProxyRequestAbortedError(getAbortReasonFromSignal(signal))); + }; + signal.addEventListener("abort", handleAbort, { once: true }); + + connectReq.on("connect", (res, socket) => { + signal.removeEventListener("abort", handleAbort); + if (res.statusCode !== 200) { + reject(new Error(`CONNECT failed: ${res.statusCode}`)); + return; + } + + const targetPort = targetUrl.port ? parseInt(targetUrl.port, 10) : 443; + const tlsSocket = tls.connect({ + host: targetUrl.hostname, + port: targetPort, + socket, + servername: targetUrl.hostname, + }, () => { + // Manually write HTTP request over TLS socket + const requestLine = `${method} ${targetUrl.pathname}${targetUrl.search} HTTP/1.1\r\n`; + const headerLines: string[] = []; + for (const [key, value] of Object.entries(outHeaders)) { + if (Array.isArray(value)) { + for (const v of value) { + headerLines.push(`${key}: ${v}`); + } + } else if (value !== undefined) { + headerLines.push(`${key}: ${value}`); + } + } + const httpRequest = requestLine + headerLines.join("\r\n") + "\r\n\r\n"; + + const handleAbort2 = () => { + tlsSocket.destroy(); + reject(new ProxyRequestAbortedError(getAbortReasonFromSignal(signal))); + }; + signal.addEventListener("abort", handleAbort2, { once: true }); + + tlsSocket.write(httpRequest); + if (body.length > 0) { + tlsSocket.write(body); + } + + // Parse HTTP response from TLS socket + let responseData = Buffer.alloc(0); + let headersParsed = false; + let incomingMessage: http.IncomingMessage | null = null; + + tlsSocket.on("data", (chunk: Buffer) => { + responseData = Buffer.concat([responseData, chunk]); + + if (!headersParsed) { + const headerEnd = responseData.indexOf("\r\n\r\n"); + if (headerEnd !== -1) { + headersParsed = true; + signal.removeEventListener("abort", handleAbort2); + const headerPart = responseData.subarray(0, headerEnd).toString(); + const bodyPart = responseData.subarray(headerEnd + 4); + + const lines = headerPart.split("\r\n"); + const statusLine = lines[0] || ""; + const statusMatch = statusLine.match(/HTTP\/\d\.\d (\d+) (.*)/); + const statusCode = statusMatch ? parseInt(statusMatch[1]!, 10) : 502; + const statusMessage = statusMatch ? statusMatch[2]! : ""; + + const responseHeaders: http.IncomingHttpHeaders = {}; + for (let i = 1; i < lines.length; i++) { + const colonIdx = lines[i]!.indexOf(":"); + if (colonIdx > 0) { + const key = lines[i]!.substring(0, colonIdx).toLowerCase(); + const value = lines[i]!.substring(colonIdx + 1).trim(); + responseHeaders[key] = value; + } + } + + const { Readable } = require("stream") as typeof import("stream"); + const bodyStream = new Readable({ read() {} }); + + incomingMessage = Object.assign(bodyStream, { + statusCode, + statusMessage, + headers: responseHeaders, + headersDistinct: {}, + httpVersion: "1.1", + httpVersionMajor: 1, + httpVersionMinor: 1, + complete: false, + rawHeaders: [], + trailers: {}, + trailersDistinct: {}, + rawTrailers: [], + socket: tlsSocket, + connection: tlsSocket, + aborted: false, + url: "", + method: null, + setTimeout: () => bodyStream, + }) as unknown as http.IncomingMessage; + + if (bodyPart.length > 0) { + bodyStream.push(bodyPart); + } + + resolve(incomingMessage); + } + } else if (incomingMessage) { + (incomingMessage as any).push(chunk); + } + }); + + tlsSocket.on("end", () => { + if (incomingMessage) { + (incomingMessage as any).push(null); + } + }); + + tlsSocket.on("error", (err) => { + signal.removeEventListener("abort", handleAbort2); + reject(err); + }); + }); + + tlsSocket.on("error", reject); + }); + + connectReq.on("error", reject); + connectReq.end(); + }); + } else { + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new ProxyRequestAbortedError(getAbortReasonFromSignal(signal))); + return; + } + + const req = http.request({ + hostname: proxy.hostname, + port: proxyPort, + path: targetUrl.href, + method, + headers: outHeaders, + }, resolve); + + const handleAbort = () => { + req.destroy(); + reject(new ProxyRequestAbortedError(getAbortReasonFromSignal(signal))); + }; + signal.addEventListener("abort", handleAbort, { once: true }); + req.on("error", (err) => { + signal.removeEventListener("abort", handleAbort); + reject(err); + }); + if (body.length > 0) req.write(body); + req.end(); + }); + } + } + + async function forwardDirect( + method: string, + targetUrl: URL, + headers: http.OutgoingHttpHeaders, + body: Buffer, + signal: AbortSignal, + ): Promise { + const isHttps = targetUrl.protocol === "https:"; + const requestModule = isHttps ? https : http; + const defaultPort = isHttps ? 443 : 80; + + const outHeaders: http.OutgoingHttpHeaders = { ...headers }; + outHeaders.host = targetUrl.host; + if (body.length > 0) { + outHeaders["content-length"] = body.length; + } + + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new ProxyRequestAbortedError(getAbortReasonFromSignal(signal))); + return; + } + + const req = requestModule.request({ + hostname: targetUrl.hostname, + port: targetUrl.port || defaultPort, + path: targetUrl.pathname + targetUrl.search, + method, + headers: outHeaders, + }, resolve); + + const handleAbort = () => { + req.destroy(); + reject(new ProxyRequestAbortedError(getAbortReasonFromSignal(signal))); + }; + signal.addEventListener("abort", handleAbort, { once: true }); + req.on("error", (err) => { + signal.removeEventListener("abort", handleAbort); + reject(err); + }); + if (body.length > 0) req.write(body); + req.end(); + }); + } + const server = http.createServer(async (req, res) => { const startTime = Date.now(); const requestId = `${++requestCounter}`; const timestamp = new Date().toISOString(); - // 创建 AbortController 用于级联取消整个请求链路 const abortController = new AbortController(); const { signal: abortSignal } = abortController; let isClientDisconnected = false; - let didStreamResponseToClient = false; - // 监听上游客户端断开连接 const handleClientClose = () => { if (!res.writableEnded && !isClientDisconnected) { - // 如果是用户主动 abort 导致的下游关闭,不应被视为 client_disconnect - if (abortSignal.aborted && getAbortReasonFromSignal(abortSignal) === USER_ABORT) { - return; - } + if (abortSignal.aborted && getAbortReasonFromSignal(abortSignal) === USER_ABORT) return; isClientDisconnected = true; abortController.abort(CLIENT_DISCONNECT); log.info(`[Abort] Client disconnected for request #${requestId}`); @@ -492,736 +656,210 @@ async function main(argv: string[]) { return; } - const candidateIndexes: number[] = forwards - .map((rule, idx) => ({ rule, idx })) - .filter(({ rule }) => { - // 跳过禁用的规则 - if (!rule.enabled) return false; - if (rule.name !== matched.rule.name) return false; - if (!matchMethod(rule.methods, method)) return false; - const rulePath = normalizePathname(rule.path || ""); - const normalizedPath = normalizePathname(requestUrl.pathname); - if (!rule.path || rule.path.trim() === "") return true; - return normalizedPath.startsWith(rulePath); - }) - .map(({ idx }) => idx); + const forwardRule = matched.rule; - if (candidateIndexes.length === 0) { - candidateIndexes.push(matched.index); + // 设置 forward hooks(重建 hop 链) + if (hooksExecutor && (instanceHooks || forwardRule.hooks)) { + try { + await hooksExecutor.setForwardHooks(forwardRule.name, forwardRule.hooks ?? null); + } catch (err) { + console.error("[Hooks] Failed to set forward hooks:", err); + } } + const targetUrl = buildTargetUrl(forwardRule, requestUrl); + const forwardHeaders: http.OutgoingHttpHeaders = { ...stripPrivateHeaders(req.headers) }; + forwardHeaders.host = targetUrl.host; + applyCustomHeaders(forwardHeaders, instanceHeaders); + applyCustomHeaders(forwardHeaders, forwardRule.headers ?? null); + const requestBodyChunks: Buffer[] = []; for await (const chunk of req) { requestBodyChunks.push(chunk as Buffer); } - const originalRequestBody = Buffer.concat(requestBodyChunks); + const requestBody = Buffer.concat(requestBodyChunks); const requestContentType = (req.headers["content-type"] as string) || null; - const originalRequestBodyDataUrl = - originalRequestBody.length > 0 - ? bufferToDataUrl(originalRequestBody, requestContentType) - : null; - - const hopByHopKeys = [ - "connection", - "keep-alive", - "proxy-authenticate", - "proxy-authorization", - "te", - "trailers", - "transfer-encoding", - "upgrade", - ]; - - const sanitizeResponseHeadersForStreaming = ( - headers: http.IncomingHttpHeaders, - ): http.OutgoingHttpHeaders => { - const clean: http.OutgoingHttpHeaders = { ...headers }; - for (const key of hopByHopKeys) { - delete (clean as Record)[key as string]; - } - return clean; - }; - - const sanitizeResponseHeadersForBuffered = ( - headers: http.IncomingHttpHeaders, - bodyLength: number, - ): http.OutgoingHttpHeaders => { - const clean = sanitizeResponseHeadersForStreaming(headers); - clean["content-length"] = bodyLength; - return clean; - }; - - let dbRecordId: number | null = null; - let finalResult: { - statusCode: number; - statusMessage: string; - headers: http.OutgoingHttpHeaders; - bodyBuffer: Buffer; - contentType: string | null; - ttfbMs: number; - bodyMs: number; - errorMessage?: string; - abortReason?: AbortReason; - forwardRule: ForwardRule; - hasResponseHookChanges?: boolean; - responseHookLayers?: HookLayer[]; - originalStatusCode?: number; - originalStatusMessage?: string; - originalHeaders?: http.IncomingHttpHeaders; - originalBodyBuffer?: Buffer; - originalContentType?: string | null; - } | null = null; - - for (let i = 0; i < candidateIndexes.length; i++) { - const ruleIndex = candidateIndexes[i]; - const forwardRule = forwards[ruleIndex as number]!; - - if (hooksExecutor && (instanceHooks || forwardRule.hooks)) { - try { - await hooksExecutor.setForwardHooks(forwardRule.name, forwardRule.hooks ?? null); - } catch (err) { - console.error("[Hooks] Failed to set forward hooks:", err); - } - } - - let targetUrl = buildTargetUrl(forwardRule, requestUrl); - // 过滤掉私有 headers(x-proxy-* 前缀),这些只记录到数据库,不转发 - const forwardHeaders: http.OutgoingHttpHeaders = { ...stripPrivateHeaders(req.headers) }; - forwardHeaders.host = targetUrl.host; - applyCustomHeaders(forwardHeaders, instanceHeaders); - applyCustomHeaders(forwardHeaders, forwardRule.headers ?? null); - - let hookedMethod = method; - let hookedTargetUrl = targetUrl; - let hookedRequestBody: Buffer = originalRequestBody; - let hookedForwardHeaders: http.OutgoingHttpHeaders = { ...forwardHeaders }; - let hasRequestHookChanges = false; - let requestHookLayers: HookLayer[] | undefined; - - if (hooksExecutor?.hasRequestHooks) { - try { - // 添加 proxy URL header,让插件知道如何发起回环请求(如心跳) - const headersForHooks: Record = { - ...(hookedForwardHeaders as Record), - [HEADER_PROXY_URL]: requestUrl.href, - }; - const hookExecResult = await hooksExecutor.executeRequestHooksWithLayers( - { - method, - url: targetUrl.href, - headers: headersForHooks, - body: streamFromBuffer(hookedRequestBody), - signal: abortSignal, - }, - (body) => body.length > 0 ? bufferToDataUrl(body, requestContentType) : null, - ); - // 检查是否是 respondWith - 短路请求,直接返回响应 - if (hookExecResult.respondWith) { - const { statusCode, headers, body } = hookExecResult.respondWith; - res.writeHead(statusCode, headers as http.OutgoingHttpHeaders); - if (body && body.length > 0) { - res.end(body); - } else { - res.end(); - } - return; - } - - const hookResult = hookExecResult.params; - hasRequestHookChanges = hookExecResult.hasChanges; - requestHookLayers = hookExecResult.layers.length > 0 ? hookExecResult.layers : undefined; - - if (hasRequestHookChanges) { - hookedMethod = hookResult.method; - hookedTargetUrl = new URL(hookResult.url); - hookedForwardHeaders = hookResult.headers as http.OutgoingHttpHeaders; - hookedForwardHeaders.host = hookedTargetUrl.host; - hookedRequestBody = await readStreamToBuffer(hookResult.body); - } - } catch (err) { - console.error("[Hooks] Request hook error:", err); - } - } + const requestBodyDataUrl = requestBody.length > 0 + ? bufferToDataUrl(requestBody, requestContentType) + : null; + + const dbRecordId = createProxyRequest({ + request_id: requestId, + timestamp, + instance_name: INSTANCE_NAME, + forward_name: forwardRule.name, + forward_id: forwardRule.id, + group_name: `${INSTANCE_NAME}/${forwardRule.name}`, + status: "pending", + abort_reason: null, + is_websocket: false, + websocket_direction: null, + error_message: null, + request: { + method, + url: requestUrl.href, + headers: req.headers as Record, + forwardedHeaders: forwardHeaders as Record, + targetUrl: targetUrl.href, + bodyDataUrl: requestBodyDataUrl, + bodySize: requestBody.length, + }, + response: undefined, + }); - const hookedRequestBodyDataUrl = - hookedRequestBody.length > 0 - ? bufferToDataUrl(hookedRequestBody, requestContentType) - : null; + activeRequests.set(dbRecordId, abortController); + debugNotifier("about to notify insert, dbRecordId: %d", dbRecordId); + dbNotifier.notify("insert", "requests", dbRecordId); - // 将 hooked body 填充到 requestHookLayers 的最后一个 modified=true 的 layer - if (requestHookLayers && hookedRequestBodyDataUrl) { - for (let i = requestHookLayers.length - 1; i >= 0; i--) { - if (requestHookLayers[i]!.modified) { - requestHookLayers[i]!.bodyDataUrl = hookedRequestBodyDataUrl; - break; - } - } - } + const attemptStart = Date.now(); + let proxyResponse: http.IncomingMessage; - if (hookedForwardHeaders["content-length"] !== undefined) { - if (hookedRequestBody.length > 0) { - hookedForwardHeaders["content-length"] = hookedRequestBody.length; - } else { - delete hookedForwardHeaders["content-length"]; - } + try { + const pluginProxyUrl = hooksExecutor?.getFirstPluginUrl(); + + if (pluginProxyUrl) { + // 通过插件链转发 + proxyResponse = await forwardViaProxy( + pluginProxyUrl, + method, + targetUrl, + forwardHeaders, + requestBody, + abortSignal, + ); + } else { + // 直连 + proxyResponse = await forwardDirect( + method, + targetUrl, + forwardHeaders, + requestBody, + abortSignal, + ); } - - if (dbRecordId === null) { - dbRecordId = createProxyRequest({ - request_id: requestId, - timestamp, - instance_name: INSTANCE_NAME, - forward_name: forwardRule.name, - forward_id: forwardRule.id, - group_name: `${INSTANCE_NAME}/${forwardRule.name}`, - status: "pending", - abort_reason: null, - is_websocket: false, - websocket_direction: null, - error_message: null, - request: { - method, - url: requestUrl.href, - headers: req.headers as Record, - forwardedHeaders: forwardHeaders as Record, - targetUrl: targetUrl.href, - bodyDataUrl: originalRequestBodyDataUrl, - bodySize: originalRequestBody.length, - }, - hookedRequest: hasRequestHookChanges - ? { - method: hookedMethod, - url: hookedTargetUrl.href, - headers: hookedForwardHeaders as Record, - bodyDataUrl: hookedRequestBodyDataUrl, - bodySize: hookedRequestBody.length, - } - : undefined, - requestHookLayers, - response: undefined, - }); - // 注册到活跃请求 Map,支持外部中断 - activeRequests.set(dbRecordId, abortController); - debugNotifier("about to notify insert, dbRecordId: %d", dbRecordId); - dbNotifier.notify("insert", "requests", dbRecordId); - debugNotifier("notify insert completed"); + } catch (error) { + const ttfbMs = Date.now() - attemptStart; + let statusCode = 502; + let errorMessage = error instanceof Error ? error.message : String(error); + let abortReason: AbortReason | undefined; + + if (error instanceof ProxyRequestAbortedError) { + statusCode = 499; + abortReason = error.abortReason; + errorMessage = abortReason === USER_ABORT ? "Request aborted by user" : "Client disconnected"; } - const attemptStart = Date.now(); - const isHttps = hookedTargetUrl.protocol === "https:"; - const requestModule = isHttps ? https : http; - const defaultPort = isHttps ? 443 : 80; - - const attemptResult = await new Promise<{ - statusCode: number; - statusMessage: string; - headers: http.OutgoingHttpHeaders; - bodyBuffer: Buffer; - contentType: string | null; - errorMessage?: string; - abortReason?: AbortReason; - ttfbMs: number; - bodyMs: number; - hasResponseHookChanges?: boolean; - responseHookLayers?: HookLayer[]; - originalStatusCode?: number; - originalStatusMessage?: string; - originalHeaders?: http.IncomingHttpHeaders; - originalBodyBuffer?: Buffer; - originalContentType?: string | null; - }>((resolve, reject) => { - // 检查是否已经被中断 - if (abortSignal.aborted) { - reject(new ProxyRequestAbortedError(getAbortReasonFromSignal(abortSignal))); - return; - } - - let proxyResRef: http.IncomingMessage | null = null; - const proxyReq = requestModule.request( - { - hostname: hookedTargetUrl.hostname, - port: hookedTargetUrl.port || defaultPort, - path: hookedTargetUrl.pathname + hookedTargetUrl.search, - method: hookedMethod, - headers: hookedForwardHeaders, - }, - async (proxyRes) => { - proxyResRef = proxyRes; - // TTFB: 收到响应头的时间 - const responseStartTime = Date.now(); - const ttfbMs = responseStartTime - attemptStart; - - const responseChunks: Buffer[] = []; - - let responseHeaders = { ...proxyRes.headers }; - let statusCode = proxyRes.statusCode || 502; - let statusMessage = proxyRes.statusMessage || ""; - - // Streaming rule: - // - If we might retry (failover), we must not stream. - // - Otherwise we can stream even with hooks (hooks are streaming-native). - const isFailureStatus = statusCode >= 400 && statusCode <= 599; - const hasMoreCandidates = i < candidateIndexes.length - 1; - const shouldRetryOnFailure = isFailureStatus && hasMoreCandidates; - const shouldStreamToClient = !shouldRetryOnFailure; - - // 流式进度更新(每秒最多更新一次) - let totalReceivedBytes = 0; - let lastProgressUpdate = 0; - const PROGRESS_THROTTLE_MS = 1000; - - if (shouldStreamToClient && !didStreamResponseToClient && !isClientDisconnected) { - didStreamResponseToClient = true; - - try { - const upstreamStream = nodeReadableToWebStream(proxyRes); - const requestMeta = { - method: hookedMethod, - url: hookedTargetUrl.href, - headers: hookedForwardHeaders as Record, - }; - - // 预检:决定是否需要处理响应 body - let responsePrecheckResult: PrecheckSummary = { needsBuffer: false, activePlugins: [], canPassthrough: true }; - if (hooksExecutor?.hasResponseHooks) { - responsePrecheckResult = await hooksExecutor.precheckResponse( - { statusCode, statusMessage, headers: responseHeaders as Record }, - requestMeta, - ); - } - - let outStatusCode = statusCode; - let outStatusMessage = statusMessage; - let outHeaders: http.IncomingHttpHeaders = { ...responseHeaders }; - let outStream: ReadableStream; - let storageStream: ReadableStream | null = null; - let hasResponseHookChanges = false; - let responseHookLayers: HookLayer[] | undefined; - const originalChunks: Buffer[] = []; - - if (responsePrecheckResult.needsBuffer && hooksExecutor?.hasResponseHooks) { - // 需要处理:tee stream,一份存储原始,一份给 hook - const { left: forStorage, right: forHooks } = teeStream(upstreamStream); - storageStream = forStorage; - - const hookExecResult = await hooksExecutor.executeResponseHooksWithLayers( - { - statusCode, - statusMessage, - headers: responseHeaders as Record, - body: forHooks, - signal: abortSignal, - requestMeta, - }, - () => null, - (headers) => (headers["content-type"] as string) ?? null, - ); - const hookResult = hookExecResult.params; - hasResponseHookChanges = hookExecResult.hasChanges; - responseHookLayers = hookExecResult.layers.length > 0 ? hookExecResult.layers : undefined; - - outStatusCode = hookResult.statusCode; - outStatusMessage = hookResult.statusMessage; - outHeaders = hookResult.headers as http.IncomingHttpHeaders; - outStream = hookResult.body; - } else { - // 不需要处理:直接透传 - outStream = upstreamStream; - } - - const streamingHeaders = sanitizeResponseHeadersForStreaming(outHeaders); - res.writeHead(outStatusCode, outStatusMessage, streamingHeaders); - - // 并行:pipe 到客户端 + 收集原始数据(如果有 storageStream) - const storagePromise = storageStream - ? (async () => { - const reader = storageStream!.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (value) originalChunks.push(Buffer.from(value)); - } - })() - : Promise.resolve(); - - await pipeWebStreamToNodeResponse({ - stream: outStream, - res, - onChunk: (chunk) => { - responseChunks.push(Buffer.from(chunk)); - totalReceivedBytes += chunk.byteLength; - - const now = Date.now(); - if (dbRecordId !== null && now - lastProgressUpdate >= PROGRESS_THROTTLE_MS) { - lastProgressUpdate = now; - updateStreamingProgress( - dbRecordId, - totalReceivedBytes, - ttfbMs, - outStatusCode, - outStatusMessage, - outHeaders as Record, - ); - dbNotifier.notify("update", "requests", dbRecordId); - } - }, - }); - - await storagePromise; - - const bodyMs = Date.now() - responseStartTime; - const bodyBuffer = Buffer.concat(responseChunks); - const originalBodyBuffer = originalChunks.length > 0 ? Buffer.concat(originalChunks) : undefined; - const contentType = (outHeaders["content-type"] as string) ?? null; - const originalContentType = (responseHeaders["content-type"] as string) ?? null; - const cleanedHeaders = sanitizeResponseHeadersForBuffered(outHeaders, bodyBuffer.length); - - resolve({ - statusCode: outStatusCode, - statusMessage: outStatusMessage, - headers: cleanedHeaders, - bodyBuffer, - contentType, - ttfbMs, - bodyMs, - hasResponseHookChanges, - responseHookLayers, - originalStatusCode: hasResponseHookChanges ? statusCode : undefined, - originalStatusMessage: hasResponseHookChanges ? statusMessage : undefined, - originalHeaders: hasResponseHookChanges ? responseHeaders : undefined, - originalBodyBuffer, - originalContentType: hasResponseHookChanges ? originalContentType : undefined, - }); - return; - } catch (error) { - if (!res.writableEnded) { - res.destroy(error as Error); - } - const bodyMs = Date.now() - responseStartTime; - resolve({ - statusCode: 502, - statusMessage: "Bad Gateway", - headers: { "content-type": "application/json" }, - bodyBuffer: Buffer.alloc(0), - contentType: "application/json", - errorMessage: error instanceof Error ? error.message : String(error), - ttfbMs, - bodyMs, - }); - return; - } - } - - proxyRes.on("data", (chunk: Buffer) => { - responseChunks.push(Buffer.from(chunk)); - totalReceivedBytes += chunk.length; - - const now = Date.now(); - if (dbRecordId !== null && now - lastProgressUpdate >= PROGRESS_THROTTLE_MS) { - lastProgressUpdate = now; - updateStreamingProgress( - dbRecordId, - totalReceivedBytes, - ttfbMs, - statusCode, - statusMessage, - responseHeaders as Record, - ); - dbNotifier.notify("update", "requests", dbRecordId); - } - }); - - proxyRes.on("end", async () => { - const bodyMs = Date.now() - responseStartTime; - let bodyBuffer: Buffer = Buffer.concat(responseChunks); - let contentType = (responseHeaders["content-type"] as string) ?? null; - - // 保存原始响应数据(用于与 hooked 对比) - const originalStatusCode = statusCode; - const originalStatusMessage = statusMessage; - const originalHeaders = { ...responseHeaders }; - const originalBodyBuffer = bodyBuffer; - const originalContentType = contentType; - let hasResponseHookChanges = false; - let responseHookLayers: HookLayer[] | undefined; - - if (hooksExecutor?.hasResponseHooks) { - try { - const hookExecResult = await hooksExecutor.executeResponseHooksWithLayers( - { - statusCode, - statusMessage, - headers: responseHeaders as Record, - body: streamFromBuffer(bodyBuffer), - signal: abortSignal, - // 传递请求元数据给响应 hooks - requestMeta: { - method: hookedMethod, - url: hookedTargetUrl.href, - headers: hookedForwardHeaders as Record, - }, - }, - (body) => body.length > 0 ? bufferToDataUrl(body, contentType) : null, - (headers) => (headers["content-type"] as string) ?? null, - ); - const hookResult = hookExecResult.params; - hasResponseHookChanges = hookExecResult.hasChanges; - responseHookLayers = hookExecResult.layers.length > 0 ? hookExecResult.layers : undefined; - - if (hasResponseHookChanges) { - statusCode = hookResult.statusCode; - statusMessage = hookResult.statusMessage; - responseHeaders = hookResult.headers as http.IncomingHttpHeaders; - bodyBuffer = await readStreamToBuffer(hookResult.body); - } - contentType = (responseHeaders["content-type"] as string) ?? contentType; - } catch (err) { - console.error("[Hooks] Response hook error:", err); - } - } - - const cleanedHeaders = sanitizeResponseHeadersForBuffered(responseHeaders, bodyBuffer.length); - - resolve({ - statusCode, - statusMessage, - headers: cleanedHeaders, - bodyBuffer, - contentType, - ttfbMs, - bodyMs, - // 原始响应数据(如果有 hook 变更) - hasResponseHookChanges, - responseHookLayers, - originalStatusCode: hasResponseHookChanges ? originalStatusCode : undefined, - originalStatusMessage: hasResponseHookChanges ? originalStatusMessage : undefined, - originalHeaders: hasResponseHookChanges ? originalHeaders : undefined, - originalBodyBuffer: hasResponseHookChanges ? originalBodyBuffer : undefined, - originalContentType: hasResponseHookChanges ? originalContentType : undefined, - }); - }); - - // 响应流传输过程中出错(如上游中途断开),error 和 end 互斥,必须处理 - proxyRes.on("error", (error) => { - const errorTtfb = Date.now() - attemptStart; - const errorBody = Buffer.from( - JSON.stringify({ - error: "上游响应流错误", - message: error.message, - }), - ); - // 如果已经开始流式写回,则无法再发送一个 502 body,只能直接断开下游连接 - if (didStreamResponseToClient && !res.writableEnded) { - res.destroy(error); - } - resolve({ - statusCode: 502, - statusMessage: "Bad Gateway", - headers: { - "content-type": "application/json", - "content-length": errorBody.length, - }, - bodyBuffer: errorBody, - contentType: "application/json", - errorMessage: error.message, - ttfbMs: errorTtfb, - bodyMs: 0, - }); - }); - }, - ); - - const cleanup = () => { - abortSignal.removeEventListener("abort", handleAbort); - }; - - proxyReq.on("error", (error) => { - cleanup(); - const errorTtfb = Date.now() - attemptStart; - const errorBody = Buffer.from( - JSON.stringify({ - error: "代理请求失败", - message: error.message, - }), - ); - resolve({ - statusCode: 502, - statusMessage: "Bad Gateway", - headers: { - "content-type": "application/json", - "content-length": errorBody.length, - }, - bodyBuffer: errorBody, - contentType: "application/json", - errorMessage: error.message, - ttfbMs: errorTtfb, - bodyMs: 0, - }); - }); - - // 监听 abort 信号,中断代理请求 - const handleAbort = () => { - const abortReason = getAbortReasonFromSignal(abortSignal); - proxyReq.destroy(); - proxyResRef?.destroy(); - // 如果已经开始写回给客户端,需要及时关闭下游连接,避免悬挂 - if (didStreamResponseToClient && !res.writableEnded) { - res.destroy(); - } - cleanup(); - reject(new ProxyRequestAbortedError(abortReason)); - }; - abortSignal.addEventListener("abort", handleAbort); - - // 如果在绑定 listener 之前就已触发 abort,确保立刻中断 - if (abortSignal.aborted) { - handleAbort(); - return; - } - - if (hookedRequestBody.length > 0) proxyReq.write(hookedRequestBody); - proxyReq.end(); - }).catch((err) => { - // 处理中断异常 - if (err instanceof ProxyRequestAbortedError) { - const statusMessage = - err.abortReason === USER_ABORT ? "User Aborted Request" : "Client Closed Request"; - const errorMessage = - err.abortReason === USER_ABORT ? "Request aborted by user" : "Client disconnected"; - return { - statusCode: 499, - statusMessage, - headers: {} as http.OutgoingHttpHeaders, - bodyBuffer: Buffer.alloc(0), - contentType: null, - errorMessage, - abortReason: err.abortReason, - ttfbMs: Date.now() - attemptStart, - bodyMs: 0, - }; - } - throw err; + updateProxyRequest(dbRecordId, { + status: abortReason ? "aborted" : "error", + abort_reason: abortReason ?? null, + error_message: errorMessage, + response: { + statusCode, + statusMessage: abortReason ? "Aborted" : "Bad Gateway", + headers: {}, + bodyDataUrl: null, + bodySize: 0, + ttfbMs, + bodyMs: 0, + contentType: null, + }, }); + dbNotifier.notify("update", "requests", dbRecordId); + activeRequests.delete(dbRecordId); - const wasAborted = attemptResult.abortReason !== undefined; - - // 上报统计数据 - const isFailureStatus = attemptResult.statusCode >= 400 && attemptResult.statusCode <= 599; - const success = !attemptResult.errorMessage && !isFailureStatus; - - if (!wasAborted) { - forwardStatsStore.sendReport(forwardRule.id, startTime, attemptResult.ttfbMs, success); + if (!isClientDisconnected && !res.headersSent) { + res.writeHead(statusCode, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: errorMessage })); } - - finalResult = { - ...attemptResult, - forwardRule, - }; - - if (wasAborted) { - break; - } - - if (!isFailureStatus) { - break; - } - - const hasMore = i < candidateIndexes.length - 1; - if (!hasMore) break; - } - - if (!finalResult || !finalResult.forwardRule || dbRecordId === null) { - res.writeHead(500, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ error: "No available forward result" })); return; } - // 构建响应体 data URL - const responseBodyDataUrl = - finalResult.bodyBuffer.length > 0 - ? bufferToDataUrl(finalResult.bodyBuffer, finalResult.contentType ?? undefined) - : null; - - // 如果有 response hook 变更,保存原始响应和 hooked 响应 - const originalResponseBodyDataUrl = - finalResult.hasResponseHookChanges && finalResult.originalBodyBuffer - ? finalResult.originalBodyBuffer.length > 0 - ? bufferToDataUrl(finalResult.originalBodyBuffer, finalResult.originalContentType ?? undefined) - : null - : null; - - const isRequestAborted = finalResult.abortReason !== undefined; - updateProxyRequest(dbRecordId, { - status: isRequestAborted ? "aborted" : finalResult.errorMessage ? "error" : "completed", - abort_reason: isRequestAborted ? finalResult.abortReason! : null, - error_message: finalResult.errorMessage ?? null, - forward_name: finalResult.forwardRule.name, - // 原始响应(如果有 hook 则是原始数据,否则就是最终数据) - response: finalResult.hasResponseHookChanges - ? { - statusCode: finalResult.originalStatusCode!, - statusMessage: finalResult.originalStatusMessage!, - headers: finalResult.originalHeaders as Record, - bodyDataUrl: originalResponseBodyDataUrl, - bodySize: finalResult.originalBodyBuffer?.length ?? 0, - ttfbMs: finalResult.ttfbMs, - bodyMs: finalResult.bodyMs, - contentType: finalResult.originalContentType ?? null, - } - : { - statusCode: finalResult.statusCode, - statusMessage: finalResult.statusMessage, - headers: finalResult.headers as Record, - bodyDataUrl: responseBodyDataUrl, - bodySize: finalResult.bodyBuffer.length, - ttfbMs: finalResult.ttfbMs, - bodyMs: finalResult.bodyMs, - contentType: finalResult.contentType ?? null, - }, - // Hooked 响应(仅当有变更时) - hookedResponse: finalResult.hasResponseHookChanges - ? { - statusCode: finalResult.statusCode, - statusMessage: finalResult.statusMessage, - headers: finalResult.headers as Record, - bodyDataUrl: responseBodyDataUrl, - bodySize: finalResult.bodyBuffer.length, - ttfbMs: finalResult.ttfbMs, - bodyMs: finalResult.bodyMs, - contentType: finalResult.contentType ?? null, - } - : undefined, - // 将 hooked body 填充到 responseHookLayers 的最后一个 modified=true 的 layer - responseHookLayers: (() => { - const layers = finalResult.responseHookLayers; - if (layers && responseBodyDataUrl && finalResult.hasResponseHookChanges) { - for (let i = layers.length - 1; i >= 0; i--) { - if (layers[i]!.modified) { - layers[i]!.bodyDataUrl = responseBodyDataUrl; - break; - } - } - } - return layers; - })(), + const ttfbMs = Date.now() - attemptStart; + const statusCode = proxyResponse.statusCode || 502; + const statusMessage = proxyResponse.statusMessage || ""; + const responseHeaders = sanitizeHeaders(proxyResponse.headers); + const contentType = (proxyResponse.headers["content-type"] as string) ?? null; + + res.writeHead(statusCode, statusMessage, responseHeaders); + + const responseChunks: Buffer[] = []; + let totalReceivedBytes = 0; + let lastProgressUpdate = 0; + const PROGRESS_THROTTLE_MS = 1000; + + proxyResponse.on("data", (chunk: Buffer) => { + responseChunks.push(chunk); + totalReceivedBytes += chunk.length; + res.write(chunk); + + const now = Date.now(); + if (now - lastProgressUpdate >= PROGRESS_THROTTLE_MS) { + lastProgressUpdate = now; + updateStreamingProgress( + dbRecordId, + totalReceivedBytes, + ttfbMs, + statusCode, + statusMessage, + proxyResponse.headers as Record, + ); + dbNotifier.notify("update", "requests", dbRecordId); + } }); - dbNotifier.notify("update", "requests", dbRecordId); - // 清理活跃请求 Map - activeRequests.delete(dbRecordId); + proxyResponse.on("end", () => { + res.end(); + const bodyMs = Date.now() - attemptStart - ttfbMs; + const responseBody = Buffer.concat(responseChunks); + const responseBodyDataUrl = responseBody.length > 0 + ? bufferToDataUrl(responseBody, contentType) + : null; - // 如果已经被中断,不再发送响应 - if (isClientDisconnected) { - return; - } + updateProxyRequest(dbRecordId, { + status: "completed", + abort_reason: null, + error_message: null, + response: { + statusCode, + statusMessage, + headers: proxyResponse.headers as Record, + bodyDataUrl: responseBodyDataUrl, + bodySize: responseBody.length, + ttfbMs, + bodyMs, + contentType, + }, + }); + dbNotifier.notify("update", "requests", dbRecordId); + activeRequests.delete(dbRecordId); - // 如果上游响应已经在请求过程中被流式写回,则这里不再重复发送 - if (didStreamResponseToClient) { - return; - } + const isFailure = statusCode >= 400; + forwardStatsStore.sendReport(forwardRule.id, startTime, ttfbMs, !isFailure); + }); - res.writeHead(finalResult.statusCode, finalResult.statusMessage, finalResult.headers); - res.end(finalResult.bodyBuffer); + proxyResponse.on("error", (error) => { + const bodyMs = Date.now() - attemptStart - ttfbMs; + if (!res.writableEnded) { + res.destroy(error); + } + updateProxyRequest(dbRecordId, { + status: "error", + abort_reason: null, + error_message: error.message, + response: { + statusCode: 502, + statusMessage: "Bad Gateway", + headers: {}, + bodyDataUrl: null, + bodySize: 0, + ttfbMs, + bodyMs, + contentType: null, + }, + }); + dbNotifier.notify("update", "requests", dbRecordId); + activeRequests.delete(dbRecordId); + }); }); server.on("upgrade", (req, socket, head) => { @@ -1239,7 +877,6 @@ async function main(argv: string[]) { } const targetUrl = buildTargetUrl(matched.rule, requestUrl); - handleWebSocketProxy(req, socket, head, targetUrl, INSTANCE_NAME, matched.rule.name, matched.rule.id); }); @@ -1257,15 +894,12 @@ async function main(argv: string[]) { dbNotifier.init(); forwardStatsStore.init(); - - // 注册 server 到 globalThis,以便 shutdown 消息处理可以访问 globalThis.__proxyServer = server; server.listen(PROXY_PORT, () => { console.log(`Proxy running on http://localhost:${PROXY_PORT} (instance: ${INSTANCE_NAME})`); }); - // 优雅关闭:当 Worker 被 terminate 时,关闭 server 并释放端口 process.on("SIGTERM", () => { log.info(`[Shutdown] Received SIGTERM, closing server...`); server.close(() => { diff --git a/tests/hooks-executor-config.test.ts b/tests/hooks-executor-config.test.ts index 4aa854b..b572009 100644 --- a/tests/hooks-executor-config.test.ts +++ b/tests/hooks-executor-config.test.ts @@ -1,38 +1,206 @@ -import { describe, it, expect } from "bun:test"; -import { HooksExecutor } from "../src/lib/hooks-executor"; -import { streamFromBuffer, readStreamToBuffer } from "@jixo/proxy-plugin"; - -describe("HooksExecutor config injection", () => { - it("should pass hook.config into plugin factory", async () => { - const hooks = [ - { - type: "http" as const, - command: "bunx", - args: ["@jixo/proxy-plugin-anthropic4droid"], - config: { - model: "gemini-claude-opus-4-5-thinking", - }, - }, +import { describe, it, expect, beforeEach, afterEach } from "bun:test"; +import { HooksExecutor, getHooksPoolStats, stopAllHooks } from "../src/lib/hooks-executor"; +import type { HookConfig } from "../src/types/proxy"; + +describe("HooksExecutor", () => { + afterEach(async () => { + await stopAllHooks(); + }); + + describe("initialization", () => { + it("should initialize with empty hooks", async () => { + const executor = new HooksExecutor("test-instance", null); + await executor.start(); + + expect(executor.hasHooks).toBe(false); + expect(executor.hasRequestHooks).toBe(false); + expect(executor.hasResponseHooks).toBe(false); + expect(executor.getFirstPluginUrl()).toBe(null); + + await executor.stop(); + }); + + it("should initialize with undefined hooks", async () => { + const executor = new HooksExecutor("test-instance", undefined); + await executor.start(); + + expect(executor.hasHooks).toBe(false); + expect(executor.getFirstPluginUrl()).toBe(null); + + await executor.stop(); + }); + + it("should initialize with empty array hooks", async () => { + const executor = new HooksExecutor("test-instance", []); + await executor.start(); + + expect(executor.hasHooks).toBe(false); + expect(executor.getFirstPluginUrl()).toBe(null); + + await executor.stop(); + }); + }); + + describe("setForwardHooks", () => { + it("should update forward hooks without restarting", async () => { + const executor = new HooksExecutor("test-instance", null); + await executor.start(); + + expect(executor.hasHooks).toBe(false); + + // Set null forward hooks should not change state + await executor.setForwardHooks("forward-1", null); + expect(executor.hasHooks).toBe(false); + + // Set empty forward hooks should not change state + await executor.setForwardHooks("forward-2", []); + expect(executor.hasHooks).toBe(false); + + await executor.stop(); + }); + + it("should skip disabled hooks", async () => { + const disabledHook: HookConfig = { + type: "http", + command: "echo", + args: ["test"], + disabled: true, + }; + + const executor = new HooksExecutor("test-instance", [disabledHook]); + await executor.start(); + + expect(executor.hasHooks).toBe(false); + expect(executor.getFirstPluginUrl()).toBe(null); + + await executor.stop(); + }); + }); + + describe("precheck methods", () => { + it("should return passthrough precheck for empty hooks", async () => { + const executor = new HooksExecutor("test-instance", null); + await executor.start(); + + const requestPrecheck = await executor.precheckRequest(); + expect(requestPrecheck.needsBuffer).toBe(false); + expect(requestPrecheck.canPassthrough).toBe(true); + expect(requestPrecheck.activePlugins).toEqual([]); + + const responsePrecheck = await executor.precheckResponse(); + expect(responsePrecheck.needsBuffer).toBe(false); + expect(responsePrecheck.canPassthrough).toBe(true); + expect(responsePrecheck.activePlugins).toEqual([]); + + await executor.stop(); + }); + }); + + describe("lifecycle", () => { + it("should start and stop cleanly", async () => { + const executor = new HooksExecutor("test-instance", null); + + await executor.start(); + expect(executor.hasHooks).toBe(false); + + await executor.stop(); + expect(executor.hasHooks).toBe(false); + }); + + it("should stop multiple times without error", async () => { + const executor = new HooksExecutor("test-instance", null); + + await executor.start(); + await executor.stop(); + await executor.stop(); + await executor.stop(); + + expect(executor.hasHooks).toBe(false); + }); + }); +}); + +describe("HooksPool", () => { + afterEach(async () => { + await stopAllHooks(); + }); + + describe("getHooksPoolStats", () => { + it("should return pool size", () => { + const stats = getHooksPoolStats(); + expect(typeof stats.size).toBe("number"); + expect(stats.size).toBeGreaterThanOrEqual(0); + }); + }); + + describe("stopAllHooks", () => { + it("should clear all hooks from pool", async () => { + await stopAllHooks(); + const stats = getHooksPoolStats(); + expect(stats.size).toBe(0); + }); + + it("should be idempotent", async () => { + await stopAllHooks(); + await stopAllHooks(); + await stopAllHooks(); + + const stats = getHooksPoolStats(); + expect(stats.size).toBe(0); + }); + }); +}); + +describe("Hook config normalization", () => { + afterEach(async () => { + await stopAllHooks(); + }); + + it("should handle single hook config", async () => { + const hook: HookConfig = { + type: "http", + command: "echo", + args: [], + disabled: true, // Disabled so it won't actually run + }; + + const executor = new HooksExecutor("test", hook); + await executor.start(); + + // Disabled hook means no active hooks + expect(executor.hasHooks).toBe(false); + + await executor.stop(); + }); + + it("should handle array of hook configs", async () => { + const hooks: HookConfig[] = [ + { type: "http", command: "echo", args: [], disabled: true }, + { type: "http", command: "echo", args: [], disabled: true }, ]; - const executor = new HooksExecutor("default", hooks); + const executor = new HooksExecutor("test", hooks); await executor.start(); - const requestBody = { - model: "claude-opus-4-5-20251101", - system: "You are Droid, an AI assistant built by Factory.", - messages: [{ role: "user", content: "Hello" }], - }; + // All disabled, no active hooks + expect(executor.hasHooks).toBe(false); - const result = await executor.executeRequestHooks({ - method: "POST", - url: "https://api.anthropic.com/v1/messages", - headers: { "content-type": "application/json" }, - body: streamFromBuffer(Buffer.from(JSON.stringify(requestBody), "utf-8")), - }); + await executor.stop(); + }); + + it("should filter out disabled hooks from array", async () => { + const hooks: HookConfig[] = [ + { type: "http", command: "echo", args: [], disabled: true }, + { type: "http", command: "echo", args: [], disabled: false }, + { type: "http", command: "echo", args: [] }, // disabled defaults to false + ]; + + // This would try to run actual processes, so we just test parsing + const executor = new HooksExecutor("test", hooks.filter((h) => h.disabled)); + await executor.start(); + + expect(executor.hasHooks).toBe(false); - const rewrittenText = (await readStreamToBuffer(result.body)).toString("utf-8"); - const rewrittenJson = JSON.parse(rewrittenText); - expect(rewrittenJson.model).toBe("gemini-claude-opus-4-5-thinking"); + await executor.stop(); }); });