Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/kv/src/main.cr
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ end
transport.start

# HTTP server
raft_status_handler = Raft::HTTP::StatusHandler(KVCommand).new(meta_node, transport, raft_advertise_address)
raft_admin_handler = Raft::HTTP::AdminHandler(KVCommand).new(meta_node, transport)
raft_status_handler = Raft::HTTP::StatusHandler.new(meta_node, transport, raft_advertise_address)
raft_admin_handler = Raft::HTTP::AdminHandler.new(meta_node, transport)
kv_handler = KVHttpHandler.new(meta_node, meta_sm, nodes, value_machines)

server = ::HTTP::Server.new([kv_handler, raft_status_handler, raft_admin_handler]) do |context|
Expand Down
4 changes: 2 additions & 2 deletions examples/queue/src/main.cr
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ end

transport.start

raft_status_handler = Raft::HTTP::StatusHandler(QueueCommand).new(meta_node, transport, raft_advertise_address)
raft_admin_handler = Raft::HTTP::AdminHandler(QueueCommand).new(meta_node, transport)
raft_status_handler = Raft::HTTP::StatusHandler.new(meta_node, transport, raft_advertise_address)
raft_admin_handler = Raft::HTTP::AdminHandler.new(meta_node, transport)
queue_handler = QueueHttpHandler.new(meta_node, meta_sm, nodes, state_machines, transport)

server = ::HTTP::Server.new([queue_handler, raft_status_handler, raft_admin_handler]) do |context|
Expand Down
117 changes: 117 additions & 0 deletions spec/raft/http/admin_client_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
require "../../spec_helper"
require "http/server"
require "uri"

# Records the last request and replies with a canned status, so AdminClient
# specs can assert the exact wire format without a live Raft node.
private class RecordingHandler
include ::HTTP::Handler

property last_method : String? = nil
property last_path : String? = nil
property last_body : String? = nil
property last_content_type : String? = nil
property last_authorization : String? = nil
property reply_status : Int32 = 200

def call(context : ::HTTP::Server::Context)
@last_method = context.request.method
@last_path = context.request.path
@last_body = context.request.body.try(&.gets_to_end)
@last_content_type = context.request.headers["Content-Type"]?
@last_authorization = context.request.headers["Authorization"]?
context.response.status_code = @reply_status
end
end

