-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathnode.go
More file actions
348 lines (304 loc) · 9.52 KB
/
node.go
File metadata and controls
348 lines (304 loc) · 9.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
package gorums
import (
"context"
"fmt"
"net"
"sort"
"strconv"
"sync/atomic"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/relab/gorums/internal/stream"
)
const nilAngleString = "<nil>"
// NodeContext is a context that carries a node for unicast and RPC calls.
// It embeds context.Context and provides access to the Node.
//
// Use [Node.Context] to create a NodeContext from an existing context.
type NodeContext struct {
context.Context
node *Node
}
// Node returns the Node associated with this context.
func (c NodeContext) Node() *Node {
return c.node
}
// enqueue enqueues a request to this node's channel. If the channel is nil,
// e.g., for the self-node, the request is silently dropped.
func (c NodeContext) enqueue(req stream.Request) {
c.node.Enqueue(req)
}
// nextMsgID returns the next message ID from this client's manager.
func (c NodeContext) nextMsgID() uint64 {
return c.node.msgIDGen()
}
// Node encapsulates the state of a node on which a remote procedure call
// can be performed.
type Node struct {
// Only assigned at creation.
id uint32
addr string
mgr *Manager // only used for backward compatibility to allow Configuration.Manager()
msgIDGen func() uint64
router *stream.MessageRouter
channel atomic.Pointer[stream.Channel]
}
// Context creates a new NodeContext from the given parent context
// and this node.
//
// Example:
//
// nodeCtx := node.Context(context.Background())
// resp, err := service.GRPCCall(nodeCtx, req)
func (n *Node) Context(parent context.Context) *NodeContext {
if n == nil {
panic("gorums: Context called with nil node")
}
return &NodeContext{Context: parent, node: n}
}
// nodeOptions contains configuration options for creating a new Node.
type nodeOptions struct {
ID uint32
SendBufferSize uint
MsgIDGen func() uint64
Metadata metadata.MD
PerNodeMD func(uint32) metadata.MD
DialOpts []grpc.DialOption
RequestHandler stream.RequestHandler
Manager *Manager // only used for backward compatibility to allow Configuration.Manager()
}
// newOutboundNode creates a new node using the provided options. It establishes
// the connection (lazy dial) and initializes the outbound channel.
func newOutboundNode(addr string, opts nodeOptions) (*Node, error) {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
n := &Node{
id: opts.ID,
addr: tcpAddr.String(),
mgr: opts.Manager,
msgIDGen: opts.MsgIDGen,
router: stream.NewMessageRouter(opts.RequestHandler),
}
// Create gRPC connection to the node without connecting (lazy dial).
conn, err := grpc.NewClient(n.addr, opts.DialOpts...)
if err != nil {
return nil, nodeError{nodeID: n.id, cause: err}
}
// Create outgoing context with metadata for this node's stream.
md := opts.Metadata.Copy()
if opts.PerNodeMD != nil {
md = metadata.Join(md, opts.PerNodeMD(n.id))
}
ctx := metadata.NewOutgoingContext(context.Background(), md)
// Create new outbound channel and establish gRPC node stream
n.channel.Store(stream.NewOutboundChannel(ctx, n.id, opts.SendBufferSize, conn, n.router))
return n, nil
}
// newInboundNode creates a Node for a known peer or self without an active
// channel. Used by inboundManager at construction time for all configured
// peers; the channel is attached when the peer's stream arrives.
func newInboundNode(id uint32, addr string, msgIDGen func() uint64) *Node {
return &Node{
id: id,
addr: addr,
msgIDGen: msgIDGen,
router: stream.NewMessageRouter(),
}
}
// newLocalNode creates a Node that dispatches calls in-process, bypassing the
// network. It is used for the self-node when this process is both client and
// server in a symmetric peer configuration. The provided handler (typically
// the local *Server) serves requests directly without a gRPC round-trip.
func newLocalNode(id uint32, addr string, msgIDGen func() uint64, handler stream.RequestHandler, mgr *Manager) *Node {
router := stream.NewMessageRouter(handler)
n := &Node{
id: id,
addr: addr,
mgr: mgr,
msgIDGen: msgIDGen,
router: router,
}
n.channel.Store(stream.NewLocalChannel(id, router))
return n
}
// IsInbound returns true if the node has an active inbound channel.
func (n *Node) IsInbound() bool {
if n == nil {
return false
}
ch := n.channel.Load()
return ch != nil && ch.IsInbound()
}
// attachStream attaches a new inbound channel to the node when a peer connects.
// If the node already has an active channel (e.g., a stale stream from a previous
// connection), it is atomically replaced and the old channel is closed.
func (n *Node) attachStream(streamCtx context.Context, inboundStream stream.BidiStream, sendBufferSize uint) {
newCh := stream.NewInboundChannel(streamCtx, n.id, sendBufferSize, inboundStream, n.router)
if old := n.channel.Swap(newCh); old != nil {
old.Close()
}
}
// detachStream closes and removes the node's inbound channel when the peer disconnects.
func (n *Node) detachStream() {
if ch := n.channel.Swap(nil); ch != nil {
ch.Close()
}
}
// RouteResponse routes an incoming message as a response to a pending
// server-initiated call. Returns true if the message matched a pending
// call and was handled; false if it should be dispatched as a new request.
// This implements the [stream.PeerNode] interface.
func (n *Node) RouteResponse(msg *stream.Message) bool {
return n.router.RouteResponse(msg.GetMessageSeqNo(), NodeResponse[*stream.Message]{
NodeID: n.id,
Value: msg,
Err: msg.ErrorStatus(),
})
}
// Enqueue enqueues a request to this node's channel.
// For local channels the channel handles in-process dispatch directly.
// If no channel is available, the request is silently dropped.
// This implements the [stream.PeerNode] interface.
func (n *Node) Enqueue(req stream.Request) {
if ch := n.channel.Load(); ch != nil {
ch.Enqueue(req)
}
}
// close this node.
func (n *Node) close() error {
if ch := n.channel.Load(); ch != nil {
return ch.Close()
}
return nil
}
// ID returns the ID of n.
func (n *Node) ID() uint32 {
if n != nil {
return n.id
}
return 0
}
// Address returns network address of n.
func (n *Node) Address() string {
if n != nil {
return n.addr
}
return nilAngleString
}
// Host returns the network host of n.
func (n *Node) Host() string {
if n == nil {
return nilAngleString
}
host, _, _ := net.SplitHostPort(n.addr)
return host
}
// Port returns network port of n.
func (n *Node) Port() string {
if n != nil {
_, port, _ := net.SplitHostPort(n.addr)
return port
}
return nilAngleString
}
func (n *Node) String() string {
if n != nil {
return fmt.Sprintf("addr: %s", n.addr)
}
return nilAngleString
}
// FullString returns a more descriptive string representation of n that
// includes id, network address and latency information.
func (n *Node) FullString() string {
if n != nil {
return fmt.Sprintf("node %d | addr: %s", n.id, n.addr)
}
return nilAngleString
}
// LastErr returns the last error encountered (if any) for this node.
func (n *Node) LastErr() error {
if ch := n.channel.Load(); ch != nil {
return ch.LastErr()
}
return nil
}
// Latency returns the latency between the client and this node.
func (n *Node) Latency() time.Duration {
return n.router.Latency()
}
type lessFunc func(n1, n2 *Node) bool
// MultiSorter implements the Sort interface, sorting the nodes within.
type MultiSorter struct {
nodes []*Node
less []lessFunc
}
// Sort sorts the argument slice according to the less functions passed to
// OrderedBy.
func (ms *MultiSorter) Sort(nodes []*Node) {
ms.nodes = nodes
sort.Sort(ms)
}
// OrderedBy returns a Sorter that sorts using the less functions, in order.
// Call its Sort method to sort the data.
func OrderedBy(less ...lessFunc) *MultiSorter {
return &MultiSorter{
less: less,
}
}
// Len is part of sort.Interface.
func (ms *MultiSorter) Len() int {
return len(ms.nodes)
}
// Swap is part of sort.Interface.
func (ms *MultiSorter) Swap(i, j int) {
ms.nodes[i], ms.nodes[j] = ms.nodes[j], ms.nodes[i]
}
// Less is part of sort.Interface. It is implemented by looping along the
// less functions until it finds a comparison that is either Less or not
// Less. Note that it can call the less functions twice per call. We
// could change the functions to return -1, 0, 1 and reduce the
// number of calls for greater efficiency: an exercise for the reader.
func (ms *MultiSorter) Less(i, j int) bool {
p, q := ms.nodes[i], ms.nodes[j]
// Try all but the last comparison.
var k int
for k = range len(ms.less) - 1 {
less := ms.less[k]
switch {
case less(p, q):
// p < q, so we have a decision.
return true
case less(q, p):
// p > q, so we have a decision.
return false
}
// p == q; try the next comparison.
}
// All comparisons to here said "equal", so just return whatever
// the final comparison reports.
return ms.less[k](p, q)
}
// ID sorts nodes by their identifier in increasing order.
var ID = func(n1, n2 *Node) bool {
return n1.id < n2.id
}
// Port sorts nodes by their port number in increasing order.
// Warning: This function may be removed in the future.
var Port = func(n1, n2 *Node) bool {
p1, _ := strconv.Atoi(n1.Port())
p2, _ := strconv.Atoi(n2.Port())
return p1 < p2
}
// LastNodeError sorts nodes by their LastErr() status in increasing order. A
// node with LastErr() != nil is larger than a node with LastErr() == nil.
var LastNodeError = func(n1, n2 *Node) bool {
if n1.LastErr() != nil && n2.LastErr() == nil {
return false
}
return true
}
// compile-time assertion for interface compliance.
var _ stream.PeerNode = (*Node)(nil)