Skip to content
Draft
76 changes: 76 additions & 0 deletions docs/src/examples/http-cluster/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# HTTP Cluster Example with X-Forwarded-For Support

This example demonstrates how to use Pup's HTTP load balancer with clustering to properly forward client IP addresses through the `X-Forwarded-For` and `X-Real-IP` headers.

## Problem

When using Pup's cluster mode with `commonPort`, the default TCP-based load balancer proxies connections at the TCP level. This means that the backend servers always see the connection coming from
`127.0.0.1` (localhost), losing the actual client IP address.

## Solution

The HTTP load balancer (`balancerType: "http"`) operates at the HTTP protocol level and automatically adds the following headers to forwarded requests:

- **`X-Forwarded-For`**: Contains the client IP address. If the header already exists, the client IP is appended to the list.
- **`X-Real-IP`**: Contains the client IP address (single value).

## Configuration

In your `pup.json`, set the `balancerType` to `"http"`:

```json
{
"cluster": {
"instances": 3,
"startPort": 8080,
"commonPort": 3000,
"strategy": "round-robin",
"balancerType": "http"
}
}
```

## Running the Example

1. Start the cluster:
```bash
pup run
```

2. Make a request to the common port:
```bash
curl http://localhost:3000
```

3. The response will show:
- The server port (one of 8080, 8081, or 8082)
- Your client IP address in the `clientIp` field
- The `X-Forwarded-For` and `X-Real-IP` headers

## Load Balancer Types

Pup supports two types of load balancers:

- **`tcp`** (default): Simple TCP proxy. Fast and protocol-agnostic, but doesn't preserve client IP.
- **`http`**: HTTP-aware proxy. Adds `X-Forwarded-For` and `X-Real-IP` headers to preserve client IP. Only works with HTTP/HTTPS traffic.

## When to Use HTTP Load Balancer

Use the HTTP load balancer when:

- Your application needs to know the real client IP address
- You're serving HTTP/HTTPS traffic
- You need to implement rate limiting, access control, or logging based on client IP
- Your application reads the `X-Forwarded-For` or `X-Real-IP` headers

Use the TCP load balancer when:

- You're proxying non-HTTP protocols (websockets, database connections, etc.)
- You don't need client IP information
- You want maximum performance with minimal overhead

## Notes

- The HTTP load balancer only works with HTTP/HTTPS traffic
- For other protocols, use an external load balancer like NGINX or HAProxy
- The `X-Forwarded-For` header can contain multiple IPs if the request passed through multiple proxies
18 changes: 18 additions & 0 deletions docs/src/examples/http-cluster/pup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"$schema": "https://pup.56k.guru/schema.json",
"name": "http-cluster-example",
"processes": [
{
"id": "http-server-cluster",
"cmd": "deno run --allow-net server.ts",
"autostart": true,
"cluster": {
"instances": 3,
"startPort": 8080,
"commonPort": 3000,
"strategy": "round-robin",
"balancerType": "http"
}
}
]
}
35 changes: 35 additions & 0 deletions docs/src/examples/http-cluster/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Example HTTP server that demonstrates the X-Forwarded-For header
* when using Pup's HTTP load balancer in cluster mode
*/

const port = Deno.env.get("PUP_CLUSTER_PORT") ? parseInt(Deno.env.get("PUP_CLUSTER_PORT")!) : 8080

Deno.serve({ port, hostname: "127.0.0.1" }, (req, info) => {
const forwardedFor = req.headers.get("X-Forwarded-For")
const realIp = req.headers.get("X-Real-IP")
const remoteAddr = info.remoteAddr

// When using HTTP load balancer, the real client IP is in X-Forwarded-For
// Without HTTP load balancer, remoteAddr would always be 127.0.0.1 (the load balancer)
const clientIp = forwardedFor || (remoteAddr as Deno.NetAddr).hostname

const response = {
message: "Hello from clustered server!",
serverPort: port,
clientIp: clientIp,
headers: {
"X-Forwarded-For": forwardedFor,
"X-Real-IP": realIp,
},
remoteAddr: remoteAddr,
}

return new Response(JSON.stringify(response, null, 2), {
headers: {
"Content-Type": "application/json",
},
})
})