private def with_recording_server(&)
handler = RecordingHandler.new
server = ::HTTP::Server.new([handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
begin
yield handler, address.port
ensure
server.close
end
end

describe Raft::HTTP::AdminClient do
it "posts the add_server wire format and returns the status" do
with_recording_server do |handler, port|
uri = URI.parse("http://127.0.0.1:#{port}")
status = Raft::HTTP::AdminClient.add_server(uri, 2_u64, "node-2:9000")

status.should eq ::HTTP::Status::OK
handler.last_method.should eq "POST"
handler.last_path.should eq "/raft/admin/add_server/2"
handler.last_content_type.should eq "application/json"
handler.last_body.should eq %({"address":"node-2:9000"})
end
end

it "returns non-200 statuses so callers can distinguish rejection from unavailability" do
with_recording_server do |handler, port|
uri = URI.parse("http://127.0.0.1:#{port}")

handler.reply_status = 400
Raft::HTTP::AdminClient.add_server(uri, 2_u64).should eq ::HTTP::Status::BAD_REQUEST

handler.reply_status = 503
Raft::HTTP::AdminClient.add_server(uri, 2_u64).should eq ::HTTP::Status::SERVICE_UNAVAILABLE
end
end

it "sends basic auth from URI userinfo" do
with_recording_server do |handler, port|
uri = URI.parse("http://admin:secret@127.0.0.1:#{port}")
Raft::HTTP::AdminClient.add_server(uri, 3_u64)

auth = handler.last_authorization
auth.should_not be_nil
auth.to_s.should start_with("Basic ")
end
end

it "respects a path prefix in the URI" do
with_recording_server do |handler, port|
uri = URI.parse("http://127.0.0.1:#{port}/api/v1")
Raft::HTTP::AdminClient.add_server(uri, 4_u64)

handler.last_path.should eq "/api/v1/raft/admin/add_server/4"
end
end

it "raises on connection errors" do
# Port 1 is essentially guaranteed closed.
uri = URI.parse("http://127.0.0.1:1")
expect_raises(Socket::ConnectError) do
Raft::HTTP::AdminClient.add_server(uri, 2_u64)
end
end

it "round-trips against the real AdminHandler" do
dir = File.tempname("raft_admin_client")
Dir.mkdir_p(dir)
config = Raft::Config.new
config.data_dir = dir
config.election_timeout_min_ticks = 100_u32
config.election_timeout_max_ticks = 100_u32
node = Raft::Node(TestData).new(id: 1_u64, peers: [] of Raft::NodeID, config: config, state_machine: TestStateMachine.new)
node.bootstrap.should be_true

admin_handler = Raft::HTTP::AdminHandler.new(node)
server = ::HTTP::Server.new([admin_handler] of ::HTTP::Handler)
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen

uri = URI.parse("http://127.0.0.1:#{address.port}")
status = Raft::HTTP::AdminClient.add_server(uri, 2_u64, "node-2:9000")
status.should eq ::HTTP::Status::OK
node.peers.any? { |p| p.id == 2_u64 }.should be_true

server.close
node.close
FileUtils.rm_rf(dir)
end
end
12 changes: 6 additions & 6 deletions spec/raft/http/admin_handler_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe Raft::HTTP::AdminHandler do
dir = File.tempname("raft_admin")
node = make_node(dir, peers: [] of Raft::NodeID)

handler = Raft::HTTP::AdminHandler(TestData).new(node)
handler = Raft::HTTP::AdminHandler.new(node)
server = ::HTTP::Server.new([handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
Expand All @@ -35,7 +35,7 @@ describe Raft::HTTP::AdminHandler do
dir = File.tempname("raft_admin")
node = make_node(dir)

handler = Raft::HTTP::AdminHandler(TestData).new(node)
handler = Raft::HTTP::AdminHandler.new(node)
server = ::HTTP::Server.new([handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
Expand All @@ -52,7 +52,7 @@ describe Raft::HTTP::AdminHandler do
dir = File.tempname("raft_admin")
node = make_node(dir)

handler = Raft::HTTP::AdminHandler(TestData).new(node)
handler = Raft::HTTP::AdminHandler.new(node)
server = ::HTTP::Server.new([handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
Expand All @@ -70,7 +70,7 @@ describe Raft::HTTP::AdminHandler do
dir = File.tempname("raft_admin")
node = make_node(dir)

handler = Raft::HTTP::AdminHandler(TestData).new(node)
handler = Raft::HTTP::AdminHandler.new(node)
server = ::HTTP::Server.new([handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
Expand Down Expand Up @@ -100,8 +100,8 @@ describe "Raft::HTTP::StatusHandler + AdminHandler chained" do
config.election_timeout_max_ticks = 100_u32
node = Raft::Node(TestData).new(id: 1_u64, peers: [] of Raft::NodeID, config: config, state_machine: TestStateMachine.new)

status_handler = Raft::HTTP::StatusHandler(TestData).new(node)
admin_handler = Raft::HTTP::AdminHandler(TestData).new(node)
status_handler = Raft::HTTP::StatusHandler.new(node)
admin_handler = Raft::HTTP::AdminHandler.new(node)
server = ::HTTP::Server.new([status_handler, admin_handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
Expand Down
6 changes: 3 additions & 3 deletions spec/raft/http/status_handler_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe Raft::HTTP::StatusHandler do
dir = File.tempname("raft_status")
node = make_node(dir)

handler = Raft::HTTP::StatusHandler(TestData).new(node)
handler = Raft::HTTP::StatusHandler.new(node)
server = ::HTTP::Server.new([handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
Expand All @@ -46,7 +46,7 @@ describe Raft::HTTP::StatusHandler do
dir = File.tempname("raft_status")
node = make_node_with_metrics(dir)

handler = Raft::HTTP::StatusHandler(TestData).new(node)
handler = Raft::HTTP::StatusHandler.new(node)
server = ::HTTP::Server.new([handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
Expand All @@ -70,7 +70,7 @@ describe Raft::HTTP::StatusHandler do
dir = File.tempname("raft_status")
node = make_node(dir)

handler = Raft::HTTP::StatusHandler(TestData).new(node)
handler = Raft::HTTP::StatusHandler.new(node)
server = ::HTTP::Server.new([handler])
address = server.bind_tcp("127.0.0.1", 0)
spawn server.listen
Expand Down
3 changes: 3 additions & 0 deletions src/raft.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ require "./raft/log/segment"
require "./raft/transport"
require "./raft/transport/memory_transport"
require "./raft/transport/tcp_transport"
require "./raft/status_source"
require "./raft/admin_ops"
require "./raft/node"
require "./raft/server"
require "./raft/http/status_handler"
require "./raft/http/admin_handler"
require "./raft/http/admin_client"
28 changes: 28 additions & 0 deletions src/raft/admin_ops.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
require "./config"

module Raft
# The mutating cluster-administration surface an admin endpoint needs
# from a node. T-free by construction — none of these depend on the
# state-machine command type, which lets `HTTP::AdminHandler` be a
# concrete (non-generic) class.
#
# Included by `Raft::Node(T)`. The threading contract still applies:
# these mutate node state without locks, so callers outside the node's
# owning fiber must arrange their own safety (e.g. run the HTTP server
# in the same execution context, or marshal calls onto the node's fiber).
module AdminOps
abstract def bootstrap : Bool
abstract def add_server(node_id : NodeID, address : String = "") : Bool
abstract def remove_server(node_id : NodeID) : Bool
abstract def promote_learner(node_id : NodeID) : Bool
abstract def transfer_leadership(to target : NodeID) : Bool

{% if flag?(:raft_debug) %}
abstract def pause
abstract def resume
abstract def partition
abstract def heal
abstract def reset
{% end %}
end
end
36 changes: 36 additions & 0 deletions src/raft/http/admin_client.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require "http/client"
require "json"
require "uri"
require "../config"

module Raft
module HTTP
# Client side of the `/raft/admin/*` wire format. Keeps request shapes in
# one place so embedders don't hand-roll them.
#
# Single attempt per call, no retry loop — embedders own retry policy.
# Returns the response `::HTTP::Status` so callers can distinguish
# rejection (400, e.g. already a member — stop retrying) from
# unavailability (5xx — retry later). Raises only on connection errors.
module AdminClient
# POST `<uri>/raft/admin/add_server/<node_id>` with a JSON body
# `{"address": address}`. Basic auth is taken from the URI's userinfo
# when present.
def self.add_server(uri : URI, node_id : NodeID, address : String = "") : ::HTTP::Status
client = ::HTTP::Client.new(uri)
begin
if user = uri.user
client.basic_auth(user, uri.password || "")
end
path = uri.path.rstrip('/') + "/raft/admin/add_server/#{node_id}"
headers = ::HTTP::Headers{"Content-Type" => "application/json"}
body = {address: address}.to_json
response = client.post(path, headers: headers, body: body)
response.status
ensure
client.close
end
end
end
end
end
31 changes: 17 additions & 14 deletions src/raft/http/admin_handler.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require "http/server/handler"
require "json"
require "../node"
require "../admin_ops"
require "../transport/tcp_transport"

module Raft
Expand All @@ -22,13 +22,16 @@ module Raft
# Anything else falls through to the next handler in the chain via
# `call_next`. Pair with `StatusHandler` to expose the read-only surface
# on a separate endpoint.
class AdminHandler(T)
#
# Concrete (non-generic) — takes any `AdminOps`, which `Node(T)`
# includes.
class AdminHandler
include ::HTTP::Handler

@node : Node(T)
@ops : AdminOps
@transport : TCPTransport?

def initialize(@node : Node(T), @transport : TCPTransport? = nil)
def initialize(@ops : AdminOps, @transport : TCPTransport? = nil)
end

def call(context : ::HTTP::Server::Context)
Expand All @@ -51,7 +54,7 @@ module Raft
private def handle_admin_inner(context, path)
case path
when "/raft/admin/bootstrap"
if @node.bootstrap
if @ops.bootstrap
json_response(context, 200, {"status" => "bootstrapped"})
else
json_response(context, 400, {"error" => "failed to bootstrap (node may already have peers)"})
Expand Down Expand Up @@ -82,28 +85,28 @@ module Raft
address = data["address"]?.try(&.as_s) || ""
end
end
if @node.add_server(node_id, address)
if @ops.add_server(node_id, address)
json_response(context, 200, {"status" => "added", "node_id" => node_id.to_s})
else
json_response(context, 400, {"error" => "failed to add server"})
end
elsif path.starts_with?("/raft/admin/remove_server/")
node_id = path.split("/").last.to_u64
if @node.remove_server(node_id)
if @ops.remove_server(node_id)
json_response(context, 200, {"status" => "removed", "node_id" => node_id.to_s})
else
json_response(context, 400, {"error" => "failed to remove server"})
end
elsif path.starts_with?("/raft/admin/promote_learner/")
node_id = path.split("/").last.to_u64
if @node.promote_learner(node_id)
if @ops.promote_learner(node_id)
json_response(context, 200, {"status" => "promoted", "node_id" => node_id.to_s})
else
json_response(context, 400, {"error" => "failed to promote learner"})
end
elsif path.starts_with?("/raft/admin/transfer_leadership/")
node_id = path.split("/").last.to_u64
if @node.transfer_leadership(to: node_id)
if @ops.transfer_leadership(to: node_id)
json_response(context, 200, {"status" => "transferring", "target" => node_id.to_s})
else
json_response(context, 400, {"error" => "failed to transfer leadership"})
Expand All @@ -123,19 +126,19 @@ module Raft
private def handle_debug_admin(context, path)
case path
when "/raft/admin/pause"
@node.pause
@ops.pause
json_response(context, 200, {"status" => "paused"})
when "/raft/admin/resume"
@node.resume
@ops.resume
json_response(context, 200, {"status" => "resumed"})
when "/raft/admin/partition"
@node.partition
@ops.partition
json_response(context, 200, {"status" => "partitioned"})
when "/raft/admin/heal"
@node.heal
@ops.heal
json_response(context, 200, {"status" => "healed"})
when "/raft/admin/reset"
@node.reset
@ops.reset
json_response(context, 200, {"status" => "reset"})
else
context.response.status_code = 404
Expand Down
Loading