Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This document describes how the Raft library is structured, how the pieces fit t
It serves three audiences at once:

- **New contributors** — what each file does, where to make changes, and what invariants to preserve.
- **Library users / integrators** (e.g. embedding this in LavinMQ) — what the library guarantees, what it does not, and which extension points to use.
- **Library users / integrators** (e.g. embedding this in a broker or other consensus-backed service) — what the library guarantees, what it does not, and which extension points to use.
- **Future you** — the rationale behind the trade-offs, so you don't have to re-derive them.

## 1. Overview & goals
Expand Down Expand Up @@ -198,8 +198,10 @@ A small struct of tunables read at construction time:
| `election_timeout_max_ticks` | 20 | Election timeout upper bound (1000ms) |
| `max_segment_size` | 64 MB | Log segment rollover threshold |
| `max_append_entries_size` | 1 MB | Cap on `entries_data` per AppendEntries message |
| `max_message_payload_bytes` | 64 MB | Per-message payload cap on the wire (transport `from_io` rejects larger frames) |
| `snapshot_chunk_size` | 1 MB | Chunk size for InstallSnapshot RPC payloads |
| `snapshot_interval_entries` | 1000 | Trigger snapshot after this many committed entries |
| `read_index_timeout_ticks` | 100 | ReadIndex callback fires with `nil` if leadership confirmation doesn't arrive within this many ticks (5s at default tick) |
| `data_dir` | `"data"` | Where logs and metadata live |

`Config` is not modified after a `Node` is constructed.
Expand Down Expand Up @@ -248,7 +250,7 @@ These classes do touch the filesystem and the network. They are kept dependency-

### Segment storage: append-mode files + stdlib buffered writes

Log segments are plain regular files opened with POSIX append mode (`File.new(path, "a+")`). Every write atomically lands at EOF — the kernel maintains the write offset and updates it atomically per `write(2)` syscall, so no explicit positioning is needed in user space. Crystal's `IO::Buffered` accumulates small writes (the per-`LogEntry` header + payload pieces) in a per-File user-space buffer; one `flush` after `entry.to_io(@file)` issues a single `write(2)` syscall per appended entry.
Log segments are plain regular files opened with POSIX append mode (`File.new(path, "a+")`). Every write atomically lands at EOF — the kernel maintains the write offset and updates it atomically per `write(2)` syscall, so no explicit positioning is needed in user space. Crystal's `IO::Buffered` accumulates small writes (the per-`LogEntry` header + payload pieces) in a per-File user-space buffer; `Segment#append` deliberately does **not** flush. The flush is deferred to `Segment#sync` (called at Raft's durability boundaries), so a batch of N appends between two boundaries collapses into a single `write(2)` syscall followed by one `fsync` — instead of `O(N)` syscalls for `O(N)` appends.

**Reads** go through stdlib `File#read_at` (which uses `pread(2)` internally) — concurrent-read-safe, no shared file-position state with the write path. Multiple followers can have the same segment file being read at different offsets simultaneously without contention.

