diff --git a/examples/kv/src/main.cr b/examples/kv/src/main.cr index ee2695a..2d83ece 100644 --- a/examples/kv/src/main.cr +++ b/examples/kv/src/main.cr @@ -127,8 +127,8 @@ end transport.start # HTTP server -raft_status_handler = Raft::HTTP::StatusHandler(KVCommand).new(meta_node, transport, raft_advertise_address) -raft_admin_handler = Raft::HTTP::AdminHandler(KVCommand).new(meta_node, transport) +raft_status_handler = Raft::HTTP::StatusHandler.new(meta_node, transport, raft_advertise_address) +raft_admin_handler = Raft::HTTP::AdminHandler.new(meta_node, transport) kv_handler = KVHttpHandler.new(meta_node, meta_sm, nodes, value_machines) server = ::HTTP::Server.new([kv_handler, raft_status_handler, raft_admin_handler]) do |context| diff --git a/examples/queue/src/main.cr b/examples/queue/src/main.cr index 934dcba..8081f85 100644 --- a/examples/queue/src/main.cr +++ b/examples/queue/src/main.cr @@ -133,8 +133,8 @@ end transport.start -raft_status_handler = Raft::HTTP::StatusHandler(QueueCommand).new(meta_node, transport, raft_advertise_address) -raft_admin_handler = Raft::HTTP::AdminHandler(QueueCommand).new(meta_node, transport) +raft_status_handler = Raft::HTTP::StatusHandler.new(meta_node, transport, raft_advertise_address) +raft_admin_handler = Raft::HTTP::AdminHandler.new(meta_node, transport) queue_handler = QueueHttpHandler.new(meta_node, meta_sm, nodes, state_machines, transport) server = ::HTTP::Server.new([queue_handler, raft_status_handler, raft_admin_handler]) do |context| diff --git a/spec/raft/http/admin_client_spec.cr b/spec/raft/http/admin_client_spec.cr new file mode 100644 index 0000000..382764d --- /dev/null +++ b/spec/raft/http/admin_client_spec.cr @@ -0,0 +1,117 @@ +require "../../spec_helper" +require "http/server" +require "uri" + +# Records the last request and replies with a canned status, so AdminClient +# specs can assert the exact wire format without a live Raft node. +private class RecordingHandler + include ::HTTP::Handler + + property last_method : String? = nil + property last_path : String? = nil + property last_body : String? = nil + property last_content_type : String? = nil + property last_authorization : String? = nil + property reply_status : Int32 = 200 + + def call(context : ::HTTP::Server::Context) + @last_method = context.request.method + @last_path = context.request.path + @last_body = context.request.body.try(&.gets_to_end) + @last_content_type = context.request.headers["Content-Type"]? + @last_authorization = context.request.headers["Authorization"]? + context.response.status_code = @reply_status + end +end + +private def with_recording_server(&) + handler = RecordingHandler.new + server = ::HTTP::Server.new([handler]) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + begin + yield handler, address.port + ensure + server.close + end +end + +describe Raft::HTTP::AdminClient do + it "posts the add_server wire format and returns the status" do + with_recording_server do |handler, port| + uri = URI.parse("http://127.0.0.1:#{port}") + status = Raft::HTTP::AdminClient.add_server(uri, 2_u64, "node-2:9000") + + status.should eq ::HTTP::Status::OK + handler.last_method.should eq "POST" + handler.last_path.should eq "/raft/admin/add_server/2" + handler.last_content_type.should eq "application/json" + handler.last_body.should eq %({"address":"node-2:9000"}) + end + end + + it "returns non-200 statuses so callers can distinguish rejection from unavailability" do + with_recording_server do |handler, port| + uri = URI.parse("http://127.0.0.1:#{port}") + + handler.reply_status = 400 + Raft::HTTP::AdminClient.add_server(uri, 2_u64).should eq ::HTTP::Status::BAD_REQUEST + + handler.reply_status = 503 + Raft::HTTP::AdminClient.add_server(uri, 2_u64).should eq ::HTTP::Status::SERVICE_UNAVAILABLE + end + end + + it "sends basic auth from URI userinfo" do + with_recording_server do |handler, port| + uri = URI.parse("http://admin:secret@127.0.0.1:#{port}") + Raft::HTTP::AdminClient.add_server(uri, 3_u64) + + auth = handler.last_authorization + auth.should_not be_nil + auth.to_s.should start_with("Basic ") + end + end + + it "respects a path prefix in the URI" do + with_recording_server do |handler, port| + uri = URI.parse("http://127.0.0.1:#{port}/api/v1") + Raft::HTTP::AdminClient.add_server(uri, 4_u64) + + handler.last_path.should eq "/api/v1/raft/admin/add_server/4" + end + end + + it "raises on connection errors" do + # Port 1 is essentially guaranteed closed. + uri = URI.parse("http://127.0.0.1:1") + expect_raises(Socket::ConnectError) do + Raft::HTTP::AdminClient.add_server(uri, 2_u64) + end + end + + it "round-trips against the real AdminHandler" do + dir = File.tempname("raft_admin_client") + Dir.mkdir_p(dir) + config = Raft::Config.new + config.data_dir = dir + config.election_timeout_min_ticks = 100_u32 + config.election_timeout_max_ticks = 100_u32 + node = Raft::Node(TestData).new(id: 1_u64, peers: [] of Raft::NodeID, config: config, state_machine: TestStateMachine.new) + node.bootstrap.should be_true + + admin_handler = Raft::HTTP::AdminHandler.new(node) + server = ::HTTP::Server.new([admin_handler] of ::HTTP::Handler) + address = server.bind_tcp("127.0.0.1", 0) + spawn server.listen + + uri = URI.parse("http://127.0.0.1:#{address.port}") + status = Raft::HTTP::AdminClient.add_server(uri, 2_u64, "node-2:9000") + status.should eq ::HTTP::Status::OK + node.peers.any? { |p| p.id == 2_u64 }.should be_true + + server.close + node.close + FileUtils.rm_rf(dir) + end +end diff --git a/spec/raft/http/admin_handler_spec.cr b/spec/raft/http/admin_handler_spec.cr index 169e523..efa0466 100644 --- a/spec/raft/http/admin_handler_spec.cr +++ b/spec/raft/http/admin_handler_spec.cr @@ -16,7 +16,7 @@ describe Raft::HTTP::AdminHandler do dir = File.tempname("raft_admin") node = make_node(dir, peers: [] of Raft::NodeID) - handler = Raft::HTTP::AdminHandler(TestData).new(node) + handler = Raft::HTTP::AdminHandler.new(node) server = ::HTTP::Server.new([handler]) address = server.bind_tcp("127.0.0.1", 0) spawn server.listen @@ -35,7 +35,7 @@ describe Raft::HTTP::AdminHandler do dir = File.tempname("raft_admin") node = make_node(dir) - handler = Raft::HTTP::AdminHandler(TestData).new(node) + handler = Raft::HTTP::AdminHandler.new(node) server = ::HTTP::Server.new([handler]) address = server.bind_tcp("127.0.0.1", 0) spawn server.listen @@ -52,7 +52,7 @@ describe Raft::HTTP::AdminHandler do dir = File.tempname("raft_admin") node = make_node(dir) - handler = Raft::HTTP::AdminHandler(TestData).new(node) + handler = Raft::HTTP::AdminHandler.new(node) server = ::HTTP::Server.new([handler]) address = server.bind_tcp("127.0.0.1", 0) spawn server.listen @@ -70,7 +70,7 @@ describe Raft::HTTP::AdminHandler do dir = File.tempname("raft_admin") node = make_node(dir) - handler = Raft::HTTP::AdminHandler(TestData).new(node) + handler = Raft::HTTP::AdminHandler.new(node) server = ::HTTP::Server.new([handler]) address = server.bind_tcp("127.0.0.1", 0) spawn server.listen @@ -100,8 +100,8 @@ describe "Raft::HTTP::StatusHandler + AdminHandler chained" do config.election_timeout_max_ticks = 100_u32 node = Raft::Node(TestData).new(id: 1_u64, peers: [] of Raft::NodeID, config: config, state_machine: TestStateMachine.new) - status_handler = Raft::HTTP::StatusHandler(TestData).new(node) - admin_handler = Raft::HTTP::AdminHandler(TestData).new(node) + status_handler = Raft::HTTP::StatusHandler.new(node) + admin_handler = Raft::HTTP::AdminHandler.new(node) server = ::HTTP::Server.new([status_handler, admin_handler]) address = server.bind_tcp("127.0.0.1", 0) spawn server.listen diff --git a/spec/raft/http/status_handler_spec.cr b/spec/raft/http/status_handler_spec.cr index caaadc3..6ba54d9 100644 --- a/spec/raft/http/status_handler_spec.cr +++ b/spec/raft/http/status_handler_spec.cr @@ -26,7 +26,7 @@ describe Raft::HTTP::StatusHandler do dir = File.tempname("raft_status") node = make_node(dir) - handler = Raft::HTTP::StatusHandler(TestData).new(node) + handler = Raft::HTTP::StatusHandler.new(node) server = ::HTTP::Server.new([handler]) address = server.bind_tcp("127.0.0.1", 0) spawn server.listen @@ -46,7 +46,7 @@ describe Raft::HTTP::StatusHandler do dir = File.tempname("raft_status") node = make_node_with_metrics(dir) - handler = Raft::HTTP::StatusHandler(TestData).new(node) + handler = Raft::HTTP::StatusHandler.new(node) server = ::HTTP::Server.new([handler]) address = server.bind_tcp("127.0.0.1", 0) spawn server.listen @@ -70,7 +70,7 @@ describe Raft::HTTP::StatusHandler do dir = File.tempname("raft_status") node = make_node(dir) - handler = Raft::HTTP::StatusHandler(TestData).new(node) + handler = Raft::HTTP::StatusHandler.new(node) server = ::HTTP::Server.new([handler]) address = server.bind_tcp("127.0.0.1", 0) spawn server.listen diff --git a/src/raft.cr b/src/raft.cr index c8b3bfa..de1ece3 100644 --- a/src/raft.cr +++ b/src/raft.cr @@ -13,7 +13,10 @@ require "./raft/log/segment" require "./raft/transport" require "./raft/transport/memory_transport" require "./raft/transport/tcp_transport" +require "./raft/status_source" +require "./raft/admin_ops" require "./raft/node" require "./raft/server" require "./raft/http/status_handler" require "./raft/http/admin_handler" +require "./raft/http/admin_client" diff --git a/src/raft/admin_ops.cr b/src/raft/admin_ops.cr new file mode 100644 index 0000000..2cbf74b --- /dev/null +++ b/src/raft/admin_ops.cr @@ -0,0 +1,28 @@ +require "./config" + +module Raft + # The mutating cluster-administration surface an admin endpoint needs + # from a node. T-free by construction — none of these depend on the + # state-machine command type, which lets `HTTP::AdminHandler` be a + # concrete (non-generic) class. + # + # Included by `Raft::Node(T)`. The threading contract still applies: + # these mutate node state without locks, so callers outside the node's + # owning fiber must arrange their own safety (e.g. run the HTTP server + # in the same execution context, or marshal calls onto the node's fiber). + module AdminOps + abstract def bootstrap : Bool + abstract def add_server(node_id : NodeID, address : String = "") : Bool + abstract def remove_server(node_id : NodeID) : Bool + abstract def promote_learner(node_id : NodeID) : Bool + abstract def transfer_leadership(to target : NodeID) : Bool + + {% if flag?(:raft_debug) %} + abstract def pause + abstract def resume + abstract def partition + abstract def heal + abstract def reset + {% end %} + end +end diff --git a/src/raft/http/admin_client.cr b/src/raft/http/admin_client.cr new file mode 100644 index 0000000..a5f7695 --- /dev/null +++ b/src/raft/http/admin_client.cr @@ -0,0 +1,36 @@ +require "http/client" +require "json" +require "uri" +require "../config" + +module Raft + module HTTP + # Client side of the `/raft/admin/*` wire format. Keeps request shapes in + # one place so embedders don't hand-roll them. + # + # Single attempt per call, no retry loop — embedders own retry policy. + # Returns the response `::HTTP::Status` so callers can distinguish + # rejection (400, e.g. already a member — stop retrying) from + # unavailability (5xx — retry later). Raises only on connection errors. + module AdminClient + # POST `/raft/admin/add_server/` with a JSON body + # `{"address": address}`. Basic auth is taken from the URI's userinfo + # when present. + def self.add_server(uri : URI, node_id : NodeID, address : String = "") : ::HTTP::Status + client = ::HTTP::Client.new(uri) + begin + if user = uri.user + client.basic_auth(user, uri.password || "") + end + path = uri.path.rstrip('/') + "/raft/admin/add_server/#{node_id}" + headers = ::HTTP::Headers{"Content-Type" => "application/json"} + body = {address: address}.to_json + response = client.post(path, headers: headers, body: body) + response.status + ensure + client.close + end + end + end + end +end diff --git a/src/raft/http/admin_handler.cr b/src/raft/http/admin_handler.cr index 216a89b..13ee618 100644 --- a/src/raft/http/admin_handler.cr +++ b/src/raft/http/admin_handler.cr @@ -1,6 +1,6 @@ require "http/server/handler" require "json" -require "../node" +require "../admin_ops" require "../transport/tcp_transport" module Raft @@ -22,13 +22,16 @@ module Raft # Anything else falls through to the next handler in the chain via # `call_next`. Pair with `StatusHandler` to expose the read-only surface # on a separate endpoint. - class AdminHandler(T) + # + # Concrete (non-generic) — takes any `AdminOps`, which `Node(T)` + # includes. + class AdminHandler include ::HTTP::Handler - @node : Node(T) + @ops : AdminOps @transport : TCPTransport? - def initialize(@node : Node(T), @transport : TCPTransport? = nil) + def initialize(@ops : AdminOps, @transport : TCPTransport? = nil) end def call(context : ::HTTP::Server::Context) @@ -51,7 +54,7 @@ module Raft private def handle_admin_inner(context, path) case path when "/raft/admin/bootstrap" - if @node.bootstrap + if @ops.bootstrap json_response(context, 200, {"status" => "bootstrapped"}) else json_response(context, 400, {"error" => "failed to bootstrap (node may already have peers)"}) @@ -82,28 +85,28 @@ module Raft address = data["address"]?.try(&.as_s) || "" end end - if @node.add_server(node_id, address) + if @ops.add_server(node_id, address) json_response(context, 200, {"status" => "added", "node_id" => node_id.to_s}) else json_response(context, 400, {"error" => "failed to add server"}) end elsif path.starts_with?("/raft/admin/remove_server/") node_id = path.split("/").last.to_u64 - if @node.remove_server(node_id) + if @ops.remove_server(node_id) json_response(context, 200, {"status" => "removed", "node_id" => node_id.to_s}) else json_response(context, 400, {"error" => "failed to remove server"}) end elsif path.starts_with?("/raft/admin/promote_learner/") node_id = path.split("/").last.to_u64 - if @node.promote_learner(node_id) + if @ops.promote_learner(node_id) json_response(context, 200, {"status" => "promoted", "node_id" => node_id.to_s}) else json_response(context, 400, {"error" => "failed to promote learner"}) end elsif path.starts_with?("/raft/admin/transfer_leadership/") node_id = path.split("/").last.to_u64 - if @node.transfer_leadership(to: node_id) + if @ops.transfer_leadership(to: node_id) json_response(context, 200, {"status" => "transferring", "target" => node_id.to_s}) else json_response(context, 400, {"error" => "failed to transfer leadership"}) @@ -123,19 +126,19 @@ module Raft private def handle_debug_admin(context, path) case path when "/raft/admin/pause" - @node.pause + @ops.pause json_response(context, 200, {"status" => "paused"}) when "/raft/admin/resume" - @node.resume + @ops.resume json_response(context, 200, {"status" => "resumed"}) when "/raft/admin/partition" - @node.partition + @ops.partition json_response(context, 200, {"status" => "partitioned"}) when "/raft/admin/heal" - @node.heal + @ops.heal json_response(context, 200, {"status" => "healed"}) when "/raft/admin/reset" - @node.reset + @ops.reset json_response(context, 200, {"status" => "reset"}) else context.response.status_code = 404 diff --git a/src/raft/http/status_handler.cr b/src/raft/http/status_handler.cr index 4086b08..0729784 100644 --- a/src/raft/http/status_handler.cr +++ b/src/raft/http/status_handler.cr @@ -1,6 +1,6 @@ require "http/server/handler" require "json" -require "../node" +require "../status_source" require "../transport/tcp_transport" module Raft @@ -17,14 +17,17 @@ module Raft # Anything else (including `POST /raft/admin/*`) falls through to the next # handler in the chain via `call_next`. Pair with `AdminHandler` to expose # the mutating surface on a separate (typically authenticated) endpoint. - class StatusHandler(T) + # + # Concrete (non-generic) — takes any `StatusSource`, which `Node(T)` + # includes. + class StatusHandler include ::HTTP::Handler - @node : Node(T) + @node : StatusSource @transport : TCPTransport? @raft_address : String? - def initialize(@node : Node(T), @transport : TCPTransport? = nil, @raft_address : String? = nil) + def initialize(@node : StatusSource, @transport : TCPTransport? = nil, @raft_address : String? = nil) end def call(context : ::HTTP::Server::Context) @@ -53,7 +56,7 @@ module Raft j.field "raft_address", @raft_address if @raft_address j.field "leader_id", leader_id j.field "commit_index", @node.commit_index - j.field "last_log_index", @node.log.last_index + j.field "last_log_index", @node.last_log_index j.field "peers" do j.array do @node.peers.each do |p| @@ -78,9 +81,9 @@ module Raft private def handle_log(context) json = JSON.build do |j| j.object do - j.field "last_index", @node.log.last_index - j.field "last_term", @node.log.last_term - j.field "segment_count", @node.log.segment_count + j.field "last_index", @node.last_log_index + j.field "last_term", @node.last_log_term + j.field "segment_count", @node.segment_count j.field "commit_index", @node.commit_index end end @@ -95,9 +98,9 @@ module Raft metrics.set_gauge("raft_node_role", @node.role.value.to_i64) metrics.set_gauge("raft_node_term", @node.current_term.to_i64) metrics.set_gauge("raft_node_commit_index", @node.commit_index.to_i64) - metrics.set_gauge("raft_node_last_log_index", @node.log.last_index.to_i64) - metrics.set_gauge("raft_node_first_log_index", @node.log.first_index.to_i64) - metrics.set_gauge("raft_node_segment_count", @node.log.segment_count.to_i64) + metrics.set_gauge("raft_node_last_log_index", @node.last_log_index.to_i64) + metrics.set_gauge("raft_node_first_log_index", @node.first_log_index.to_i64) + metrics.set_gauge("raft_node_segment_count", @node.segment_count.to_i64) metrics.set_gauge("raft_node_snapshot_index", @node.snapshot_index.to_i64) metrics.set_gauge("raft_node_snapshot_size_bytes", @node.snapshot_size_bytes) metrics.set_gauge("raft_node_peers", @node.peers.size.to_i64) diff --git a/src/raft/node.cr b/src/raft/node.cr index a51cf21..eb9775e 100644 --- a/src/raft/node.cr +++ b/src/raft/node.cr @@ -5,6 +5,8 @@ require "./log_entry" require "./log" require "./state_machine" require "./metrics" +require "./status_source" +require "./admin_ops" module Raft # Implements the Raft consensus protocol for a single peer in a single @@ -28,6 +30,9 @@ module Raft # do the actual call. The KV and queue examples demonstrate this # pattern in `start_group_loop`. class Node(T) + include StatusSource + include AdminOps + getter role : Role = Role::Follower getter current_term : UInt64 = 0_u64 getter id : NodeID @@ -44,6 +49,24 @@ module Raft File.exists?(path) ? File.size(path).to_i64 : 0_i64 end + # T-free log scalars for StatusSource — let status/metrics consumers + # read log shape without holding a reference to the mutable Log(T). + def last_log_index : UInt64 + @log.last_index + end + + def last_log_term : UInt64 + @log.last_term + end + + def first_log_index : UInt64 + @log.first_index + end + + def segment_count : Int32 + @log.segment_count + end + getter peers : Array(Peer) getter metrics : Metrics? {% if flag?(:raft_debug) %} diff --git a/src/raft/status_source.cr b/src/raft/status_source.cr new file mode 100644 index 0000000..fcd31ac --- /dev/null +++ b/src/raft/status_source.cr @@ -0,0 +1,35 @@ +require "./config" +require "./message" +require "./peer" +require "./metrics" + +module Raft + # The read-only surface a status/metrics endpoint needs from a node. + # T-free by construction — every method here is independent of the + # state-machine command type, which lets `HTTP::StatusHandler` be a + # concrete (non-generic) class. + # + # Included by `Raft::Node(T)`. The threading contract still applies: + # these read mutable node state without locks, so callers outside the + # node's owning fiber must arrange their own safety (e.g. run the HTTP + # server in the same execution context). + module StatusSource + abstract def id : NodeID + abstract def role : Role + abstract def current_term : UInt64 + abstract def leader_id : NodeID? + abstract def commit_index : UInt64 + abstract def peers : Array(Peer) + abstract def metrics : Metrics? + abstract def snapshot_index : UInt64 + abstract def snapshot_size_bytes : Int64 + abstract def last_log_index : UInt64 + abstract def last_log_term : UInt64 + abstract def first_log_index : UInt64 + abstract def segment_count : Int32 + + {% if flag?(:raft_debug) %} + abstract def paused : Bool + {% end %} + end +end