diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..bedc149 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,90 @@ +name: CI + +on: + push: + branches: [main, master, fixrace] + pull_request: + branches: [main, master] + +env: + ZIG_VERSION: "0.16.0-dev.1657+985a3565c" + ZIG_URL: "https://ziglang.org/builds/zig-x86_64-linux-0.16.0-dev.1657+985a3565c.tar.xz" + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Cache Zig + uses: actions/cache@v4 + with: + path: ~/zig + key: zig-${{ env.ZIG_VERSION }} + + - name: Install Zig + run: | + if [ ! -d ~/zig ]; then + mkdir -p ~/zig + curl -L ${{ env.ZIG_URL }} | tar -xJ -C ~/zig --strip-components=1 + fi + echo "$HOME/zig" >> $GITHUB_PATH + + - name: Verify Zig version + run: zig version + + - name: Patch ecdsa.zig for API compatibility + run: sed -i 's/mem\.trimLeft/mem.trimStart/g' ~/zig/lib/std/crypto/ecdsa.zig + + - name: Build + run: zig build + + - name: Run unit tests + run: zig build test + + release-build: + runs-on: ubuntu-latest + if: github.event_name == 'push' + needs: build + strategy: + matrix: + include: + - target: x86_64-linux + artifact: load_balancer-x86_64-linux + - target: aarch64-linux + artifact: load_balancer-aarch64-linux + - target: x86_64-macos + artifact: load_balancer-x86_64-macos + - target: aarch64-macos + artifact: load_balancer-aarch64-macos + + steps: + - uses: actions/checkout@v4 + + - name: Cache Zig + uses: actions/cache@v4 + with: + path: ~/zig + key: zig-${{ env.ZIG_VERSION }} + + - name: Install Zig + run: | + if [ ! -d ~/zig ]; then + mkdir -p ~/zig + curl -L ${{ env.ZIG_URL }} | tar -xJ -C ~/zig --strip-components=1 + fi + echo "$HOME/zig" >> $GITHUB_PATH + + - name: Patch ecdsa.zig for API compatibility + run: sed -i 's/mem\.trimLeft/mem.trimStart/g' ~/zig/lib/std/crypto/ecdsa.zig + + - name: Build release binary + run: | + zig build -Doptimize=ReleaseFast -Dtarget=${{ matrix.target }} + mv zig-out/bin/load_balancer ${{ matrix.artifact }} + + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.artifact }} + path: ${{ matrix.artifact }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..6219710 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,80 @@ +name: Release + +on: + push: + tags: + - 'v*' + +env: + ZIG_VERSION: "0.16.0-dev.1657+985a3565c" + ZIG_URL: "https://ziglang.org/builds/zig-x86_64-linux-0.16.0-dev.1657+985a3565c.tar.xz" + +jobs: + build-release: + runs-on: ubuntu-latest + strategy: + matrix: + include: + - target: x86_64-linux + artifact: load_balancer-x86_64-linux + - target: aarch64-linux + artifact: load_balancer-aarch64-linux + - target: x86_64-macos + artifact: load_balancer-x86_64-macos + - target: aarch64-macos + artifact: load_balancer-aarch64-macos + + steps: + - uses: actions/checkout@v4 + + - name: Cache Zig + uses: actions/cache@v4 + with: + path: ~/zig + key: zig-${{ env.ZIG_VERSION }} + + - name: Install Zig + run: | + if [ ! -d ~/zig ]; then + mkdir -p ~/zig + curl -L ${{ env.ZIG_URL }} | tar -xJ -C ~/zig --strip-components=1 + fi + echo "$HOME/zig" >> $GITHUB_PATH + + - name: Build release binary + run: | + zig build -Doptimize=ReleaseFast -Dtarget=${{ matrix.target }} + mv zig-out/bin/load_balancer ${{ matrix.artifact }} + + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.artifact }} + path: ${{ matrix.artifact }} + + create-release: + runs-on: ubuntu-latest + needs: build-release + permissions: + contents: write + steps: + - name: Download all artifacts + uses: actions/download-artifact@v4 + with: + path: artifacts + + - name: Prepare release assets + run: | + cd artifacts + for dir in */; do + name="${dir%/}" + chmod +x "$dir/$name" + tar -czvf "$name.tar.gz" -C "$dir" "$name" + done + ls -la *.tar.gz + + - name: Create GitHub Release + uses: softprops/action-gh-release@v2 + with: + generate_release_notes: true + files: artifacts/*.tar.gz diff --git a/1 b/1 deleted file mode 100644 index e69de29..0000000 diff --git a/__pycache__/h2_backend.cpython-314.pyc b/__pycache__/h2_backend.cpython-314.pyc deleted file mode 100644 index 4b19d1e..0000000 Binary files a/__pycache__/h2_backend.cpython-314.pyc and /dev/null differ diff --git a/main.zig b/main.zig index f481400..f563bd2 100644 --- a/main.zig +++ b/main.zig @@ -629,6 +629,14 @@ fn workerMain( defer threaded.deinit(); const io = threaded.io(); + // CRITICAL: Increase async_limit to prevent synchronous fallback deadlock + // Default is CPU cores - 1. With HTTP/2 multiplexing, each connection spawns + // a reader task. If async_limit is exceeded, Io.async runs synchronously, + // blocking the request coroutine forever (reader waits for data, request + // never gets to wait on its condition). + // Set to unlimited so reader tasks always spawn asynchronously. + threaded.setAsyncLimit(.unlimited); + // Set CPU affinity after Io.Threaded.init setCpuAffinity(worker_id) catch {}; @@ -780,6 +788,10 @@ fn runSingleProcess(parent_allocator: std.mem.Allocator, config: Config) !void { defer threaded.deinit(); const io = threaded.io(); + // CRITICAL: Increase async_limit to prevent synchronous fallback deadlock + // See comment in workerMain for details. + threaded.setAsyncLimit(.unlimited); + // Router var router = switch (lb_config.strategy) { inline else => |s| try createRouter(allocator, &worker_state, s), diff --git a/src/config/config_watcher.zig b/src/config/config_watcher.zig index 59a642b..7ff0c02 100644 --- a/src/config/config_watcher.zig +++ b/src/config/config_watcher.zig @@ -160,7 +160,7 @@ pub const FileWatcher = struct { /// Initialize inotify-based watcher (Linux) fn initInotify(path: []const u8) !Self { - const inotify_fd = try posix.inotify_init1(.{ .CLOEXEC = true }); + const inotify_fd = try posix.inotify_init1(std.os.linux.IN.CLOEXEC); errdefer posix.close(inotify_fd); const mask: u32 = std.os.linux.IN.MODIFY | std.os.linux.IN.MOVE_SELF | std.os.linux.IN.DELETE_SELF; diff --git a/src/http/http2/client.zig b/src/http/http2/client.zig index 0ca7176..2674a73 100644 --- a/src/http/http2/client.zig +++ b/src/http/http2/client.zig @@ -46,9 +46,12 @@ pub const Stream = struct { body: std.ArrayList(u8) = .{}, end_stream: bool = false, - // Async signaling (replaces pipes) - condition: Io.Condition = .{}, - completed: bool = false, + // Per-stream mutex+condition for async multiplexing + // CANNOT be atomic: request() must SLEEP until reader signals completion + // Flow: request() waits on condition → reader dispatches frame → signals condition + mutex: Io.Mutex = .init, + condition: Io.Condition = Io.Condition.init, + completed: bool = false, // Set by reader, checked by request() error_code: ?u32 = null, retry_needed: bool = false, // Set when GOAWAY indicates stream wasn't processed @@ -61,7 +64,7 @@ pub const Stream = struct { self.completed = false; self.error_code = null; self.retry_needed = false; - self.condition = .{}; + // Don't reset mutex/condition - they're managed by lock/unlock // Don't deinit body - reuse capacity self.body.items.len = 0; } @@ -85,7 +88,10 @@ pub const Http2Client = struct { /// Pending streams for multiplexing (fixed-size, no allocation) streams: [MAX_STREAMS]Stream = [_]Stream{.{}} ** MAX_STREAMS, - active_streams: usize = 0, + /// Count of in-flight streams - MUST be atomic + /// Incremented under write_mutex, decremented under stream.mutex (different mutexes!) + /// Without atomic: concurrent decrements cause lost updates → count goes negative + active_streams: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), /// Write buffer for frame batching (enables safe multiplexing) /// Frames are queued here, then flushed in one TLS write @@ -195,7 +201,7 @@ pub const Http2Client = struct { self.streams[slot].body.items.len = 0; self.streams[slot].completed = false; self.streams[slot].error_code = null; - self.active_streams += 1; + _ = self.active_streams.fetchAdd(1, .acq_rel); log.debug("Queueing HTTP/2 request: method={s}, path={s}, stream={d}, slot={d}", .{ method, path, stream_id, slot, @@ -292,7 +298,7 @@ pub const Http2Client = struct { // Clear stream slot (but don't deinit body - it's moved to response) self.streams[slot].active = false; self.streams[slot].body = .{}; // Reset to empty (ownership transferred) - self.active_streams -= 1; + _ = self.active_streams.fetchSub(1, .acq_rel); return response; } @@ -428,8 +434,8 @@ pub const Http2Client = struct { } /// Get count of active streams - pub inline fn activeStreamCount(self: *const Self) usize { - return self.active_streams; + pub inline fn activeStreamCount(self: *Self) usize { + return self.active_streams.load(.acquire); } // --- Stream ID management --- @@ -515,67 +521,66 @@ pub const Http2Client = struct { self: *Self, sock: *UltraSock, shutdown_requested: *bool, - reader_running: *bool, + reader_running: *std.atomic.Value(bool), state: *State, goaway_received: *bool, - stream_mutex: *Io.Mutex, io: Io, ) void { - log.debug("readerTask: started, active_streams={d}", .{self.active_streams}); + log.debug("readerTask: started, active_streams={d}", .{self.active_streams.load(.acquire)}); defer { // CRITICAL: Capture shutdown state BEFORE any other operations // After signalAllStreams: request() wakes -> release() -> deinitAsync() sets shutdown_requested=true const was_clean_shutdown = shutdown_requested.*; log.debug("readerTask: exiting, shutdown_requested={}", .{was_clean_shutdown}); - // On unclean shutdown (GOAWAY, error): send close_notify, close socket, mark dead - // We MUST close the socket here - backend is waiting for TCP FIN after close_notify - // deinitAsync will check state==dead and skip all socket operations + // On unclean shutdown (GOAWAY, error): close socket, mark dead + // Only send close_notify if socket is still connected (TLS state valid) + // If backend already closed, TLS cipher state may be corrupt - skip close_notify if (!was_clean_shutdown) { state.* = .dead; - sock.sendCloseNotify(io); + if (sock.connected) { + sock.sendCloseNotify(io); + } sock.closeSocketOnly(); - log.debug("readerTask: sent close_notify, closed socket, marked dead", .{}); + log.debug("readerTask: closed socket, marked dead", .{}); } // Now signal waiting streams and update state goaway_received.* = self.goaway_received; - self.signalAllStreams(stream_mutex, io); - reader_running.* = false; + self.signalAllStreams(io); + reader_running.store(false, .release); } var recv_buf: [mod.MAX_FRAME_SIZE_DEFAULT + 9]u8 = undefined; while (!shutdown_requested.*) { // Read frame header (9 bytes) - loop to handle partial reads + // Use 5 second timeout to detect dead connections + const RECV_TIMEOUT_MS: u32 = 5000; var header_buf: [9]u8 = undefined; var header_read: usize = 0; log.debug("Reader: waiting for frame header...", .{}); while (header_read < 9) { - const n = sock.recv(io, header_buf[header_read..]) catch |err| { + const n = sock.recvWithTimeout(io, header_buf[header_read..], RECV_TIMEOUT_MS) catch |err| { // Check shutdown first if (shutdown_requested.*) { log.debug("Reader: shutdown requested during recv", .{}); return; } - // If no active streams, this might be an idle timeout while pooled - // Check if socket fd is still valid (recv sets connected=false on any error) - if (self.active_streams == 0) { - // Check if socket is actually dead or just timed out - if (sock.getFd()) |fd| { - // Socket fd still valid - likely just a timeout, not a real disconnect - // Restore connected flag and continue waiting - if (!sock.hasStaleData()) { - sock.connected = true; - log.debug("Reader: idle timeout, continuing (fd={}, no active streams)", .{fd}); - continue; - } + // Timeout with no active streams - exit cleanly (connection idle) + if (err == error.Timeout) { + if (self.active_streams.load(.acquire) == 0) { + log.debug("Reader: timeout with no active streams, exiting", .{}); + return; } + // Active streams waiting - keep trying + log.debug("Reader: timeout but {d} streams active, continuing", .{self.active_streams.load(.acquire)}); + continue; } // Active streams or socket truly dead - exit reader - log.debug("Reader: recv failed: {}, active_streams={}, connected={}", .{ err, self.active_streams, sock.connected }); + log.debug("Reader: recv failed: {}, active_streams={}, connected={}", .{ err, self.active_streams.load(.acquire), sock.connected }); return; }; if (n == 0) { @@ -601,7 +606,11 @@ pub const Http2Client = struct { if (fh.length > 0) { var payload_read: usize = 0; while (payload_read < fh.length) { - const n = sock.recv(io, recv_buf[payload_read..fh.length]) catch |err| { + const n = sock.recvWithTimeout(io, recv_buf[payload_read..fh.length], RECV_TIMEOUT_MS) catch |err| { + if (err == error.Timeout) { + log.debug("Reader: payload timeout, continuing", .{}); + continue; + } log.debug("Reader payload recv error: {}", .{err}); return; }; @@ -615,10 +624,8 @@ pub const Http2Client = struct { const payload = recv_buf[0..fh.length]; - // Dispatch frame to correct stream (mutex-protected) - stream_mutex.lock(io) catch return; // Cancelled + // Dispatch frame to correct stream (per-stream mutex in dispatchFrame) const dispatch_result = self.dispatchFrame(fh, payload, io); - stream_mutex.unlock(io); dispatch_result catch |err| { if (err == error.ConnectionGoaway) { @@ -637,6 +644,10 @@ pub const Http2Client = struct { switch (ft) { .headers => { if (self.findStreamSlot(fh.stream_id)) |slot| { + // Lock per-stream mutex for atomic update+signal + self.streams[slot].mutex.lock(io) catch return; + defer self.streams[slot].mutex.unlock(io); + self.streams[slot].header_count = hpack.Hpack.decodeHeaders( payload, &self.streams[slot].headers, @@ -654,6 +665,10 @@ pub const Http2Client = struct { }, .data => { if (self.findStreamSlot(fh.stream_id)) |slot| { + // Lock per-stream mutex for atomic update+signal + self.streams[slot].mutex.lock(io) catch return; + defer self.streams[slot].mutex.unlock(io); + self.streams[slot].body.appendSlice(self.allocator, payload) catch {}; if (fh.isEndStream()) { @@ -665,6 +680,10 @@ pub const Http2Client = struct { }, .rst_stream => { if (self.findStreamSlot(fh.stream_id)) |slot| { + // Lock per-stream mutex for atomic update+signal + self.streams[slot].mutex.lock(io) catch return; + defer self.streams[slot].mutex.unlock(io); + if (payload.len >= 4) { self.streams[slot].error_code = std.mem.readInt(u32, payload[0..4], .big); } @@ -686,10 +705,13 @@ pub const Http2Client = struct { for (&self.streams) |*stream| { if (stream.active and !stream.completed) { if (stream.id > last_stream_id) { + // Lock per-stream mutex for atomic update+signal + stream.mutex.lock(io) catch continue; log.debug("GOAWAY: signaling stream {d} for retry", .{stream.id}); stream.retry_needed = true; stream.completed = true; stream.condition.signal(io); + stream.mutex.unlock(io); } else { remaining_active += 1; } @@ -698,7 +720,7 @@ pub const Http2Client = struct { // Mark that we received GOAWAY - no new streams allowed self.goaway_received = true; - log.debug("GOAWAY: set goaway_received=true, active_streams={d}, remaining={d}", .{ self.active_streams, remaining_active }); + log.debug("GOAWAY: set goaway_received=true, active_streams={d}, remaining={d}", .{ self.active_streams.load(.acquire), remaining_active }); // FAST EXIT: If no streams are waiting for completion, exit immediately // instead of waiting for read timeout. This prevents 1-2s delay on retry. @@ -728,20 +750,28 @@ pub const Http2Client = struct { } } - /// Signal all active streams (acquires mutex) - pub fn signalAllStreams(self: *Self, stream_mutex: *Io.Mutex, io: Io) void { - stream_mutex.lock(io) catch return; // Cancelled - defer stream_mutex.unlock(io); - self.signalAllStreamsUnlocked(io); + /// Signal all active streams with per-stream mutexes + pub fn signalAllStreams(self: *Self, io: Io) void { + for (&self.streams) |*stream| { + if (stream.active and !stream.completed) { + stream.mutex.lock(io) catch continue; + stream.retry_needed = true; + stream.completed = true; + stream.condition.signal(io); + stream.mutex.unlock(io); + } + } } - /// Signal all active streams (mutex must be held) + /// Signal all active streams (unlocked version for use when mutexes already held) fn signalAllStreamsUnlocked(self: *Self, io: Io) void { for (&self.streams) |*stream| { if (stream.active and !stream.completed) { + stream.mutex.lock(io) catch continue; stream.retry_needed = true; stream.completed = true; stream.condition.signal(io); + stream.mutex.unlock(io); } } } @@ -782,7 +812,7 @@ test "http2 client init" { defer client.deinit(); try std.testing.expectEqual(@as(u31, 1), client.next_stream_id); try std.testing.expectEqual(false, client.preface_sent); - try std.testing.expectEqual(@as(usize, 0), client.active_streams); + try std.testing.expectEqual(@as(usize, 0), client.active_streams.load(.acquire)); } test "http2 client stream slot management" { diff --git a/src/http/http2/connection.zig b/src/http/http2/connection.zig index 1c8ec06..9760b49 100644 --- a/src/http/http2/connection.zig +++ b/src/http/http2/connection.zig @@ -4,6 +4,28 @@ //! Single request() method handles everything: send, spawn reader, await. //! Per-connection TLS buffers for safe concurrent multiplexing. //! +//! ## Synchronization Strategy +//! +//! We use atomics for simple counters/flags, mutexes for multi-step operations: +//! +//! | Primitive | Type | Why | +//! |------------------|---------|----------------------------------------------| +//! | ref_count | Atomic | Single counter, fetchAdd/fetchSub | +//! | active_streams | Atomic | Single counter, concurrent inc/dec | +//! | reader_running | Atomic | Single flag, cmpxchg for race-free spawn | +//! | write_mutex | Mutex | Multi-step: encode → buffer → flush | +//! | stream.mutex | Mutex | Must wait for condition signal | +//! | stream.condition | CondVar | Sleep until reader signals completion | +//! +//! **Why not all atomics?** +//! - Atomics: ONE instruction, never blocks (fetchAdd, cmpxchg) +//! - Mutex: Protects MULTIPLE operations as atomic unit +//! - Condition: Lets coroutine SLEEP until signaled +//! +//! Atomics cannot protect multi-step sequences or wait for events. +//! write_mutex ensures encode→buffer→flush completes without interleaving. +//! stream.condition lets request() sleep until reader dispatches response. +//! //! TigerBeetle style: //! - Fixed-size arrays, no hidden allocations //! - Explicit state, no implicit transitions @@ -49,12 +71,15 @@ pub const H2Connection = struct { /// Binary state machine state: State = .ready, - // Multiplexing mutexes (must be at stable address - struct is heap-allocated) + // Multiplexing mutex (must be at stable address - struct is heap-allocated) + // CANNOT be atomic: protects multi-step TLS write sequence (encode → buffer → flush) + // All streams share this - contention is the price of safe multiplexing write_mutex: Io.Mutex = .init, - stream_mutex: Io.Mutex = .init, // Reader task tracking - reader_running: bool = false, + // CRITICAL: Use atomic for reader_running to prevent race where two requests + // both see false and spawn duplicate readers (causes double-close panic) + reader_running: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), shutdown_requested: bool = false, reader_future: ?Io.Future(void) = null, @@ -65,6 +90,12 @@ pub const H2Connection = struct { backend_idx: u32 = 0, last_used_ns: i64 = 0, + // Reference counting for safe multiplexing + // Incremented by pool.getOrCreate(), decremented by pool.release() + // Only destroy when ref_count hits 0 + // CRITICAL: Must be atomic - multiple coroutines may release concurrently + ref_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + // Per-connection TLS buffers - CRITICAL for concurrent connections! // Threadlocal buffers are shared between connections on same thread. // Concurrent reads/writes corrupt TLS decryption state. @@ -150,53 +181,68 @@ pub const H2Connection = struct { if (trace) tls_log.debug(" H2 step 1: acquiring write_mutex", .{}); log.debug("request: sending {s} {s}", .{ method, path }); try self.write_mutex.lock(io); - const stream_id = try self.h2_client.sendRequest(method, path, host, body); + + // Re-check state after acquiring mutex - connection may have died while waiting + if (self.state == .dead) { + self.write_mutex.unlock(io); + log.debug("request: connection died while waiting for mutex", .{}); + return error.ConnectionDead; + } + + // Send and flush - unlock mutex on ANY exit (success or error) + const stream_id = self.h2_client.sendRequest(method, path, host, body) catch |err| { + self.write_mutex.unlock(io); + return err; + }; if (trace) tls_log.debug(" H2 step 1: flushing stream {d}", .{stream_id}); - try self.h2_client.flush(&self.sock, io); + self.h2_client.flush(&self.sock, io) catch |err| { + self.write_mutex.unlock(io); + return err; + }; self.write_mutex.unlock(io); if (trace) tls_log.debug(" H2 step 1: sent stream {d}, write_mutex released", .{stream_id}); log.debug("request: sent stream {d}", .{stream_id}); - // STEP 2: Lock stream_mutex FIRST (critical ordering) - // When reader tries to lock for dispatch, it BLOCKS -> Io.async yields - log.debug("request: locking stream_mutex", .{}); - try self.stream_mutex.lock(io); - - // STEP 3: Spawn reader if needed (will block on our mutex) - if (!self.reader_running) { - // Reset shutdown flag (may have been set when connection was released to pool) - self.shutdown_requested = false; - log.debug("request: spawning reader", .{}); - if (!self.spawnReader(io)) { - self.stream_mutex.unlock(io); - return error.ConnectionDead; - } - } - - // STEP 4: Wait for completion (condition.wait releases mutex) + // STEP 2: Find our stream slot (just allocated, must exist) const slot = self.h2_client.findStreamSlot(stream_id) orelse { log.err("Stream {d} not found in slots", .{stream_id}); - self.stream_mutex.unlock(io); return error.StreamNotFound; }; + // STEP 3: Spawn reader if needed AFTER send but BEFORE locking mutex + // Critical ordering: + // - After send: so reader has data to receive (won't timeout waiting) + // - Before lock: so when Io.async yields, we don't hold the mutex + // - Reader might dispatch our frame, but we're not in wait yet + // That's OK - completed flag will be set, and our wait loop checks it first + // spawnReader uses atomic cmpxchg internally - safe to call from multiple coroutines + if (!self.spawnReader(io)) { + return error.ConnectionDead; + } + + // STEP 4: Lock per-stream mutex and wait for completion + // If reader already set completed=true, the while loop exits immediately + log.debug("request: about to lock stream {d} mutex, slot {d}", .{ stream_id, slot }); + log.debug("request: locking stream {d} mutex, slot {d}", .{ stream_id, slot }); + try self.h2_client.streams[slot].mutex.lock(io); + log.debug("request: waiting on stream {d}, slot {d}", .{ stream_id, slot }); while (!self.h2_client.streams[slot].completed) { - try self.h2_client.streams[slot].condition.wait(io, &self.stream_mutex); + try self.h2_client.streams[slot].condition.wait(io, &self.h2_client.streams[slot].mutex); } // STEP 5: Check for stream errors and retry conditions if (self.h2_client.streams[slot].retry_needed) { log.debug("Stream {d} needs retry (GOAWAY)", .{stream_id}); + self.h2_client.streams[slot].mutex.unlock(io); self.h2_client.streams[slot].reset(); - self.stream_mutex.unlock(io); return error.RetryNeeded; } if (self.h2_client.streams[slot].error_code) |code| { log.debug("Stream {d} reset with error {d}", .{ stream_id, code }); + self.h2_client.streams[slot].mutex.unlock(io); self.h2_client.streams[slot].reset(); - self.stream_mutex.unlock(io); return error.StreamReset; } @@ -213,11 +259,11 @@ pub const H2Connection = struct { self.h2_client.streams[slot].body = .{}; self.h2_client.streams[slot].active = false; - // TigerBeetle style: assert correct state, bugs should crash - std.debug.assert(self.h2_client.active_streams > 0); - self.h2_client.active_streams -= 1; + // Atomically decrement active_streams (returns previous value) + const prev = self.h2_client.active_streams.fetchSub(1, .acq_rel); + std.debug.assert(prev > 0); // Must have had at least 1 active stream - self.stream_mutex.unlock(io); + self.h2_client.streams[slot].mutex.unlock(io); self.last_used_ns = currentTimeNs(); log.debug("request: completed stream {d}, status {d}", .{ stream_id, response.status }); return response; @@ -226,24 +272,33 @@ pub const H2Connection = struct { /// Spawn async reader task for frame dispatch /// Returns true if reader started, false if connection is dead /// Safe to call multiple times - no-op if already running + /// Uses atomic cmpxchg to prevent race condition where two requests + /// both spawn readers (causes double-close panic) fn spawnReader(self: *Self, io: Io) bool { - if (self.reader_running) { + // Atomic compare-and-swap: only proceed if we're the one who set it to true + // This prevents the race where two requests both see false and spawn readers + if (self.reader_running.cmpxchgStrong(false, true, .acq_rel, .acquire)) |_| { + // cmpxchg returned non-null = failed = someone else already set it log.debug("spawnReader: already running", .{}); return true; } + + // We won the race - we set reader_running from false to true if (self.state == .dead) { log.warn("spawnReader: connection dead", .{}); + self.reader_running.store(false, .release); return false; } if (self.goaway_received) { log.warn("spawnReader: GOAWAY received", .{}); + self.reader_running.store(false, .release); return false; } log.debug("spawnReader: starting reader task", .{}); - self.reader_running = true; + self.shutdown_requested = false; - // Store future for proper cleanup (prevents async closure memory leak) + // Spawn reader and store future for proper cleanup in deinitAsync self.reader_future = Io.async( io, Http2Client.readerTask, @@ -254,10 +309,10 @@ pub const H2Connection = struct { &self.reader_running, &self.state, &self.goaway_received, - &self.stream_mutex, io, }, ); + log.debug("spawnReader: Io.async returned, future={?}", .{if (self.reader_future != null) @as(?*anyopaque, @ptrCast(&self.reader_future.?)) else null}); return true; } @@ -274,7 +329,7 @@ pub const H2Connection = struct { return self.state == .ready and !self.goaway_received; } - /// Clean up resources (async version - properly awaits reader) + /// Clean up resources (async version - waits for reader to exit) /// Use when you have Io context (from request handler) pub fn deinitAsync(self: *Self, io: Io) void { const trace = config_mod.isTlsTraceEnabled(); @@ -285,30 +340,27 @@ pub const H2Connection = struct { self.shutdown_requested = true; if (trace) tls_log.debug(" H2 deinitAsync: shutdown_requested=true", .{}); - // Phase 1: ALWAYS await reader future if it exists - // Critical: reader might still be in defer block even if reader_running=false - // We must wait for it to complete before touching any shared state + // Await reader future if it exists - ensures reader's defer block completes if (self.reader_future) |*future| { - if (trace) tls_log.debug(" H2 deinitAsync: phase 1 - awaiting reader task", .{}); - // Send close_notify to unblock reader from recv() ONLY if: - // - Reader is still running (might be blocked in recv) - // - State is not dead (reader hasn't already sent close_notify and closed socket) - // If state is dead, reader already handled everything - don't touch socket! - if (self.reader_running and self.state != .dead) { + if (trace) tls_log.debug(" H2 deinitAsync: awaiting reader future", .{}); + + // Send close_notify to unblock reader from recv() + // Only if state is not dead (reader hasn't already closed socket) + if (self.reader_running.load(.acquire) and self.state != .dead) { self.sock.sendCloseNotify(io); } + + // Properly await the reader task _ = future.await(io); self.reader_future = null; - if (trace) tls_log.debug(" H2 deinitAsync: phase 1 - reader task completed", .{}); + if (trace) tls_log.debug(" H2 deinitAsync: reader future completed", .{}); } - // Phase 2 & 3: Only if reader didn't already handle shutdown + // Close socket if reader didn't already // When state == .dead, reader already sent close_notify AND closed socket if (self.state != .dead) { - // Clean shutdown path - reader exited without sending close_notify - if (trace) tls_log.debug(" H2 deinitAsync: phase 2 - sending close_notify (clean shutdown)", .{}); + if (trace) tls_log.debug(" H2 deinitAsync: closing socket", .{}); self.sock.sendCloseNotify(io); - if (trace) tls_log.debug(" H2 deinitAsync: phase 3 - closing socket", .{}); self.sock.closeSocketOnly(); } else if (trace) { tls_log.debug(" H2 deinitAsync: reader already closed socket, skipping", .{}); @@ -357,7 +409,7 @@ test "H2Connection: initial state" { try std.testing.expectEqual(State.ready, conn.state); try std.testing.expect(!conn.goaway_received); - try std.testing.expect(!conn.reader_running); + try std.testing.expect(!conn.reader_running.load(.acquire)); try std.testing.expect(conn.isReady()); } diff --git a/src/http/http2/pool.zig b/src/http/http2/pool.zig index f3d09bd..5ee77f7 100644 --- a/src/http/http2/pool.zig +++ b/src/http/http2/pool.zig @@ -40,6 +40,12 @@ pub const H2ConnectionPool = struct { backends: []const BackendServer, allocator: std.mem.Allocator, + /// Per-backend mutex to prevent concurrent connection creation race + /// CANNOT be atomic: protects multi-step sequence (check empty → create → store) + /// Without this, multiple coroutines see "empty" before any assigns, + /// leading to duplicate connections and memory leaks + backend_mutex: [MAX_BACKENDS]Io.Mutex, + const Self = @This(); /// Initialize pool with backend servers @@ -49,6 +55,7 @@ pub const H2ConnectionPool = struct { .slot_state = undefined, .backends = backends, .allocator = allocator, + .backend_mutex = [_]Io.Mutex{.init} ** MAX_BACKENDS, }; // Initialize all slots to empty @@ -64,80 +71,87 @@ pub const H2ConnectionPool = struct { /// Get or create connection for backend /// - /// Strategy: - /// 1. Scan for available connection in this backend's slots - /// 2. If found and healthy, mark in_use and return - /// 3. If found but stale, destroy and create fresh + /// Strategy for HTTP/2 MULTIPLEXING: + /// 1. Lock per-backend mutex (prevents concurrent creation race) + /// 2. Scan for available connection with room for more streams + /// 3. If found and healthy, return it (stays available for more requests) /// 4. If not found, find empty slot and create fresh - /// 5. Connect (TLS + HTTP/2 handshake) + /// 5. Unlock mutex on return pub fn getOrCreate(self: *Self, backend_idx: u32, io: Io) !*H2Connection { std.debug.assert(backend_idx < self.backends.len); - // Phase 1: Scan for existing available connection + // Lock per-backend mutex to prevent concurrent creation race + // Critical: without this, multiple coroutines see "empty" before any assigns + try self.backend_mutex[backend_idx].lock(io); + defer self.backend_mutex[backend_idx].unlock(io); + + // Phase 1: Find existing connection with room for more streams for (&self.slot_state[backend_idx], &self.slots[backend_idx], 0..) |*state, *slot, i| { if (state.* == .available) { if (slot.*) |conn| { - // Check if connection is stale/dead + // Skip stale connections - DON'T destroy here! + // Other requests may still hold references. Let release() handle cleanup. if (self.isConnectionStale(conn)) { - log.debug("Connection stale, destroying: backend={d} slot={d}", .{ backend_idx, i }); - // CRITICAL: Clear slot FIRST to prevent other coroutines from seeing stale pointer - slot.* = null; - state.* = .empty; - self.destroyConnection(conn, io); - continue; // Will create fresh in Phase 2 + log.debug("Connection stale, skipping: backend={d} slot={d}", .{ backend_idx, i }); + continue; } - // Connection healthy, mark in_use and return - state.* = .in_use; - // Reset to aggressive timeout for active request handling - conn.sock.setReadTimeout(500) catch {}; - log.debug("Reusing connection: backend={d} slot={d}", .{ backend_idx, i }); - return conn; + // Check actual slot availability (8 stream slots per connection) + if (conn.h2_client.findFreeSlot() != null) { + const new_refs = conn.ref_count.fetchAdd(1, .acq_rel) + 1; + log.debug("Reusing connection: backend={d} slot={d} streams={d} refs={d}", .{ backend_idx, i, conn.h2_client.active_streams.load(.acquire), new_refs }); + return conn; + } } } } - // Phase 2: No available connection, find empty slot and create fresh + // Phase 2: Create new connection in empty slot for (&self.slot_state[backend_idx], &self.slots[backend_idx], 0..) |*state, *slot, i| { if (state.* == .empty) { - std.debug.assert(slot.* == null); - - // Create fresh connection const conn = try self.createFreshConnection(backend_idx, io); - errdefer self.destroyConnection(conn, io); - - // Store in slot, mark in_use + conn.ref_count.store(1, .release); // First user slot.* = conn; - state.* = .in_use; - log.debug("Created fresh connection: backend={d} slot={d}", .{ backend_idx, i }); + state.* = .available; + log.debug("Created fresh connection: backend={d} slot={d} refs=1", .{ backend_idx, i }); return conn; } } - // Pool exhausted + // All slots full - handler will retry on TooManyStreams log.warn("Connection pool exhausted: backend={d}", .{backend_idx}); return error.PoolExhausted; } /// Release connection back to pool /// + /// Decrements ref_count. Only destroys when ref_count hits 0. /// If success=true and connection healthy: mark available for reuse /// If success=false or connection dead: destroy and mark empty pub fn release(self: *Self, conn: *H2Connection, success: bool, io: Io) void { const backend_idx = conn.backend_idx; std.debug.assert(backend_idx < self.backends.len); - // Find slot containing this connection + // Atomically decrement reference count and get previous value + // This is safe for concurrent releases - only the one that gets prev_refs=1 proceeds to destroy + const prev_refs = conn.ref_count.fetchSub(1, .acq_rel); + std.debug.assert(prev_refs > 0); // Must have had at least 1 ref + + // If other users still have references (prev was > 1, now > 0), just return + if (prev_refs > 1) { + log.debug("Connection released, refs remaining: backend={d} refs={d}", .{ backend_idx, prev_refs - 1 }); + return; + } + + // We were the last user (prev_refs == 1, now 0) - find slot and decide fate for (&self.slot_state[backend_idx], &self.slots[backend_idx]) |*state, *slot| { if (slot.* == conn) { + + // Last user - decide whether to keep or destroy if (success and conn.isReady() and !self.isConnectionStale(conn)) { // Connection healthy - return to pool for reuse - // HTTP/2 multiplexing makes connection reuse highly valuable log.debug("Returning connection to pool: backend={d}", .{backend_idx}); conn.last_used_ns = currentTimeNs(); - // Set longer timeout for idle pooled connection (30 seconds) - // This allows the reader to wait for GOAWAY without triggering false errors - conn.sock.setReadTimeout(30_000) catch {}; state.* = .available; } else { // Failed, dead, or stale - destroy @@ -186,8 +200,9 @@ pub const H2ConnectionPool = struct { &conn.tls_output_buffer, ); - // Set aggressive timeout for fresh connection - conn.sock.setReadTimeout(500) catch {}; + // Enable TCP keepalive to detect dead connections (replaces SO_RCVTIMEO) + // OS will send probes and close dead connections automatically + conn.sock.enableKeepalive() catch {}; // Perform HTTP/2 handshake (send preface + SETTINGS) try conn.connect(io); @@ -197,6 +212,7 @@ pub const H2ConnectionPool = struct { } /// Check if connection is stale or dead + /// Returns true if connection should NOT be used for NEW requests fn isConnectionStale(self: *Self, conn: *H2Connection) bool { _ = self; @@ -205,9 +221,10 @@ pub const H2ConnectionPool = struct { return true; } - // Never destroy connections with active streams - if (conn.h2_client.active_streams > 0) { - return false; + // GOAWAY = no new streams allowed (existing streams can complete) + // Must check BEFORE active_streams - we can't send NEW requests even if conn is busy + if (conn.goaway_received) { + return true; } // Check basic connectivity @@ -215,12 +232,13 @@ pub const H2ConnectionPool = struct { return true; } - // Check GOAWAY received - if (conn.goaway_received) { - return true; + // Skip idle timeout check if connection has active streams + // Active connections are obviously not "idle" + if (conn.h2_client.active_streams.load(.acquire) > 0) { + return false; } - // Check idle timeout + // Check idle timeout (only for connections with no active streams) const now_ns = currentTimeNs(); const idle_ns = now_ns - conn.last_used_ns; if (idle_ns > IDLE_TIMEOUT_NS) { diff --git a/src/http/ultra_sock.zig b/src/http/ultra_sock.zig index f8ee311..dd9a862 100644 --- a/src/http/ultra_sock.zig +++ b/src/http/ultra_sock.zig @@ -551,6 +551,42 @@ pub const UltraSock = struct { } } + /// Receive with timeout - returns error.Timeout if no data within timeout_ms + /// For TLS: uses regular recv (TLS doesn't support raw socket timeout) + /// For plain: uses socket receiveTimeout directly + /// + /// Note: For TLS, timeout is approximate - we rely on the reader task's + /// periodic timeout checks between recv calls. + pub fn recvWithTimeout(self: *UltraSock, io: Io, buffer: []u8, timeout_ms: u32) !usize { + if (!self.connected) { + return error.NotConnected; + } + + // TLS path: can't use raw socket timeout (corrupts TLS state) + // Just use regular recv - timeout handled at reader task level + if (self.tls_conn != null) { + return self.recv(io, buffer); + } + + // Plain socket path - use receiveTimeout directly + if (self.stream) |stream| { + const duration = Io.Clock.Duration{ + .raw = Io.Duration.fromMilliseconds(@as(i64, timeout_ms)), + .clock = .real, + }; + const timeout = Io.Timeout{ .duration = duration }; + const msg = stream.socket.receiveTimeout(io, buffer, timeout) catch |err| { + if (err == error.Timeout) { + return error.Timeout; + } + self.connected = false; + return error.ReadFailed; + }; + return msg.data.len; + } + return error.NotConnected; + } + /// Get the TLS connection (for direct access to writeAll/read/next) pub fn getTlsConnection(self: *UltraSock) ?*tls.Connection { if (self.tls_conn != null) { @@ -636,14 +672,20 @@ pub const UltraSock = struct { /// Send TLS close_notify alert without closing the socket /// Use this for graceful shutdown - allows peer to send their close_notify back - /// Note: Does NOT check `connected` flag - recv errors set connected=false but - /// we should still send close_notify if TLS connection exists + /// Only sends if socket is still connected - if recv failed, TLS state may be corrupt pub fn sendCloseNotify(self: *UltraSock, io: Io) void { const trace_enabled = config_mod.isTlsTraceEnabled(); const tls_log = std.log.scoped(.tls_trace); - // Only check has_reader_writer and tls_conn - NOT connected - // After recv error/EOF, connected=false but we can still send close_notify + // MUST check connected - if recv failed, TLS cipher state may be corrupt + // Trying to encrypt close_notify with corrupt cipher causes panic + if (!self.connected) { + if (trace_enabled) { + tls_log.debug("Skipping close_notify: socket disconnected for {s}:{}", .{ self.host, self.port }); + } + return; + } + if (self.has_reader_writer) { if (self.tls_conn) |*conn| { // Update io context for the writer diff --git a/src/proxy/connection.zig b/src/proxy/connection.zig index 1b2e420..b643275 100644 --- a/src/proxy/connection.zig +++ b/src/proxy/connection.zig @@ -90,8 +90,8 @@ pub fn acquireConnection( return ProxyError.BackendUnavailable; }; - // Set read timeout to detect dead connections. - sock.setReadTimeout(1000) catch {}; + // Enable TCP keepalive to detect dead connections (replaces SO_RCVTIMEO) + sock.enableKeepalive() catch {}; // Log negotiated protocol if (sock.isHttp2()) { diff --git a/src/proxy/handler.zig b/src/proxy/handler.zig index 77ddb94..3421675 100644 --- a/src/proxy/handler.zig +++ b/src/proxy/handler.zig @@ -54,6 +54,9 @@ pub const ProxyError = error{ Timeout, EmptyResponse, InvalidResponse, + /// HTTP/2 GOAWAY exhausted retries - NOT a backend health failure + /// Server gracefully closed connection, just need fresh connection + GoawayRetriesExhausted, }; // ============================================================================ @@ -169,7 +172,10 @@ inline fn proxyWithFailover( state.recordSuccess(primary_idx); return response; } else |err| { - state.recordFailure(primary_idx); + // GOAWAY exhaustion is NOT a backend failure - just connection-level flow control + if (err != ProxyError.GoawayRetriesExhausted) { + state.recordFailure(primary_idx); + } log.warn("[W{d}] Backend {d} failed: {s}", .{ state.worker_id, primary_idx + 1, @errorName(err) }); } } else { @@ -179,7 +185,10 @@ inline fn proxyWithFailover( state.recordSuccess(primary_idx); return response; } else |err| { - state.recordFailure(primary_idx); + // GOAWAY exhaustion is NOT a backend failure - just connection-level flow control + if (err != ProxyError.GoawayRetriesExhausted) { + state.recordFailure(primary_idx); + } log.warn("[W{d}] Backend {d} failed: {s}", .{ state.worker_id, primary_idx + 1, @errorName(err) }); } } @@ -199,7 +208,10 @@ inline fn proxyWithFailover( state.recordSuccess(failover_idx); return response; } else |failover_err| { - state.recordFailure(failover_idx); + // GOAWAY exhaustion is NOT a backend failure + if (failover_err != ProxyError.GoawayRetriesExhausted) { + state.recordFailure(failover_idx); + } const err_name = @errorName(failover_err); log.warn("[W{d}] Failover to backend {d} failed: {s}", .{ state.worker_id, @@ -214,7 +226,10 @@ inline fn proxyWithFailover( state.recordSuccess(failover_idx); return response; } else |failover_err| { - state.recordFailure(failover_idx); + // GOAWAY exhaustion is NOT a backend failure + if (failover_err != ProxyError.GoawayRetriesExhausted) { + state.recordFailure(failover_idx); + } const err_name = @errorName(failover_err); log.warn("[W{d}] Failover to backend {d} failed: {s}", .{ state.worker_id, @@ -519,7 +534,7 @@ fn forwardH2Response( // Forward response to client const client_writer = ctx.writer; // Convert status code - use 502 for invalid status codes from backend - const status: http.Status = std.meta.intToEnum(http.Status, response.status) catch .@"Bad Gateway"; + const status: http.Status = std.enums.fromInt(http.Status, response.status) orelse .@"Bad Gateway"; var http_response = ctx.response; http_response.status = status; http_response.mime = http.Mime.HTML; // Default to HTML for HTTP/2 responses @@ -585,34 +600,78 @@ fn streamingProxyHttp2( // Get pool (must exist) const pool = state.h2_pool orelse return ProxyError.ConnectionFailed; - // Get or create connection (pool handles everything: TLS, handshake, retry) - var conn = pool.getOrCreate(backend_idx, ctx.io) catch |err| { - log.warn("[REQ {d}] H2 pool getOrCreate failed: {}", .{ req_id, err }); - return ProxyError.ConnectionFailed; - }; + // Retry loop for TooManyStreams (connection full, not broken) + const h2_conn = @import("../http/http2/connection.zig"); + const h2_client = @import("../http/http2/client.zig"); + var response: h2_client.Response = undefined; + var used_conn: *h2_conn.H2Connection = undefined; + var attempts: u32 = 0; + var last_was_goaway: bool = false; // Track if exhaustion was due to GOAWAY + while (attempts < 5) : (attempts += 1) { + // Micro-backoff for GOAWAY retries (prevents thundering herd without latency cost) + // 0µs, 100µs, 200µs, 400µs, 800µs - just enough to spread out retries + if (last_was_goaway and attempts > 0) { + const backoff_ns: u64 = @as(u64, 100_000) << @intCast(attempts - 1); + std.Io.sleep(ctx.io, .{ .nanoseconds = @intCast(backoff_ns) }, .awake) catch {}; + } + last_was_goaway = false; - // Make request (connection handles: send, reader spawn, await) - var response = conn.request( - @tagName(ctx.request.method orelse .GET), - ctx.request.uri orelse "/", - backend.getFullHost(), - ctx.request.body, - ctx.io, - ) catch |err| { - log.warn("[REQ {d}] H2 request failed: {}", .{ req_id, err }); - pool.release(conn, false, ctx.io); + // Get or create connection (pool handles everything: TLS, handshake, retry) + const conn = pool.getOrCreate(backend_idx, ctx.io) catch |err| { + log.warn("[REQ {d}] H2 pool getOrCreate failed: {}", .{ req_id, err }); + return ProxyError.ConnectionFailed; + }; + + // Make request (connection handles: send, reader spawn, await) + response = conn.request( + @tagName(ctx.request.method orelse .GET), + ctx.request.uri orelse "/", + backend.getFullHost(), + ctx.request.body, + ctx.io, + ) catch |err| { + if (err == error.TooManyStreams) { + // Connection full (all 8 slots busy) - release as healthy and retry + log.debug("[REQ {d}] Connection full, retrying...", .{req_id}); + pool.release(conn, true, ctx.io); + continue; + } + if (err == error.RetryNeeded or err == error.ConnectionGoaway) { + // GOAWAY received - get fresh connection and retry + // NOT a failure - just graceful connection shutdown + log.debug("[REQ {d}] GOAWAY, retrying on fresh connection", .{req_id}); + pool.release(conn, false, ctx.io); // Destroy this conn, but NOT a backend failure + last_was_goaway = true; + continue; + } + // Other errors - connection is broken + log.warn("[REQ {d}] H2 request failed: {}", .{ req_id, err }); + pool.release(conn, false, ctx.io); + return ProxyError.SendFailed; + }; + + // Success - save connection for release after response forwarding + used_conn = conn; + break; + } else { + // All retries exhausted + if (last_was_goaway) { + // GOAWAY exhausted retries - NOT a backend health failure + // Server is healthy, just aggressively closing connections under load + log.debug("[REQ {d}] GOAWAY exhausted retries (not a failure)", .{req_id}); + return ProxyError.GoawayRetriesExhausted; + } + log.warn("[REQ {d}] H2 request failed after {d} retries", .{ req_id, attempts }); return ProxyError.SendFailed; - }; + } defer response.deinit(); + defer pool.release(used_conn, true, ctx.io); // Update proxy state proxy_state.status_code = response.status; proxy_state.bytes_from_backend = @intCast(response.body.items.len); - // Release connection back to pool - pool.release(conn, true, ctx.io); - - // Forward response to client + // Forward response to client (connection released via defer above) return forwardH2Response(ctx, proxy_state, &response, backend_idx, start_ns, req_id); } diff --git a/vendor/zzz.io/build.zig b/vendor/zzz.io/build.zig index 558c391..880db5d 100644 --- a/vendor/zzz.io/build.zig +++ b/vendor/zzz.io/build.zig @@ -52,7 +52,7 @@ fn add_example( }); if (link_libc) { - example.linkLibC(); + example.root_module.link_libc = true; } example.root_module.addImport("zzz", zzz_module); diff --git a/vendor/zzz.io/src/http/request.zig b/vendor/zzz.io/src/http/request.zig index c74e1f0..ff216d6 100644 --- a/vendor/zzz.io/src/http/request.zig +++ b/vendor/zzz.io/src/http/request.zig @@ -86,7 +86,7 @@ pub const Request = struct { } else { var header_iter = std.mem.tokenizeScalar(u8, line, ':'); const key = header_iter.next() orelse return HTTPError.MalformedRequest; - const value = std.mem.trimLeft(u8, header_iter.rest(), &.{' '}); + const value = std.mem.trimStart(u8, header_iter.rest(), &.{' '}); if (value.len == 0) return HTTPError.MalformedRequest; try self.headers.put(key, value); }