Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,326 changes: 1,326 additions & 0 deletions docs/testing-plans/raft-cr-project-stability.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/kv/jepsen/src/jepsen/raft_kv.clj
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@
:linear (independent/checker
(checker/linearizable
{:model (model/register nil)
:algorithm :linear}))})
:algorithm :competition}))})
:generator (->> (independent/concurrent-generator
5 ; threads per key
(range) ; infinite key sequence
Expand Down
250 changes: 250 additions & 0 deletions examples/queue/spec/queue_under_partition_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
# AUTO-GENERATED from test plan: docs/testing-plans/raft-cr-project-stability.md
# Scenario: S13 queue_per_key_fifo_under_partition_and_handoff
# Falsifies: C33 (per-queue FIFO), C34 (queue replication via Raft commit+apply),
# C35 (Consume bridge exactly-once on leader)
# Plan sha at generation: 3997446
#
# REVIEW BEFORE TREATING AS A PERMANENT REGRESSION:
# this file was generated by executing-distributed-system-tests in author mode.
#
# In-process variant of the plan's Jepsen-based S13. Drives the queue example
# under a synthetic partition (deliver_all_except skips messages to/from the
# isolated node). Tests:
# - per-producer FIFO across the published tag stream
# - cross-replica state equivalence after heal (snapshot bytes byte-equal)
# - exactly-once consume bridge delivery
# - total tag count preserved across the partition window

require "spec"
require "file_utils"
require "../../../src/raft"
require "../src/queue_command"
require "../src/queue_state_machine"

# Deliver all pending messages between cluster nodes, except for messages
# to/from nodes in `partitioned`. Mimics a one-cluster-cut partition: the
# partitioned node sees nothing from the rest and vice-versa.
private def deliver_all_except(
nodes : Hash(Raft::NodeID, Raft::Node(QueueCommand)),
partitioned : Set(Raft::NodeID),
)
loop do
any_delivered = false
pending = [] of {Raft::NodeID, Raft::NodeID, Raft::Message}

nodes.each do |from_id, node|
node.take_messages.each do |target_id, msg|
pending << {from_id, target_id, msg}
end
end

pending.each do |from_id, target_id, msg|
# Drop if either side is partitioned away
next if partitioned.includes?(from_id) || partitioned.includes?(target_id)
if target_node = nodes[target_id]?
target_node.step(msg)
any_delivered = true
end
end

break unless any_delivered
end
end

# Wrapper that delivers everything (no partition).
private def deliver_all(nodes : Hash(Raft::NodeID, Raft::Node(QueueCommand)))
deliver_all_except(nodes, Set(Raft::NodeID).new)
end

private def make_queue_cluster(node_ids : Array(UInt64) = [1_u64, 2_u64, 3_u64], group_id : UInt64 = 1_u64)
state_machines = {} of Raft::NodeID => QueueStateMachine
nodes = {} of Raft::NodeID => Raft::Node(QueueCommand)
dirs = [] of String

node_ids.each do |id|
sm = QueueStateMachine.new
state_machines[id] = sm
dir = File.tempname("queue_partition_#{id}_#{group_id}")
Dir.mkdir_p(dir)
dirs << dir
cfg = Raft::Config.new
cfg.data_dir = dir
cfg.election_timeout_min_ticks = 5_u32
cfg.election_timeout_max_ticks = 5_u32
cfg.heartbeat_ticks = 2_u32
peers = node_ids.reject(id)
nodes[id] = Raft::Node(QueueCommand).new(
id: id, peers: peers, config: cfg, state_machine: sm, group_id: group_id,
)
end

{nodes, state_machines, dirs}
end

private def cleanup(nodes : Hash(Raft::NodeID, Raft::Node(QueueCommand)), dirs : Array(String))
nodes.each_value(&.close)
dirs.each { |d| FileUtils.rm_rf(d) rescue nil }
end

# Compute snapshot bytes from a state machine — used for cross-replica equality.
private def snapshot_bytes(sm : QueueStateMachine) : Bytes
io = IO::Memory.new
sm.snapshot(io)
io.to_slice.dup
end

# Drain a state machine's deque via the Consume bridge. Returns the bodies
# in the order the state machine pops them.
private def drain_via_bridge(
nodes : Hash(Raft::NodeID, Raft::Node(QueueCommand)),
sms : Hash(Raft::NodeID, QueueStateMachine),
leader_id : Raft::NodeID,
expected_count : Int32,
) : Array(String)
drained = [] of String
expected_count.times do |i|
req_id = "drain-#{i}"
ch = Channel(Bytes?).new(1)
sms[leader_id].register_request(req_id, ch)
nodes[leader_id].propose(QueueCommand.new(QueueAction::Consume, "q", req_id: req_id)).should be_true
deliver_all(nodes)
2.times { nodes[leader_id].tick }
deliver_all(nodes)
body = ch.receive
body.should_not be_nil
drained << String.new(body.not_nil!)
end
drained
end

describe "S13: queue_per_key_fifo_under_partition_and_handoff (plan §7 S13)" do
it "preserves per-producer FIFO and cross-replica state under partition + heal" do
nodes, sms, dirs = make_queue_cluster
begin
# 1. Elect node 1.
5.times { nodes[1_u64].tick }
deliver_all(nodes)
nodes[1_u64].role.should eq Raft::Role::Leader

# 2. Phase 1 — pre-partition: 3 producers × 5 msgs each, interleaved.
published = [] of String # tag in publish order
n_per_producer_phase1 = 5
n_per_producer_phase1.times do |seq|
(1..3).each do |producer|
tag = "p#{producer}-#{seq.to_s.rjust(2, '0')}"
nodes[1_u64].propose(QueueCommand.new(QueueAction::Publish, "q", body: tag.to_slice)).should be_true
published << tag
end
end
deliver_all(nodes)
2.times { nodes[1_u64].tick }
deliver_all(nodes)

# Sanity: all three replicas have phase-1 messages applied.
expected_phase1 = 3 * n_per_producer_phase1
sms[1_u64].depth.should eq expected_phase1
sms[2_u64].depth.should eq expected_phase1
sms[3_u64].depth.should eq expected_phase1

# 3. Partition node 3 (follower) from {n1, n2}. Quorum on {n1, n2} side.
partitioned = Set{3_u64}

# 4. Phase 2 — during partition: 3 producers × 5 msgs each.
n_per_producer_phase2 = 5
n_per_producer_phase2.times do |seq|
(1..3).each do |producer|
tag = "p#{producer}-#{(seq + n_per_producer_phase1).to_s.rjust(2, '0')}"
nodes[1_u64].propose(QueueCommand.new(QueueAction::Publish, "q", body: tag.to_slice)).should be_true
published << tag
end
end
# Deliver only between {n1, n2} — n3 sees nothing.
deliver_all_except(nodes, partitioned)
2.times { nodes[1_u64].tick }
deliver_all_except(nodes, partitioned)

# Quorum-side replicas should have all 30 messages; n3 stuck at 15.
total_published = 3 * (n_per_producer_phase1 + n_per_producer_phase2)
sms[1_u64].depth.should eq total_published
sms[2_u64].depth.should eq total_published
sms[3_u64].depth.should eq expected_phase1 # n3 didn't see phase 2

# 5. Heal partition.
partitioned.clear

# 6. Tick + deliver until n3 catches up. The leader's next heartbeat will
# bring n3 along via the standard AppendEntries / reject_hint flow.
catchup_iters = 0
while sms[3_u64].depth != total_published && catchup_iters < 50
nodes[1_u64].tick
deliver_all(nodes)
catchup_iters += 1
end
sms[3_u64].depth.should eq total_published

# 7. Oracle: cross-replica state equivalence (C34). Every replica's
# snapshot bytes are byte-equal.
bytes1 = snapshot_bytes(sms[1_u64])
bytes2 = snapshot_bytes(sms[2_u64])
bytes3 = snapshot_bytes(sms[3_u64])
bytes1.should eq bytes2
bytes2.should eq bytes3

# 8. Oracle: drain via consume bridge on the leader; sequence must match
# the published order (because there's only one publisher path — the
# leader — so global order is well-defined).
drained = drain_via_bridge(nodes, sms, 1_u64, total_published)
drained.size.should eq total_published

# Total set equality: every published tag was consumed exactly once (C35).
drained.to_set.should eq published.to_set
# Per-producer FIFO: extract per-producer subsequences; each must match
# the publish order for that producer (C33).
(1..3).each do |producer|
published_for_p = published.select { |t| t.starts_with?("p#{producer}-") }
drained_for_p = drained.select { |t| t.starts_with?("p#{producer}-") }
drained_for_p.should eq published_for_p
end
# Cross-producer ordering: since there's a single proposer/leader, the
# drained order should equal the published order exactly.
drained.should eq published
ensure
cleanup(nodes, dirs)
end
end

it "rejects proposes on a minority-partitioned node (C34 — no commit without quorum)" do
nodes, sms, dirs = make_queue_cluster
begin
# Elect node 1.
5.times { nodes[1_u64].tick }
deliver_all(nodes)
nodes[1_u64].role.should eq Raft::Role::Leader

# Partition n2 and n3 from n1 (leader is alone — no quorum).
partitioned = Set{2_u64, 3_u64}

# Leader can still accept proposes — but commit can't advance.
pre_commit = nodes[1_u64].commit_index
nodes[1_u64].propose(QueueCommand.new(QueueAction::Publish, "q", body: "lonely".to_slice)).should be_true
deliver_all_except(nodes, partitioned)

# Without quorum response, commit_index doesn't advance past the
# entry's index. The propose appended an entry but it isn't committed.
sms[1_u64].depth.should eq 0
nodes[1_u64].log.last_index.should be > pre_commit

# Heal and verify commit catches up.
partitioned.clear
10.times do
nodes[1_u64].tick
deliver_all(nodes)
end
sms[1_u64].depth.should eq 1
sms[2_u64].depth.should eq 1
sms[3_u64].depth.should eq 1
ensure
cleanup(nodes, dirs)
end
end
end
125 changes: 125 additions & 0 deletions spec/fuzz/message_from_io_fuzz_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# AUTO-GENERATED from test plan: docs/testing-plans/raft-cr-project-stability.md
# Scenario: S12 serialization_and_message_parser_fuzz
# Falsifies: C31 (max payload check); surfaces M6 (per-entry T#from_io exception isolation)
# Plan sha at generation: c58aefdaa18581a3382c6594af7e995e6bc2af4b
#
# REVIEW BEFORE TREATING AS A PERMANENT REGRESSION:
# this file was generated by executing-distributed-system-tests in author mode.
# Verify the workload, faults, and oracle below match the plan's intent.

require "../spec_helper"

# Bounded random-input fuzz of the wire-format parsers. The invariant under test:
# parsers never panic on arbitrary input — they either succeed with a valid value
# or raise a typed I/O / overflow / argument error.

ALLOWED_ERROR_CLASSES = [
IO::EOFError,
IO::Error,
ArgumentError,
OverflowError,
IndexError,
] of Class

def assert_only_typed_errors(input_label : String, &block)
begin
block.call
rescue ex : IO::EOFError | IO::Error | ArgumentError | OverflowError | IndexError
# Typed parser error — acceptable.
rescue ex
raise "FUZZ FAIL (#{input_label}): unexpected exception #{ex.class}: #{ex.message}"
end
end

describe "S12: serialization_and_message_parser_fuzz (plan §7 S12)" do
it "Message.from_io never panics on random bytes within max_payload" do
max_payload = 1_u32 * 1024_u32 * 1024_u32 # 1 MB
rng = Random.new(42_u64) # deterministic seed for reproducibility
iters = 2_000

iters.times do |i|
# Pick a random length in [0, 256] — most useful coverage is at short
# boundary lengths; a 1 MB random buffer would mostly produce header-size
# rejection without exploring parser branches.
len = rng.rand(0..256)
buf = Bytes.new(len)
rng.random_bytes(buf)
assert_only_typed_errors("Message.from_io iter=#{i} len=#{len}") do
Raft::Message.from_io(IO::Memory.new(buf), max_payload)
end
end
end

it "Message.from_io rejects oversized entries_data payload (C31 — exact bound)" do
# Constructed payload that lies about its entries_data size: claims max+1
# but the buffer is short. from_io should raise before allocating.
max_payload = 1024_u32
io = IO::Memory.new
io.write_bytes(1_u8, IO::ByteFormat::LittleEndian) # protocol_version
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # group_id
io.write_bytes(0_u8, IO::ByteFormat::LittleEndian) # type AppendEntries
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # from
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # term
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # prev_log_index
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # prev_log_term
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # commit_index
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # last_log_index
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # last_log_term
io.write_bytes(0_u8, IO::ByteFormat::LittleEndian) # success
io.write_bytes(0_u64, IO::ByteFormat::LittleEndian) # reject_hint
io.write_bytes(0_u32, IO::ByteFormat::LittleEndian) # entries_count
io.write_bytes(max_payload + 1_u32, IO::ByteFormat::LittleEndian) # data_size > max_payload
io.rewind

ex = expect_raises(IO::Error) do
Raft::Message.from_io(io, max_payload)
end
ex.message.not_nil!.should contain "max_message_payload_bytes"
end

it "LogEntry.from_io never panics on random bytes" do
rng = Random.new(43_u64)
iters = 2_000
iters.times do |i|
len = rng.rand(0..128)
buf = Bytes.new(len)
rng.random_bytes(buf)
assert_only_typed_errors("LogEntry.from_io iter=#{i} len=#{len}") do
Raft::LogEntry(TestData).from_io(IO::Memory.new(buf))
end
end
end

it "Peer.from_io never panics on random bytes" do
rng = Random.new(44_u64)
iters = 1_000
iters.times do |i|
len = rng.rand(0..64)
buf = Bytes.new(len)
rng.random_bytes(buf)
assert_only_typed_errors("Peer.from_io iter=#{i} len=#{len}") do
Raft::Peer.from_io(IO::Memory.new(buf))
end
end
end

it "Message round-trips for a corpus of well-formed inputs" do
msgs = [
Raft::Message.new(type: Raft::MessageType::AppendEntries, from: 1_u64, term: 0_u64),
Raft::Message.new(type: Raft::MessageType::PreVote, from: 2_u64, term: 1_u64, last_log_index: 5_u64, last_log_term: 1_u64),
Raft::Message.new(type: Raft::MessageType::RequestVoteResponse, from: 3_u64, term: 1_u64, success: true),
Raft::Message.new(type: Raft::MessageType::AppendEntries, from: 1_u64, term: 2_u64,
entries_data: Bytes.new(8), entries_count: 1_u32),
]
msgs.each_with_index do |m, i|
buf = IO::Memory.new
m.to_io(buf)
buf.rewind
decoded = Raft::Message.from_io(buf, 1_u32 << 20)
decoded.type.should eq m.type
decoded.from.should eq m.from
decoded.term.should eq m.term
decoded.entries_count.should eq m.entries_count
end
end
end
Loading