diff --git a/docs/src/examples/http-cluster/README.md b/docs/src/examples/http-cluster/README.md new file mode 100644 index 0000000..370f5a0 --- /dev/null +++ b/docs/src/examples/http-cluster/README.md @@ -0,0 +1,76 @@ +# HTTP Cluster Example with X-Forwarded-For Support + +This example demonstrates how to use Pup's HTTP load balancer with clustering to properly forward client IP addresses through the `X-Forwarded-For` and `X-Real-IP` headers. + +## Problem + +When using Pup's cluster mode with `commonPort`, the default TCP-based load balancer proxies connections at the TCP level. This means that the backend servers always see the connection coming from +`127.0.0.1` (localhost), losing the actual client IP address. + +## Solution + +The HTTP load balancer (`balancerType: "http"`) operates at the HTTP protocol level and automatically adds the following headers to forwarded requests: + +- **`X-Forwarded-For`**: Contains the client IP address. If the header already exists, the client IP is appended to the list. +- **`X-Real-IP`**: Contains the client IP address (single value). + +## Configuration + +In your `pup.json`, set the `balancerType` to `"http"`: + +```json +{ + "cluster": { + "instances": 3, + "startPort": 8080, + "commonPort": 3000, + "strategy": "round-robin", + "balancerType": "http" + } +} +``` + +## Running the Example + +1. Start the cluster: + ```bash + pup run + ``` + +2. Make a request to the common port: + ```bash + curl http://localhost:3000 + ``` + +3. The response will show: + - The server port (one of 8080, 8081, or 8082) + - Your client IP address in the `clientIp` field + - The `X-Forwarded-For` and `X-Real-IP` headers + +## Load Balancer Types + +Pup supports two types of load balancers: + +- **`tcp`** (default): Simple TCP proxy. Fast and protocol-agnostic, but doesn't preserve client IP. +- **`http`**: HTTP-aware proxy. Adds `X-Forwarded-For` and `X-Real-IP` headers to preserve client IP. Only works with HTTP/HTTPS traffic. + +## When to Use HTTP Load Balancer + +Use the HTTP load balancer when: + +- Your application needs to know the real client IP address +- You're serving HTTP/HTTPS traffic +- You need to implement rate limiting, access control, or logging based on client IP +- Your application reads the `X-Forwarded-For` or `X-Real-IP` headers + +Use the TCP load balancer when: + +- You're proxying non-HTTP protocols (websockets, database connections, etc.) +- You don't need client IP information +- You want maximum performance with minimal overhead + +## Notes + +- The HTTP load balancer only works with HTTP/HTTPS traffic +- For other protocols, use an external load balancer like NGINX or HAProxy +- The `X-Forwarded-For` header can contain multiple IPs if the request passed through multiple proxies diff --git a/docs/src/examples/http-cluster/pup.json b/docs/src/examples/http-cluster/pup.json new file mode 100644 index 0000000..7beead2 --- /dev/null +++ b/docs/src/examples/http-cluster/pup.json @@ -0,0 +1,18 @@ +{ + "$schema": "https://pup.56k.guru/schema.json", + "name": "http-cluster-example", + "processes": [ + { + "id": "http-server-cluster", + "cmd": "deno run --allow-net server.ts", + "autostart": true, + "cluster": { + "instances": 3, + "startPort": 8080, + "commonPort": 3000, + "strategy": "round-robin", + "balancerType": "http" + } + } + ] +} diff --git a/docs/src/examples/http-cluster/server.ts b/docs/src/examples/http-cluster/server.ts new file mode 100644 index 0000000..1921044 --- /dev/null +++ b/docs/src/examples/http-cluster/server.ts @@ -0,0 +1,35 @@ +/** + * Example HTTP server that demonstrates the X-Forwarded-For header + * when using Pup's HTTP load balancer in cluster mode + */ + +const port = Deno.env.get("PUP_CLUSTER_PORT") ? parseInt(Deno.env.get("PUP_CLUSTER_PORT")!) : 8080 + +Deno.serve({ port, hostname: "127.0.0.1" }, (req, info) => { + const forwardedFor = req.headers.get("X-Forwarded-For") + const realIp = req.headers.get("X-Real-IP") + const remoteAddr = info.remoteAddr + + // When using HTTP load balancer, the real client IP is in X-Forwarded-For + // Without HTTP load balancer, remoteAddr would always be 127.0.0.1 (the load balancer) + const clientIp = forwardedFor || (remoteAddr as Deno.NetAddr).hostname + + const response = { + message: "Hello from clustered server!", + serverPort: port, + clientIp: clientIp, + headers: { + "X-Forwarded-For": forwardedFor, + "X-Real-IP": realIp, + }, + remoteAddr: remoteAddr, + } + + return new Response(JSON.stringify(response, null, 2), { + headers: { + "Content-Type": "application/json", + }, + }) +}) + +console.log(`Server running on port ${port}`) diff --git a/lib/core/cluster.ts b/lib/core/cluster.ts index a08029f..b1e6b94 100644 --- a/lib/core/cluster.ts +++ b/lib/core/cluster.ts @@ -11,7 +11,7 @@ import { Process, type ProcessInformation } from "./process.ts" import { ApiProcessState } from "@pup/api-definitions" import { LOAD_BALANCER_DEFAULT_VALIDATION_INTERVAL_S, type ProcessConfiguration } from "./configuration.ts" import type { Pup } from "./pup.ts" -import { BalancingStrategy, type LoadBalancerStartOperation } from "./loadbalancer.ts" +import { BalancingStrategy, type LoadBalancerStartOperation, LoadBalancerType } from "./loadbalancer.ts" class Cluster extends Process { public processes: Process[] = [] @@ -87,9 +87,16 @@ class Cluster extends Process { strategy = BalancingStrategy.ROUND_ROBIN } + let type: LoadBalancerType + if (this.config.cluster.balancerType === "http") { + type = LoadBalancerType.HTTP + } else { + type = LoadBalancerType.TCP + } + this.pup.logger.log( "cluster", - `Setting up load balancer for ${nInstances} instances with common port ${this.config.cluster.commonPort} and strategy ${BalancingStrategy[strategy]}`, + `Setting up ${LoadBalancerType[type]} load balancer for ${nInstances} instances with common port ${this.config.cluster.commonPort} and strategy ${BalancingStrategy[strategy]}`, this.config, ) @@ -103,6 +110,7 @@ class Cluster extends Process { strategy, validationInterval: LOAD_BALANCER_DEFAULT_VALIDATION_INTERVAL_S, commonPort: this.config.cluster.commonPort, + type, } this.loadBalancerWorker = new Worker(new URL("../workers/loadbalancer.js", import.meta.url).href, { type: "module" }) diff --git a/lib/core/configuration.ts b/lib/core/configuration.ts index be30014..4c8423a 100644 --- a/lib/core/configuration.ts +++ b/lib/core/configuration.ts @@ -71,6 +71,7 @@ interface ClusterConfiguration { commonPort?: number startPort?: number strategy?: string + balancerType?: string } interface GlobalWatcherConfiguration { @@ -146,6 +147,7 @@ const ConfigurationSchema = z.object({ commonPort: z.number().min(1).max(65535).optional(), startPort: z.number().min(1).max(65535).optional(), strategy: z.enum(["ip-hash", "round-robin", "least-connections"]).default("round-robin"), + balancerType: z.enum(["tcp", "http"]).default("tcp"), })), pidFile: z.optional(z.string()), path: z.optional(z.string()), diff --git a/lib/core/loadbalancer.ts b/lib/core/loadbalancer.ts index 7d3be50..7e68384 100644 --- a/lib/core/loadbalancer.ts +++ b/lib/core/loadbalancer.ts @@ -14,6 +14,11 @@ export enum BalancingStrategy { LEAST_CONNECTIONS, } +export enum LoadBalancerType { + TCP, + HTTP, +} + export interface Backend { host: string port: number @@ -31,18 +36,21 @@ export interface LoadBalancerStartOperation { strategy: BalancingStrategy validationInterval: number commonPort: number + type?: LoadBalancerType } export class LoadBalancer { //public readonly pup: Pup private listener: Deno.Listener | null = null + private httpServer: Deno.HttpServer | null = null public backends: InternalBackend[] private strategy: BalancingStrategy private currentIndex: number private validationInterval: number private validationTimer: number + private type: LoadBalancerType private loggerCallback: (severity: string, category: string, text: string) => void @@ -51,6 +59,7 @@ export class LoadBalancer { strategy: BalancingStrategy = BalancingStrategy.ROUND_ROBIN, validationInterval: number = LOAD_BALANCER_DEFAULT_VALIDATION_INTERVAL_S, loggerCallback: (severity: string, category: string, text: string) => void, + type: LoadBalancerType = LoadBalancerType.TCP, ) { // Deep copy of incoming backend object, with additional properties this.backends = this.initializeBackends(backends) @@ -60,6 +69,7 @@ export class LoadBalancer { this.strategy = strategy this.currentIndex = 0 this.validationInterval = validationInterval + this.type = type // Validate backends every 120 seconds this.validationTimer = this.setupValidationTimer() // Continuously validate @@ -223,6 +233,14 @@ export class LoadBalancer { throw new Error("No backends defined") } + if (this.type === LoadBalancerType.HTTP) { + await this.startHttpServer(port) + } else { + await this.startTcpServer(port) + } + } + + private async startTcpServer(port: number): Promise { this.listener = Deno.listen({ port }) for await (const client of this.listener) { const backend = this.selectBackend(client) @@ -235,12 +253,100 @@ export class LoadBalancer { } } + private async startHttpServer(port: number): Promise { + this.httpServer = Deno.serve({ + port, + handler: (req, info) => this.handleHttpRequest(req, info), + }) + await this.httpServer.finished + } + + private async handleHttpRequest(req: Request, info: Deno.ServeHandlerInfo): Promise { + // Select backend using round-robin or other strategy + // For HTTP, we need to create a mock connection object for IP hash strategy + const mockConn = this.createMockConn(info.remoteAddr) + const backend = this.selectBackend(mockConn) + + if (!backend) { + this.loggerCallback("warn", "loadbalancer", "No available backend for HTTP request") + return new Response("Service Unavailable", { status: 503 }) + } + + try { + // Forward the request to the backend + return await this.forwardHttpRequest(req, backend, info.remoteAddr) + } catch (error) { + this.handleConnectionFailure(backend) + this.loggerCallback("error", "loadbalancer", `HTTP proxy error: ${error}`) + return new Response("Bad Gateway", { status: 502 }) + } + } + + private createMockConn(remoteAddr: Deno.NetAddr): Deno.Conn { + // Create a minimal mock connection object for strategy selection + // Only remoteAddr is used by selectBackend() for IP hash strategy + // Type assertion is safe here as we only access remoteAddr in the selection logic + return { + remoteAddr, + localAddr: { transport: "tcp", hostname: "127.0.0.1", port: 0 }, + } as Deno.Conn + } + + private async forwardHttpRequest(req: Request, backend: InternalBackend, clientAddr: Deno.NetAddr): Promise { + this.updateBackendConnectionStatus(backend, true) + + try { + // Build the backend URL + const url = new URL(req.url) + const backendUrl = `http://${backend.host}:${backend.port}${url.pathname}${url.search}` + + // Clone headers and add X-Forwarded-For + const headers = new Headers(req.headers) + + // Get the client IP address + const clientIp = clientAddr.transport === "tcp" || clientAddr.transport === "udp" ? clientAddr.hostname : "unknown" + + // Add or append to X-Forwarded-For header + const existingForwarded = headers.get("X-Forwarded-For") + if (existingForwarded) { + headers.set("X-Forwarded-For", `${existingForwarded}, ${clientIp}`) + } else { + headers.set("X-Forwarded-For", clientIp) + } + + // Add X-Real-IP header (commonly used) + headers.set("X-Real-IP", clientIp) + + // Forward the request to the backend + const backendReq = new Request(backendUrl, { + method: req.method, + headers: headers, + body: req.body, + // @ts-expect-error - duplex is a valid Request option for streaming but not in TypeScript's lib.dom.d.ts yet + duplex: req.body ? "half" : undefined, + }) + + const backendRes = await fetch(backendReq) + + // Reset failed transmissions on success + backend.failedTransmissions = 0 + + return backendRes + } finally { + this.updateBackendConnectionStatus(backend, false) + } + } + close(): void { clearInterval(this.validationTimer) if (this.listener) { this.listener.close() this.listener = null } + if (this.httpServer) { + this.httpServer.shutdown() + this.httpServer = null + } } } diff --git a/lib/workers/loadbalancer.js b/lib/workers/loadbalancer.js index 98d93ce..1259ba7 100644 --- a/lib/workers/loadbalancer.js +++ b/lib/workers/loadbalancer.js @@ -29,6 +29,7 @@ const loggerCallback = (severity, category, text) => { * @property {any} strategy * @property {number} validationInterval * @property {number} commonPort + * @property {any} type */ /** @@ -38,7 +39,7 @@ const loggerCallback = (severity, category, text) => { self.onmessage = (event) => { if (event && event.data && event.data.operation === "start") { if (!loadBalancer) { - loadBalancer = new LoadBalancer(event.data.backends, event.data.strategy, event.data.validationInterval, loggerCallback) + loadBalancer = new LoadBalancer(event.data.backends, event.data.strategy, event.data.validationInterval, loggerCallback, event.data.type) loadBalancer && loadBalancer.start(event.data.commonPort) } } diff --git a/test/core/loadbalancer-http.test.ts b/test/core/loadbalancer-http.test.ts new file mode 100644 index 0000000..44a02bb --- /dev/null +++ b/test/core/loadbalancer-http.test.ts @@ -0,0 +1,144 @@ +// Test HTTP load balancer with X-Forwarded-For forwarding +import { assertEquals } from "@std/assert" +import { type Backend, BalancingStrategy, LoadBalancer, LoadBalancerType } from "../../lib/core/loadbalancer.ts" +import { test } from "@cross/test" + +const loggerCallback = (severity: string, category: string, text: string) => { + console.log(`[${severity.toUpperCase()}][${category}] ${text}`) +} + +test("LoadBalancer HTTP - X-Forwarded-For header is added", async () => { + // Create a simple backend HTTP server that echoes headers + const backendPort = 8095 + + const backendServer = Deno.serve({ + port: backendPort, + hostname: "127.0.0.1", + handler: (req) => { + return new Response( + JSON.stringify({ + forwardedFor: req.headers.get("X-Forwarded-For"), + realIp: req.headers.get("X-Real-IP"), + }), + { + headers: { "Content-Type": "application/json" }, + }, + ) + }, + }) + + // Give the backend server time to start + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Create HTTP load balancer + const lbPort = 8096 + const backends: Backend[] = [{ host: "127.0.0.1", port: backendPort }] + const loadBalancer = new LoadBalancer( + backends, + BalancingStrategy.ROUND_ROBIN, + 120, + loggerCallback, + LoadBalancerType.HTTP, + ) + + // Start load balancer in background + loadBalancer.start(lbPort) + + // Give the load balancer time to start + await new Promise((resolve) => setTimeout(resolve, 100)) + + try { + // Make a request through the load balancer + const response = await fetch(`http://127.0.0.1:${lbPort}/test`, { + headers: { + "User-Agent": "test-client", + }, + }) + + assertEquals(response.status, 200) + + const body = await response.json() + + // Verify X-Forwarded-For header was added + assertEquals(typeof body.forwardedFor, "string") + assertEquals(body.forwardedFor.includes("127.0.0.1"), true) + + // Verify X-Real-IP header was added + assertEquals(typeof body.realIp, "string") + assertEquals(body.realIp, "127.0.0.1") + + console.log("Headers received by backend:", { + forwardedFor: body.forwardedFor, + realIp: body.realIp, + }) + } finally { + // Cleanup + loadBalancer.close() + await backendServer.shutdown() + } +}) + +test("LoadBalancer HTTP - X-Forwarded-For header is appended to existing header", async () => { + // Create a simple backend HTTP server that echoes headers + const backendPort = 8097 + + const backendServer = Deno.serve({ + port: backendPort, + hostname: "127.0.0.1", + handler: (req) => { + return new Response( + JSON.stringify({ + forwardedFor: req.headers.get("X-Forwarded-For"), + }), + { + headers: { "Content-Type": "application/json" }, + }, + ) + }, + }) + + // Give the backend server time to start + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Create HTTP load balancer + const lbPort = 8098 + const backends: Backend[] = [{ host: "127.0.0.1", port: backendPort }] + const loadBalancer = new LoadBalancer( + backends, + BalancingStrategy.ROUND_ROBIN, + 120, + loggerCallback, + LoadBalancerType.HTTP, + ) + + // Start load balancer in background + loadBalancer.start(lbPort) + + // Give the load balancer time to start + await new Promise((resolve) => setTimeout(resolve, 100)) + + try { + // Make a request through the load balancer with an existing X-Forwarded-For header + const response = await fetch(`http://127.0.0.1:${lbPort}/test`, { + headers: { + "X-Forwarded-For": "192.168.1.100", + }, + }) + + assertEquals(response.status, 200) + + const body = await response.json() + + // Verify X-Forwarded-For header contains both the original and the new IP + assertEquals(typeof body.forwardedFor, "string") + assertEquals(body.forwardedFor.includes("192.168.1.100"), true) + assertEquals(body.forwardedFor.includes("127.0.0.1"), true) + assertEquals(body.forwardedFor.includes(","), true) // Should have comma separator + + console.log("Appended X-Forwarded-For:", body.forwardedFor) + } finally { + // Cleanup + loadBalancer.close() + await backendServer.shutdown() + } +}) diff --git a/test/core/loadbalancer.test.ts b/test/core/loadbalancer.test.ts index b91782a..5d62c3b 100644 --- a/test/core/loadbalancer.test.ts +++ b/test/core/loadbalancer.test.ts @@ -1,6 +1,6 @@ // load_balancer_test.ts import { assertEquals, assertThrows } from "@std/assert" -import { type Backend, BalancingStrategy, hashCode, LoadBalancer } from "../../lib/core/loadbalancer.ts" +import { type Backend, BalancingStrategy, hashCode, LoadBalancer, LoadBalancerType } from "../../lib/core/loadbalancer.ts" import { test } from "@cross/test" // Define logger callback function @@ -140,3 +140,25 @@ test("LoadBalancer - Selects Backend with LEAST_CONNECTIONS Strategy", () => { // Cleanup loadBalancer.close() }) + +test("LoadBalancer - HTTP Type Initialization", () => { + const backends: Backend[] = [ + { host: "127.0.0.1", port: 8081 }, + { host: "127.0.0.1", port: 8082 }, + ] + const loadBalancer = new LoadBalancer(backends, BalancingStrategy.ROUND_ROBIN, 120, loggerCallback, LoadBalancerType.HTTP) + assertEquals(loadBalancer instanceof LoadBalancer, true) + // Cleanup + loadBalancer.close() +}) + +test("LoadBalancer - HTTP forwards X-Forwarded-For header", () => { + // This is an integration test that requires a real backend server + // For now, we just verify the load balancer can be created with HTTP type + const backends: Backend[] = [ + { host: "127.0.0.1", port: 8091 }, + ] + const loadBalancer = new LoadBalancer(backends, BalancingStrategy.ROUND_ROBIN, 120, loggerCallback, LoadBalancerType.HTTP) + assertEquals(loadBalancer instanceof LoadBalancer, true) + loadBalancer.close() +})