Expand All @@ -258,7 +260,7 @@ Log segments are plain regular files opened with POSIX append mode (`File.new(pa

### `Raft::Log::Segment(T)`

One log segment file. Owns a `File` (in `"a+"` mode), an in-memory offset array `@offsets : Array(UInt64)` mapping `index → byte offset within the file`, and a `@size : Int64` tracking total written bytes. A segment can be read by index (O(1) offset lookup, then `File#read_at` to deserialize), appended to (via `entry.to_io(@file) + flush`, which leverages `IO::Buffered` coalescing), or truncated (via `File#truncate` to shrink the file in place).
One log segment file. Owns a `File` (in `"a+"` mode), an in-memory offset array `@offsets : Array(UInt64)` mapping `index → byte offset within the file`, and a `@size : Int64` tracking total written bytes. A segment can be read by index (O(1) offset lookup, then `File#read_at` to deserialize), appended to (via `entry.to_io(@file)` — the flush is deferred to the next `sync` call so a batch of appends coalesces into one `write(2)`), or truncated (via `File#truncate` to shrink the file in place).

The segment filename is the zero-padded first index it contains: `0000000000000001.log`, `0000000000010001.log`, etc. This makes lexicographic directory listing equal to chronological order.

Expand Down
4 changes: 4 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
name: raft
version: 0.1.0
description: A fast, embeddable Raft consensus library for Crystal — multi-raft, segmented log, pre-vote, learners, snapshots, linearizable reads.

authors:
- Anton Dalgren <anton@84codes.com>

repository: https://github.com/84codes/raft.cr
documentation: https://github.com/84codes/raft.cr/blob/main/ARCHITECTURE.md

targets:
raft-tui:
main: src/raft/tui/main.cr
Expand Down
38 changes: 21 additions & 17 deletions spec/raft/snapshot_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ describe "Raft::Node snapshot persistence" do
Dir.mkdir_p(dir)
cfg = Raft::Config.new
cfg.data_dir = dir
cfg.snapshot_interval_entries = 2_u64 # trigger via natural path

sm1 = TestStateMachine.new
sm1.apply(TestData.new("a"))
sm1.apply(TestData.new("b"))

node1 = Raft::Node(TestData).new(id: 1_u64, peers: [] of UInt64, config: cfg, state_machine: sm1)
# Manually persist a snapshot at index=2, term=1
node1.persist_snapshot_for_test(2_u64, 1_u64)
node1.bootstrap
node1.propose(TestData.new("a"))
node1.propose(TestData.new("b"))
# Single-node cluster commits + applies immediately; two Normal entries
# past the bootstrap config crosses the snapshot threshold.
sm1.applied.map(&.value).should eq ["a", "b"]
node1.snapshot_index.should be > 0_u64
snapshotted_index = node1.snapshot_index
snapshotted_term = node1.snapshot_term
node1.close

sm2 = TestStateMachine.new
node2 = Raft::Node(TestData).new(id: 1_u64, peers: [] of UInt64, config: cfg, state_machine: sm2)
node2.snapshot_index.should eq 2_u64
node2.snapshot_term.should eq 1_u64
sm2.applied.size.should eq 2
sm2.applied[0].value.should eq "a"
sm2.applied[1].value.should eq "b"
node2.snapshot_index.should eq snapshotted_index
node2.snapshot_term.should eq snapshotted_term
sm2.applied.map(&.value).should eq ["a", "b"]

node2.close
FileUtils.rm_rf(dir)
Expand Down Expand Up @@ -56,25 +59,26 @@ describe "Raft::Node snapshot + log tail recovery" do
Dir.mkdir_p(dir)
cfg = Raft::Config.new
cfg.data_dir = dir
# Trigger snapshot after 2 applies — so "a" and "b" produce a snapshot,
# "c" then lands in the log tail past snapshot_index.
cfg.snapshot_interval_entries = 2_u64

sm1 = TestStateMachine.new
node1 = Raft::Node(TestData).new(id: 1_u64, peers: [] of UInt64, config: cfg, state_machine: sm1)
node1.bootstrap
node1.propose(TestData.new("a"))
node1.propose(TestData.new("b"))
# Single-node cluster commits immediately; sm1 now has ["a", "b"].
sm1.applied.map(&.value).should eq ["a", "b"]
snapshot_at = node1.snapshot_index
snapshot_at.should be > 0_u64

# Snapshot at current last_applied (covers bootstrap config + "a" + "b").
# sm1.applied == ["a", "b"] at this moment — the snapshot encodes exactly that.
node1.persist_snapshot_for_test(node1.last_applied, node1.current_term)

# Now append "c" — it lands in the log tail AFTER the snapshot index.
# "c" lands AFTER the snapshot — it's only in the log tail, not the snapshot.
node1.propose(TestData.new("c"))
sm1.applied.map(&.value).should eq ["a", "b", "c"]
node1.log.last_index.should be > snapshot_at
node1.close

# Restart: recover from snapshot (restores "a"+"b"), then replay "c" from log tail.
# Restart: snapshot restores ["a","b"], then log replay applies "c".
sm2 = TestStateMachine.new
node2 = Raft::Node(TestData).new(id: 1_u64, peers: [] of UInt64, config: cfg, state_machine: sm2)
sm2.applied.map(&.value).should eq ["a", "b", "c"]
Expand Down
44 changes: 35 additions & 9 deletions src/raft/node.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,24 @@
module Raft
# Implements the Raft consensus protocol for a single peer in a single
# group. Generic over `T`, the application's command type.
#
# ## Threading contract
#
# **A Node is single-fiber.** All five public entry points — `tick`,
# `step`, `propose`, `take_messages`, and `read_index` — and all
# registration methods (`on_role_change`, `on_configuration_change`,
# `on_configuration_applied`) must be called from the same fiber that
# owns the node's loop. Internal state (`@current_term`, `@log`,
# `@peers`, `@outbox`, `@pending_reads`, `@pending_apply`) is mutated
# without locks because of this invariant.
#
# Callbacks fire on that same fiber and must therefore be non-blocking
# — channel a signal to your own fiber if you need to do real work.
#
# If you need to call `propose` from a different fiber (e.g. an HTTP
# handler), send the request over a channel and have the loop fiber
# do the actual call. The KV and queue examples demonstrate this
# pattern in `start_group_loop`.
class Node(T)
getter role : Role = Role::Follower
getter current_term : UInt64 = 0_u64
Expand Down Expand Up @@ -319,13 +339,25 @@ module Raft
messages
end

# Fires when a Configuration entry is **committed** (apply phase). Block
# receives the new peer list. Use this for actions that should only run
# after a membership change is durable across a majority — e.g. cascading
# data-group reconfiguration in a multi-raft setup.
#
# Runs on the Raft fiber; keep it non-blocking. See class docs for the
# full threading contract.
def on_configuration_change(&block : Array(Peer) ->)
@on_configuration_change = block
end

# Called whenever the peer list changes — both when config entries are stored
# in the log (followers) and when they're committed (leader). Use this to
# register transport peer addresses from config entries.
# Fires whenever the peer list changes — both when config entries are
# **stored** in the log (followers, before commit) and when they're
# committed (leader). Use this to register transport peer addresses
# from config entries; followers can route to new peers as soon as
# they see the entry, without waiting for the cluster commit cycle.
#
# Runs on the Raft fiber; keep it non-blocking. See class docs for the
# full threading contract.
def on_configuration_applied(&block : Array(Peer) ->)
@on_configuration_applied = block
end
Expand Down Expand Up @@ -1051,12 +1083,6 @@ module Raft
@on_configuration_applied.try(&.call(@peers))
end

# Test helper — drives persist_snapshot from outside while
# take_snapshot doesn't exist yet (added in Task 3).
def persist_snapshot_for_test(index : UInt64, term : UInt64)
persist_snapshot(index, term)
end

private def take_snapshot
return if @last_applied <= @snapshot_index
term = @log.term_at(@last_applied)
Expand Down