Conversation
Add the foundation for server-side peer tracking as specified in doc/symmetric-configuration-phase2.md. config_opts.go: - Add InboundNodeOption interface; only WithNodes satisfies it, giving compile-time safety that WithNodeList cannot be used for inbound configuration (which requires pre-assigned IDs). - Add nodeMap[T].newServerConfig: validates IDs (no zero), normalizes addresses, rejects duplicates, and populates InboundManager.peerAddrs. inbound_manager.go (new): - Add gorumsNodeIDKey constant and nodeID(ctx) helper for extracting a NodeID from the gorums-node-id gRPC metadata key. - Add InboundManager struct with NewInboundManager constructor, KnownIDs() and isKnown() methods. inbound_manager_test.go (new): - Compile-time assertion that nodeMap[T] satisfies InboundNodeOption. - TestNewInboundManager: valid nodes, empty map, node-0, duplicate address, and invalid address cases. - TestInboundNodeOptionNodeListRejected: runtime check that nodeList does not satisfy InboundNodeOption. - TestNodeID: nine cases covering valid IDs, zero, negative, non-numeric, overflow, and missing metadata.
Change the Node.channel field from *stream.Channel to atomic.Pointer[stream.Channel]. This allows the field to be nil (representing a peer node with no active stream) without requiring a separate boolean flag, and makes nil-safe access uniform across outbound and inbound nodes. Changes: - Node.channel: *stream.Channel → atomic.Pointer[stream.Channel] - Add Node.enqueue: loads channel atomically, drops request if nil - NodeContext.enqueue: delegate to Node.enqueue instead of calling node.channel.Enqueue directly - Add newPeerNode: constructs a Node without an active channel, for use by InboundManager before a peer's stream arrives - Add Node.attachStream: stores a new inbound channel on connect - Add Node.detachStream: swaps nil and closes the old channel on disconnect - Update Node.close, LastErr, Latency, LastNodeError to use channel.Load() - client_interceptor: call n.enqueue instead of n.channel.Enqueue - node_test: update TestNodeSort to use channel.Store; add BenchmarkNodeEnqueue and BenchmarkNodeEnqueueSend
…nfig Add the full InboundManager implementation for server-initiated calls. Nodes are pre-created at construction time (channel nil); channels are attached when peers connect and detached when streams end. Changes: - InboundManager struct: replace peerAddrs map with nodes map[uint32]*Node; add myID, inboundCfg, nextMsgID, sendBufferSize, onConfigChange fields - NewInboundManager: add myID and variadic InboundManagerOption parameters; apply options before node creation so sendBufferSize is in effect - Add InboundManager.newNode: encapsulates newPeerNode + map insertion, called by config_opts during construction (no locking needed) - Add InboundManager.getMsgID: atomic counter for server-initiated message IDs - Add InboundManager.InboundConfig: returns current live Configuration - Add InboundManager.AcceptPeer: extracts peer NodeID from gRPC metadata, skips external clients and unknown peers, delegates to registerPeer; error return reserved for future credential validation - Add InboundManager.registerPeer: attaches inbound channel, rebuilds config, returns idempotent cleanup func (sync.Once) that detaches on stream end - Add InboundManager.rebuildConfig: rebuilds sorted inboundCfg from nodes map; includes self-node (myID) unconditionally and connected peers - opts.go: add InboundManagerOption type and WithInboundSendBufferSize - inbound_manager_test.go: add mockBidiStream, newTestInboundManager helper, TestInboundManagerRegisterUnregister (table-driven, covers sort, reconnect, stale cleanup, idempotent unregister), TestAcceptPeer, TestAcceptPeerReturnsCleanup; update TestNewInboundManager to cover WithNodeList and wantCfgIDs
Remove the separate InboundNodeOption interface and add newServerConfig to NodeListOption itself, so a single option value works for both Manager (outbound, via newConfig) and InboundManager (inbound, via newServerConfig). This eliminates the runtime type assertion in NewInboundManager and allows WithNodeList to be passed to NewInboundManager. Changes: - Drop InboundNodeOption interface - Add newServerConfig(*InboundManager) error to NodeListOption - nodeMap[T].newServerConfig: updated comment; call im.newNode instead of reaching into im.nodes and im.getMsgID directly - nodeList.newServerConfig: new method; assigns IDs sequentially starting at 1, with address normalization and duplicate detection - WithNodeList doc comment: note ID-assignment behavior for InboundManager - inbound_manager_test.go: replace compile-time InboundNodeOption assertion with NodeListOption assertions for both nodeMap[T] and nodeList; replace WithNodeListRejected test case with WithNodeListAssignsIDs and WithNodeListEmptyRejected; remove TestInboundNodeOptionNodeListRejected
When a peer connects via AcceptPeer, an inbound Channel is created with its own sender() goroutine. NodeStream's anonymous send goroutine also writes to the same gRPC server stream, violating gRPC's prohibition on concurrent Send() calls. The race is latent today but would trigger the moment server-initiated quorum calls (Phase 3) make node.enqueue() reachable for inbound nodes. Fix: for inbound peers, NodeStream's send goroutine relays handler responses through node.enqueue() instead of calling srv.Send() directly. This makes Channel.sender() the sole writer on the inbound stream. ServerCtx and the finished channel are unchanged; the only difference is one branch inside the existing send goroutine. Also refactor AcceptPeer to return (*Node, error) and replace the closure-based cleanup with an explicit UnregisterPeer(id) method on InboundManager, giving NodeStream access to the peer Node for the relay path and simplifying the unregister API. Changes: - inbound_manager.go: AcceptPeer returns (*Node, error), add UnregisterPeer(id), remove sync.Once closure from registerPeer - server.go: NodeStream relays via node.enqueue() for inbound peers, defers UnregisterPeer(node.ID()), rename field to inboundMgr - inbound_manager_test.go: update tests for new API, rename TestAcceptPeerReturnsCleanup to TestAcceptPeerReturnsNode, remove StaleUnregisterAfterReconnect test - channel.go: update NewInboundChannel doc comments - node.go: trim attachStream doc comment
The original tiebreaker design in registerPeer was incorrect: it rejected new inbound connections when myID < peerID, keeping the old (broken) channel on reconnect. This made peer reconnection fail silently. Replace the tiebreaker with always-replace logic by changing attachStream to use atomic Swap() instead of Store(). If an existing channel is present (e.g., stale stream from a previous connection), it is atomically replaced and the old channel is closed. This encapsulates replacement entirely within Node, simplifying registerPeer. Test reshaped: TestSymmetricStreamTiebreaker → TestRegisterPeerReplacesExisting
Extract response routing machinery from Channel into a standalone MessageRouter type in internal/stream/router.go. Node owns the router and injects it into each Channel, so the router survives channel replacement during inbound reconnects. MessageRouter provides: - Register/RouteResponse for pending call lifecycle - CancelPending/RequeuePending for stream failure handling - Latency tracking via moving average, computed in RouteResponse Key design changes: - Node holds *MessageRouter, passes it to NewOutboundChannel and NewInboundChannel via constructor injection - Node.Latency() delegates directly to the router instead of the channel - Channel no longer owns response routing state or latency tracking - RouteResponse returns bool (latency computed internally) - Register sets SendTime internally, removing it from caller
Add demuxing in NodeStream's Recv loop so that responses to server-initiated calls are routed to the correct pending request via the MessageRouter, instead of being dispatched as new requests. - Add Node.RouteResponse(msg) that unmarshals the response and delegates to router.RouteResponse - Insert demux check in NodeStream before handler dispatch: if the message matches a pending call, route it and continue - Add TestNodeRouteResponse covering match, consumed, and unknown cases
Move Envelope (was gorums.Message wrapper), ServerCtx, Handler, Interceptor, and the NodeStream implementation from the gorums package to internal/stream. This co-locates all stream I/O logic in one package, eliminating import cycles for the upcoming unified handler registry on MessageRouter. - Define PeerAcceptor/PeerNode interfaces to break the cross-package dependency on InboundManager and Node - gorums.Server becomes a thin wrapper around stream.Server - Type aliases in gorums preserve full backward compatibility - AcceptPeer now returns a cleanup function instead of requiring a separate UnregisterPeer call
Add handlers map, SetHandlers, and HandleRequest to MessageRouter for type-safe handler lookup from both server and client sides. - Add SetHandlers to PeerNode interface so NodeStream can propagate the server's handler map to each peer's router on connect - Node.SetHandlers delegates to its router - Shared map by reference: handlers registered once are visible to all routers for the same role - Add TestRouterHandleRequest covering nil, found, not-found, and shared map mutation cases
Update Channel.receiver to dispatch server-initiated requests via the router's HandleRequest when RouteResponse returns false. - Add dispatchRequest method that unmarshals the request, calls the handler, marshals the response, and enqueues it back - Add TestServerCallsClient E2E test validating the full symmetric path: server request → client handler → server response
This adds type assertion for interfaces stream.PeerAcceptor and stream.PeerNode for the InboundManager and Node, respectively.
Replace explicit InboundManager construction and WithInboundManager option with higher-level ServerOptions that create the InboundManager internally: - WithPeers(myID, nodeListOpt): configure known peer tracking - WithDynamicPeers(): accept unknown connecting clients (Phase E) - WithPeerSendBufferSize(size): per-peer send buffer - WithOnConfigChange(f): callback on config changes API before: im, _ := NewInboundManager(myID, WithNodes(peers), WithInboundSendBufferSize(8)) srv := NewServer(WithInboundManager(im)) cfg := im.InboundConfig() API after: srv := NewServer(WithPeers(myID, WithNodes(peers)), WithPeerSendBufferSize(8)) cfg := srv.InboundConfig() Server changes: - NewServer creates InboundManager internally when peer options are set - Add Server.InboundConfig() accessor for the live peer configuration - Panic on configuration errors (programmer errors at startup) InboundManager changes: - Unexport constructor: NewInboundManager → newInboundManager - Accept explicit params (sendBuffer, onConfigChange) instead of variadic InboundManagerOption funcs - Remove InboundManagerOption type (WithInboundSendBufferSize, WithOnConfigChange) from opts.go - Return single value (panics on error) - Fold id == 0 check into isKnown for cleaner AcceptPeer AcceptPeer improvements: - Nil-receiver safe: no-op when InboundManager is nil - Always return a no-op cleanup func (never nil), so callers can unconditionally defer cleanup() - Accept handlers map param and call node.setHandlers internally, removing SetHandlers from PeerNode interface Node changes: - Unexport SetHandlers → setHandlers (only called within gorums package) Test changes: - E2E tests use WithPeers and srv.InboundConfig() instead of manually constructing InboundManager - Extract testPeerServer, peerNodes, shouldPanic test helpers - Remove unused waitForConfig; add waitForServerConfig - Error tests use wantPanic field with shouldPanic helper
Enable the Gorums server to accept connections from unknown clients (e.g. ephemeral clients that do not have pre-configured NodeIDs) and interact with them via server-initiated calls. - Add WithDynamicPeers ServerOption to enable dynamic registration. - Automatically assign sequential NodeIDs (starting from 1<<20 to avoid collisions with known replicas) to unknown clients upon connection. - Append dynamically assigned clients to the live InboundConfig, allowing the server to send requests to them (e.g., QuorumCall, Multicast) using the same bidi stream established by the client. - Treat dynamic nodes as ephemeral: completely remove them from the server's internal node map when their stream disconnects, unlike known peers which persist for reconnection. - Update AcceptPeer in InboundManager to handle the new dynamic mode, branching between known peers, dynamic peers, and rejected clients. - Add E2E tests for dynamic clients: TestDynamicPeerConnects, TestDynamicPeerDisconnects, TestDynamicPeerMixedMode, and TestDynamicPeerServerCallsClient.
This is just a cleanliness commit; we can save some lines of code.
Prevent unprotected read access to the `InboundManager.nodes` map in `AcceptPeer` by replacing the `isKnown(id)` boolean check with a `getNode(id *Node)` helper. Previously, `AcceptPeer` checked `isKnown(id)` (which acquired and released an RLock) and subsequently accessed `im.nodes[id]` directly without a lock. With the introduction of dynamic peers, `im.nodes` is now a mutable map (updated concurrently by `acceptDynamic` and `removeDynamic`), causing a data race between unprotected reads and locked writes. `getNode(id)` performs the lookup and returns the `*Node` reference safely within a single RLock scope, eliminating the race condition.
Refactor the InboundManager connection handling to return cleanup closures directly from registerPeer and acceptDynamic, removing the need for standalone unregisterPeer and removeDynamic methods. - Make registerPeer and acceptDynamic signatures symmetric, both returning (stream.PeerNode, func(), error). - Move node.setHandlers(handlers) inside these methods, simplifying the logic in AcceptPeer. - Remove the unexported unregisterPeer and removeDynamic methods, inlining their logic securely within the returned cleanup closures. - Update inbound_manager_test.go to use AcceptPeer instead of the removed internal methods for simulating peer connections and disconnections. - Maintain separate registerPeer (for known peers) and acceptDynamic (for unknown clients) logic paths, allowing future divergence (e.g., separating peer nodes from dynamic nodes into distinct maps). - Reverted getNode back to isKnown that returns bool now that the registerPeer method handles the work, we no longer need the Node in AcceptPeer.
Clean up outdated test names for InboundManager to align with the new "dynamic peer" test naming conventions, and delete redundant test cases that are now fully covered by table-driven tests. - Rename TestInboundManagerRegisterUnregister to TestAcceptPeerUpdatesConfig to accurately reflect its usage of the AcceptPeer API. - Rename TestRegisterPeerReplacesExisting to TestAcceptPeerReplacesExistingStream. - Delete TestAcceptPeerReturnsNode as its cases (handling external clients, known peers, unknown peers) are already exhaustively tested in the TestAcceptPeer table test. - Rename legacy TestInboundManagerPeerConnects, TestInboundManagerPeerDisconnects, and TestServerCallsClient adding the KnownPeer prefix for symmetry with the new DynamicPeer tests.
This commit introduces a distinction between configurations for known peers and dynamically connected clients in the InboundManager, preventing them from being merged into a single Configuration by default. - InboundManager now maintains a config (known peers) and a dynamicConfig (unknown clients). - Server and System now expose both Config() and DynamicConfig() methods. - Tests in inbound_manager_test have been updated to check the appropriate configuration using a new generic waitForConfigCondition helper.
This architectural shift simplifies the internal stream package into a pure bidirectional multiplexer, completely decoupled from gorums specific RPC implementations and Protobuf decoding. - Move proto.Message decoding out of the channel/router layer. The router now carries raw *stream.Message values instead of decoded proto.Message values. Each call type (RPCCall, quorum calls) calls unmarshalResponse when consuming a response, keeping the wire-format message in the pipeline until needed. - Moved ServerCtx, Handler, Interceptor, Envelope, and marshaling from internal/stream to gorums package. - Introduced RequestHandler interface in internal/stream to replace the direct dependency on gorums handler types. - Updated stream.MessageRouter and stream.NodeResponse to operate on *stream.Message instead of proto.Message, pushing protobuf unmarshaling to the gorums layer (ClientCtx, unicast, rpc). - Updated gorums.Node and inbound_manager to use setRequestHandler instead of setHandlers. - Adapted unit tests and benchmarks to accommodate the architectural changes. - Simplify interceptors: handlers no longer need to re-marshal modified proto messages back into the payload. SendMessage lazily marshals out.Msg on send.
|
|
Overall Grade |
Security Reliability Complexity Hygiene |
Code Review Summary
| Analyzer | Status | Updated (UTC) | Details |
|---|---|---|---|
| Go | Feb 27, 2026 2:07p.m. | Review ↗ | |
| Shell | Feb 27, 2026 2:07p.m. | Review ↗ |
We don't want to deal with secure gRPC servers in benchmarks and tests.
We don't want to deal with secure gRPC servers in benchmarks and tests.
There was a problem hiding this comment.
Pull request overview
This PR introduces symmetric configuration support with dynamic peer registration, enabling servers to track connected clients and perform server-initiated RPC calls. The implementation significantly decouples the stream package from Gorums-specific logic and establishes clear interface boundaries for bidirectional communication.
Changes:
- Introduced
InboundManagerto track known and dynamic peers, maintaining liveConfig()andDynamicConfig()views - Refactored stream multiplexing into
internal/streampackage withPeerAcceptorandPeerNodeinterfaces to decouple from gorums package - Moved
MessageRouterto Node level (from Channel) to survive channel replacement during reconnects, usingatomic.Pointer[Channel]for lock-free swaps - Added
ServerCtx.Config()andServerCtx.DynamicConfig()methods providing handlers with live peer state access - Introduced
WithPeers()andWithDynamicPeers()ServerOptions for configuring peer tracking
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| inbound_manager.go | New component managing server-side peer tracking and dynamic peer registration with auto-assigned IDs |
| handler.go | Extracted handler types, interceptors, and ServerCtx from server.go; added Config/DynamicConfig accessor methods |
| internal/stream/server.go | New stream server handling NodeStream connections with PeerAcceptor integration for peer identification |
| internal/stream/router.go | New MessageRouter for response routing and handler dispatch, owned by Node and injected into channels |
| internal/stream/channel.go | Integrated MessageRouter injection; removed latency tracking (moved to router); added client-side request handler dispatch |
| server.go | Major refactoring - moved handler types to handler.go, integrated InboundManager, added HandleRequest bridge method |
| node.go | Changed channel field to atomic.Pointer for lock-free swaps; added attachStream/detachStream for inbound channels; implements PeerNode interface |
| config_opts.go | Added newServerConfig method to NodeListOption for server-side peer configuration with sequential ID assignment |
| marshaling.go | Moved from internal/stream to gorums package; made functions private (unmarshalRequest/unmarshalResponse) |
| encoding.go | Removed Message type and helper functions (moved to handler.go) |
| rpc.go | Updated to unmarshal responses from stream.Message; fixed error handling to avoid double-error return |
| responses.go | Changed response type from NodeResponse[msg] to NodeResponse[*stream.Message] for consistency |
| unicast.go | Updated reply channel type to NodeResponse[*stream.Message] |
| client_interceptor.go | Updated reply channel type to NodeResponse[*stream.Message]; changed Enqueue calls to use Node method |
| system.go | Added Config() and DynamicConfig() proxy methods to Server |
| testing_shared.go | Added TestManager helper for creating Manager with custom options |
| inbound_manager_test.go | Comprehensive test suite with E2E tests for known/dynamic peers, connect/disconnect, and symmetric communication |
| internal/stream/router_test.go | Unit tests for MessageRouter registration, routing, and handler lookup |
| internal/stream/channel_test.go | Updated tests for router integration and inbound channel behavior |
| node_test.go | Added tests for RouteResponse, Enqueue overhead benchmarks, and end-to-end send benchmarks |
| server_test.go | Updated interceptor test to reflect lazy marshaling changes |
| responses_test.go | Updated to marshal responses into stream.Message format |
| .vscode/gorums.txt | Added new dictionary words (benchmem, demultiplexing, unregistrations) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This also uses the generic ParseInteger instead of std lib one.
This automatically assigns this system's NodeID in the connection metadata, enabling the remove server to identify the replica.
Replace the "known peers / dynamic peers" naming with a cleaner Config / ClientConfig distinction throughout the public API: - WithPeers(myID, opt) → WithConfig(myID, opt, ...onChange) - WithDynamicPeers() → WithClientConfig(...onChange) - Server.DynamicConfig() → Server.ClientConfig() - System.DynamicConfig() → System.ClientConfig() - ServerCtx.DynamicConfig() → ServerCtx.ClientConfig() - InboundManager.DynamicConfig() → InboundManager.ClientConfig() WithOnConfigChange is removed; the change callbacks are now optional variadic arguments on WithConfig and WithClientConfig respectively, co-locating callback registration with the option that owns it. Internally: dynamicPeers→clientPeers, dynamicConfig→clientConfig, dynamicIDStart→clientIDStart, nextDynamicID→nextClientID, acceptDynamic→acceptClient. Fixes one of the code review comments raised by copilot.
Add a pre-increment check in acceptClient that returns an error when nextClientID reaches math.MaxUint32, preventing silent wraparound and potential ID collisions with known peer IDs or the reserved external client ID (0). Also update the clientIDStart comment to document the ~4.3 billion ID capacity and the overflow guard behavior. This fixes another copilot code review comment.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 27 out of 27 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This PR introduces the Inbound Manager, implements support for Symmetric Configuration, and significantly decouples the internal stream package from Gorums-specific RPC logic. These changes enhance connection management, support dynamic client registration, and lay the foundation for fully symmetric peer-to-peer RPC capabilities.
Fixes #274
Fixes #269
Key Changes relative to
masterInboundManagerto track and manage incoming connections from both known peers and previously unknown ("dynamic") clients.Server.Config()andServer.DynamicConfig()methods to provide live views of currently connected known and dynamic peers, making this state accessible dynamically.WithDynamicPeers()ServerOption. When enabled, the server accepts connections from unrecognized nodes, assigns them a sequential auto-generated ID, and tracks them in theDynamicConfig.MessageRouter, handler types, and internal stream abstractions into theinternal/streampackage.HandleRequestnow acts as the bridge that dispatches decoded messages to registered handlers.RequestHandler):RequestHandlerinterface to facilitate bidirectional request routing. Clients and servers can now leverage unified handler registries to dispatch incoming requests symmetrically regardless of who initiated the connection.ServerCtx):Config()andDynamicConfig()methods to theServerCtxstruct, providing RPC request handlers with direct, live access to connected peer state and routing information.Sendrace on inbound channels by centralizing the send path ensuring onlyChannel.sender()writes to the gRPC stream.enqueue()method onNodeand replaced standard locking withatomic.Pointer[stream.Channel]for atomic stream swaps, greatly reducing lock contention during connection tiebreakers.Context and Migration
ServerCtxnow exposeConfig()andDynamicConfig()methods, providing handlers with access to live routing information.unregisterPeerhave been removed in favor of inline connection cleanup directly within the scopes returned byInboundManager.WithDynamicPeers().