Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 102 additions & 7 deletions spec/raft/node_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ describe Raft::Node do
describe "peers" do
it "exposes peer list including self" do
node = create_test_node(1_u64, [2_u64, 3_u64])
node.peers.map(&.id).sort.should eq [1_u64, 2_u64, 3_u64]
node.peers.map(&.id).sort!.should eq [1_u64, 2_u64, 3_u64]
node.peers.all?(&.voter?).should eq true
node.close
end
Expand Down Expand Up @@ -747,7 +747,7 @@ describe Raft::Node do
node.close
end

it "add_server rejects duplicate node" do
it "add_server rejects self-address-update and does not append a log entry" do
config = Raft::Config.new
config.election_timeout_min_ticks = 5_u32
config.election_timeout_max_ticks = 5_u32
Expand All @@ -759,7 +759,102 @@ describe Raft::Node do
3_u64 => create_test_node(3_u64, [1_u64, 2_u64], config),
}
elect_leader(nodes)
nodes[1_u64].add_server(2_u64).should eq false
leader_id = nodes.find! { |_, n| n.role == Raft::Role::Leader }.first
leader = nodes[leader_id]
own_peer_before = leader.peers.find! { |p| p.id == leader_id }
last_index_before = leader.log.last_index

# Attempting to update the leader's own address must be rejected
leader.add_server(leader_id, "127.0.0.1:9999").should eq false

# No log entry appended
leader.log.last_index.should eq last_index_before
# Own peer entry unchanged
own_peer_after = leader.peers.find! { |p| p.id == leader_id }
own_peer_after.address.should eq own_peer_before.address
nodes.each_value(&.close)
end

it "add_server rejects duplicate node with same address (no-op, returns true)" do
config = Raft::Config.new
config.election_timeout_min_ticks = 5_u32
config.election_timeout_max_ticks = 5_u32
config.heartbeat_ticks = 100_u32

nodes = {
1_u64 => create_test_node(1_u64, [2_u64, 3_u64], config),
2_u64 => create_test_node(2_u64, [1_u64, 3_u64], config),
3_u64 => create_test_node(3_u64, [1_u64, 2_u64], config),
}
elect_leader(nodes)
# Re-adding an existing peer with same (empty) address is a no-op: returns true, no new log entry
last_index_before = nodes[1_u64].log.last_index
nodes[1_u64].add_server(2_u64).should eq true
nodes[1_u64].log.last_index.should eq last_index_before
nodes.each_value(&.close)
end

it "add_server updates address for returning member with new address" do
config = Raft::Config.new
config.election_timeout_min_ticks = 5_u32
config.election_timeout_max_ticks = 5_u32
config.heartbeat_ticks = 100_u32

nodes = {
1_u64 => create_test_node(1_u64, [2_u64, 3_u64], config),
2_u64 => create_test_node(2_u64, [1_u64, 3_u64], config),
3_u64 => create_test_node(3_u64, [1_u64, 2_u64], config),
}
elect_leader(nodes)
# Peer 2 currently has no address; re-add with a new address
last_index_before = nodes[1_u64].log.last_index
nodes[1_u64].add_server(2_u64, "127.0.0.1:5679").should eq true
# A new configuration entry must have been appended
nodes[1_u64].log.last_index.should be > last_index_before
# Peer 2's address should be updated in the leader's peer list
peer2 = nodes[1_u64].peers.find { |p| p.id == 2_u64 }
peer2.should_not be_nil
peer2.not_nil!.address.should eq "127.0.0.1:5679"
nodes.each_value(&.close)
end

it "add_server updates address for returning learner with new address, preserving Learner role" do
config = Raft::Config.new
config.election_timeout_min_ticks = 5_u32
config.election_timeout_max_ticks = 5_u32
config.heartbeat_ticks = 100_u32

nodes = {
1_u64 => create_test_node(1_u64, [2_u64, 3_u64], config),
2_u64 => create_test_node(2_u64, [1_u64, 3_u64], config),
3_u64 => create_test_node(3_u64, [1_u64, 2_u64], config),
}
elect_leader(nodes)
leader = nodes[1_u64]

# First add node 4 as a learner
leader.add_server(4_u64, "127.0.0.1:5680").should eq true
peer4 = leader.peers.find { |p| p.id == 4_u64 }
peer4.not_nil!.learner?.should eq true
peer4.not_nil!.address.should eq "127.0.0.1:5680"

# Commit the first config change (same pattern as promote_learner test)
leader.take_messages.each do |target_id, msg|
nodes[target_id].step(msg) if nodes.has_key?(target_id)
end
[2_u64, 3_u64].each do |id|
nodes[id].take_messages.each do |target_id, msg|
leader.step(msg) if target_id == 1_u64
end
end

