diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d59e3f3..bb804c5 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -5,7 +5,7 @@ This document describes how the Raft library is structured, how the pieces fit t It serves three audiences at once: - **New contributors** — what each file does, where to make changes, and what invariants to preserve. -- **Library users / integrators** (e.g. embedding this in LavinMQ) — what the library guarantees, what it does not, and which extension points to use. +- **Library users / integrators** (e.g. embedding this in a broker or other consensus-backed service) — what the library guarantees, what it does not, and which extension points to use. - **Future you** — the rationale behind the trade-offs, so you don't have to re-derive them. ## 1. Overview & goals @@ -198,8 +198,10 @@ A small struct of tunables read at construction time: | `election_timeout_max_ticks` | 20 | Election timeout upper bound (1000ms) | | `max_segment_size` | 64 MB | Log segment rollover threshold | | `max_append_entries_size` | 1 MB | Cap on `entries_data` per AppendEntries message | +| `max_message_payload_bytes` | 64 MB | Per-message payload cap on the wire (transport `from_io` rejects larger frames) | | `snapshot_chunk_size` | 1 MB | Chunk size for InstallSnapshot RPC payloads | | `snapshot_interval_entries` | 1000 | Trigger snapshot after this many committed entries | +| `read_index_timeout_ticks` | 100 | ReadIndex callback fires with `nil` if leadership confirmation doesn't arrive within this many ticks (5s at default tick) | | `data_dir` | `"data"` | Where logs and metadata live | `Config` is not modified after a `Node` is constructed. @@ -248,7 +250,7 @@ These classes do touch the filesystem and the network. They are kept dependency- ### Segment storage: append-mode files + stdlib buffered writes -Log segments are plain regular files opened with POSIX append mode (`File.new(path, "a+")`). Every write atomically lands at EOF — the kernel maintains the write offset and updates it atomically per `write(2)` syscall, so no explicit positioning is needed in user space. Crystal's `IO::Buffered` accumulates small writes (the per-`LogEntry` header + payload pieces) in a per-File user-space buffer; one `flush` after `entry.to_io(@file)` issues a single `write(2)` syscall per appended entry. +Log segments are plain regular files opened with POSIX append mode (`File.new(path, "a+")`). Every write atomically lands at EOF — the kernel maintains the write offset and updates it atomically per `write(2)` syscall, so no explicit positioning is needed in user space. Crystal's `IO::Buffered` accumulates small writes (the per-`LogEntry` header + payload pieces) in a per-File user-space buffer; `Segment#append` deliberately does **not** flush. The flush is deferred to `Segment#sync` (called at Raft's durability boundaries), so a batch of N appends between two boundaries collapses into a single `write(2)` syscall followed by one `fsync` — instead of `O(N)` syscalls for `O(N)` appends. **Reads** go through stdlib `File#read_at` (which uses `pread(2)` internally) — concurrent-read-safe, no shared file-position state with the write path. Multiple followers can have the same segment file being read at different offsets simultaneously without contention. @@ -258,7 +260,7 @@ Log segments are plain regular files opened with POSIX append mode (`File.new(pa ### `Raft::Log::Segment(T)` -One log segment file. Owns a `File` (in `"a+"` mode), an in-memory offset array `@offsets : Array(UInt64)` mapping `index → byte offset within the file`, and a `@size : Int64` tracking total written bytes. A segment can be read by index (O(1) offset lookup, then `File#read_at` to deserialize), appended to (via `entry.to_io(@file) + flush`, which leverages `IO::Buffered` coalescing), or truncated (via `File#truncate` to shrink the file in place). +One log segment file. Owns a `File` (in `"a+"` mode), an in-memory offset array `@offsets : Array(UInt64)` mapping `index → byte offset within the file`, and a `@size : Int64` tracking total written bytes. A segment can be read by index (O(1) offset lookup, then `File#read_at` to deserialize), appended to (via `entry.to_io(@file)` — the flush is deferred to the next `sync` call so a batch of appends coalesces into one `write(2)`), or truncated (via `File#truncate` to shrink the file in place). The segment filename is the zero-padded first index it contains: `0000000000000001.log`, `0000000000010001.log`, etc. This makes lexicographic directory listing equal to chronological order. diff --git a/shard.yml b/shard.yml index 8fe6ba2..bbef914 100644 --- a/shard.yml +++ b/shard.yml @@ -1,9 +1,13 @@ name: raft version: 0.1.0 +description: A fast, embeddable Raft consensus library for Crystal — multi-raft, segmented log, pre-vote, learners, snapshots, linearizable reads. authors: - Anton Dalgren +repository: https://github.com/84codes/raft.cr +documentation: https://github.com/84codes/raft.cr/blob/main/ARCHITECTURE.md + targets: raft-tui: main: src/raft/tui/main.cr diff --git a/spec/raft/snapshot_spec.cr b/spec/raft/snapshot_spec.cr index c7ee6c1..ff30f3f 100644 --- a/spec/raft/snapshot_spec.cr +++ b/spec/raft/snapshot_spec.cr @@ -6,23 +6,26 @@ describe "Raft::Node snapshot persistence" do Dir.mkdir_p(dir) cfg = Raft::Config.new cfg.data_dir = dir + cfg.snapshot_interval_entries = 2_u64 # trigger via natural path sm1 = TestStateMachine.new - sm1.apply(TestData.new("a")) - sm1.apply(TestData.new("b")) - node1 = Raft::Node(TestData).new(id: 1_u64, peers: [] of UInt64, config: cfg, state_machine: sm1) - # Manually persist a snapshot at index=2, term=1 - node1.persist_snapshot_for_test(2_u64, 1_u64) + node1.bootstrap + node1.propose(TestData.new("a")) + node1.propose(TestData.new("b")) + # Single-node cluster commits + applies immediately; two Normal entries + # past the bootstrap config crosses the snapshot threshold. + sm1.applied.map(&.value).should eq ["a", "b"] + node1.snapshot_index.should be > 0_u64 + snapshotted_index = node1.snapshot_index + snapshotted_term = node1.snapshot_term node1.close sm2 = TestStateMachine.new node2 = Raft::Node(TestData).new(id: 1_u64, peers: [] of UInt64, config: cfg, state_machine: sm2) - node2.snapshot_index.should eq 2_u64 - node2.snapshot_term.should eq 1_u64 - sm2.applied.size.should eq 2 - sm2.applied[0].value.should eq "a" - sm2.applied[1].value.should eq "b" + node2.snapshot_index.should eq snapshotted_index + node2.snapshot_term.should eq snapshotted_term + sm2.applied.map(&.value).should eq ["a", "b"] node2.close FileUtils.rm_rf(dir) @@ -56,25 +59,26 @@ describe "Raft::Node snapshot + log tail recovery" do Dir.mkdir_p(dir) cfg = Raft::Config.new cfg.data_dir = dir + # Trigger snapshot after 2 applies — so "a" and "b" produce a snapshot, + # "c" then lands in the log tail past snapshot_index. + cfg.snapshot_interval_entries = 2_u64 sm1 = TestStateMachine.new node1 = Raft::Node(TestData).new(id: 1_u64, peers: [] of UInt64, config: cfg, state_machine: sm1) node1.bootstrap node1.propose(TestData.new("a")) node1.propose(TestData.new("b")) - # Single-node cluster commits immediately; sm1 now has ["a", "b"]. sm1.applied.map(&.value).should eq ["a", "b"] + snapshot_at = node1.snapshot_index + snapshot_at.should be > 0_u64 - # Snapshot at current last_applied (covers bootstrap config + "a" + "b"). - # sm1.applied == ["a", "b"] at this moment — the snapshot encodes exactly that. - node1.persist_snapshot_for_test(node1.last_applied, node1.current_term) - - # Now append "c" — it lands in the log tail AFTER the snapshot index. + # "c" lands AFTER the snapshot — it's only in the log tail, not the snapshot. node1.propose(TestData.new("c")) sm1.applied.map(&.value).should eq ["a", "b", "c"] + node1.log.last_index.should be > snapshot_at node1.close - # Restart: recover from snapshot (restores "a"+"b"), then replay "c" from log tail. + # Restart: snapshot restores ["a","b"], then log replay applies "c". sm2 = TestStateMachine.new node2 = Raft::Node(TestData).new(id: 1_u64, peers: [] of UInt64, config: cfg, state_machine: sm2) sm2.applied.map(&.value).should eq ["a", "b", "c"] diff --git a/src/raft/node.cr b/src/raft/node.cr index 6e15b68..c94ddb2 100644 --- a/src/raft/node.cr +++ b/src/raft/node.cr @@ -1,4 +1,24 @@ module Raft + # Implements the Raft consensus protocol for a single peer in a single + # group. Generic over `T`, the application's command type. + # + # ## Threading contract + # + # **A Node is single-fiber.** All five public entry points — `tick`, + # `step`, `propose`, `take_messages`, and `read_index` — and all + # registration methods (`on_role_change`, `on_configuration_change`, + # `on_configuration_applied`) must be called from the same fiber that + # owns the node's loop. Internal state (`@current_term`, `@log`, + # `@peers`, `@outbox`, `@pending_reads`, `@pending_apply`) is mutated + # without locks because of this invariant. + # + # Callbacks fire on that same fiber and must therefore be non-blocking + # — channel a signal to your own fiber if you need to do real work. + # + # If you need to call `propose` from a different fiber (e.g. an HTTP + # handler), send the request over a channel and have the loop fiber + # do the actual call. The KV and queue examples demonstrate this + # pattern in `start_group_loop`. class Node(T) getter role : Role = Role::Follower getter current_term : UInt64 = 0_u64 @@ -319,13 +339,25 @@ module Raft messages end + # Fires when a Configuration entry is **committed** (apply phase). Block + # receives the new peer list. Use this for actions that should only run + # after a membership change is durable across a majority — e.g. cascading + # data-group reconfiguration in a multi-raft setup. + # + # Runs on the Raft fiber; keep it non-blocking. See class docs for the + # full threading contract. def on_configuration_change(&block : Array(Peer) ->) @on_configuration_change = block end - # Called whenever the peer list changes — both when config entries are stored - # in the log (followers) and when they're committed (leader). Use this to - # register transport peer addresses from config entries. + # Fires whenever the peer list changes — both when config entries are + # **stored** in the log (followers, before commit) and when they're + # committed (leader). Use this to register transport peer addresses + # from config entries; followers can route to new peers as soon as + # they see the entry, without waiting for the cluster commit cycle. + # + # Runs on the Raft fiber; keep it non-blocking. See class docs for the + # full threading contract. def on_configuration_applied(&block : Array(Peer) ->) @on_configuration_applied = block end @@ -1051,12 +1083,6 @@ module Raft @on_configuration_applied.try(&.call(@peers)) end - # Test helper — drives persist_snapshot from outside while - # take_snapshot doesn't exist yet (added in Task 3). - def persist_snapshot_for_test(index : UInt64, term : UInt64) - persist_snapshot(index, term) - end - private def take_snapshot return if @last_applied <= @snapshot_index term = @log.term_at(@last_applied)