console.log(`Server running on port ${port}`)
12 changes: 10 additions & 2 deletions lib/core/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Process, type ProcessInformation } from "./process.ts"
import { ApiProcessState } from "@pup/api-definitions"
import { LOAD_BALANCER_DEFAULT_VALIDATION_INTERVAL_S, type ProcessConfiguration } from "./configuration.ts"
import type { Pup } from "./pup.ts"
import { BalancingStrategy, type LoadBalancerStartOperation } from "./loadbalancer.ts"
import { BalancingStrategy, type LoadBalancerStartOperation, LoadBalancerType } from "./loadbalancer.ts"

class Cluster extends Process {
public processes: Process[] = []
Expand Down Expand Up @@ -87,9 +87,16 @@ class Cluster extends Process {
strategy = BalancingStrategy.ROUND_ROBIN
}

let type: LoadBalancerType
if (this.config.cluster.balancerType === "http") {
type = LoadBalancerType.HTTP
} else {
type = LoadBalancerType.TCP
}

this.pup.logger.log(
"cluster",
`Setting up load balancer for ${nInstances} instances with common port ${this.config.cluster.commonPort} and strategy ${BalancingStrategy[strategy]}`,
`Setting up ${LoadBalancerType[type]} load balancer for ${nInstances} instances with common port ${this.config.cluster.commonPort} and strategy ${BalancingStrategy[strategy]}`,
this.config,
)

Expand All @@ -103,6 +110,7 @@ class Cluster extends Process {
strategy,
validationInterval: LOAD_BALANCER_DEFAULT_VALIDATION_INTERVAL_S,
commonPort: this.config.cluster.commonPort,
type,
}

this.loadBalancerWorker = new Worker(new URL("../workers/loadbalancer.js", import.meta.url).href, { type: "module" })
Expand Down
2 changes: 2 additions & 0 deletions lib/core/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ interface ClusterConfiguration {
commonPort?: number
startPort?: number
strategy?: string
balancerType?: string
}

interface GlobalWatcherConfiguration {
Expand Down Expand Up @@ -146,6 +147,7 @@ const ConfigurationSchema = z.object({
commonPort: z.number().min(1).max(65535).optional(),
startPort: z.number().min(1).max(65535).optional(),
strategy: z.enum(["ip-hash", "round-robin", "least-connections"]).default("round-robin"),
balancerType: z.enum(["tcp", "http"]).default("tcp"),
})),
pidFile: z.optional(z.string()),
path: z.optional(z.string()),
Expand Down
106 changes: 106 additions & 0 deletions lib/core/loadbalancer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ export enum BalancingStrategy {
LEAST_CONNECTIONS,
}

export enum LoadBalancerType {
TCP,
HTTP,
}

export interface Backend {
host: string
port: number
Expand All @@ -31,18 +36,21 @@ export interface LoadBalancerStartOperation {
strategy: BalancingStrategy
validationInterval: number
commonPort: number
type?: LoadBalancerType
}

export class LoadBalancer {
//public readonly pup: Pup

private listener: Deno.Listener | null = null
private httpServer: Deno.HttpServer | null = null

public backends: InternalBackend[]
private strategy: BalancingStrategy
private currentIndex: number
private validationInterval: number
private validationTimer: number
private type: LoadBalancerType

private loggerCallback: (severity: string, category: string, text: string) => void

Expand All @@ -51,6 +59,7 @@ export class LoadBalancer {
strategy: BalancingStrategy = BalancingStrategy.ROUND_ROBIN,
validationInterval: number = LOAD_BALANCER_DEFAULT_VALIDATION_INTERVAL_S,
loggerCallback: (severity: string, category: string, text: string) => void,
type: LoadBalancerType = LoadBalancerType.TCP,
) {
// Deep copy of incoming backend object, with additional properties
this.backends = this.initializeBackends(backends)
Expand All @@ -60,6 +69,7 @@ export class LoadBalancer {
this.strategy = strategy
this.currentIndex = 0
this.validationInterval = validationInterval
this.type = type
// Validate backends every 120 seconds
this.validationTimer = this.setupValidationTimer() // Continuously validate

Expand Down Expand Up @@ -223,6 +233,14 @@ export class LoadBalancer {
throw new Error("No backends defined")
}

if (this.type === LoadBalancerType.HTTP) {
await this.startHttpServer(port)
} else {
await this.startTcpServer(port)
}
}

private async startTcpServer(port: number): Promise<void> {
this.listener = Deno.listen({ port })
for await (const client of this.listener) {
const backend = this.selectBackend(client)
Expand All @@ -235,12 +253,100 @@ export class LoadBalancer {
}
}

private async startHttpServer(port: number): Promise<void> {
this.httpServer = Deno.serve({
port,
handler: (req, info) => this.handleHttpRequest(req, info),
})
await this.httpServer.finished
}

private async handleHttpRequest(req: Request, info: Deno.ServeHandlerInfo): Promise<Response> {
// Select backend using round-robin or other strategy
// For HTTP, we need to create a mock connection object for IP hash strategy
const mockConn = this.createMockConn(info.remoteAddr)
const backend = this.selectBackend(mockConn)

if (!backend) {
this.loggerCallback("warn", "loadbalancer", "No available backend for HTTP request")
return new Response("Service Unavailable", { status: 503 })
}

try {
// Forward the request to the backend
return await this.forwardHttpRequest(req, backend, info.remoteAddr)
} catch (error) {
this.handleConnectionFailure(backend)
this.loggerCallback("error", "loadbalancer", `HTTP proxy error: ${error}`)
return new Response("Bad Gateway", { status: 502 })
}
}

private createMockConn(remoteAddr: Deno.NetAddr): Deno.Conn {
// Create a minimal mock connection object for strategy selection
// Only remoteAddr is used by selectBackend() for IP hash strategy
// Type assertion is safe here as we only access remoteAddr in the selection logic
return {
remoteAddr,
localAddr: { transport: "tcp", hostname: "127.0.0.1", port: 0 },
} as Deno.Conn
}

private async forwardHttpRequest(req: Request, backend: InternalBackend, clientAddr: Deno.NetAddr): Promise<Response> {
this.updateBackendConnectionStatus(backend, true)

try {
// Build the backend URL
const url = new URL(req.url)
const backendUrl = `http://${backend.host}:${backend.port}${url.pathname}${url.search}`

// Clone headers and add X-Forwarded-For
const headers = new Headers(req.headers)

// Get the client IP address
const clientIp = clientAddr.transport === "tcp" || clientAddr.transport === "udp" ? clientAddr.hostname : "unknown"

// Add or append to X-Forwarded-For header
const existingForwarded = headers.get("X-Forwarded-For")
if (existingForwarded) {
headers.set("X-Forwarded-For", `${existingForwarded}, ${clientIp}`)
} else {
headers.set("X-Forwarded-For", clientIp)
}

// Add X-Real-IP header (commonly used)
headers.set("X-Real-IP", clientIp)

// Forward the request to the backend
const backendReq = new Request(backendUrl, {
method: req.method,
headers: headers,
body: req.body,
// @ts-expect-error - duplex is a valid Request option for streaming but not in TypeScript's lib.dom.d.ts yet
duplex: req.body ? "half" : undefined,
})

const backendRes = await fetch(backendReq)

// Reset failed transmissions on success
backend.failedTransmissions = 0

return backendRes
} finally {
this.updateBackendConnectionStatus(backend, false)
}
}

close(): void {
clearInterval(this.validationTimer)
if (this.listener) {
this.listener.close()
this.listener = null
}
if (this.httpServer) {
this.httpServer.shutdown()
this.httpServer = null
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion lib/workers/loadbalancer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const loggerCallback = (severity, category, text) => {
* @property {any} strategy
* @property {number} validationInterval
* @property {number} commonPort
* @property {any} type
*/

/**
Expand All @@ -38,7 +39,7 @@ const loggerCallback = (severity, category, text) => {
self.onmessage = (event) => {
if (event && event.data && event.data.operation === "start") {
if (!loadBalancer) {
loadBalancer = new LoadBalancer(event.data.backends, event.data.strategy, event.data.validationInterval, loggerCallback)
loadBalancer = new LoadBalancer(event.data.backends, event.data.strategy, event.data.validationInterval, loggerCallback, event.data.type)
loadBalancer && loadBalancer.start(event.data.commonPort)
}
}
Expand Down
Loading
Loading