# Now re-add node 4 with a different address; role must stay Learner
last_index_before = leader.log.last_index
leader.add_server(4_u64, "127.0.0.1:9999").should eq true
leader.log.last_index.should be > last_index_before
peer4_updated = leader.peers.find { |p| p.id == 4_u64 }
peer4_updated.not_nil!.learner?.should eq true
peer4_updated.not_nil!.address.should eq "127.0.0.1:9999"
nodes.each_value(&.close)
end

Expand Down Expand Up @@ -1044,7 +1139,7 @@ describe Raft::Node do

# Leader should send AppendEntries to all peers including the learner
messages = leader.take_messages
target_ids = messages.map { |tid, _| tid }.uniq.sort
target_ids = messages.map { |tid, _| tid }.uniq!.sort!
target_ids.should eq [2_u64, 3_u64, 4_u64]

# Propose a value — quorum is still 2 (out of 3 voters)
Expand Down Expand Up @@ -1126,7 +1221,7 @@ describe Raft::Node do

node_recovered = Raft::Node(TestData).new(id: 1_u64, peers: [2_u64, 3_u64], config: c1_recover, state_machine: TestStateMachine.new)
node_recovered.peers.size.should eq 4
node_recovered.peers.map(&.id).sort.should eq [1_u64, 2_u64, 3_u64, 4_u64]
node_recovered.peers.map(&.id).sort!.should eq [1_u64, 2_u64, 3_u64, 4_u64]
node_recovered.peers.all?(&.voter?).should eq true

node_recovered.close
Expand Down Expand Up @@ -1301,7 +1396,7 @@ describe Raft::Node do

# Add node 4 as learner
leader.add_server(4_u64)
leader.peers.find { |p| p.id == 4_u64 }.not_nil!.learner?.should eq true
leader.peers.find! { |p| p.id == 4_u64 }.learner?.should eq true

# Send AppendEntries to all peers including node 4
messages = leader.take_messages
Expand Down Expand Up @@ -1432,7 +1527,7 @@ describe Raft::Node do
# Node 4 should have peers now (applied config from leader)
node4.peers.size.should be > 0
# And should be a learner
node4.peers.find { |p| p.id == 4_u64 }.not_nil!.learner?.should eq true
node4.peers.find! { |p| p.id == 4_u64 }.learner?.should eq true

# Drain any pending responses from replication
node4.take_messages
Expand Down
25 changes: 19 additions & 6 deletions src/raft/node.cr
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,25 @@ module Raft
true
end

# Add a new node to the cluster as a learner.
# Returns false if not leader or node already exists.
# Add a new node to the cluster as a learner, or re-admit a returning member.
# - New id: adds as Learner, returns true.
# - Known id, different non-empty address: updates the stored address and replicates
# the new configuration, returns true.
# - Known id, same address (or empty address passed): no-op, returns true.
# Returns false if not leader, if a configuration change is already in flight, or if node_id == @id.
def add_server(node_id : NodeID, address : String = "") : Bool
return false unless @role == Role::Leader
return false if @pending_config_index > @commit_index
return false if @peers.any? { |p| p.id == node_id }
return false if node_id == @id
if existing = @peers.find { |p| p.id == node_id }
# Returning member: update address only if a non-empty new address differs.
return true if address.empty? || existing.address == address
new_peers = @peers.map do |p|
p.id == node_id ? Peer.new(p.id, p.role, address) : p
end
append_configuration(new_peers)
return true
end
new_peers = @peers.dup
new_peers << Peer.new(node_id, Peer::Role::Learner, address)
append_configuration(new_peers)
Expand Down Expand Up @@ -513,7 +526,7 @@ module Raft

private def cancel_pending_reads
return if @pending_reads.empty? && @pending_apply.empty?
(@pending_reads + @pending_apply).each { |pr| pr.callback.call(nil) }
(@pending_reads + @pending_apply).each(&.callback.call(nil))
@pending_reads.clear
@pending_apply.clear
end
Expand Down Expand Up @@ -1177,7 +1190,7 @@ module Raft
f.write_bytes(0_u8, IO::ByteFormat::LittleEndian)
end
f.write_bytes(@peers.size.to_u32, IO::ByteFormat::LittleEndian)
@peers.each { |p| p.to_io(f) }
@peers.each(&.to_io(f))
f.fsync
end
File.rename(tmp_path, path)
Expand Down Expand Up @@ -1207,7 +1220,7 @@ module Raft
private def serialize_peers(peers : Array(Peer) = @peers) : Bytes
io = IO::Memory.new
io.write_bytes(peers.size.to_u32, IO::ByteFormat::LittleEndian)
peers.each { |p| p.to_io(io) }
peers.each(&.to_io(io))
io.to_slice.dup
end

Expand Down