diff --git a/CLAUDE.md b/CLAUDE.md index 38c76af..c30259a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -52,10 +52,39 @@ launchctl list com.openclaw.claude-max-proxy ## Architecture -- `src/types/claude-cli.ts` - Claude CLI JSON streaming types and type guards -- `src/types/openai.ts` - OpenAI-compatible API types +The proxy uses a session-aware process pool to eliminate per-request spawn overhead +(3–10s) and prevent cross-agent context contamination. + +### Key files + +- `src/subprocess/router.ts` - **SessionPoolRouter**: session-key locking, per-model warm pools, orphan reclamation, context accumulation recycling, nightly sweep +- `src/subprocess/manager.ts` - **ClaudeSubprocess**: single-request subprocess fallback (retained for headerless requests and non-pooled models) +- `src/server/routes.ts` - Express route handlers; routes pool requests via `x-openclaw-session-key` header; falls back to ClaudeSubprocess when header is absent +- `src/server/standalone.ts` - Server entry point; initializes pool, schedules 3 AM ET sweep via node-cron, handles graceful shutdown - `src/adapter/openai-to-cli.ts` - Converts OpenAI requests to CLI input - `src/adapter/cli-to-openai.ts` - Converts CLI output to OpenAI responses -- `src/subprocess/manager.ts` - Spawns and manages Claude CLI subprocesses -- `src/server/routes.ts` - Express route handlers (streaming + non-streaming) -- `src/server/standalone.js` - Server entry point +- `src/types/claude-cli.ts` - Claude CLI JSON streaming types and type guards +- `src/types/openai.ts` - OpenAI-compatible API types + +### Request routing + +``` +POST /v1/chat/completions + x-openclaw-session-key present AND model is opus/sonnet + → SessionPoolRouter.execute() → locked warm process (33% faster) + header absent OR model is haiku + → ClaudeSubprocess (subprocess-per-request, original behavior) +``` + +### Pool env vars + +| Var | Default | Description | +|-----|---------|-------------| +| POOL_OPUS_SIZE | 6 | Warm opus processes | +| POOL_SONNET_SIZE | 4 | Warm sonnet processes | +| MAX_TOTAL_PROCESSES | 30 | Hard cap (locked + warm) | +| POOL_MAX_REQUESTS_PER_PROCESS | 50 | Context accumulation threshold | +| POOL_REQUEST_QUEUE_DEPTH | 3 | Per-process queue depth before 429 | +| POOL_REQUEST_TIMEOUT_MS | 300000 | Per-request timeout (5 min) | +| SWEEP_IDLE_THRESHOLD_MS | 7200000 | Idle time before sweep recycles (2 hr) | +| SWEEP_HOUR | 3 | Hour in ET for nightly sweep | diff --git a/package-lock.json b/package-lock.json index f6561bd..b74f35c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,9 @@ "version": "1.0.0", "license": "MIT", "dependencies": { + "@types/node-cron": "^3.0.11", "express": "^4.21.2", + "node-cron": "^4.2.1", "uuid": "^11.0.5" }, "bin": { @@ -96,6 +98,12 @@ "undici-types": "~6.21.0" } }, + "node_modules/@types/node-cron": { + "version": "3.0.11", + "resolved": "https://registry.npmjs.org/@types/node-cron/-/node-cron-3.0.11.tgz", + "integrity": "sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==", + "license": "MIT" + }, "node_modules/@types/qs": { "version": "6.14.0", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.14.0.tgz", @@ -652,6 +660,15 @@ "node": ">= 0.6" } }, + "node_modules/node-cron": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-4.2.1.tgz", + "integrity": "sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==", + "license": "ISC", + "engines": { + "node": ">=6.0.0" + } + }, "node_modules/object-inspect": { "version": "1.13.4", "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.13.4.tgz", diff --git a/package.json b/package.json index ec0555e..3c4f75b 100644 --- a/package.json +++ b/package.json @@ -39,7 +39,9 @@ }, "homepage": "https://github.com/atalovesyou/claude-max-api-proxy#readme", "dependencies": { + "@types/node-cron": "^3.0.11", "express": "^4.21.2", + "node-cron": "^4.2.1", "uuid": "^11.0.5" }, "devDependencies": { diff --git a/specs/session-pooling.spec.md b/specs/session-pooling.spec.md new file mode 100644 index 0000000..6ac652e --- /dev/null +++ b/specs/session-pooling.spec.md @@ -0,0 +1,658 @@ +# Session Pooling — Spec (Tier 1) — Rev 5 + +**Rev 5 (2026-03-21):** Addresses 4 findings from fourth Opus sub-agent review. 1 major, 3 minor. All prior resolutions verified adequate. + +**Rev 4 (2026-03-21):** Addresses 7 findings from third Opus sub-agent review. 3 major, 4 minor. All prior resolutions verified adequate. + +**Rev 3 (2026-03-21):** Addresses 7 new findings from second Opus sub-agent review. 3 major, 4 minor. All Rev 1 resolutions verified adequate. + +**Rev 2 (2026-03-21):** Addresses 14 findings from Opus sub-agent architectural review. 3 critical, 5 major, 6 minor. All resolved. + +## Purpose + +Replace the subprocess-per-request architecture in the Claude Max API proxy with a session-aware process pool that locks warm CLI processes to OpenClaw session keys, eliminating 3–10s spawn overhead per request while preventing cross-agent context contamination. + +## Context + +The proxy currently spawns a new `claude --print` subprocess for every API request. Each process pays the full startup cost (CLI initialization, auth handshake), then dies after one response. With 20 agent sessions and ~5 concurrent, this overhead is the dominant latency contributor. + +The prototype (`src/subprocess/pool.ts`, `src/server/standalone-pool.ts`) proved the concept: 33% faster single requests, clean concurrent handling, queue draining. But it uses a shared stateless pool — any process serves any agent. Testing confirmed that CLI processes in `--input-format stream-json` mode accumulate context across messages (the CLI treats sequential stdin messages as one continuous conversation). This means a shared pool causes cross-agent context contamination and unbounded context growth. + +The production design locks each CLI process to a specific OpenClaw session key (which encodes agent + channel/thread). No cross-contamination. Context accumulation is bounded to one session's traffic. A nightly sweep recycles idle processes. + +## Architecture + +### Components + +``` +┌──────────────────────────────────────────────────┐ +│ routes.ts (existing) │ +│ handleChatCompletions() │ +│ ├─ reads x-openclaw-session-key header │ +│ ├─ reads model from request body │ +│ └─ calls SessionPoolRouter.execute() │ +└──────────────┬───────────────────────────────────┘ + │ +┌──────────────▼───────────────────────────────────┐ +│ SessionPoolRouter (NEW — src/subprocess/router.ts) │ +│ ├─ lockedSessions: Map │ +│ ├─ warmPool: Map │ +│ │ ├─ opus pool (default 6) │ +│ │ └─ sonnet pool (default 4) │ +│ ├─ execute(prompt, model, sessionKey) → Emitter │ +│ │ 1. Check lockedSessions for sessionKey │ +│ │ 2. If found + idle → route to locked process │ +│ │ 3. If found + busy → enqueue on per-process │ +│ │ request queue (max depth: 3) │ +│ │ 4. If not found → claim from warmPool[model] │ +│ │ 5. If warmPool empty → spawn cold process │ +│ │ 6. Lock claimed/spawned process to sessionKey │ +│ ├─ sweep() — 3 AM ET nightly │ +│ │ Recycle locked processes idle > 2 hours │ +│ │ Recycle locked processes with requestCount > 50│ +│ │ Respawn recycled into warmPool │ +│ │ Refill warmPool to configured size │ +│ │ Enforce MAX_TOTAL_PROCESSES cap │ +│ └─ shutdown() — graceful teardown │ +└──────────────────────────────────────────────────┘ + │ +┌──────────────▼───────────────────────────────────┐ +│ PooledProcess (enhanced from prototype pool.ts) │ +│ ├─ CLI process (--input-format stream-json) │ +│ ├─ model: "opus" | "sonnet" │ +│ ├─ lockedTo: sessionKey | null │ +│ ├─ agentChannel: string | null (lineage key) │ +│ ├─ lastRequestAt: timestamp │ +│ ├─ spawnedAt: timestamp │ +│ ├─ requestCount: number │ +│ ├─ state: "idle" | "busy" | "recycling" │ +│ └─ requestQueue: Array │ +└──────────────────────────────────────────────────┘ +``` + +### Request Flow + +``` +1. OpenClaw gateway → POST /v1/chat/completions + Headers: x-openclaw-session-key: "agent:scope:discord:channel:1475832162648461316" + Body: { model: "claude-opus-4", messages: [...], stream: true } + +2. routes.ts extracts sessionKey from header, model from body + +3. SessionPoolRouter.execute(prompt, "opus", sessionKey): + a) lockedSessions.has(sessionKey)? + YES + idle → write to stdin, mark busy, start request timeout timer + YES + busy → enqueue on process's requestQueue (max 3, reject with 429 if full) + YES + PENDING_SENTINEL → enqueue (claim in progress, will drain when ready) + NO → set PENDING_SENTINEL in lockedSessions (synchronous, prevents race) + → warmPool["opus"].pop() → lock to sessionKey (replace sentinel) + if warmPool empty AND total processes < MAX_TOTAL_PROCESSES → spawn new process (3-10s cold start, one-time), lock to sessionKey + if warmPool empty AND total processes >= MAX_TOTAL_PROCESSES → reject queued requests on sentinel, delete sentinel, fall back to ClaudeSubprocess (log warning with process count) + +4. Write prompt to process stdin as stream-json message +5. Read response from process stdout, emit events to caller + - If POOL_REQUEST_TIMEOUT_MS exceeded → treat as process death (kill, reject queue, respawn) +6. Mark process idle, drain requestQueue if non-empty (next queued request starts) +7. Process stays locked to sessionKey for future requests +``` + +### Per-Process Request Serialization (Critical — Finding #1) + +Each `PooledProcess` has a `requestQueue` (FIFO, max depth 3). The CLI's stdin pipe can only handle one message at a time — interleaving would corrupt both responses. + +**When a request arrives for a locked-busy process:** +1. If `requestQueue.length < 3` → enqueue, return a pending emitter +2. If `requestQueue.length >= 3` → reject with HTTP 429 "Too Many Requests" + `Retry-After: 5` +3. When the active request completes → dequeue next, write to stdin, emit to its caller + +**Why max depth 3:** A queue deeper than 3 means the session is being hammered faster than the model can respond. Backpressure via 429 is safer than unbounded queueing. The gateway will retry. + +### Orphan Reclamation via Lineage Tracking (Critical — Finding #2) + +Session resets generate a new session key. The old process is orphaned. Without lineage tracking, orphans accumulate until the 3 AM sweep — 20+ manual resets in a day = 20+ leaked processes. + +**Solution:** Each `PooledProcess` stores `agentChannel` — extracted from the session key (e.g., `agent:scope:discord:channel:1475832162648461316` → `scope:discord:channel:1475832162648461316`). When a new session key arrives: + +1. Parse the `agentChannel` from the new key +2. Scan `lockedSessions` for any process with the same `agentChannel` but a *different* session key +3. If found → that process is orphaned. If idle: kill immediately, respawn into warm pool. If busy: mark for reclamation after current request completes. + +This catches resets instantly without waiting for the nightly sweep. + +### Orphan Queue Rejection (Major — Finding N1) + +When a session reset triggers orphan reclamation (see "Orphan Reclamation" above), the orphaned process may have queued requests from the old session key. These requests belong to a dead session — the gateway has already moved on to a new session key. + +**Rule:** When a process is marked as orphaned: +- If idle: kill immediately, respawn into warm pool +- If busy with an active request: let the active request complete, then kill +- **All queued requests on the orphaned process are rejected immediately with HTTP 503 + `Retry-After: 3`** — they belong to the dead session key. The gateway will retry with the new key, which will route to the new process. + +Do NOT drain queued requests on an orphaned process. The queue contents are stale — they were enqueued under a session key that no longer exists. + +### Canonical Lock Clearing (Major — Finding N8) + +Multiple flows remove session locks: timeout recovery, context recycling, orphan reclamation, process death, sweep, and shutdown. Each MUST use the same canonical cleanup sequence: + +``` +function clearSessionLock(sessionKey: string, process: PooledProcess): + 1. lockedSessions.delete(sessionKey) // remove the mapping entirely + 2. process.lockedTo = null // clear the process's back-reference + 3. process.agentChannel = null // clear lineage key + 4. process.requestCount = 0 // reset counter for reuse + 5. // caller then either: kills the process, or returns it to warmPool +``` + +**Rule:** Every code path that unlocks a session MUST call this single function. No inline `lockedSessions.delete()` scattered across the codebase. This prevents stale artifacts (e.g., deleting the map entry but leaving `process.lockedTo` set, or forgetting to clear lineage). + +Deliverable: a private `clearSessionLock()` method on `SessionPoolRouter`. All unlock paths call it. + +### Per-Request Timeout (Major — Finding N2) + +If a CLI process hangs (stuck inference, deadlocked stdin/stdout pipe, unresponsive model), it stays in "busy" state indefinitely. The per-process queue fills to max depth, then all subsequent requests get 429'd. The nightly sweep skips busy processes. The session is permanently wedged. + +**Solution:** `POOL_REQUEST_TIMEOUT_MS` (default 300000 / 5 minutes). + +When a request has been in-flight longer than this threshold: +1. Treat the process as dead — trigger the standard process death recovery flow +2. The active request's emitter receives a timeout error +3. Reject all queued requests with 503 + `Retry-After: 3` +4. Remove the process from `lockedSessions` +5. Spawn a replacement into the warm pool +6. Log the timeout event with session key, PID, and elapsed time + +**Why 5 minutes:** The existing proxy has a 15-minute timeout for subprocess-per-request. Pooled processes should be tighter — a 5-minute inference is extremely unusual and likely indicates a hang. The model typically responds in 10-60 seconds. This is configurable via env var. + +### Atomic Pool Claim (Major — Finding N3) + +Two simultaneous requests for the same *new* session key can race: both check `lockedSessions.has(key)` → false, both attempt to claim from the warm pool. One overwrites the other's lock, orphaning a process with no lineage reclamation. + +Node.js is single-threaded, but the `execute()` path has async yield points (cold spawn with `await`). Between the `has()` check and the lock write, another request can enter the same path. + +**Solution:** Set a pending-lock sentinel immediately (synchronously) on first touch: + +``` +execute(prompt, model, sessionKey): + if lockedSessions.has(sessionKey): + // route to existing locked process (or queue) + else: + // SYNCHRONOUS: set sentinel before any async work + lockedSessions.set(sessionKey, PENDING_SENTINEL) + try: + process = warmPool[model].pop() ?? await spawnCold(model) + lockedSessions.set(sessionKey, process) // replace sentinel with real process + catch: + lockedSessions.delete(sessionKey) // clean up sentinel on failure + throw + + // Second request for same key hits the `has()` check → true → queues on the sentinel + // When sentinel is replaced with real process, queued requests are drained +``` + +The `PENDING_SENTINEL` is a special marker that causes incoming requests for that key to enqueue. When the real process is assigned, the queue drains. + +**Implementation note (Finding N16):** `lockedSessions` is typed `Map`, but the sentinel is not a real `PooledProcess`. Implement the sentinel as a lightweight object with `{ isPending: true, requestQueue: PendingRequest[] }` — it has a queue (where requests accumulate while the real process is being claimed/spawned) but no actual CLI process. When the real process is assigned, transfer the sentinel's `requestQueue` to the new `PooledProcess` and drain it. The router checks `isPending` to distinguish sentinels from real processes. Type the map as `Map` or use a discriminated union. + +### Failed Cold Spawn Recovery (Major — Finding N9) + +If a `PENDING_SENTINEL` is set and the cold spawn fails (CLI binary missing, auth broken, OOM), the catch block deletes the sentinel — but requests that queued against the sentinel are now waiting on nothing. They hang forever. + +**Rule:** Before deleting the sentinel on spawn failure: +1. Collect all requests queued against this session key's sentinel +2. Reject each with HTTP 503 + `Retry-After: 3` +3. Then delete the sentinel from `lockedSessions` +4. Log the spawn failure with model, session key, and error + +``` +catch (error): + rejectQueuedRequests(sessionKey, 503, "Retry-After: 3") // drain before delete + lockedSessions.delete(sessionKey) + log({ event: "cold_spawn_failed", sessionKey, model, error }) + throw // propagate to the original requester +``` + +### Context Accumulation Threshold (Critical — Finding #3) + +Each request adds its full prompt to the CLI's accumulated context. After 50 requests, the CLI could have 500K+ tokens of accumulated noise, risking context window overflow and degraded responses. + +**Rule:** When a process completes a request and `requestCount > 50`: +- If `requestQueue` is empty → recycle immediately: call `clearSessionLock()`, kill process, respawn into warm pool. Next request from this key claims a new process. +- If `requestQueue` is non-empty → set state to `"recycling"` (prevents new requests from being enqueued by sweep or new arrivals — they get a fresh process instead). Drain the existing queue normally. When queue empties, THEN call `clearSessionLock()`, kill, respawn. **Important (Finding N17):** `clearSessionLock()` resets `requestCount` to 0, so it must be called AFTER the drain-then-recycle decision, not before. The `"recycling"` state is the guard — it signals that this process is committed to being recycled regardless of the counter. + +The threshold is configurable via `POOL_MAX_REQUESTS_PER_PROCESS` (default 50). + +### Nightly Sweep (3:00 AM ET) + +``` +For each locked process: + If lastRequestAt < (now - 2 hours) OR requestCount > 50: + If state == "busy" OR state == "recycling" → skip (don't interrupt active work or double-kill a mid-recycle process — Finding N14) + Kill process + Remove from lockedSessions + Spawn fresh process into warmPool (if below configured size) + +Refill warmPool to configured sizes (check cap before EACH spawn, not once — Finding N11): + for each model in [opus, sonnet]: + while warmPool[model].length < configured size: + if total(locked + warm) >= MAX_TOTAL_PROCESSES: + log warning, stop refilling + break + spawn one process, add to warmPool[model] + +Enforce MAX_TOTAL_PROCESSES: + If total(locked + warm) > MAX_TOTAL_PROCESSES: + Do NOT spawn new warm processes — let the pool recover naturally + Log a warning with current counts +``` + +**DST Handling (Finding #9):** Use `Intl.DateTimeFormat` with `timeZone: 'America/New_York'` to resolve the current ET hour, not a fixed UTC offset. This handles EST/EDT transitions correctly. Alternatively, use `node-cron` with timezone support (`{ timezone: 'America/New_York' }`). + +### Session Reset Handling + +When an agent's session is reset in OpenClaw, the gateway generates a new `sessionId`, which changes the `x-openclaw-session-key`. The proxy sees a new key → lineage tracking detects the orphan → orphan is reclaimed immediately (if idle) or after its current request (if busy) → new key claims a fresh process from the warm pool. + +### Process Death Recovery (Major — Finding #4) + +When a CLI process exits unexpectedly: + +1. **If idle:** Remove from `lockedSessions` (if locked) or from `warmPool`. Spawn replacement into warm pool. +2. **If busy (mid-request):** + - The active request's emitter receives an error event + - Remove the dead process from `lockedSessions` + - **Do NOT auto-retry** — the gateway handles retries at the HTTP level + - If the process had queued requests, reject them with 503 + `Retry-After: 3` + - Spawn a replacement into the warm pool + +The key invariant: a dead process is never left in `lockedSessions`. The `exit` handler atomically removes the mapping and spawns a replacement. + +### Client Disconnect Handling (Major — Finding #6) + +When a client disconnects mid-stream: +- Detach the response emitter (stop sending to the dead connection) +- Let the CLI process finish generating its response (the flat-rate subscription means no cost for wasted inference) +- The process returns to `idle` state, still locked to its session key +- **Context divergence:** The CLI now has a response in its accumulated context that the gateway never received. This is harmless because OpenClaw sends the full messages array with every request — the gateway's context is authoritative, and the CLI's accumulated context is noise that gets recycled away eventually. + +### Auth Token Expiration (Major — Finding #7) + +Long-lived CLI processes (up to 24 hours between sweeps) may outlive their auth token. + +**Detection:** If a CLI process returns an auth error (non-zero exit or error message containing "auth", "unauthorized", "token expired"): +1. Mark the process as dead — trigger the process death recovery flow +2. Log the auth failure for monitoring +3. The replacement process will authenticate fresh on spawn + +**No preemptive refresh needed.** The Claude CLI handles token refresh internally for most cases. This catch handles the edge case where it doesn't. + +### Unknown Model Routing (Major — Finding #8, updated Finding N15) + +The existing `extractModel()` in `openai-to-cli.ts` normalizes model strings: it maps known names to `ClaudeModel` values (`"opus" | "sonnet" | "haiku"`) and defaults unrecognized strings to `"opus"`. This means truly unknown model strings (e.g., `"gpt-4"`) silently become `"opus"` and will route to the opus pool — the "unknown model" fallback path is unreachable via the adapter. + +**The router must check against the set of pooled models, not rely on type-level unknowns:** + +``` +const POOLED_MODELS = new Set(["opus", "sonnet"]); // models that have warm pools + +if (POOLED_MODELS.has(resolvedModel)): + // route through SessionPoolRouter +else: + // fall back to ClaudeSubprocess (e.g., haiku) +``` + +This correctly handles: +- `haiku` — a valid `ClaudeModel` but not pooled → falls back to `ClaudeSubprocess` +- Truly unknown strings — already mapped to `"opus"` by `extractModel()` → routes to opus pool (correct, since the CLI will run opus on the Max subscription regardless) +- Future models — add to `POOLED_MODELS` when a pool is created, otherwise they fall back + +**Do not modify `extractModel()`** — its defaulting behavior is correct for the CLI (which needs a valid model name). The pooling decision is a separate layer. + +Log any non-pooled model name for tracking — if haiku or a future model appears frequently, consider adding a pool. + +### Model Pool Flex Zone (Major — Finding #5) + +Static 6/4 opus/sonnet split can starve one model if usage patterns shift. The warm pool sizing is a *target*, not a hard partition: + +- On startup, spawn `POOL_OPUS_SIZE` opus + `POOL_SONNET_SIZE` sonnet processes +- If the opus warm pool is empty but the sonnet pool has excess (> `POOL_SONNET_SIZE`): **do not cross-assign** (different model). Spawn a cold opus process. +- If total processes < `MAX_TOTAL_PROCESSES`, cold spawns are always allowed regardless of per-model targets +- The 3 AM sweep refills to configured targets, naturally rebalancing + +This is simpler than a dynamic flex zone and maintains the invariant that model pools are pure. The `MAX_TOTAL_PROCESSES` cap (default 30) prevents runaway spawning. + +### Serialization Loss in Fallback Mode (Major — Finding N10) + +When `MAX_TOTAL_PROCESSES` is reached, new session-keyed requests fall back to `ClaudeSubprocess` (subprocess-per-request). Two simultaneous requests for the same session key both get separate subprocesses — no serialization, no locking. The per-process request queue does not apply to fallback subprocesses. + +**This is a known degradation, not a bug.** Fallback mode is a safety valve for when the pool is saturated. In practice: +- If the cap is routinely hit, the correct fix is to increase `MAX_TOTAL_PROCESSES`, not to add serialization to the fallback path. +- Subprocess-per-request was the *entire* architecture before pooling — it works, it's just slower. +- Log a warning each time fallback is used with the current process count, so operators can detect when the cap needs raising. + +## Invariants (non-negotiable) + +- A locked process serves ONLY the session key it is locked to. No exceptions. No "borrowing" idle locked processes. +- A process serves ONLY one request at a time on its stdin pipe. Concurrent requests queue on the per-process request queue. +- The warm pool is partitioned by model. An opus request never gets a sonnet process or vice versa. +- If the warm pool is empty and no process is available, the proxy spawns a cold subprocess (same as current behavior). It NEVER fails a request due to pool exhaustion — it degrades to cold-start or subprocess fallback. +- Total process count (locked + warm) must not exceed `MAX_TOTAL_PROCESSES` (default 30). Beyond this, new requests fall back to `ClaudeSubprocess`. +- The production port (3456) and health endpoint format remain unchanged. Existing OpenClaw provider config works without modification. +- `ClaudeSubprocess` (subprocess-per-request) is retained in the codebase as fallback for unknown models and headerless requests. Fallback subprocesses are NOT counted against `MAX_TOTAL_PROCESSES` — they are short-lived (die after one request) and self-limiting. +- The nightly sweep runs at 3:00 AM America/New_York (DST-aware), not UTC. +- Pool claim for a new session key is atomic: a `PENDING_SENTINEL` is set synchronously before any async work. No two requests for the same new key can both claim a process. +- Every in-flight request has a timeout (`POOL_REQUEST_TIMEOUT_MS`). A hung process is treated as dead and recovered automatically. +- All session lock clearing goes through `clearSessionLock()`. No inline `lockedSessions.delete()` calls. +- Failed cold spawns reject all queued requests before deleting the sentinel. No orphaned waiters. +- Sweep refill checks `MAX_TOTAL_PROCESSES` before each individual spawn, not once at the start. +- Sweep skips processes in `recycling` state (same as `busy`). No double-kill. +- Shutdown closes the listening socket before draining. No new connections accepted during teardown. + +## Forbidden Patterns + +- **No cross-session routing:** A process locked to session A must never serve session B, even if A is idle and B is queued. Claim a new process instead. +- **No model mixing within a process:** A process spawned with `--model opus` must never receive a sonnet request. Model is baked at spawn time. +- **No eager recycling of active sessions:** The sweep recycles only processes idle > 2 hours OR exceeding the request count threshold. A process that served a request 30 minutes ago stays locked, even during the 3 AM sweep (unless it's over the request count limit). **Clarification:** The inline `requestCount > 50` recycling (in "Context Accumulation Threshold") is not eager recycling — it triggers only after a request completes and only when the queue is empty. It is a safety valve, not a sweep. +- **No shared mutable state between pool and routes:** The router exposes `execute()`, `stats()`, and `sweep()` only. Routes do not directly manipulate pool internals. +- **No removal of the existing subprocess manager:** `ClaudeSubprocess` stays for fallback and backward compatibility. +- **No concurrent writes to a process's stdin:** The per-process request queue serializes all writes. Any code path that bypasses the queue is a critical bug. +- **No draining queued requests on orphaned processes:** When lineage reclamation marks a process as orphaned, queued requests are rejected (503), not drained. They belong to a dead session key. +- **No pool claim without sentinel:** The `lockedSessions.set(key, PENDING_SENTINEL)` must execute synchronously before any `await`. Skipping the sentinel creates a race condition between concurrent requests for the same new key. + +## Modules + +### Module 1: SessionPoolRouter (`src/subprocess/router.ts`) + +**Status:** Planning +**Owner:** Max +**Dependencies:** Existing pool.ts (to be refactored), existing openai-to-cli.ts adapter + +**Deliverables:** +- `SessionPoolRouter` class with `execute()`, `stats()`, `sweep()`, `shutdown()` +- Per-model warm pools (opus, sonnet) with configurable sizes +- Session-key locking with `lockedSessions` map +- Per-process FIFO request queue (max depth 3) with 429 backpressure +- Lineage tracking (`agentChannel`) for orphan detection on session reset +- Context accumulation guard: recycle process when `requestCount > POOL_MAX_REQUESTS_PER_PROCESS` +- Cold-start fallback when warm pool is empty (subject to `MAX_TOTAL_PROCESSES` cap) +- `ClaudeSubprocess` fallback when `MAX_TOTAL_PROCESSES` is reached or model is unknown +- Nightly sweep logic (idle > 2 hours OR requestCount > 50 → recycle → refill) +- Process health monitoring: auto-respawn on unexpected death with atomic `lockedSessions` cleanup +- Auth error detection: treat auth failures as process death, trigger recovery flow +- Per-request timeout (`POOL_REQUEST_TIMEOUT_MS`): hung processes treated as dead, triggers recovery +- Atomic pool claim via `PENDING_SENTINEL`: prevents race condition on simultaneous new-key requests +- Orphan queue rejection: queued requests on orphaned processes are rejected (503), not drained +- Canonical `clearSessionLock()` method used by all unlock paths (timeout, recycle, orphan, death, sweep, shutdown) +- Failed cold spawn recovery: reject queued requests before deleting sentinel +- Fallback serialization loss documented and logged (warning when fallback triggered) + +**Acceptance Criteria:** +- [ ] Requests with the same `x-openclaw-session-key` always route to the same process +- [ ] Requests with different session keys never share a process +- [ ] Two rapid requests on the same session key are serialized — second waits for first to complete, no stdin interleaving +- [ ] Fourth request on a busy process (queue depth 3) returns HTTP 429 with `Retry-After: 5` +- [ ] When warm pool is empty, a cold process is spawned and the request succeeds (no failure) +- [ ] When `MAX_TOTAL_PROCESSES` is reached, new session requests fall back to `ClaudeSubprocess` +- [ ] Opus requests only go to opus processes; sonnet to sonnet +- [ ] Unknown model (e.g., haiku) with session key header falls back to `ClaudeSubprocess` +- [ ] Session reset (new key, same agent+channel) immediately orphan-reclaims the old process +- [ ] Process with `requestCount > 50` is recycled on next idle transition (queue drained first) +- [ ] `stats()` returns: `{ total, locked: { total, opus, sonnet }, warm: { opus, sonnet }, busy, queued, orphansReclaimed, totalRequests, processRecycles, routeHits: { locked, warm, cold, fallback } }` +- [ ] Sweep correctly identifies and recycles processes idle > 2 hours OR requestCount > 50 +- [ ] Sweep skips busy processes (never interrupts active work) +- [ ] Sweep refills warm pool to configured size after recycling (respecting `MAX_TOTAL_PROCESSES`) +- [ ] Process death triggers atomic cleanup: remove from `lockedSessions`, reject queued requests with 503, spawn replacement +- [ ] Auth errors (exit code + error message matching) trigger process death recovery +- [ ] Request timeout (`POOL_REQUEST_TIMEOUT_MS`) kills hung process and triggers death recovery +- [ ] Orphan reclamation rejects queued requests on the orphaned process with 503 (does not drain them) +- [ ] Two simultaneous requests for the same new session key do not both claim a process — second request queues behind the pending sentinel +- [ ] `agentChannel` extraction validates session key format and falls back to full key if format is unexpected +- [ ] Graceful shutdown kills all processes and drains/rejects queued requests +- [ ] All unlock paths (timeout, recycle, orphan, death, sweep, shutdown) use `clearSessionLock()` — no inline `lockedSessions.delete()` +- [ ] Failed cold spawn rejects queued requests with 503 before deleting sentinel +- [ ] Fallback to ClaudeSubprocess logs a warning with current total process count +- [ ] Sweep refill checks MAX_TOTAL_PROCESSES before each spawn (not once) +- [ ] Sweep skips processes in `recycling` state + +### Module 2: Route Integration (`src/server/routes.ts`) + +**Status:** Planning +**Owner:** Max +**Dependencies:** Module 1 + +**Deliverables:** +- Extract `x-openclaw-session-key` and `x-openclaw-agent-id` from request headers +- Route through `SessionPoolRouter.execute()` instead of `new ClaudeSubprocess()` +- Fallback to `ClaudeSubprocess` if session key header is missing (backward compat) +- Client disconnect handling: detach emitter, let process finish, return to locked-idle state +- Structured logging: log session key, model, process PID, latency, queue depth per request + +**Acceptance Criteria:** +- [ ] Requests with `x-openclaw-session-key` header use pooled routing +- [ ] Requests without the header fall back to subprocess-per-request (existing behavior) +- [ ] Streaming and non-streaming both work through the pool +- [ ] Client disconnect does not kill the pooled process — it completes and returns to locked-idle +- [ ] Health endpoint includes pool stats from `stats()` +- [ ] No changes to the response format (OpenAI-compatible output unchanged) +- [ ] Each request logs: `{ sessionKey, model, processPid, latencyMs, queueDepth, cacheHit: "locked"|"warm"|"cold"|"fallback" }` + +### Module 3: Server Startup & Sweep Scheduling (`src/server/standalone.ts`) + +**Status:** Planning +**Owner:** Max +**Dependencies:** Module 1 + +**Deliverables:** +- Initialize `SessionPoolRouter` at server startup with configured pool sizes +- Schedule nightly sweep at 3:00 AM ET using `node-cron` with `timezone: 'America/New_York'` (DST-aware) +- Environment variable configuration: + - `POOL_OPUS_SIZE` (default 6) — warm opus processes + - `POOL_SONNET_SIZE` (default 4) — warm sonnet processes + - `POOL_MAX_REQUESTS_PER_PROCESS` (default 50) — context accumulation threshold + - `MAX_TOTAL_PROCESSES` (default 30) — hard cap on all pool processes (locked + warm) + - `SWEEP_HOUR` (default 3) — hour in ET for nightly sweep + - `SWEEP_IDLE_THRESHOLD_MS` (default 7200000) — idle time before sweep recycles + - `POOL_REQUEST_QUEUE_DEPTH` (default 3) — per-process queue depth + - `POOL_REQUEST_TIMEOUT_MS` (default 300000) — per-request timeout; hung process treated as dead +- Graceful shutdown integration (SIGTERM/SIGINT): + 1. **Immediately** close the listening socket (stop accepting new connections — Finding N12) + 2. Wait for in-flight requests to complete (30s timeout) + 3. Call `SessionPoolRouter.shutdown()` — rejects all queued requests with 503, kills all pool processes + 4. Exit + + **Implementation note (Finding N18):** `standalone.ts` should handle the full shutdown sequence directly rather than delegating to the existing `stopServer()` in `index.ts`. The current `stopServer()` has no pool awareness. The standalone entry point owns both the HTTP server and the pool router, so it is the natural place for the coordinated shutdown: `server.close()` (step 1), then timeout (step 2), then `router.shutdown()` (step 3), then `process.exit()` (step 4). + +**Acceptance Criteria:** +- [ ] Pool initializes with configured sizes on server start +- [ ] Sweep runs at 3:00 AM ET daily, correctly handling EST↔EDT transitions +- [ ] Pool sizes configurable via env vars without code changes +- [ ] Server startup logs pool configuration and initial stats +- [ ] Graceful shutdown closes listening socket immediately on SIGTERM, then drains in-flight (30s timeout) +- [ ] Queued requests are rejected with 503 during shutdown +- [ ] No new connections accepted after shutdown signal + +### Module 4: Prototype Cleanup + +**Status:** Planning +**Owner:** Max +**Dependencies:** Modules 1-3 merged and verified + +**Deliverables:** +- Remove `src/server/standalone-pool.ts` (prototype — superseded by production integration) +- Refactor `src/subprocess/pool.ts` into the router or remove if fully superseded +- Update CLAUDE.md with new architecture documentation +- **Separate commit from Modules 1-3** (allows clean revert if prototype removal causes issues) + +**Acceptance Criteria:** +- [ ] No orphaned prototype files in the codebase +- [ ] CLAUDE.md reflects the pooled architecture +- [ ] Build succeeds with no unused imports or dead code +- [ ] Committed separately from the main pooling implementation + +## Validation Criteria + +What "done" looks like — these must be verified by someone other than the builder: + +1. **Functional:** Send 5 requests from different session keys → each gets a dedicated process. Send a 6th from a key that already has a process → routed to the existing one (verify via logs or health endpoint). +2. **Serialization:** Send 2 rapid requests on the same session key → both succeed, responses are correct and not interleaved. Verify via log timestamps that the second waited for the first. +3. **Backpressure:** Send 5 rapid requests on the same session key (queue depth 3) → first executes, next 3 queue, 5th returns 429. +4. **Concurrency:** Send 3 simultaneous requests from different keys → all served in parallel with no queueing. +5. **Cold start:** Send a request after all warm processes are claimed → new process spawned, request succeeds, latency logged. +6. **Overflow cap:** Fill the pool to `MAX_TOTAL_PROCESSES`, send another request with a new session key → falls back to `ClaudeSubprocess`, still succeeds. +7. **Sweep:** Manually trigger sweep → processes idle > 2 hours recycled, processes with requestCount > 50 recycled, active processes retained. Warm pool refilled. +8. **Session reset:** Send request with key A, then send with key A' (different session ID, same agent+channel) → gets a fresh process. Key A's process reclaimed immediately. +9. **Orphan reclamation:** Simulate 5 rapid session resets from the same agent → verify no orphan accumulation (all old processes reclaimed, not leaked until 3 AM). +10. **Process death:** Kill a locked process's PID externally → verify: removed from lockedSessions, replacement spawned, next request on that key gets a new process. +11. **Client disconnect:** Start a streaming request, disconnect mid-stream → process not killed, returns to locked-idle state, next request on same key works. +12. **Context degradation:** Send 60 requests through the same locked process → verify the process is recycled after request 50 (on next idle). Verify subsequent request gets a fresh process. +13. **Backward compat:** Send a request without `x-openclaw-session-key` header → falls back to subprocess-per-request. No error. +14. **Unknown model:** Send a request with session key + model "haiku" → falls back to `ClaudeSubprocess`. No error. +15. **Health:** `/health` endpoint returns pool stats including locked/warm/busy/queued/orphansReclaimed counts. Stats include aggregate route-hit counters: locked, warm, cold, fallback. +16. **Request timeout:** Simulate a hung process (e.g., suspend with SIGSTOP). Verify request times out after `POOL_REQUEST_TIMEOUT_MS`, process is killed and replaced, queued requests rejected with 503. +17. **Atomic claim:** Send 2 simultaneous requests for the same new session key → only one process claimed, second request queued and served after the first completes. +18. **Orphan queue rejection:** Enqueue 2 requests on a locked process, then trigger a session reset. Verify queued requests receive 503 (not drained through the orphaned process). +19. **Build:** `npm run build` succeeds with no errors or warnings. +20. **Lock clearing consistency:** Grep the codebase for `lockedSessions.delete` — it should appear ONLY inside `clearSessionLock()`. No other direct deletions. +21. **Failed cold spawn:** Simulate a spawn failure (e.g., invalid CLAUDE_BIN) after sentinel is set with queued requests → queued requests receive 503, sentinel is cleaned up. +22. **Sweep refill cap:** Set MAX_TOTAL_PROCESSES=12, fill 10 locked + 2 warm. Trigger sweep that recycles 1 → refill spawns 1. Verify total never exceeds 12. +23. **Shutdown socket close:** Send SIGTERM, then immediately attempt a new connection → connection refused (socket closed). In-flight requests complete within 30s. + +## Observability (Finding #12) + +### Logging + +Every request logs a structured JSON line: +```json +{ + "ts": "2026-03-21T17:30:00Z", + "event": "request", + "sessionKey": "agent:scope:discord:channel:...", + "model": "opus", + "pid": 12345, + "latencyMs": 1770, + "queueDepth": 0, + "routeType": "locked|warm|cold|fallback", + "requestCount": 12 +} +``` + +Pool lifecycle events (spawn, recycle, death, orphan-reclaim, sweep) are also logged as structured JSON. + +### Health Endpoint + +`GET /health` returns: +```json +{ + "status": "ok", + "provider": "claude-code-cli", + "pool": { + "total": 12, + "locked": { "total": 7, "opus": 5, "sonnet": 2 }, + "warm": { "opus": 3, "sonnet": 2 }, + "busy": 2, + "queued": 0, + "maxTotal": 30, + "orphansReclaimed": 3, + "totalRequests": 142, + "processRecycles": 5, + "requestTimeouts": 0, + "routeHits": { "locked": 98, "warm": 32, "cold": 8, "fallback": 4 }, + "uptime": 43200 + } +} +``` + +## Decision Log + +| Date | Decision | Rationale | Alternatives Considered | +|------|----------|-----------|------------------------| +| 2026-03-21 | Per-session-key locking over shared stateless pool | CLI accumulates context in stream-json mode (empirically verified). Shared pool causes cross-agent contamination. Locking prevents this. | Shared pool + aggressive recycling — rejected because it doesn't prevent cross-contamination within the recycling window | +| 2026-03-21 | Per-model pools over single mixed pool | CLI `--model` flag is set at spawn time, cannot be changed per-request in stream-json mode | Single pool with model flag per message — not supported by CLI | +| 2026-03-21 | 3 AM ET sweep over idle timeout | Simpler rules (one daily check vs. continuous timeout tracking). Session resets handle intra-day cleanup. User controls context freshness via manual reset. | Rolling idle timeout (30 min) — adds complexity, may recycle processes the user wants kept warm | +| 2026-03-21 | 6 opus + 4 sonnet pool sizing | Matches observed usage pattern: ~70% opus, ~30% sonnet. 10 total covers expected 5-concurrent peak with buffer. Configurable via env vars. | 5+5 — doesn't match actual model distribution | +| 2026-03-21 | Retain ClaudeSubprocess as fallback | Backward compatibility for requests without session key header. Also serves unknown models (haiku, future models) not in the pool. | Remove entirely — breaks headerless requests, limits future model support | +| 2026-03-21 | Per-process request queue (max 3) + 429 backpressure | **Rev 2 — Finding #1.** CLI stdin pipe cannot handle concurrent writes. Serialization via FIFO queue prevents data corruption. Max depth 3 with 429 prevents unbounded memory growth. | Reject immediately if busy (too aggressive — normal gateway retry creates brief bursts). Unbounded queue (memory risk). | +| 2026-03-21 | Lineage-based orphan reclamation | **Rev 2 — Finding #2.** Session resets orphan processes. Without immediate reclamation, 20+ resets/day = 20+ leaked processes until 3 AM. Lineage key (agent+channel) detects orphans on the spot. | Wait for 3 AM sweep (unacceptable leak rate). Timer-based idle reclamation (adds complexity, doesn't catch rapid resets). | +| 2026-03-21 | Request count threshold (50) for recycling | **Rev 2 — Finding #3.** CLI accumulates context from every request. 50 requests ≈ 100-200K tokens of noise. Recycling at this threshold prevents context window overflow while staying well within safe bounds for a day's active session. | No threshold, rely on manual resets only (risky for agents that never reset). Lower threshold like 20 (too aggressive, causes unnecessary cold starts). | +| 2026-03-21 | MAX_TOTAL_PROCESSES cap (30) | **Rev 2 — Finding #12.** Prevents runaway process spawning from consuming all system memory. 30 processes × 200MB = 6GB worst case — well within 128GB. Beyond 30, requests fall back to ClaudeSubprocess. | No cap (dangerous). Per-model cap only (doesn't prevent total runaway). | +| 2026-03-21 | Reject (not drain) queued requests on orphaned processes | **Rev 3 — Finding N1.** Queued requests belong to the dead session key. Draining them serves stale requests and delays the process kill. Rejecting with 503 lets the gateway retry with the new key. | Drain queue first (serves stale-session requests, delays reclamation). | +| 2026-03-21 | Per-request timeout (5 min default) | **Rev 3 — Finding N2.** Without a timeout, a hung CLI process permanently wedges a session. 5 min is generous for inference (typical: 10-60s) but catches real hangs. Triggers standard death recovery. | No timeout (process stays busy forever). 15-min matching old proxy (too long for pooled — blocks the session). | +| 2026-03-21 | PENDING_SENTINEL for atomic pool claim | **Rev 3 — Finding N3.** Async yield points between `has()` and `set()` create a window for duplicate claims. Synchronous sentinel closes the window. Second request queues instead of claiming. | Mutex/lock (overkill for single-threaded Node). Accept the race (orphans a process silently). | +| 2026-03-21 | Canonical clearSessionLock() method | **Rev 4 — Finding N8.** Six different flows clear session locks with inconsistent cleanup. Single method prevents stale artifacts (dangling lockedTo, leftover lineage keys). | Inline cleanup per flow (error-prone, already caused inconsistency). | +| 2026-03-21 | Reject queued requests on failed cold spawn | **Rev 4 — Finding N9.** Sentinel deletion without queue drain leaves waiters hanging forever. Explicit rejection before deletion ensures no orphaned promises. | Let waiters timeout naturally (up to 5 min hang per request). | +| 2026-03-21 | Document fallback serialization loss as known degradation | **Rev 4 — Finding N10.** Adding serialization to ClaudeSubprocess fallback adds complexity to a safety valve that shouldn't be hit often. If it's hit often, increase the cap. | Add fallback serialization (complexity for a rare path). | +| 2026-03-21 | Route by POOLED_MODELS set, not type system | **Rev 5 — Finding N15.** `extractModel()` defaults unknowns to "opus" — the type system can't distinguish pooled from non-pooled. Explicit set check is the correct routing mechanism. | Modify extractModel to return null (breaks existing callers). | +| 2026-03-21 | Standalone.ts owns shutdown sequence | **Rev 5 — Finding N18.** The server module has no pool awareness. Rather than threading the router through `startServer`/`stopServer`, let the entry point coordinate both. | Thread router into index.ts (unnecessary coupling). | + +## Review History + +| Rev | Date | Reviewer | Findings | Status | +|-----|------|----------|----------|--------| +| 1 | 2026-03-21 | Opus sub-agent | 14 (3C/5M/6m) | All resolved in Rev 2 | +| 2 | 2026-03-21 | Opus sub-agent | 7 (0C/3M/4m) | All resolved in Rev 3 | +| 3 | 2026-03-21 | Opus sub-agent | 7 (0C/3M/4m) | All resolved in Rev 4 | +| 4 | 2026-03-21 | Opus sub-agent | 4 (0C/1M/3m) | All resolved in Rev 5 | + +### Finding Resolution Index + +| # | Severity | Finding | Resolution | +|---|----------|---------|------------| +| 1 | CRITICAL | No per-process request serialization | Per-process FIFO request queue (max 3) + 429 backpressure. See "Per-Process Request Serialization" section. | +| 2 | CRITICAL | Orphaned processes leak until 3 AM | Lineage tracking via `agentChannel` field. Immediate reclamation on session reset. See "Orphan Reclamation" section. | +| 3 | CRITICAL | No context accumulation mitigation | `requestCount > 50` triggers recycling on next idle. Configurable via env var. See "Context Accumulation Threshold" section. | +| 4 | MAJOR | Process death while locked — race condition | Atomic cleanup in `exit` handler: remove from lockedSessions, reject queued requests with 503, spawn replacement. See "Process Death Recovery" section. | +| 5 | MAJOR | Static pool sizing can starve one model | Pools are targets, not hard partitions. Cold spawns allowed up to MAX_TOTAL_PROCESSES. See "Model Pool Flex Zone" section. | +| 6 | MAJOR | Client disconnect leaves CLI context diverged | Documented as harmless — OpenClaw sends full context every request, CLI accumulation is noise. See "Client Disconnect Handling" section. | +| 7 | MAJOR | No auth expiration handling | Auth errors treated as process death, triggering standard recovery flow. See "Auth Token Expiration" section. | +| 8 | MAJOR | Unknown model + session key — ambiguous routing | Unknown models bypass pool entirely, fall back to ClaudeSubprocess. See "Unknown Model Routing" section. | +| 9 | MINOR | DST handling for sweep | Use `node-cron` with `timezone: 'America/New_York'` or `Intl.DateTimeFormat`. Specified in "Nightly Sweep" section. | +| 10 | MINOR | `stats()` interface not defined | Full interface specified in "Health Endpoint" section under Observability. | +| 11 | MINOR | No validation test for context degradation | Added as Validation Criterion #12: send 60 requests, verify recycling at 50. | +| 12 | MINOR | Prototype cleanup should be separate commit | Module 4 now specifies separate commit. | +| 13 | MINOR | No logging/observability spec | Added "Observability" section with structured logging and health endpoint schemas. | +| 14 | MINOR | Pool size conflates warm vs. total | Added `MAX_TOTAL_PROCESSES` (default 30) as hard cap distinct from per-model warm targets. | +| N1 | MAJOR | Orphan reclamation races with in-flight queue | Queued requests on orphaned processes are rejected (503), not drained. They belong to the dead session. See "Orphan Queue Rejection" section. | +| N2 | MAJOR | No per-request timeout for hung processes | `POOL_REQUEST_TIMEOUT_MS` (default 300000). Timeout triggers process death recovery. See "Per-Request Timeout" section. | +| N3 | MAJOR | Pool claim not atomic — race on new session key | `PENDING_SENTINEL` set synchronously before async work. See "Atomic Pool Claim" section. | +| N4 | MINOR | `agentChannel` extraction assumes fixed key format | Validation added — falls back to full session key if format is unexpected. AC added to Module 1. | +| N5 | MINOR | Off-by-one in backpressure validation criterion | Fixed wording: "first executes, next 3 queue, 5th returns 429." | +| N6 | MINOR | ClaudeSubprocess fallback not counted in MAX_TOTAL_PROCESSES | Documented as intentional — fallback processes are short-lived and self-limiting. | +| N7 | MINOR | No aggregate route-hit counters in stats() | Added `routeHits: { locked, warm, cold, fallback }` to health endpoint. | +| N8 | MAJOR | Inconsistent lock-clearing language across flows | Canonical `clearSessionLock()` method — all unlock paths must call it. See "Canonical Lock Clearing" section. | +| N9 | MAJOR | Queued requests orphaned on failed cold spawn | Reject queued requests with 503 before deleting sentinel on spawn failure. See "Failed Cold Spawn Recovery" section. | +| N10 | MAJOR | Serialization loss in ClaudeSubprocess fallback | Documented as known degradation. Log warning when fallback triggered. See "Serialization Loss in Fallback Mode" section. | +| N11 | MINOR | Sweep refill could overshoot MAX_TOTAL_PROCESSES | Check cap before each individual spawn in refill loop. Updated in "Nightly Sweep" section. | +| N12 | MINOR | Shutdown doesn't specify when to stop accepting connections | Close listening socket immediately on SIGTERM. Updated in Module 3. | +| N13 | MINOR | Health endpoint missing locked counts per model | Added `locked: { total, opus, sonnet }` to health endpoint. | +| N14 | MINOR | Sweep could double-kill a process mid-inline-recycle | Sweep skips processes in `recycling` state. Updated in "Nightly Sweep" section. | +| N15 | MAJOR | `extractModel` defaults unknowns to "opus", defeating unknown-model fallback | Router checks against `POOLED_MODELS` set, not type-level unknowns. `extractModel` unchanged. See updated "Unknown Model Routing" section. | +| N16 | MINOR | PENDING_SENTINEL has no queue data structure | Sentinel is a `{ isPending: true, requestQueue: [] }` object. Queue transfers to real process on claim. See updated "Atomic Pool Claim" section. | +| N17 | MINOR | `clearSessionLock` resets requestCount before drain completes | Drain-then-recycle sets `"recycling"` state first, calls `clearSessionLock` only after drain completes. See updated "Context Accumulation Threshold" section. | +| N18 | MINOR | Shutdown has no mechanism to access pool from server module | `standalone.ts` owns the full shutdown sequence directly. See updated Module 3. | + +## Pre-Change Impact Statement + +**Risk:** 🟡 Yellow + +**Impact on dependent systems:** +- **OpenClaw gateway:** No changes required. Gateway already sends `x-openclaw-session-key` and `x-openclaw-agent-id` headers. Pool is transparent — same endpoint, same response format. +- **All 20+ agent sessions:** Transparent improvement. Agents see faster responses, no behavioral change. +- **systemd service:** No changes to the service file. Same binary, same port. +- **Monitoring:** Health endpoint gains pool stats — additive, not breaking. + +**What could break:** +- If the session key header extraction is wrong, all requests fall through to subprocess (safe degradation, but no pooling benefit) +- If pool process dies mid-request and respawn races with the next request on the same key, the next request could get a cold start instead of its locked process (handled by atomic cleanup in Finding #4 resolution) +- If the 3 AM sweep timezone calculation is wrong, sweep runs at wrong time (consequences are minor — processes accumulate a few extra hours of context; mitigated by DST-aware scheduling in Finding #9 resolution) +- If `MAX_TOTAL_PROCESSES` is set too low, more requests fall back to subprocess than expected (safe degradation, adjust the cap) + +**Rollback path:** +1. Set `POOL_OPUS_SIZE=0` and `POOL_SONNET_SIZE=0` → all requests fall back to `ClaudeSubprocess` (no pooling, same as current behavior) +2. Or: revert to the pre-pooling commit — `ClaudeSubprocess` is never removed + +**What needs testing:** +- End-to-end request with session key header → confirm routing to locked process +- Concurrent requests from different keys → confirm parallel execution +- Rapid requests on same key → confirm serialization, no interleaving +- Process death → confirm auto-respawn and session mapping cleanup +- Session reset → confirm orphan reclamation +- Sweep with mix of active and idle processes → confirm correct recycling +- Request without header → confirm fallback to subprocess diff --git a/src/adapter/openai-to-cli.ts b/src/adapter/openai-to-cli.ts index 5774e34..7428158 100644 --- a/src/adapter/openai-to-cli.ts +++ b/src/adapter/openai-to-cli.ts @@ -7,7 +7,8 @@ import type { OpenAIChatRequest, OpenAIContentBlock } from "../types/openai.js"; export type ClaudeModel = "opus" | "sonnet" | "haiku"; export interface CliInput { - prompt: string; + prompt: string; // Full prompt (system + history + user) — for first turn + latestPrompt: string; // Latest user message only — for subsequent turns model: ClaudeModel; sessionId?: string; } @@ -61,7 +62,11 @@ function extractText(content: string | OpenAIContentBlock[]): string { } if (Array.isArray(content)) { return content - .filter((block) => block.type === "text" || block.type === "input_text") + .filter( + (block) => + (block.type === "text" || block.type === "input_text") && + block.text != null + ) .map((block) => block.text) .join("\n"); } @@ -132,12 +137,29 @@ export function messagesToPrompt( return parts.join("\n").trim(); } +/** + * Extract only the latest user message from the messages array. + * Used by pooled processes on subsequent turns (requestCount > 0) + * where the CLI already has system context and prior turns in memory. + */ +export function latestUserMessage( + messages: OpenAIChatRequest["messages"] +): string { + for (let i = messages.length - 1; i >= 0; i--) { + if (messages[i].role === "user") { + return extractText(messages[i].content); + } + } + return ""; +} + /** * Convert OpenAI chat request to CLI input format */ export function openaiToCli(request: OpenAIChatRequest): CliInput { return { prompt: messagesToPrompt(request.messages), + latestPrompt: latestUserMessage(request.messages), model: extractModel(request.model), sessionId: request.user, // Use OpenAI's user field for session mapping }; diff --git a/src/server/routes.ts b/src/server/routes.ts index 8695879..8e6a6d3 100644 --- a/src/server/routes.ts +++ b/src/server/routes.ts @@ -1,25 +1,48 @@ /** * API Route Handlers * - * Implements OpenAI-compatible endpoints for Clawdbot integration + * Implements OpenAI-compatible endpoints for Clawdbot integration. + * Routes session-keyed requests through SessionPoolRouter; falls back to + * ClaudeSubprocess for headerless or non-pooled requests. */ import type { Request, Response } from "express"; import { v4 as uuidv4 } from "uuid"; import { ClaudeSubprocess } from "../subprocess/manager.js"; +import { + SessionPoolRouter, + type ExecuteResult, +} from "../subprocess/router.js"; import { openaiToCli } from "../adapter/openai-to-cli.js"; import { cliResultToOpenai, createDoneChunk, } from "../adapter/cli-to-openai.js"; -import type { OpenAIChatRequest, OpenAIToolCall } from "../types/openai.js"; -import type { ClaudeCliAssistant, ClaudeCliResult, ClaudeCliStreamEvent } from "../types/claude-cli.js"; +import type { OpenAIChatRequest } from "../types/openai.js"; +import type { + ClaudeCliAssistant, + ClaudeCliResult, + ClaudeCliStreamEvent, +} from "../types/claude-cli.js"; + +// --------------------------------------------------------------------------- +// Module-level router reference (set by standalone.ts at startup) +// --------------------------------------------------------------------------- + +let poolRouter: SessionPoolRouter | null = null; + +export function setPoolRouter(router: SessionPoolRouter): void { + poolRouter = router; +} + +export function getPoolRouter(): SessionPoolRouter | null { + return poolRouter; +} + +// --------------------------------------------------------------------------- +// POST /v1/chat/completions +// --------------------------------------------------------------------------- -/** - * Handle POST /v1/chat/completions - * - * Main endpoint for chat requests, supports both streaming and non-streaming - */ export async function handleChatCompletions( req: Request, res: Response @@ -27,10 +50,26 @@ export async function handleChatCompletions( const requestId = uuidv4().replace(/-/g, "").slice(0, 24); const body = req.body as OpenAIChatRequest; const stream = body.stream === true; + const startTime = Date.now(); + const earlySessionKey = (req.headers["x-openclaw-session-key"] as string | undefined) || (body as any).sessionId; + console.log(JSON.stringify({ + ts: new Date().toISOString(), + event: "request_received", + requestId, + sessionKey: earlySessionKey || "(none)", + model: body.model || "(none)", + stream, + messageCount: body.messages?.length ?? 0, + contentLength: req.headers["content-length"] || "(none)", + })); try { // Validate request - if (!body.messages || !Array.isArray(body.messages) || body.messages.length === 0) { + if ( + !body.messages || + !Array.isArray(body.messages) || + body.messages.length === 0 + ) { res.status(400).json({ error: { message: "messages is required and must be a non-empty array", @@ -41,8 +80,69 @@ export async function handleChatCompletions( return; } - // Convert to CLI input format const cliInput = openaiToCli(body); + const sessionKey = (req.headers["x-openclaw-session-key"] as string | undefined) || cliInput.sessionId; + const agentId = req.headers["x-openclaw-agent-id"] as string | undefined; + + // --- Pool routing --- + if (sessionKey && poolRouter) { + const result = poolRouter.execute( + cliInput.prompt, + cliInput.latestPrompt, + cliInput.model, + sessionKey, + body.messages.length + ); + + if (result) { + // Pooled route + const { emitter, routeType, pid, queueDepth } = result; + console.log(JSON.stringify({ + ts: new Date().toISOString(), + event: "request_routed", + requestId, + sessionKey, + model: cliInput.model, + routeType, + pid, + queueDepth, + elapsedMs: Date.now() - startTime, + })); + + if (stream) { + await handlePooledStreaming( + req, + res, + emitter, + requestId, + startTime, + sessionKey, + agentId, + cliInput.model, + routeType, + pid, + queueDepth + ); + } else { + await handlePooledNonStreaming( + res, + emitter, + requestId, + startTime, + sessionKey, + agentId, + cliInput.model, + routeType, + pid, + queueDepth + ); + } + return; + } + // result === null → fall through to ClaudeSubprocess + } + + // --- Fallback: ClaudeSubprocess (no session key, unpooled model, or at capacity) --- const subprocess = new ClaudeSubprocess(); if (stream) { @@ -66,21 +166,280 @@ export async function handleChatCompletions( } } -/** - * Convert Claude tool_use ID to OpenAI-compatible call ID. - * Claude uses "toolu_abc123", OpenAI uses "call_abc123". - */ -function toOpenAICallId(claudeId: string): string { - return `call_${claudeId.replace("toolu_", "")}`; +// --------------------------------------------------------------------------- +// Pooled streaming response +// --------------------------------------------------------------------------- + +async function handlePooledStreaming( + _req: Request, + res: Response, + emitter: ExecuteResult["emitter"], + requestId: string, + startTime: number, + sessionKey: string, + agentId: string | undefined, + model: string, + routeType: string, + pid: number | null, + queueDepth: number +): Promise { + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Request-Id", requestId); + res.flushHeaders(); + res.write(":ok\n\n"); + + return new Promise((resolve) => { + let isFirst = true; + let lastModel = "claude-sonnet-4"; + let isComplete = false; + let hasEmittedText = false; + + + + const onTextBlockStart = () => { + if (hasEmittedText && !res.writableEnded) { + const sepChunk = { + id: `chatcmpl-${requestId}`, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model: lastModel, + choices: [ + { index: 0, delta: { content: "\n\n" }, finish_reason: null }, + ], + }; + res.write(`data: ${JSON.stringify(sepChunk)}\n\n`); + } + }; + + const onContentDelta = (event: ClaudeCliStreamEvent) => { + const delta = event.event.delta; + const text = (delta?.type === "text_delta" && delta.text) || ""; + if (text && !res.writableEnded) { + const chunk = { + id: `chatcmpl-${requestId}`, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model: lastModel, + choices: [ + { + index: 0, + delta: { + role: isFirst ? ("assistant" as const) : undefined, + content: text, + }, + finish_reason: null, + }, + ], + }; + res.write(`data: ${JSON.stringify(chunk)}\n\n`); + isFirst = false; + hasEmittedText = true; + } + }; + + const onAssistant = (message: ClaudeCliAssistant) => { + lastModel = message.message.model; + }; + + const onResult = (result: ClaudeCliResult) => { + isComplete = true; + const latencyMs = Date.now() - startTime; + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "request", + sessionKey, + agentId, + model, + pid, + latencyMs, + queueDepth, + routeType, + cacheHit: routeType, + requestCount: result.num_turns, + }) + ); + + if (!res.writableEnded) { + const doneChunk = createDoneChunk(requestId, lastModel); + if (result.usage) { + doneChunk.usage = { + prompt_tokens: result.usage.input_tokens || 0, + completion_tokens: result.usage.output_tokens || 0, + total_tokens: + (result.usage.input_tokens || 0) + + (result.usage.output_tokens || 0), + }; + } + res.write(`data: ${JSON.stringify(doneChunk)}\n\n`); + res.write("data: [DONE]\n\n"); + res.end(); + } + resolve(); + }; + + const onError = (error: Error) => { + isComplete = true; + emitter.removeListener("text_block_start", onTextBlockStart); + emitter.removeListener("content_delta", onContentDelta); + emitter.removeListener("assistant", onAssistant); + emitter.removeListener("result", onResult); + const latencyMs = Date.now() - startTime; + const errWithStatus = error as Error & { + statusCode?: number; + retryAfter?: number; + }; + + console.error( + JSON.stringify({ + ts: new Date().toISOString(), + event: "request_error", + sessionKey, + model, + pid, + latencyMs, + routeType, + error: error.message, + }) + ); + + if (!res.headersSent) { + const status = errWithStatus.statusCode || 500; + if (status === 429) { + res.setHeader("Retry-After", String(errWithStatus.retryAfter || 5)); + } + res.status(status).json({ + error: { + message: error.message, + type: status === 429 ? "rate_limit_error" : "server_error", + code: null, + }, + }); + } else if (!res.writableEnded) { + res.write( + `data: ${JSON.stringify({ + error: { + message: error.message, + type: "server_error", + code: null, + }, + })}\n\n` + ); + res.end(); + } + resolve(); + }; + + // Client disconnect: remove request-specific listeners and release the + // CLI process so it doesn't stay "busy" with a response nobody consumes. + res.on("close", () => { + if (!isComplete) { + emitter.removeListener("text_block_start", onTextBlockStart); + emitter.removeListener("content_delta", onContentDelta); + emitter.removeListener("assistant", onAssistant); + emitter.removeListener("result", onResult); + emitter.removeListener("error", onError); + + // Tell the router to kill+respawn the process — it's mid-response + // with buffered output and no consumer. + const router = getPoolRouter(); + if (router && sessionKey) { + router.cancelRequest(sessionKey); + } + } + resolve(); + }); + + emitter.on("text_block_start", onTextBlockStart); + emitter.on("content_delta", onContentDelta); + emitter.on("assistant", onAssistant); + emitter.on("result", onResult); + emitter.on("error", onError); + }); } -/** - * Handle streaming response (SSE) - * - * IMPORTANT: The Express req.on("close") event fires when the request body - * is fully received, NOT when the client disconnects. For SSE connections, - * we use res.on("close") to detect actual client disconnection. - */ +// --------------------------------------------------------------------------- +// Pooled non-streaming response +// --------------------------------------------------------------------------- + +async function handlePooledNonStreaming( + res: Response, + emitter: ExecuteResult["emitter"], + requestId: string, + startTime: number, + sessionKey: string, + agentId: string | undefined, + model: string, + routeType: string, + pid: number | null, + queueDepth: number +): Promise { + return new Promise((resolve) => { + emitter.on("result", (result: ClaudeCliResult) => { + const latencyMs = Date.now() - startTime; + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "request", + sessionKey, + agentId, + model, + pid, + latencyMs, + queueDepth, + routeType, + cacheHit: routeType, + requestCount: result.num_turns, + }) + ); + res.json(cliResultToOpenai(result, requestId)); + resolve(); + }); + + emitter.on("error", (error: Error) => { + const latencyMs = Date.now() - startTime; + const errWithStatus = error as Error & { + statusCode?: number; + retryAfter?: number; + }; + + console.error( + JSON.stringify({ + ts: new Date().toISOString(), + event: "request_error", + sessionKey, + model, + pid, + latencyMs, + routeType, + error: error.message, + }) + ); + + if (!res.headersSent) { + const status = errWithStatus.statusCode || 500; + if (status === 429) { + res.setHeader("Retry-After", String(errWithStatus.retryAfter || 5)); + } + res.status(status).json({ + error: { + message: error.message, + type: status === 429 ? "rate_limit_error" : "server_error", + code: null, + }, + }); + } + resolve(); + }); + }); +} + +// --------------------------------------------------------------------------- +// Fallback: ClaudeSubprocess streaming (existing behavior, unchanged) +// --------------------------------------------------------------------------- + async function handleStreamingResponse( req: Request, res: Response, @@ -88,17 +447,11 @@ async function handleStreamingResponse( cliInput: ReturnType, requestId: string ): Promise { - // Set SSE headers res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Request-Id", requestId); - - // CRITICAL: Flush headers immediately to establish SSE connection - // Without this, headers are buffered and client times out waiting res.flushHeaders(); - - // Send initial comment to confirm connection is alive res.write(":ok\n\n"); return new Promise((resolve, reject) => { @@ -106,20 +459,14 @@ async function handleStreamingResponse( let lastModel = "claude-sonnet-4"; let isComplete = false; let hasEmittedText = false; - let toolCallIndex = 0; - let inToolBlock = false; - // Handle actual client disconnect (response stream closed) res.on("close", () => { if (!isComplete) { - // Client disconnected before response completed - kill subprocess subprocess.kill(); } resolve(); }); - // When a new text content block starts after we've already emitted text, - // insert a separator so text from different blocks doesn't run together subprocess.on("text_block_start", () => { if (hasEmittedText && !res.writableEnded) { const sepChunk = { @@ -127,19 +474,18 @@ async function handleStreamingResponse( object: "chat.completion.chunk", created: Math.floor(Date.now() / 1000), model: lastModel, - choices: [{ - index: 0, - delta: { - content: "\n\n", + choices: [ + { + index: 0, + delta: { content: "\n\n" }, + finish_reason: null, }, - finish_reason: null, - }], + ], }; res.write(`data: ${JSON.stringify(sepChunk)}\n\n`); } }); - // Handle streaming content deltas subprocess.on("content_delta", (event: ClaudeCliStreamEvent) => { const delta = event.event.delta; const text = (delta?.type === "text_delta" && delta.text) || ""; @@ -149,14 +495,16 @@ async function handleStreamingResponse( object: "chat.completion.chunk", created: Math.floor(Date.now() / 1000), model: lastModel, - choices: [{ - index: 0, - delta: { - role: isFirst ? "assistant" : undefined, - content: text, + choices: [ + { + index: 0, + delta: { + role: isFirst ? ("assistant" as const) : undefined, + content: text, + }, + finish_reason: null, }, - finish_reason: null, - }], + ], }; res.write(`data: ${JSON.stringify(chunk)}\n\n`); isFirst = false; @@ -164,78 +512,6 @@ async function handleStreamingResponse( } }); - // DISABLED: Tool call forwarding causes an agentic loop — OpenClaw interprets - // Claude Code's internal tool_use (Read, Bash, etc.) as calls it needs to - // handle, triggering repeated requests. Claude Code handles tools internally - // via --print mode; only the final text result should be forwarded. - // TODO: Re-enable with a non-tool_calls display mechanism (e.g. inline text). - // - // subprocess.on("tool_use_start", (event: ClaudeCliStreamEvent) => { - // if (res.writableEnded) return; - // const block = event.event.content_block; - // if (block?.type !== "tool_use") return; - // - // inToolBlock = true; - // const chunk = { - // id: `chatcmpl-${requestId}`, - // object: "chat.completion.chunk", - // created: Math.floor(Date.now() / 1000), - // model: lastModel, - // choices: [{ - // index: 0, - // delta: { - // role: isFirst ? "assistant" : undefined, - // tool_calls: [{ - // index: toolCallIndex, - // id: toOpenAICallId(block.id), - // type: "function" as const, - // function: { - // name: block.name, - // arguments: "", - // }, - // }], - // }, - // finish_reason: null, - // }], - // }; - // res.write(`data: ${JSON.stringify(chunk)}\n\n`); - // isFirst = false; - // }); - // - // subprocess.on("input_json_delta", (event: ClaudeCliStreamEvent) => { - // if (res.writableEnded) return; - // const delta = event.event.delta; - // if (delta?.type !== "input_json_delta") return; - // - // const chunk = { - // id: `chatcmpl-${requestId}`, - // object: "chat.completion.chunk", - // created: Math.floor(Date.now() / 1000), - // model: lastModel, - // choices: [{ - // index: 0, - // delta: { - // tool_calls: [{ - // index: toolCallIndex, - // function: { - // arguments: delta.partial_json, - // }, - // }], - // }, - // finish_reason: null, - // }], - // }; - // res.write(`data: ${JSON.stringify(chunk)}\n\n`); - // }); - // - // subprocess.on("content_block_stop", () => { - // if (inToolBlock) { - // toolCallIndex++; - // inToolBlock = false; - // } - // }); - - // Handle final assistant message (for model name) subprocess.on("assistant", (message: ClaudeCliAssistant) => { lastModel = message.message.model; }); @@ -243,14 +519,14 @@ async function handleStreamingResponse( subprocess.on("result", (result: ClaudeCliResult) => { isComplete = true; if (!res.writableEnded) { - // Send final done chunk with finish_reason and usage data const doneChunk = createDoneChunk(requestId, lastModel); if (result.usage) { doneChunk.usage = { prompt_tokens: result.usage.input_tokens || 0, completion_tokens: result.usage.output_tokens || 0, total_tokens: - (result.usage.input_tokens || 0) + (result.usage.output_tokens || 0), + (result.usage.input_tokens || 0) + + (result.usage.output_tokens || 0), }; } res.write(`data: ${JSON.stringify(doneChunk)}\n\n`); @@ -265,7 +541,11 @@ async function handleStreamingResponse( if (!res.writableEnded) { res.write( `data: ${JSON.stringify({ - error: { message: error.message, type: "server_error", code: null }, + error: { + message: error.message, + type: "server_error", + code: null, + }, })}\n\n` ); res.end(); @@ -274,13 +554,17 @@ async function handleStreamingResponse( }); subprocess.on("close", (code: number | null) => { - // Subprocess exited - ensure response is closed if (!res.writableEnded) { if (code !== 0 && !isComplete) { - // Abnormal exit without result - send error - res.write(`data: ${JSON.stringify({ - error: { message: `Process exited with code ${code}`, type: "server_error", code: null }, - })}\n\n`); + res.write( + `data: ${JSON.stringify({ + error: { + message: `Process exited with code ${code}`, + type: "server_error", + code: null, + }, + })}\n\n` + ); } res.write("data: [DONE]\n\n"); res.end(); @@ -288,20 +572,22 @@ async function handleStreamingResponse( resolve(); }); - // Start the subprocess - subprocess.start(cliInput.prompt, { - model: cliInput.model, - sessionId: cliInput.sessionId, - }).catch((err) => { - console.error("[Streaming] Subprocess start error:", err); - reject(err); - }); + subprocess + .start(cliInput.prompt, { + model: cliInput.model, + sessionId: cliInput.sessionId, + }) + .catch((err) => { + console.error("[Streaming] Subprocess start error:", err); + reject(err); + }); }); } -/** - * Handle non-streaming response - */ +// --------------------------------------------------------------------------- +// Fallback: ClaudeSubprocess non-streaming (existing behavior, unchanged) +// --------------------------------------------------------------------------- + async function handleNonStreamingResponse( res: Response, subprocess: ClaudeSubprocess, @@ -310,23 +596,6 @@ async function handleNonStreamingResponse( ): Promise { return new Promise((resolve) => { let finalResult: ClaudeCliResult | null = null; - // DISABLED: see tool call forwarding comment in handleStreamingResponse - // const accumulatedToolCalls: OpenAIToolCall[] = []; - // - // subprocess.on("assistant", (message: ClaudeCliAssistant) => { - // for (const block of message.message.content) { - // if (block.type === "tool_use") { - // accumulatedToolCalls.push({ - // id: toOpenAICallId(block.id), - // type: "function", - // function: { - // name: block.name, - // arguments: JSON.stringify(block.input), - // }, - // }); - // } - // } - // }); subprocess.on("result", (result: ClaudeCliResult) => { finalResult = result; @@ -359,7 +628,6 @@ async function handleNonStreamingResponse( resolve(); }); - // Start the subprocess subprocess .start(cliInput.prompt, { model: cliInput.model, @@ -378,11 +646,10 @@ async function handleNonStreamingResponse( }); } -/** - * Handle GET /v1/models - * - * Returns available models - */ +// --------------------------------------------------------------------------- +// GET /v1/models +// --------------------------------------------------------------------------- + export function handleModels(_req: Request, res: Response): void { const now = Math.floor(Date.now() / 1000); const modelIds = [ @@ -405,15 +672,20 @@ export function handleModels(_req: Request, res: Response): void { }); } -/** - * Handle GET /health - * - * Health check endpoint - */ +// --------------------------------------------------------------------------- +// GET /health — includes pool stats when available +// --------------------------------------------------------------------------- + export function handleHealth(_req: Request, res: Response): void { - res.json({ + const base: Record = { status: "ok", provider: "claude-code-cli", timestamp: new Date().toISOString(), - }); + }; + + if (poolRouter) { + base.pool = poolRouter.stats(); + } + + res.json(base); } diff --git a/src/server/standalone.ts b/src/server/standalone.ts index 0d4881f..9dcd902 100644 --- a/src/server/standalone.ts +++ b/src/server/standalone.ts @@ -1,25 +1,50 @@ #!/usr/bin/env node /** - * Standalone server for testing without Clawdbot + * Standalone server with session-aware process pooling + * + * Initializes the SessionPoolRouter, schedules the nightly sweep, + * and owns the full graceful-shutdown sequence. * * Usage: * npm run start - * # or * node dist/server/standalone.js [port] */ -import { startServer, stopServer } from "./index.js"; +import cron from "node-cron"; +import { startServer } from "./index.js"; +import { setPoolRouter } from "./routes.js"; +import { SessionPoolRouter } from "../subprocess/router.js"; import { verifyClaude, verifyAuth } from "../subprocess/manager.js"; +import type { Server } from "http"; + +// --------------------------------------------------------------------------- +// Environment configuration +// --------------------------------------------------------------------------- const DEFAULT_PORT = 3456; +const env = { + port: parseInt(process.env.PORT || process.argv[2] || String(DEFAULT_PORT), 10), + opusSize: parseInt(process.env.POOL_OPUS_SIZE || "6", 10), + sonnetSize: parseInt(process.env.POOL_SONNET_SIZE || "4", 10), + maxRequestsPerProcess: parseInt(process.env.POOL_MAX_REQUESTS_PER_PROCESS || "50", 10), + maxTotalProcesses: parseInt(process.env.MAX_TOTAL_PROCESSES || "30", 10), + sweepHour: parseInt(process.env.SWEEP_HOUR || "3", 10), + sweepIdleThresholdMs: parseInt(process.env.SWEEP_IDLE_THRESHOLD_MS || "7200000", 10), + requestQueueDepth: parseInt(process.env.POOL_REQUEST_QUEUE_DEPTH || "3", 10), + requestTimeoutMs: parseInt(process.env.POOL_REQUEST_TIMEOUT_MS || "300000", 10), +}; + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + async function main(): Promise { - console.log("Claude Code CLI Provider - Standalone Server"); - console.log("============================================\n"); + console.log("Claude Code CLI Provider - Session Pool Server"); + console.log("===============================================\n"); - // Parse port from command line - const port = parseInt(process.argv[2] || String(DEFAULT_PORT), 10); - if (isNaN(port) || port < 1 || port > 65535) { + // Validate port + if (isNaN(env.port) || env.port < 1 || env.port > 65535) { console.error(`Invalid port: ${process.argv[2]}`); process.exit(1); } @@ -43,30 +68,120 @@ async function main(): Promise { } console.log(" Authentication: OK\n"); - // Start server + // --- Initialize SessionPoolRouter --- + console.log("Pool configuration:"); + console.log(` Opus pool size: ${env.opusSize}`); + console.log(` Sonnet pool size: ${env.sonnetSize}`); + console.log(` Max total processes: ${env.maxTotalProcesses}`); + console.log(` Max requests/process: ${env.maxRequestsPerProcess}`); + console.log(` Request queue depth: ${env.requestQueueDepth}`); + console.log(` Request timeout: ${env.requestTimeoutMs}ms`); + console.log(` Sweep hour (ET): ${env.sweepHour}:00`); + console.log(` Sweep idle threshold: ${env.sweepIdleThresholdMs}ms\n`); + + const router = new SessionPoolRouter({ + opusSize: env.opusSize, + sonnetSize: env.sonnetSize, + maxRequestsPerProcess: env.maxRequestsPerProcess, + maxTotalProcesses: env.maxTotalProcesses, + requestQueueDepth: env.requestQueueDepth, + requestTimeoutMs: env.requestTimeoutMs, + sweepIdleThresholdMs: env.sweepIdleThresholdMs, + }); + + // Register router with routes module + setPoolRouter(router); + + // Initialize warm pools + await router.initialize(); + + // --- Start HTTP server --- + let server: Server; try { - await startServer({ port }); - console.log("\nServer ready. Test with:"); - console.log(` curl -X POST http://localhost:${port}/v1/chat/completions \\`); - console.log(` -H "Content-Type: application/json" \\`); - console.log(` -d '{"model": "claude-sonnet-4", "messages": [{"role": "user", "content": "Hello!"}]}'`); - console.log("\nPress Ctrl+C to stop.\n"); + server = await startServer({ port: env.port }); } catch (err) { console.error("Failed to start server:", err); + await router.shutdown(); process.exit(1); } - // Handle graceful shutdown - const shutdown = async () => { - console.log("\nShutting down..."); - await stopServer(); + console.log(`\n[Server] Pool stats: ${JSON.stringify(router.stats())}`); + console.log("\nServer ready. Test with:"); + console.log( + ` curl -X POST http://localhost:${env.port}/v1/chat/completions \\` + ); + console.log(` -H "Content-Type: application/json" \\`); + console.log( + ` -d '{"model": "claude-sonnet-4", "messages": [{"role": "user", "content": "Hello!"}]}'` + ); + console.log("\nPress Ctrl+C to stop.\n"); + + // --- Schedule nightly sweep --- + const sweepJob = cron.schedule( + `0 ${env.sweepHour} * * *`, + () => { + console.log("[Sweep] Nightly sweep triggered"); + router.sweep().catch((err: unknown) => { + console.error("[Sweep] Error:", err); + }); + }, + { timezone: "America/New_York" } + ); + + // --- Graceful shutdown --- + let shutdownInProgress = false; + + const shutdown = async (signal: string) => { + if (shutdownInProgress) return; + shutdownInProgress = true; + + console.log(`\n[Shutdown] ${signal} received — starting graceful shutdown`); + + // 1. Stop the cron job + sweepJob.stop(); + + // 2. Close listening socket FIRST (stop new connections) + console.log("[Shutdown] Closing listening socket..."); + server.close(); + + // 3. Wait for in-flight requests (30s timeout) + console.log("[Shutdown] Waiting up to 30s for in-flight requests..."); + await new Promise((resolve) => { + const timeout = setTimeout(() => { + console.log("[Shutdown] 30s timeout reached — forcing shutdown"); + resolve(); + }, 30000); + + // Check if all connections are done + server.on("close", () => { + clearTimeout(timeout); + resolve(); + }); + }); + + // 4. Shutdown the pool router (rejects queued, kills processes) + console.log("[Shutdown] Shutting down pool router..."); + await router.shutdown(); + + // 5. Exit + console.log("[Shutdown] Complete."); process.exit(0); }; - process.on("SIGINT", shutdown); - process.on("SIGTERM", shutdown); + process.on("SIGINT", () => shutdown("SIGINT")); + process.on("SIGTERM", () => shutdown("SIGTERM")); } +process.on("unhandledRejection", (reason: unknown) => { + console.error("[FATAL] Unhandled promise rejection:", reason); + process.exit(1); +}); + +process.on("uncaughtException", (err: Error) => { + console.error("[FATAL] Uncaught exception:", err); + process.exit(1); +}); + main().catch((err) => { console.error("Unexpected error:", err); process.exit(1); diff --git a/src/subprocess/router.ts b/src/subprocess/router.ts new file mode 100644 index 0000000..9387127 --- /dev/null +++ b/src/subprocess/router.ts @@ -0,0 +1,1223 @@ +/** + * Session Pool Router + * + * Maintains per-model warm pools of persistent Claude CLI processes and locks + * them to OpenClaw session keys. Each session key gets a dedicated process — + * no cross-session contamination, no concurrent stdin writes. + * + * See specs/session-pooling.spec.md (Rev 5) for the full design. + */ + +import { spawn, ChildProcess } from "child_process"; +import { EventEmitter } from "events"; + +/** + * Factory that creates an EventEmitter with a permanent no-op error listener + * so it never reaches a zero-listener state. Prevents Node's default throw + * behavior when callers detach their listeners before an async error fires. + * The permanent listener is attached at creation time — no complex tracking needed. + */ +function safeEmitter(): EventEmitter { + const emitter = new EventEmitter(); + // Permanent no-op guard: emitter always has ≥1 error listener, so Node never + // throws on emit("error", ...) regardless of what callers attach or detach. + emitter.on("error", (err: Error) => { + console.error("[Router] Suppressed emitter error:", err?.message ?? err); + }); + return emitter; +} + +import type { + ClaudeCliMessage, + ClaudeCliStreamEvent, +} from "../types/claude-cli.js"; +import { + isAssistantMessage, + isResultMessage, + isContentDelta, + isTextBlockStart, + isToolUseBlockStart, + isInputJsonDelta, + isContentBlockStop, + isSystemInit, +} from "../types/claude-cli.js"; +import type { ClaudeModel } from "../adapter/openai-to-cli.js"; + +// --------------------------------------------------------------------------- +// Tool mapping prompt (shared with manager.ts / pool.ts) +// --------------------------------------------------------------------------- + +const OPENCLAW_TOOL_MAPPING_PROMPT = [ + "## Tool Name Mapping", + "You are running inside Claude Code CLI, not OpenClaw. The system prompt may reference OpenClaw tool names — map them to your actual tools:", + "", + "### Direct tool replacements", + "- `exec` or `process` → use `Bash` (run shell commands)", + "- `read` → use `Read` (read file contents)", + "- `write` → use `Write` (write files)", + "- `edit` → use `Edit` (edit files)", + "- `grep` → use `Grep` (search file contents)", + "- `find` or `ls` → use `Glob` or `Bash(ls ...)`", + "- `web_search` → use `WebSearch`", + "- `web_fetch` → use `WebFetch`", + "- `image` → use `Read` (Claude Code can read images)", + "", + "### OpenClaw CLI tools (use via Bash)", + "These OpenClaw tools are available through the `openclaw` CLI. Use `Bash` to run them:", + '- `memory_search` → `Bash(openclaw memory search "")` — semantic search across memory files', + "- `memory_get` → `Read` on the memory file directly, OR `Bash(openclaw memory search \"\")` for discovery", + '- `message` → `Bash(openclaw message send --to "")` — send messages to channels (Telegram, Discord, etc.)', + " - Also: `openclaw message read`, `openclaw message broadcast`, `openclaw message react`, `openclaw message poll`", + "- `cron` → `Bash(openclaw cron list)`, `Bash(openclaw cron add ...)`, `Bash(openclaw cron status)` — manage scheduled jobs", + " - Also: `openclaw cron rm`, `openclaw cron enable`, `openclaw cron disable`, `openclaw cron runs`, `openclaw cron run`, `openclaw cron edit`", + '- `sessions_list` → `Bash(openclaw agent --local --message "list sessions")` or check session files directly', + '- `sessions_history` → `Bash(openclaw agent --local --message "show history for session ")` or check session files', + "- `nodes` → `Bash(openclaw nodes status)`, `Bash(openclaw nodes describe )`, `Bash(openclaw nodes invoke --node --command )`", + ' - Also: `openclaw nodes run --node ""` for running commands on paired nodes', + "", + "### Not available via CLI", + "- `browser` — requires OpenClaw's dedicated browser server (no CLI equivalent)", + "- `canvas` — requires paired node with canvas capability; use `openclaw nodes invoke` if a node is available", + "", + "### Skills", + "When a skill says to run a bash/python command, use the `Bash` tool directly.", + "Skills are located in the `skills/` directory relative to your working directory.", + "To use a skill: `Read` its SKILL.md file first, then follow the instructions using `Bash`.", + "Run `openclaw skills list --eligible --json` to see all available skills.", +].join("\n"); + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** Models that have dedicated warm pools. */ +export const POOLED_MODELS = new Set(["opus", "sonnet"]); + +export type PooledModel = "opus" | "sonnet"; + +export interface PooledProcess { + id: number; + process: ChildProcess; + model: PooledModel; + lockedTo: string | null; + agentChannel: string | null; + lastRequestAt: number; + spawnedAt: number; + requestCount: number; + lastMessageCount: number; + state: "idle" | "busy" | "recycling"; + requestQueue: PendingRequest[]; + buffer: string; + currentEmitter: EventEmitter | null; + ready: boolean; + requestTimeoutTimer: NodeJS.Timeout | null; + orphaned: boolean; +} + +export interface PendingRequest { + fullPrompt: string; + latestPrompt: string; + messageCount: number; + emitter: EventEmitter; +} + +export interface PendingSentinel { + isPending: true; + requestQueue: PendingRequest[]; +} + +function isPendingSentinel( + v: PooledProcess | PendingSentinel +): v is PendingSentinel { + return (v as PendingSentinel).isPending === true; +} + +export interface PoolRouterConfig { + opusSize: number; + sonnetSize: number; + maxRequestsPerProcess: number; + maxTotalProcesses: number; + requestQueueDepth: number; + requestTimeoutMs: number; + sweepIdleThresholdMs: number; +} + +export interface PoolStats { + total: number; + locked: { total: number; opus: number; sonnet: number }; + warm: { opus: number; sonnet: number }; + busy: number; + queued: number; + maxTotal: number; + orphansReclaimed: number; + totalRequests: number; + processRecycles: number; + requestTimeouts: number; + routeHits: { locked: number; warm: number; cold: number; fallback: number }; + uptime: number; +} + +export interface ExecuteResult { + emitter: EventEmitter; + routeType: "locked" | "warm" | "cold" | "fallback"; + pid: number | null; + queueDepth: number; +} + +// --------------------------------------------------------------------------- +// SessionPoolRouter +// --------------------------------------------------------------------------- + +export class SessionPoolRouter { + private config: PoolRouterConfig; + private lockedSessions = new Map(); + private warmPool = new Map(); + private allProcesses = new Map(); + private nextId = 0; + private shuttingDown = false; + private startedAt = Date.now(); + + // Counters for stats + private orphansReclaimed = 0; + private totalRequests = 0; + private processRecycles = 0; + private requestTimeouts = 0; + private routeHits = { locked: 0, warm: 0, cold: 0, fallback: 0 }; + + constructor(config: Partial = {}) { + this.config = { + opusSize: config.opusSize ?? 6, + sonnetSize: config.sonnetSize ?? 4, + maxRequestsPerProcess: config.maxRequestsPerProcess ?? 50, + maxTotalProcesses: config.maxTotalProcesses ?? 30, + requestQueueDepth: config.requestQueueDepth ?? 3, + requestTimeoutMs: config.requestTimeoutMs ?? 300000, + sweepIdleThresholdMs: config.sweepIdleThresholdMs ?? 7200000, + }; + this.warmPool.set("opus", []); + this.warmPool.set("sonnet", []); + } + + // ------------------------------------------------------------------------- + // Initialization + // ------------------------------------------------------------------------- + + async initialize(): Promise { + console.log( + `[Router] Initializing — opus: ${this.config.opusSize}, sonnet: ${this.config.sonnetSize}` + ); + const promises: Promise[] = []; + for (let i = 0; i < this.config.opusSize; i++) { + promises.push(this.spawnWarm("opus")); + } + for (let i = 0; i < this.config.sonnetSize; i++) { + promises.push(this.spawnWarm("sonnet")); + } + await Promise.all(promises); + console.log( + `[Router] Ready — ${this.allProcesses.size} warm processes spawned` + ); + } + + // ------------------------------------------------------------------------- + // Execute — main entry point + // ------------------------------------------------------------------------- + + execute( + prompt: string, + latestPrompt: string, + model: ClaudeModel, + sessionKey: string, + messageCount: number = 0 + ): ExecuteResult | null { + if (this.shuttingDown) { + const emitter = safeEmitter(); + process.nextTick(() => { + emitter.emit("error", new Error("Server is shutting down")); + }); + return { emitter, routeType: "fallback", pid: null, queueDepth: 0 }; + } + + // Only pool opus and sonnet + if (!POOLED_MODELS.has(model)) { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "fallback_unpooled_model", + model, + sessionKey, + totalProcesses: this.allProcesses.size, + }) + ); + this.routeHits.fallback++; + return null; // caller uses ClaudeSubprocess + } + + const pooledModel = model as PooledModel; + + // --- Lineage-based orphan reclamation --- + const agentChannel = this.extractAgentChannel(sessionKey); + this.reclaimOrphans(sessionKey, agentChannel); + + // --- Check lockedSessions --- + const existing = this.lockedSessions.get(sessionKey); + + if (existing) { + if (isPendingSentinel(existing)) { + return this.enqueueOnSentinel(existing, prompt, latestPrompt, sessionKey, messageCount); + } + + const proc = existing; + + // Detect gateway session reset: message count dropped below stored value. + // Normal flow is monotonically increasing; any drop means the gateway's + // context diverged (reset via /new, idle timeout, or compaction). + if (messageCount > 0 && proc.lastMessageCount > 0 && messageCount < proc.lastMessageCount) { + console.log(JSON.stringify({ + ts: new Date().toISOString(), + event: "session_reset_detected", + sessionKey, + previousMessageCount: proc.lastMessageCount, + incomingMessageCount: messageCount, + processId: proc.id, + pid: proc.process.pid, + requestCount: proc.requestCount, + })); + if (proc.state === "busy") { + // Can't kill mid-request — remove lock now and let releaseProcess kill after. + this.lockedSessions.delete(sessionKey); + proc.lockedTo = null; + proc.orphaned = true; + } else { + this.clearSessionLock(sessionKey, proc); + this.killAndRespawn(proc); + this.processRecycles++; + } + // Fall through to warm/cold claim below + } else if (proc.state === "idle") { + this.routeHits.locked++; + this.totalRequests++; + return this.routeToProcess(proc, prompt, latestPrompt, "locked", messageCount); + } else { + return this.enqueueOnProcess(proc, prompt, latestPrompt, sessionKey, messageCount); + } + } + + // --- New session key: claim a process --- + // Set PENDING_SENTINEL synchronously BEFORE any async work + const sentinel: PendingSentinel = { isPending: true, requestQueue: [] }; + this.lockedSessions.set(sessionKey, sentinel); + + const warm = this.warmPool.get(pooledModel)!; + if (warm.length > 0) { + // Claim from warm pool (synchronous — no race) + const proc = warm.pop()!; + this.lockProcess(proc, sessionKey, agentChannel); + this.transferSentinelQueue(sentinel, proc); + this.lockedSessions.set(sessionKey, proc); + this.routeHits.warm++; + this.totalRequests++; + return this.routeToProcess(proc, prompt, latestPrompt, "warm", messageCount); + } + + // Warm pool empty — need cold spawn + if (this.allProcesses.size >= this.config.maxTotalProcesses) { + this.rejectSentinelQueue(sentinel, 503, "Pool at capacity"); + this.clearSessionLock(sessionKey, null); + this.routeHits.fallback++; + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "fallback_at_capacity", + sessionKey, + model, + totalProcesses: this.allProcesses.size, + maxTotal: this.config.maxTotalProcesses, + }) + ); + return null; // caller uses ClaudeSubprocess + } + + // Cold spawn (async) + const emitter = safeEmitter(); + this.totalRequests++; + this.routeHits.cold++; + + this.spawnCold(pooledModel) + .then((proc) => { + this.lockProcess(proc, sessionKey, agentChannel); + this.transferSentinelQueue(sentinel, proc); + this.lockedSessions.set(sessionKey, proc); + this.assignToProcess(proc, prompt, latestPrompt, emitter, messageCount); + }) + .catch((err) => { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "cold_spawn_failed", + sessionKey, + model, + error: String(err), + }) + ); + this.rejectSentinelQueue(sentinel, 503, "Cold spawn failed"); + this.clearSessionLock(sessionKey, null); + emitter.emit("error", new Error(`Cold spawn failed: ${err}`)); + }); + + return { + emitter, + routeType: "cold", + pid: null, + queueDepth: sentinel.requestQueue.length, + }; + } + + // ------------------------------------------------------------------------- + // Spawn helpers + // ------------------------------------------------------------------------- + + private async spawnWarm(model: PooledModel): Promise { + const proc = this.spawnProcess(model); + this.warmPool.get(model)!.push(proc); + } + + private async spawnCold(model: PooledModel): Promise { + const proc = this.spawnProcess(model); + await new Promise((resolve) => { + if (proc.ready) { + resolve(); + } else { + const timer = setTimeout(() => resolve(), 500); + const onReady = () => { + clearTimeout(timer); + resolve(); + }; + proc.process.stdout?.once("data", onReady); + proc.process.once("error", () => { + clearTimeout(timer); + resolve(); + }); + } + }); + return proc; + } + + private spawnProcess(model: PooledModel): PooledProcess { + const id = this.nextId++; + const args = [ + "--print", + "--input-format", + "stream-json", + "--output-format", + "stream-json", + "--verbose", + "--include-partial-messages", + "--dangerously-skip-permissions", + "--no-session-persistence", + "--model", + model, + "--append-system-prompt", + OPENCLAW_TOOL_MAPPING_PROMPT, + ]; + + const child = spawn(process.env.CLAUDE_BIN || "claude", args, { + cwd: process.env.HOME || "/tmp", + env: Object.fromEntries( + Object.entries(process.env).filter(([k]) => k !== "CLAUDECODE") + ), + stdio: ["pipe", "pipe", "pipe"], + }); + + const pooled: PooledProcess = { + id, + process: child, + model, + lockedTo: null, + agentChannel: null, + lastRequestAt: 0, + spawnedAt: Date.now(), + requestCount: 0, + lastMessageCount: 0, + state: "idle", + requestQueue: [], + buffer: "", + currentEmitter: null, + ready: false, + requestTimeoutTimer: null, + orphaned: false, + }; + + this.allProcesses.set(id, pooled); + + child.stdout?.on("data", (chunk: Buffer) => { + pooled.buffer += chunk.toString(); + this.processBuffer(pooled); + }); + + child.stderr?.on("data", (chunk: Buffer) => { + const text = chunk.toString().trim(); + if (process.env.DEBUG_SUBPROCESS) { + console.error(`[Router:${id}] stderr:`, text.slice(0, 200)); + } + if (text.match(/\b(auth|unauthorized|token expired|forbidden)\b/i)) { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "auth_error", + pid: child.pid, + processId: id, + model, + stderr: text.slice(0, 200), + }) + ); + child.kill("SIGTERM"); + } + }); + + child.on("close", (code) => { + this.handleProcessDeath(pooled, code); + }); + + child.on("error", (err) => { + console.error(`[Router:${id}] Process error:`, err.message); + if (pooled.currentEmitter) { + pooled.currentEmitter.emit("error", err); + pooled.currentEmitter = null; + } + }); + + pooled.ready = true; + + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "process_spawned", + processId: id, + pid: child.pid, + model, + }) + ); + + return pooled; + } + + // ------------------------------------------------------------------------- + // Buffer processing + // ------------------------------------------------------------------------- + + private processBuffer(pooled: PooledProcess): void { + const lines = pooled.buffer.split("\n"); + pooled.buffer = lines.pop() || ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + try { + const message: ClaudeCliMessage = JSON.parse(trimmed); + + if (isSystemInit(message)) { + pooled.ready = true; + continue; + } + + const emitter = pooled.currentEmitter; + if (!emitter) continue; + + emitter.emit("message", message); + + if (isTextBlockStart(message)) { + emitter.emit("text_block_start", message as ClaudeCliStreamEvent); + } + if (isToolUseBlockStart(message)) { + emitter.emit("tool_use_start", message as ClaudeCliStreamEvent); + } + if (isInputJsonDelta(message)) { + emitter.emit("input_json_delta", message as ClaudeCliStreamEvent); + } + if (isContentBlockStop(message)) { + emitter.emit("content_block_stop", message as ClaudeCliStreamEvent); + } + if (isContentDelta(message)) { + emitter.emit("content_delta", message as ClaudeCliStreamEvent); + } else if (isAssistantMessage(message)) { + emitter.emit("assistant", message); + } else if (isResultMessage(message)) { + emitter.emit("result", message); + this.releaseProcess(pooled); + } + } catch { + if (process.env.DEBUG_SUBPROCESS) { + console.error( + `[Router:${pooled.id}] Non-JSON:`, + trimmed.slice(0, 100) + ); + } + } + } + } + + // ------------------------------------------------------------------------- + // Process assignment & release + // ------------------------------------------------------------------------- + + private routeToProcess( + proc: PooledProcess, + prompt: string, + latestPrompt: string, + routeType: "locked" | "warm" | "cold", + messageCount: number = 0 + ): ExecuteResult { + const emitter = safeEmitter(); + this.assignToProcess(proc, prompt, latestPrompt, emitter, messageCount); + return { + emitter, + routeType, + pid: proc.process.pid ?? null, + queueDepth: proc.requestQueue.length, + }; + } + + private assignToProcess( + pooled: PooledProcess, + fullPrompt: string, + latestPrompt: string, + emitter: EventEmitter, + messageCount: number = 0 + ): void { + pooled.state = "busy"; + // Select prompt BEFORE incrementing requestCount. + // Guard: if latestPrompt is empty (no user message in array), fall back to fullPrompt. + const prompt = (pooled.requestCount === 0 || !latestPrompt) ? fullPrompt : latestPrompt; + pooled.requestCount++; + pooled.lastMessageCount = messageCount; + pooled.lastRequestAt = Date.now(); + pooled.currentEmitter = emitter; + + const message = JSON.stringify({ + type: "user", + message: { role: "user", content: prompt }, + }); + pooled.process.stdin?.write(message + "\n"); + + this.startRequestTimeout(pooled); + } + + /** + * Called by routes.ts when the gateway client disconnects before the CLI + * finishes responding. Releases the process so it can serve new requests + * instead of staying "busy" with a response nobody is consuming. + */ + public cancelRequest(sessionKey: string): void { + const proc = this.lockedSessions.get(sessionKey); + if (!proc || isPendingSentinel(proc)) return; + if (proc.state !== "busy") return; + + console.log(JSON.stringify({ + ts: new Date().toISOString(), + event: "request_cancelled", + processId: proc.id, + pid: proc.process.pid, + sessionKey, + reason: "client_disconnect", + })); + + // Kill and respawn — the CLI process may be mid-response with buffered + // output that we can't cleanly drain. Safest to start fresh. + this.clearRequestTimeout(proc); + proc.currentEmitter = null; + this.rejectProcessQueue(proc); + // Clear the session lock BEFORE kill so the next request gets a fresh + // warm process instead of enqueuing behind the dead one. + this.clearSessionLock(sessionKey, proc); + this.killAndRespawn(proc); + } + + private releaseProcess(pooled: PooledProcess): void { + this.clearRequestTimeout(pooled); + pooled.currentEmitter = null; + + // Orphan check + if (pooled.orphaned) { + this.rejectProcessQueue(pooled); + if (pooled.lockedTo) { + this.clearSessionLock(pooled.lockedTo, pooled); + } + this.killAndRespawn(pooled); + return; + } + + // Context accumulation guard + if (pooled.requestCount > this.config.maxRequestsPerProcess) { + if (pooled.requestQueue.length === 0) { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "context_recycle", + processId: pooled.id, + pid: pooled.process.pid, + requestCount: pooled.requestCount, + }) + ); + if (pooled.lockedTo) { + this.clearSessionLock(pooled.lockedTo, pooled); + } + this.killAndRespawn(pooled); + this.processRecycles++; + return; + } else { + pooled.state = "recycling"; + this.drainNextRequest(pooled); + return; + } + } + + // Normal release + if (pooled.requestQueue.length > 0) { + this.drainNextRequest(pooled); + } else { + pooled.state = "idle"; + } + } + + private drainNextRequest(pooled: PooledProcess): void { + if (pooled.requestQueue.length === 0) { + if (pooled.state === "recycling") { + if (pooled.lockedTo) { + this.clearSessionLock(pooled.lockedTo, pooled); + } + this.killAndRespawn(pooled); + this.processRecycles++; + return; + } + pooled.state = "idle"; + return; + } + + const next = pooled.requestQueue.shift()!; + this.totalRequests++; + this.assignToProcess(pooled, next.fullPrompt, next.latestPrompt, next.emitter, next.messageCount); + } + + // ------------------------------------------------------------------------- + // Per-request timeout + // ------------------------------------------------------------------------- + + private startRequestTimeout(pooled: PooledProcess): void { + this.clearRequestTimeout(pooled); + pooled.requestTimeoutTimer = setTimeout(() => { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "request_timeout", + processId: pooled.id, + pid: pooled.process.pid, + sessionKey: pooled.lockedTo, + elapsedMs: this.config.requestTimeoutMs, + }) + ); + this.requestTimeouts++; + + if (pooled.currentEmitter) { + pooled.currentEmitter.emit( + "error", + new Error( + `Request timed out after ${this.config.requestTimeoutMs}ms` + ) + ); + pooled.currentEmitter = null; + } + + this.rejectProcessQueue(pooled); + + if (pooled.lockedTo) { + this.clearSessionLock(pooled.lockedTo, pooled); + } + + this.killAndRespawn(pooled); + }, this.config.requestTimeoutMs); + } + + private clearRequestTimeout(pooled: PooledProcess): void { + if (pooled.requestTimeoutTimer) { + clearTimeout(pooled.requestTimeoutTimer); + pooled.requestTimeoutTimer = null; + } + } + + // ------------------------------------------------------------------------- + // Queue management + // ------------------------------------------------------------------------- + + private enqueueOnProcess( + proc: PooledProcess, + fullPrompt: string, + latestPrompt: string, + sessionKey: string, + messageCount: number = 0 + ): ExecuteResult | null { + if (proc.state === "recycling") { + this.routeHits.fallback++; + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "fallback_recycling", + sessionKey, + processId: proc.id, + }) + ); + return null; + } + + if (proc.requestQueue.length >= this.config.requestQueueDepth) { + const emitter = safeEmitter(); + process.nextTick(() => { + emitter.emit( + "error", + Object.assign( + new Error("Too Many Requests — per-session queue full"), + { statusCode: 429, retryAfter: 5 } + ) + ); + }); + return { + emitter, + routeType: "locked", + pid: proc.process.pid ?? null, + queueDepth: proc.requestQueue.length, + }; + } + + const emitter = safeEmitter(); + const pending: PendingRequest = { + fullPrompt, + latestPrompt, + messageCount, + emitter, + }; + proc.requestQueue.push(pending); + this.routeHits.locked++; + + return { + emitter, + routeType: "locked", + pid: proc.process.pid ?? null, + queueDepth: proc.requestQueue.length, + }; + } + + private enqueueOnSentinel( + sentinel: PendingSentinel, + fullPrompt: string, + latestPrompt: string, + _sessionKey: string, + messageCount: number = 0 + ): ExecuteResult | null { + if (sentinel.requestQueue.length >= this.config.requestQueueDepth) { + const emitter = safeEmitter(); + process.nextTick(() => { + emitter.emit( + "error", + Object.assign( + new Error("Too Many Requests — per-session queue full"), + { statusCode: 429, retryAfter: 5 } + ) + ); + }); + return { + emitter, + routeType: "locked", + pid: null, + queueDepth: sentinel.requestQueue.length, + }; + } + + const emitter = safeEmitter(); + sentinel.requestQueue.push({ fullPrompt, latestPrompt, messageCount, emitter }); + + return { + emitter, + routeType: "locked", + pid: null, + queueDepth: sentinel.requestQueue.length, + }; + } + + private transferSentinelQueue( + sentinel: PendingSentinel, + proc: PooledProcess + ): void { + for (const pending of sentinel.requestQueue) { + proc.requestQueue.push(pending); + } + sentinel.requestQueue = []; + } + + private rejectSentinelQueue( + sentinel: PendingSentinel, + statusCode: number, + message: string + ): void { + for (const pending of sentinel.requestQueue) { + pending.emitter.emit( + "error", + Object.assign(new Error(message), { statusCode, retryAfter: 3 }) + ); + } + sentinel.requestQueue = []; + } + + private rejectProcessQueue(proc: PooledProcess): void { + for (const pending of proc.requestQueue) { + pending.emitter.emit( + "error", + Object.assign(new Error("Process unavailable"), { + statusCode: 503, + retryAfter: 3, + }) + ); + } + proc.requestQueue = []; + } + + // ------------------------------------------------------------------------- + // Locking + // ------------------------------------------------------------------------- + + private lockProcess( + proc: PooledProcess, + sessionKey: string, + agentChannel: string + ): void { + proc.lockedTo = sessionKey; + proc.agentChannel = agentChannel; + } + + /** + * Canonical lock clearing — ALL unlock paths MUST use this method. + * No inline lockedSessions.delete() anywhere else in the codebase. + * Pass null for proc when clearing a PendingSentinel (no process state to reset). + */ + private clearSessionLock(sessionKey: string, proc: PooledProcess | null): void { + this.lockedSessions.delete(sessionKey); + if (proc) { + proc.lockedTo = null; + proc.agentChannel = null; + proc.requestCount = 0; + } + } + + // ------------------------------------------------------------------------- + // Lineage & orphan reclamation + // ------------------------------------------------------------------------- + + private extractAgentChannel(sessionKey: string): string { + const parts = sessionKey.split(":"); + if (parts.length >= 2) { + return parts.slice(1).join(":"); + } + return sessionKey; + } + + private reclaimOrphans( + newSessionKey: string, + newAgentChannel: string + ): void { + for (const [key, value] of this.lockedSessions.entries()) { + if (key === newSessionKey) continue; + if (isPendingSentinel(value)) continue; + + const proc = value; + if (proc.agentChannel === newAgentChannel) { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "orphan_reclaimed", + oldSessionKey: key, + newSessionKey, + agentChannel: newAgentChannel, + processId: proc.id, + pid: proc.process.pid, + state: proc.state, + }) + ); + this.orphansReclaimed++; + + // Reject ALL queued requests (they belong to the dead session) + this.rejectProcessQueue(proc); + + if (proc.state === "idle") { + this.clearSessionLock(key, proc); + this.killAndRespawn(proc); + } else { + // Busy — mark for reclamation after current request + proc.orphaned = true; + } + } + } + } + + // ------------------------------------------------------------------------- + // Process death & respawn + // ------------------------------------------------------------------------- + + private handleProcessDeath(pooled: PooledProcess, code: number | null): void { + // If already removed from allProcesses (e.g. by killAndRespawn), skip + if (!this.allProcesses.has(pooled.id)) return; + + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "process_death", + processId: pooled.id, + pid: pooled.process.pid, + code, + model: pooled.model, + state: pooled.state, + lockedTo: pooled.lockedTo, + }) + ); + + this.clearRequestTimeout(pooled); + + if (pooled.currentEmitter) { + pooled.currentEmitter.emit( + "error", + new Error(`Pool process ${pooled.id} died with code ${code}`) + ); + pooled.currentEmitter = null; + } + + this.rejectProcessQueue(pooled); + + if (pooled.lockedTo) { + this.clearSessionLock(pooled.lockedTo, pooled); + } + + this.allProcesses.delete(pooled.id); + + const warm = this.warmPool.get(pooled.model); + if (warm) { + const idx = warm.indexOf(pooled); + if (idx >= 0) warm.splice(idx, 1); + } + + if (!this.shuttingDown) { + this.spawnWarm(pooled.model).catch((err) => { + console.error(`[Router] Failed to respawn after death:`, err); + }); + } + } + + private killAndRespawn(pooled: PooledProcess): void { + this.clearRequestTimeout(pooled); + pooled.currentEmitter = null; + + // Remove from allProcesses BEFORE kill to prevent double-handling + this.allProcesses.delete(pooled.id); + + const warm = this.warmPool.get(pooled.model); + if (warm) { + const idx = warm.indexOf(pooled); + if (idx >= 0) warm.splice(idx, 1); + } + + pooled.process.stdin?.end(); + setTimeout(() => { + try { + pooled.process.kill("SIGKILL"); + } catch { + // Already dead + } + }, 3000); + + if ( + !this.shuttingDown && + this.allProcesses.size < this.config.maxTotalProcesses + ) { + this.spawnWarm(pooled.model).catch((err) => { + console.error(`[Router] Failed to respawn:`, err); + }); + } + } + + // ------------------------------------------------------------------------- + // Sweep (nightly 3 AM ET) + // ------------------------------------------------------------------------- + + async sweep(): Promise { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "sweep_start", + totalProcesses: this.allProcesses.size, + lockedSessions: this.lockedSessions.size, + }) + ); + + const now = Date.now(); + const toRecycle: Array<{ key: string; proc: PooledProcess }> = []; + + for (const [key, value] of this.lockedSessions.entries()) { + if (isPendingSentinel(value)) continue; + const proc = value; + + if (proc.state === "busy" || proc.state === "recycling") continue; + + const idleMs = now - (proc.lastRequestAt || proc.spawnedAt); + if ( + idleMs > this.config.sweepIdleThresholdMs || + proc.requestCount > this.config.maxRequestsPerProcess + ) { + toRecycle.push({ key, proc }); + } + } + + for (const { key, proc } of toRecycle) { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "sweep_recycle", + processId: proc.id, + pid: proc.process.pid, + model: proc.model, + idleMs: now - (proc.lastRequestAt || proc.spawnedAt), + requestCount: proc.requestCount, + }) + ); + this.clearSessionLock(key, proc); + this.killAndRespawn(proc); + this.processRecycles++; + } + + // Refill warm pools — check MAX_TOTAL_PROCESSES before EACH spawn + for (const model of ["opus", "sonnet"] as PooledModel[]) { + const targetSize = + model === "opus" ? this.config.opusSize : this.config.sonnetSize; + const warm = this.warmPool.get(model)!; + + while (warm.length < targetSize) { + if (this.allProcesses.size >= this.config.maxTotalProcesses) { + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "sweep_refill_cap_reached", + model, + totalProcesses: this.allProcesses.size, + maxTotal: this.config.maxTotalProcesses, + }) + ); + break; + } + await this.spawnWarm(model); + } + } + + console.log( + JSON.stringify({ + ts: new Date().toISOString(), + event: "sweep_complete", + totalProcesses: this.allProcesses.size, + recycled: toRecycle.length, + warmOpus: this.warmPool.get("opus")!.length, + warmSonnet: this.warmPool.get("sonnet")!.length, + }) + ); + } + + // ------------------------------------------------------------------------- + // Stats + // ------------------------------------------------------------------------- + + stats(): PoolStats { + let busy = 0; + let queued = 0; + const locked = { total: 0, opus: 0, sonnet: 0 }; + + for (const [, value] of this.lockedSessions.entries()) { + if (isPendingSentinel(value)) { + queued += value.requestQueue.length; + continue; + } + const proc = value; + locked.total++; + if (proc.model === "opus") locked.opus++; + else locked.sonnet++; + if (proc.state === "busy" || proc.state === "recycling") busy++; + queued += proc.requestQueue.length; + } + + return { + total: this.allProcesses.size, + locked, + warm: { + opus: this.warmPool.get("opus")!.length, + sonnet: this.warmPool.get("sonnet")!.length, + }, + busy, + queued, + maxTotal: this.config.maxTotalProcesses, + orphansReclaimed: this.orphansReclaimed, + totalRequests: this.totalRequests, + processRecycles: this.processRecycles, + requestTimeouts: this.requestTimeouts, + routeHits: { ...this.routeHits }, + uptime: Math.floor((Date.now() - this.startedAt) / 1000), + }; + } + + // ------------------------------------------------------------------------- + // Shutdown + // ------------------------------------------------------------------------- + + async shutdown(): Promise { + this.shuttingDown = true; + console.log( + `[Router] Shutting down — ${this.allProcesses.size} processes` + ); + + // Reject all queued requests + for (const [, value] of this.lockedSessions.entries()) { + if (isPendingSentinel(value)) { + this.rejectSentinelQueue(value, 503, "Server shutting down"); + } else { + this.rejectProcessQueue(value); + } + } + + // Kill all processes + const kills: Promise[] = []; + for (const pooled of this.allProcesses.values()) { + this.clearRequestTimeout(pooled); + if (pooled.currentEmitter) { + pooled.currentEmitter.emit( + "error", + new Error("Server shutting down") + ); + pooled.currentEmitter = null; + } + kills.push( + new Promise((resolve) => { + pooled.process.on("close", () => resolve()); + pooled.process.stdin?.end(); + setTimeout(() => { + try { + pooled.process.kill("SIGKILL"); + } catch { + // already dead + } + resolve(); + }, 5000); + }) + ); + } + + await Promise.all(kills); + this.allProcesses.clear(); + this.lockedSessions.clear(); + this.warmPool.set("opus", []); + this.warmPool.set("sonnet", []); + console.log("[Router] Shutdown complete."); + } +}