diff --git a/examples/kv/src/main.cr b/examples/kv/src/main.cr index 03fb655..ee2695a 100644 --- a/examples/kv/src/main.cr +++ b/examples/kv/src/main.cr @@ -127,10 +127,11 @@ end transport.start # HTTP server -raft_handler = Raft::HTTP::Handler(KVCommand).new(meta_node, transport, raft_advertise_address) +raft_status_handler = Raft::HTTP::StatusHandler(KVCommand).new(meta_node, transport, raft_advertise_address) +raft_admin_handler = Raft::HTTP::AdminHandler(KVCommand).new(meta_node, transport) kv_handler = KVHttpHandler.new(meta_node, meta_sm, nodes, value_machines) -server = ::HTTP::Server.new([kv_handler, raft_handler]) do |context| +server = ::HTTP::Server.new([kv_handler, raft_status_handler, raft_admin_handler]) do |context| context.response.status_code = 404 context.response.print "Not found" end diff --git a/examples/queue/src/main.cr b/examples/queue/src/main.cr index 3476ccd..934dcba 100644 --- a/examples/queue/src/main.cr +++ b/examples/queue/src/main.cr @@ -133,10 +133,11 @@ end transport.start -raft_handler = Raft::HTTP::Handler(QueueCommand).new(meta_node, transport, raft_advertise_address) +raft_status_handler = Raft::HTTP::StatusHandler(QueueCommand).new(meta_node, transport, raft_advertise_address) +raft_admin_handler = Raft::HTTP::AdminHandler(QueueCommand).new(meta_node, transport) queue_handler = QueueHttpHandler.new(meta_node, meta_sm, nodes, state_machines, transport) -server = ::HTTP::Server.new([queue_handler, raft_handler]) do |context| +server = ::HTTP::Server.new([queue_handler, raft_status_handler, raft_admin_handler]) do |context| context.response.status_code = 404 context.response.print "Not found" end diff --git a/spec/raft/http/admin_handler_spec.cr b/spec/raft/http/admin_handler_spec.cr new file mode 100644 index 0000000..169e523 --- /dev/null +++ b/spec/raft/http/admin_handler_spec.cr @@ -0,0 +1,124 @@ +require "../../spec_helper" +require "http/server" +require "http/client" + +private def make_node(dir : String, peers : Array(Raft::NodeID) = [2_u64, 3_u64]) : Raft::Node(TestData) + Dir.mkdir_p(dir) + config = Raft::Config.new + config.data_dir = dir + config.election_timeout_min_ticks = 100_u32 + config.election_timeout_max_ticks = 100_u32 + Raft::Node(TestData).new(id: 1_u64, peers: peers, config: config, state_machine: TestStateMachine.new) +end + +describe Raft::HTTP::AdminHandler do + it "bootstraps a single-node cluster via POST" do + dir = File.tempname("raft_admin") + node = make_node(dir, peers: [] of Raft::NodeID) + + handler = Raft::HTTP::AdminHandler(TestData).new(node) + server = ::HTTP::Server.new([handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + response = ::HTTP::Client.post("http://127.0.0.1:#{address.port}/raft/admin/bootstrap") + response.status_code.should eq 200 + response.body.should contain("\"status\":\"bootstrapped\"") + node.role.should eq Raft::Role::Leader + + server.close + node.close + FileUtils.rm_rf(dir) + end + + it "returns 404 for unknown admin actions" do + dir = File.tempname("raft_admin") + node = make_node(dir) + + handler = Raft::HTTP::AdminHandler(TestData).new(node) + server = ::HTTP::Server.new([handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + response = ::HTTP::Client.post("http://127.0.0.1:#{address.port}/raft/admin/nonexistent") + response.status_code.should eq 404 + + server.close + node.close + FileUtils.rm_rf(dir) + end + + it "does not respond to GET /raft/status (falls through)" do + dir = File.tempname("raft_admin") + node = make_node(dir) + + handler = Raft::HTTP::AdminHandler(TestData).new(node) + server = ::HTTP::Server.new([handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + response = ::HTTP::Client.get("http://127.0.0.1:#{address.port}/raft/status") + response.status_code.should eq 404 + + server.close + node.close + FileUtils.rm_rf(dir) + end + + {% if flag?(:raft_debug) %} + it "pauses and resumes node via debug admin endpoints" do + dir = File.tempname("raft_admin") + node = make_node(dir) + + handler = Raft::HTTP::AdminHandler(TestData).new(node) + server = ::HTTP::Server.new([handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + response = ::HTTP::Client.post("http://127.0.0.1:#{address.port}/raft/admin/pause") + response.status_code.should eq 200 + node.paused.should be_true + + response = ::HTTP::Client.post("http://127.0.0.1:#{address.port}/raft/admin/resume") + response.status_code.should eq 200 + node.paused.should be_false + + server.close + node.close + FileUtils.rm_rf(dir) + end + {% end %} +end + +describe "Raft::HTTP::StatusHandler + AdminHandler chained" do + it "serves both route sets when both handlers are mounted together" do + dir = File.tempname("raft_chain") + Dir.mkdir_p(dir) + config = Raft::Config.new + config.data_dir = dir + config.election_timeout_min_ticks = 100_u32 + config.election_timeout_max_ticks = 100_u32 + node = Raft::Node(TestData).new(id: 1_u64, peers: [] of Raft::NodeID, config: config, state_machine: TestStateMachine.new) + + status_handler = Raft::HTTP::StatusHandler(TestData).new(node) + admin_handler = Raft::HTTP::AdminHandler(TestData).new(node) + server = ::HTTP::Server.new([status_handler, admin_handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + # Status path hits StatusHandler. + status_response = ::HTTP::Client.get("http://127.0.0.1:#{address.port}/raft/status") + status_response.status_code.should eq 200 + status_response.body.should contain("\"id\":1") + + # Admin path falls through StatusHandler, lands on AdminHandler. + bootstrap_response = ::HTTP::Client.post("http://127.0.0.1:#{address.port}/raft/admin/bootstrap") + bootstrap_response.status_code.should eq 200 + bootstrap_response.body.should contain("\"status\":\"bootstrapped\"") + node.role.should eq Raft::Role::Leader + + server.close + node.close + FileUtils.rm_rf(dir) + end +end diff --git a/spec/raft/http/handler_spec.cr b/spec/raft/http/handler_spec.cr deleted file mode 100644 index 77cafe5..0000000 --- a/spec/raft/http/handler_spec.cr +++ /dev/null @@ -1,92 +0,0 @@ -require "../../spec_helper" -require "http/server" -require "http/client" - -describe Raft::HTTP::Handler do - it "returns node status as JSON" do - dir = File.tempname("raft_http") - Dir.mkdir_p(dir) - config = Raft::Config.new - config.data_dir = dir - config.election_timeout_min_ticks = 100_u32 - config.election_timeout_max_ticks = 100_u32 - - sm = TestStateMachine.new - node = Raft::Node(TestData).new(id: 1_u64, peers: [2_u64, 3_u64], config: config, state_machine: sm) - - handler = Raft::HTTP::Handler(TestData).new(node) - server = ::HTTP::Server.new([handler]) - address = server.bind_tcp("127.0.0.1", 0) - spawn server.listen - - response = ::HTTP::Client.get("http://127.0.0.1:#{address.port}/raft/status") - response.status_code.should eq 200 - body = response.body - body.should contain("\"id\":1") - body.should contain("\"role\":\"follower\"") - - server.close - node.close - FileUtils.rm_rf(dir) - end - - it "returns metrics in prometheus format" do - dir = File.tempname("raft_http") - Dir.mkdir_p(dir) - config = Raft::Config.new - config.data_dir = dir - config.election_timeout_min_ticks = 100_u32 - config.election_timeout_max_ticks = 100_u32 - - sm = TestStateMachine.new - metrics = Raft::Metrics.new(node_id: 1_u64) - node = Raft::Node(TestData).new(id: 1_u64, peers: [2_u64, 3_u64], config: config, state_machine: sm, metrics: metrics) - - handler = Raft::HTTP::Handler(TestData).new(node) - server = ::HTTP::Server.new([handler]) - address = server.bind_tcp("127.0.0.1", 0) - spawn server.listen - - response = ::HTTP::Client.get("http://127.0.0.1:#{address.port}/raft/metrics") - response.status_code.should eq 200 - response.body.should contain("raft_node_term") - # Fresh follower with no leader yet: is_leader=0, leader_id=0 (nil → 0). - response.body.should match /raft_node_is_leader\{[^}]*\} 0\n/ - response.body.should match /raft_node_leader_id\{[^}]*\} 0\n/ - - server.close - node.close - FileUtils.rm_rf(dir) - end - - {% if flag?(:raft_debug) %} - it "pauses and resumes node via admin endpoints" do - dir = File.tempname("raft_http") - Dir.mkdir_p(dir) - config = Raft::Config.new - config.data_dir = dir - config.election_timeout_min_ticks = 100_u32 - config.election_timeout_max_ticks = 100_u32 - - sm = TestStateMachine.new - node = Raft::Node(TestData).new(id: 1_u64, peers: [2_u64, 3_u64], config: config, state_machine: sm) - - handler = Raft::HTTP::Handler(TestData).new(node) - server = ::HTTP::Server.new([handler]) - address = server.bind_tcp("127.0.0.1", 0) - spawn server.listen - - response = ::HTTP::Client.post("http://127.0.0.1:#{address.port}/raft/admin/pause") - response.status_code.should eq 200 - node.paused.should be_true - - response = ::HTTP::Client.post("http://127.0.0.1:#{address.port}/raft/admin/resume") - response.status_code.should eq 200 - node.paused.should be_false - - server.close - node.close - FileUtils.rm_rf(dir) - end - {% end %} -end diff --git a/spec/raft/http/status_handler_spec.cr b/spec/raft/http/status_handler_spec.cr new file mode 100644 index 0000000..caaadc3 --- /dev/null +++ b/spec/raft/http/status_handler_spec.cr @@ -0,0 +1,89 @@ +require "../../spec_helper" +require "http/server" +require "http/client" + +private def make_node(dir : String) : Raft::Node(TestData) + Dir.mkdir_p(dir) + config = Raft::Config.new + config.data_dir = dir + config.election_timeout_min_ticks = 100_u32 + config.election_timeout_max_ticks = 100_u32 + Raft::Node(TestData).new(id: 1_u64, peers: [2_u64, 3_u64], config: config, state_machine: TestStateMachine.new) +end + +private def make_node_with_metrics(dir : String) : Raft::Node(TestData) + Dir.mkdir_p(dir) + config = Raft::Config.new + config.data_dir = dir + config.election_timeout_min_ticks = 100_u32 + config.election_timeout_max_ticks = 100_u32 + metrics = Raft::Metrics.new(node_id: 1_u64) + Raft::Node(TestData).new(id: 1_u64, peers: [2_u64, 3_u64], config: config, state_machine: TestStateMachine.new, metrics: metrics) +end + +describe Raft::HTTP::StatusHandler do + it "returns node status as JSON" do + dir = File.tempname("raft_status") + node = make_node(dir) + + handler = Raft::HTTP::StatusHandler(TestData).new(node) + server = ::HTTP::Server.new([handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + response = ::HTTP::Client.get("http://127.0.0.1:#{address.port}/raft/status") + response.status_code.should eq 200 + body = response.body + body.should contain("\"id\":1") + body.should contain("\"role\":\"follower\"") + + server.close + node.close + FileUtils.rm_rf(dir) + end + + it "returns metrics in prometheus format" do + dir = File.tempname("raft_status") + node = make_node_with_metrics(dir) + + handler = Raft::HTTP::StatusHandler(TestData).new(node) + server = ::HTTP::Server.new([handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + response = ::HTTP::Client.get("http://127.0.0.1:#{address.port}/raft/metrics") + response.status_code.should eq 200 + response.body.should contain("raft_node_term") + # Fresh follower with no leader yet: is_leader=0, leader_id=0 (nil → 0). + response.body.should match /raft_node_is_leader\{[^}]*\} 0\n/ + response.body.should match /raft_node_leader_id\{[^}]*\} 0\n/ + + server.close + node.close + FileUtils.rm_rf(dir) + end + + it "does not respond to POST /raft/admin/* (falls through to next handler)" do + # StatusHandler alone — admin posts should fall through to the bare chain's + # 404. This is the property that lets the metrics port stay safe from + # cluster-mutating requests. + dir = File.tempname("raft_status") + node = make_node(dir) + + handler = Raft::HTTP::StatusHandler(TestData).new(node) + server = ::HTTP::Server.new([handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + response = ::HTTP::Client.post("http://127.0.0.1:#{address.port}/raft/admin/bootstrap") + response.status_code.should eq 404 + + # Node was not mutated — still a follower with the initial peer set. + node.role.should eq Raft::Role::Follower + node.current_term.should eq 0_u64 + + server.close + node.close + FileUtils.rm_rf(dir) + end +end diff --git a/src/raft.cr b/src/raft.cr index 0ff3947..c8b3bfa 100644 --- a/src/raft.cr +++ b/src/raft.cr @@ -15,4 +15,5 @@ require "./raft/transport/memory_transport" require "./raft/transport/tcp_transport" require "./raft/node" require "./raft/server" -require "./raft/http/handler" +require "./raft/http/status_handler" +require "./raft/http/admin_handler" diff --git a/src/raft/http/handler.cr b/src/raft/http/admin_handler.cr similarity index 58% rename from src/raft/http/handler.cr rename to src/raft/http/admin_handler.cr index ee50f59..216a89b 100644 --- a/src/raft/http/handler.cr +++ b/src/raft/http/admin_handler.cr @@ -5,107 +5,43 @@ require "../transport/tcp_transport" module Raft module HTTP - class Handler(T) + # Mutating HTTP handler exposing cluster administration over `POST + # /raft/admin/*`. Mount behind authentication — every route mutates + # cluster state (membership, leadership, or under `-Draft_debug`, the + # chaos primitives). + # + # Routes (all `POST`): + # - `/raft/admin/bootstrap` — turn an empty node into a single-node cluster + # - `/raft/admin/register_peer` — register a peer's address with the transport + # - `/raft/admin/add_server/{id}` — add a learner with optional `{"address": "..."}` body + # - `/raft/admin/remove_server/{id}` — remove a server from the configuration + # - `/raft/admin/promote_learner/{id}` — promote a learner to voter + # - `/raft/admin/transfer_leadership/{id}` — initiate leadership transfer + # - `-Draft_debug` only: `/raft/admin/pause|resume|partition|heal|reset` + # + # Anything else falls through to the next handler in the chain via + # `call_next`. Pair with `StatusHandler` to expose the read-only surface + # on a separate endpoint. + class AdminHandler(T) include ::HTTP::Handler @node : Node(T) @transport : TCPTransport? - @raft_address : String? - def initialize(@node : Node(T), @transport : TCPTransport? = nil, @raft_address : String? = nil) + def initialize(@node : Node(T), @transport : TCPTransport? = nil) end def call(context : ::HTTP::Server::Context) method = context.request.method path = context.request.path - case {method, path} - when {"GET", "/raft/status"} - handle_status(context) - when {"GET", "/raft/log"} - handle_log(context) - when {"GET", "/raft/metrics"} - handle_metrics(context) + if method == "POST" && path.starts_with?("/raft/admin/") + handle_admin(context, path) else - if method == "POST" && path.starts_with?("/raft/admin/") - handle_admin(context, path) - return - end call_next(context) end end - private def handle_status(context) - leader_id = @node.leader_id - json = JSON.build do |j| - j.object do - j.field "id", @node.id - j.field "role", @node.role.to_s.downcase - j.field "term", @node.current_term - j.field "raft_address", @raft_address if @raft_address - j.field "leader_id", leader_id - j.field "commit_index", @node.commit_index - j.field "last_log_index", @node.log.last_index - j.field "peers" do - j.array do - @node.peers.each do |p| - j.object do - j.field "id", p.id - j.field "role", p.role.to_s.downcase - j.field "address", p.address unless p.address.empty? - end - end - end - end - {% if flag?(:raft_debug) %} - j.field "paused", @node.paused - {% end %} - end - end - context.response.content_type = "application/json" - context.response.status_code = 200 - context.response.print json - end - - private def handle_log(context) - json = JSON.build do |j| - j.object do - j.field "last_index", @node.log.last_index - j.field "last_term", @node.log.last_term - j.field "segment_count", @node.log.segment_count - j.field "commit_index", @node.commit_index - end - end - context.response.content_type = "application/json" - context.response.status_code = 200 - context.response.print json - end - - private def handle_metrics(context) - if metrics = @node.metrics - # Update gauges from current node state - metrics.set_gauge("raft_node_role", @node.role.value.to_i64) - metrics.set_gauge("raft_node_term", @node.current_term.to_i64) - metrics.set_gauge("raft_node_commit_index", @node.commit_index.to_i64) - metrics.set_gauge("raft_node_last_log_index", @node.log.last_index.to_i64) - metrics.set_gauge("raft_node_first_log_index", @node.log.first_index.to_i64) - metrics.set_gauge("raft_node_segment_count", @node.log.segment_count.to_i64) - metrics.set_gauge("raft_node_snapshot_index", @node.snapshot_index.to_i64) - metrics.set_gauge("raft_node_snapshot_size_bytes", @node.snapshot_size_bytes) - metrics.set_gauge("raft_node_peers", @node.peers.size.to_i64) - metrics.set_gauge("raft_node_is_leader", @node.role.leader? ? 1_i64 : 0_i64) - metrics.set_gauge("raft_node_leader_id", (@node.leader_id || 0_u64).to_i64) - - context.response.content_type = "text/plain; version=0.0.4" - context.response.status_code = 200 - context.response.print metrics.to_prometheus - @transport.try(&.to_prometheus(context.response)) - else - context.response.status_code = 503 - context.response.print "Metrics not configured" - end - end - private def handle_admin(context, path) handle_admin_inner(context, path) rescue ex : JSON::ParseException | KeyError | ArgumentError diff --git a/src/raft/http/status_handler.cr b/src/raft/http/status_handler.cr new file mode 100644 index 0000000..4086b08 --- /dev/null +++ b/src/raft/http/status_handler.cr @@ -0,0 +1,118 @@ +require "http/server/handler" +require "json" +require "../node" +require "../transport/tcp_transport" + +module Raft + module HTTP + # Read-only HTTP handler exposing node status, log metadata, and Prometheus + # metrics. Safe to mount on an unauthenticated metrics port — does not + # mutate cluster state. + # + # Routes: + # - `GET /raft/status` — JSON snapshot of node role, term, leader, peers + # - `GET /raft/log` — JSON snapshot of log/commit indexes + # - `GET /raft/metrics` — Prometheus text format + # + # Anything else (including `POST /raft/admin/*`) falls through to the next + # handler in the chain via `call_next`. Pair with `AdminHandler` to expose + # the mutating surface on a separate (typically authenticated) endpoint. + class StatusHandler(T) + include ::HTTP::Handler + + @node : Node(T) + @transport : TCPTransport? + @raft_address : String? + + def initialize(@node : Node(T), @transport : TCPTransport? = nil, @raft_address : String? = nil) + end + + def call(context : ::HTTP::Server::Context) + method = context.request.method + path = context.request.path + + case {method, path} + when {"GET", "/raft/status"} + handle_status(context) + when {"GET", "/raft/log"} + handle_log(context) + when {"GET", "/raft/metrics"} + handle_metrics(context) + else + call_next(context) + end + end + + private def handle_status(context) + leader_id = @node.leader_id + json = JSON.build do |j| + j.object do + j.field "id", @node.id + j.field "role", @node.role.to_s.downcase + j.field "term", @node.current_term + j.field "raft_address", @raft_address if @raft_address + j.field "leader_id", leader_id + j.field "commit_index", @node.commit_index + j.field "last_log_index", @node.log.last_index + j.field "peers" do + j.array do + @node.peers.each do |p| + j.object do + j.field "id", p.id + j.field "role", p.role.to_s.downcase + j.field "address", p.address unless p.address.empty? + end + end + end + end + {% if flag?(:raft_debug) %} + j.field "paused", @node.paused + {% end %} + end + end + context.response.content_type = "application/json" + context.response.status_code = 200 + context.response.print json + end + + private def handle_log(context) + json = JSON.build do |j| + j.object do + j.field "last_index", @node.log.last_index + j.field "last_term", @node.log.last_term + j.field "segment_count", @node.log.segment_count + j.field "commit_index", @node.commit_index + end + end + context.response.content_type = "application/json" + context.response.status_code = 200 + context.response.print json + end + + private def handle_metrics(context) + if metrics = @node.metrics + # Update gauges from current node state + metrics.set_gauge("raft_node_role", @node.role.value.to_i64) + metrics.set_gauge("raft_node_term", @node.current_term.to_i64) + metrics.set_gauge("raft_node_commit_index", @node.commit_index.to_i64) + metrics.set_gauge("raft_node_last_log_index", @node.log.last_index.to_i64) + metrics.set_gauge("raft_node_first_log_index", @node.log.first_index.to_i64) + metrics.set_gauge("raft_node_segment_count", @node.log.segment_count.to_i64) + metrics.set_gauge("raft_node_snapshot_index", @node.snapshot_index.to_i64) + metrics.set_gauge("raft_node_snapshot_size_bytes", @node.snapshot_size_bytes) + metrics.set_gauge("raft_node_peers", @node.peers.size.to_i64) + metrics.set_gauge("raft_node_is_leader", @node.role.leader? ? 1_i64 : 0_i64) + metrics.set_gauge("raft_node_leader_id", (@node.leader_id || 0_u64).to_i64) + + context.response.content_type = "text/plain; version=0.0.4" + context.response.status_code = 200 + context.response.print metrics.to_prometheus + @transport.try(&.to_prometheus(context.response)) + else + context.response.status_code = 503 + context.response.print "Metrics not configured" + end + end + end + end +end