diff --git a/spec/raft/node_spec.cr b/spec/raft/node_spec.cr index 66566f8..065f4af 100644 --- a/spec/raft/node_spec.cr +++ b/spec/raft/node_spec.cr @@ -518,7 +518,7 @@ describe Raft::Node do describe "peers" do it "exposes peer list including self" do node = create_test_node(1_u64, [2_u64, 3_u64]) - node.peers.map(&.id).sort.should eq [1_u64, 2_u64, 3_u64] + node.peers.map(&.id).sort!.should eq [1_u64, 2_u64, 3_u64] node.peers.all?(&.voter?).should eq true node.close end @@ -747,7 +747,7 @@ describe Raft::Node do node.close end - it "add_server rejects duplicate node" do + it "add_server rejects self-address-update and does not append a log entry" do config = Raft::Config.new config.election_timeout_min_ticks = 5_u32 config.election_timeout_max_ticks = 5_u32 @@ -759,7 +759,102 @@ describe Raft::Node do 3_u64 => create_test_node(3_u64, [1_u64, 2_u64], config), } elect_leader(nodes) - nodes[1_u64].add_server(2_u64).should eq false + leader_id = nodes.find! { |_, n| n.role == Raft::Role::Leader }.first + leader = nodes[leader_id] + own_peer_before = leader.peers.find! { |p| p.id == leader_id } + last_index_before = leader.log.last_index + + # Attempting to update the leader's own address must be rejected + leader.add_server(leader_id, "127.0.0.1:9999").should eq false + + # No log entry appended + leader.log.last_index.should eq last_index_before + # Own peer entry unchanged + own_peer_after = leader.peers.find! { |p| p.id == leader_id } + own_peer_after.address.should eq own_peer_before.address + nodes.each_value(&.close) + end + + it "add_server rejects duplicate node with same address (no-op, returns true)" do + config = Raft::Config.new + config.election_timeout_min_ticks = 5_u32 + config.election_timeout_max_ticks = 5_u32 + config.heartbeat_ticks = 100_u32 + + nodes = { + 1_u64 => create_test_node(1_u64, [2_u64, 3_u64], config), + 2_u64 => create_test_node(2_u64, [1_u64, 3_u64], config), + 3_u64 => create_test_node(3_u64, [1_u64, 2_u64], config), + } + elect_leader(nodes) + # Re-adding an existing peer with same (empty) address is a no-op: returns true, no new log entry + last_index_before = nodes[1_u64].log.last_index + nodes[1_u64].add_server(2_u64).should eq true + nodes[1_u64].log.last_index.should eq last_index_before + nodes.each_value(&.close) + end + + it "add_server updates address for returning member with new address" do + config = Raft::Config.new + config.election_timeout_min_ticks = 5_u32 + config.election_timeout_max_ticks = 5_u32 + config.heartbeat_ticks = 100_u32 + + nodes = { + 1_u64 => create_test_node(1_u64, [2_u64, 3_u64], config), + 2_u64 => create_test_node(2_u64, [1_u64, 3_u64], config), + 3_u64 => create_test_node(3_u64, [1_u64, 2_u64], config), + } + elect_leader(nodes) + # Peer 2 currently has no address; re-add with a new address + last_index_before = nodes[1_u64].log.last_index + nodes[1_u64].add_server(2_u64, "127.0.0.1:5679").should eq true + # A new configuration entry must have been appended + nodes[1_u64].log.last_index.should be > last_index_before + # Peer 2's address should be updated in the leader's peer list + peer2 = nodes[1_u64].peers.find { |p| p.id == 2_u64 } + peer2.should_not be_nil + peer2.not_nil!.address.should eq "127.0.0.1:5679" + nodes.each_value(&.close) + end + + it "add_server updates address for returning learner with new address, preserving Learner role" do + config = Raft::Config.new + config.election_timeout_min_ticks = 5_u32 + config.election_timeout_max_ticks = 5_u32 + config.heartbeat_ticks = 100_u32 + + nodes = { + 1_u64 => create_test_node(1_u64, [2_u64, 3_u64], config), + 2_u64 => create_test_node(2_u64, [1_u64, 3_u64], config), + 3_u64 => create_test_node(3_u64, [1_u64, 2_u64], config), + } + elect_leader(nodes) + leader = nodes[1_u64] + + # First add node 4 as a learner + leader.add_server(4_u64, "127.0.0.1:5680").should eq true + peer4 = leader.peers.find { |p| p.id == 4_u64 } + peer4.not_nil!.learner?.should eq true + peer4.not_nil!.address.should eq "127.0.0.1:5680" + + # Commit the first config change (same pattern as promote_learner test) + leader.take_messages.each do |target_id, msg| + nodes[target_id].step(msg) if nodes.has_key?(target_id) + end + [2_u64, 3_u64].each do |id| + nodes[id].take_messages.each do |target_id, msg| + leader.step(msg) if target_id == 1_u64 + end + end + + # Now re-add node 4 with a different address; role must stay Learner + last_index_before = leader.log.last_index + leader.add_server(4_u64, "127.0.0.1:9999").should eq true + leader.log.last_index.should be > last_index_before + peer4_updated = leader.peers.find { |p| p.id == 4_u64 } + peer4_updated.not_nil!.learner?.should eq true + peer4_updated.not_nil!.address.should eq "127.0.0.1:9999" nodes.each_value(&.close) end @@ -1044,7 +1139,7 @@ describe Raft::Node do # Leader should send AppendEntries to all peers including the learner messages = leader.take_messages - target_ids = messages.map { |tid, _| tid }.uniq.sort + target_ids = messages.map { |tid, _| tid }.uniq!.sort! target_ids.should eq [2_u64, 3_u64, 4_u64] # Propose a value — quorum is still 2 (out of 3 voters) @@ -1126,7 +1221,7 @@ describe Raft::Node do node_recovered = Raft::Node(TestData).new(id: 1_u64, peers: [2_u64, 3_u64], config: c1_recover, state_machine: TestStateMachine.new) node_recovered.peers.size.should eq 4 - node_recovered.peers.map(&.id).sort.should eq [1_u64, 2_u64, 3_u64, 4_u64] + node_recovered.peers.map(&.id).sort!.should eq [1_u64, 2_u64, 3_u64, 4_u64] node_recovered.peers.all?(&.voter?).should eq true node_recovered.close @@ -1301,7 +1396,7 @@ describe Raft::Node do # Add node 4 as learner leader.add_server(4_u64) - leader.peers.find { |p| p.id == 4_u64 }.not_nil!.learner?.should eq true + leader.peers.find! { |p| p.id == 4_u64 }.learner?.should eq true # Send AppendEntries to all peers including node 4 messages = leader.take_messages @@ -1432,7 +1527,7 @@ describe Raft::Node do # Node 4 should have peers now (applied config from leader) node4.peers.size.should be > 0 # And should be a learner - node4.peers.find { |p| p.id == 4_u64 }.not_nil!.learner?.should eq true + node4.peers.find! { |p| p.id == 4_u64 }.learner?.should eq true # Drain any pending responses from replication node4.take_messages diff --git a/src/raft/node.cr b/src/raft/node.cr index eb9775e..db87737 100644 --- a/src/raft/node.cr +++ b/src/raft/node.cr @@ -312,12 +312,25 @@ module Raft true end - # Add a new node to the cluster as a learner. - # Returns false if not leader or node already exists. + # Add a new node to the cluster as a learner, or re-admit a returning member. + # - New id: adds as Learner, returns true. + # - Known id, different non-empty address: updates the stored address and replicates + # the new configuration, returns true. + # - Known id, same address (or empty address passed): no-op, returns true. + # Returns false if not leader, if a configuration change is already in flight, or if node_id == @id. def add_server(node_id : NodeID, address : String = "") : Bool return false unless @role == Role::Leader return false if @pending_config_index > @commit_index - return false if @peers.any? { |p| p.id == node_id } + return false if node_id == @id + if existing = @peers.find { |p| p.id == node_id } + # Returning member: update address only if a non-empty new address differs. + return true if address.empty? || existing.address == address + new_peers = @peers.map do |p| + p.id == node_id ? Peer.new(p.id, p.role, address) : p + end + append_configuration(new_peers) + return true + end new_peers = @peers.dup new_peers << Peer.new(node_id, Peer::Role::Learner, address) append_configuration(new_peers) @@ -513,7 +526,7 @@ module Raft private def cancel_pending_reads return if @pending_reads.empty? && @pending_apply.empty? - (@pending_reads + @pending_apply).each { |pr| pr.callback.call(nil) } + (@pending_reads + @pending_apply).each(&.callback.call(nil)) @pending_reads.clear @pending_apply.clear end @@ -1177,7 +1190,7 @@ module Raft f.write_bytes(0_u8, IO::ByteFormat::LittleEndian) end f.write_bytes(@peers.size.to_u32, IO::ByteFormat::LittleEndian) - @peers.each { |p| p.to_io(f) } + @peers.each(&.to_io(f)) f.fsync end File.rename(tmp_path, path) @@ -1207,7 +1220,7 @@ module Raft private def serialize_peers(peers : Array(Peer) = @peers) : Bytes io = IO::Memory.new io.write_bytes(peers.size.to_u32, IO::ByteFormat::LittleEndian) - peers.each { |p| p.to_io(io) } + peers.each(&.to_io(io)) io.to_slice.dup end