Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9f59208
chore: bump workspace version to 0.0.4
farhan-syah Apr 16, 2026
ce3dc9d
feat(swim): add MembershipSubscriber hook for state transition events
farhan-syah Apr 16, 2026
c1ab17d
feat(cluster): invalidate routing leader hints on SWIM liveness events
farhan-syah Apr 16, 2026
7056676
feat(cluster): add graceful node decommission
farhan-syah Apr 16, 2026
794a7ca
feat(cluster): add reachability-driven circuit-breaker recovery
farhan-syah Apr 16, 2026
256ebd3
docs(cluster): remove stale phase-label references from swim comments
farhan-syah Apr 16, 2026
e9472dc
feat(cluster): add load-based rebalancer with metrics, planning, and …
farhan-syah Apr 16, 2026
3d7f26a
fix(cluster): correct migration learner promotion and replicated Phas…
farhan-syah Apr 16, 2026
cf92257
feat(cluster): trigger rebalancer sweep on SWIM membership changes
farhan-syah Apr 16, 2026
239f657
feat(cluster): add SHOW RANGES, SHOW ROUTING, and SHOW SCHEMA VERSION…
farhan-syah Apr 16, 2026
ff6c817
feat(http): add /health/live and /health/drain endpoints
farhan-syah Apr 16, 2026
b8f6618
fix(pgwire): wire pg_catalog dispatch into query handler and fix reba…
farhan-syah Apr 16, 2026
a2e87e5
feat(rebalancer): add CPU backpressure gate to pause sweeps under load
farhan-syah Apr 16, 2026
a01f73c
feat(pgwire): add nodedb.read_consistency session parameter
farhan-syah Apr 16, 2026
9af5664
feat(pgwire): implement pg_catalog virtual table handler
farhan-syah Apr 16, 2026
7097421
feat(cluster): add closed timestamp tracker and follower read gate
farhan-syah Apr 16, 2026
6da002f
fix(pgwire): guard against out-of-bounds access in DDL command parsers
farhan-syah Apr 16, 2026
87926fc
feat(nodedb-sql): add typed DDL AST module
farhan-syah Apr 16, 2026
72eaecb
feat(nodedb-cluster): add MetadataEntry::Batch for atomic DDL replica…
farhan-syah Apr 16, 2026
a451bc5
feat(pgwire): implement transactional DDL with AST-based dispatch
farhan-syah Apr 16, 2026
89071f5
test(nodedb): add DDL replication correctness tests
farhan-syah Apr 16, 2026
45c33d1
fix(raft): deduplicate votes by peer ID and make election timeout con…
farhan-syah Apr 16, 2026
dbe01c9
feat(cluster): start health monitor and harden topology convergence
farhan-syah Apr 16, 2026
5f6abeb
test(cluster): replace fixed sleeps with readiness polling and fix SW…
farhan-syah Apr 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ members = [
resolver = "2"

[workspace.package]
version = "0.0.3"
version = "0.0.4"
edition = "2024"
rust-version = "1.94"
license = "BUSL-1.1"
Expand Down
6 changes: 5 additions & 1 deletion nodedb-cluster/src/bootstrap/bootstrap_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub(super) fn bootstrap(config: &ClusterConfig, catalog: &ClusterCatalog) -> Res
);

// Create MultiRaft with all groups (single-node, no peers).
let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone());
let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone())
.with_election_timeout(config.election_timeout_min, config.election_timeout_max);
for group_id in routing.group_ids() {
multi_raft.add_group(group_id, vec![])?;
}
Expand Down Expand Up @@ -81,6 +82,7 @@ fn generate_cluster_id() -> u64 {
mod tests {
use super::*;
use crate::catalog::ClusterCatalog;
use std::time::Duration;

fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
let dir = tempfile::tempdir().unwrap();
Expand All @@ -102,6 +104,8 @@ mod tests {
force_bootstrap: false,
join_retry: Default::default(),
swim_udp_addr: None,
election_timeout_min: Duration::from_millis(150),
election_timeout_max: Duration::from_millis(300),
};

let state = bootstrap(&config, &catalog).unwrap();
Expand Down
4 changes: 4 additions & 0 deletions nodedb-cluster/src/bootstrap/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ pub struct ClusterConfig {
/// [`crate::spawn_swim`] after the cluster is up and feed the
/// seed list from `seed_nodes`.
pub swim_udp_addr: Option<SocketAddr>,
/// Raft election timeout range. Controls how long a follower waits
/// before starting an election after losing contact with the leader.
pub election_timeout_min: Duration,
pub election_timeout_max: Duration,
}

/// Result of cluster startup — everything needed to run the Raft loop.
Expand Down
7 changes: 6 additions & 1 deletion nodedb-cluster/src/bootstrap/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ fn apply_join_response(
// learners). A learner-started group boots in the `Learner`
// role and will not run an election until a subsequent
// `PromoteLearner` conf change is applied.
let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone());
let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone())
.with_election_timeout(config.election_timeout_min, config.election_timeout_max);
for g in &resp.groups {
let is_voter = g.members.contains(&config.node_id);
let is_learner = g.learners.contains(&config.node_id);
Expand Down Expand Up @@ -450,6 +451,8 @@ mod tests {
force_bootstrap: false,
join_retry: Default::default(),
swim_udp_addr: None,
election_timeout_min: Duration::from_millis(150),
election_timeout_max: Duration::from_millis(300),
};
let state1 = bootstrap(&config1, &catalog1).unwrap();

Expand Down Expand Up @@ -499,6 +502,8 @@ mod tests {
force_bootstrap: false,
join_retry: Default::default(),
swim_udp_addr: None,
election_timeout_min: Duration::from_millis(150),
election_timeout_max: Duration::from_millis(300),
};

let lifecycle = ClusterLifecycleTracker::new();
Expand Down
2 changes: 2 additions & 0 deletions nodedb-cluster/src/bootstrap/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ mod tests {
force_bootstrap: false,
join_retry: Default::default(),
swim_udp_addr: None,
election_timeout_min: Duration::from_millis(150),
election_timeout_max: Duration::from_millis(300),
}
}

Expand Down
6 changes: 5 additions & 1 deletion nodedb-cluster/src/bootstrap/restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub(super) fn restart(
// as a learner on restart; dropping the group entirely would
// leave the node permanently without any copy of it and
// silently broken.
let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone());
let mut multi_raft = MultiRaft::new(config.node_id, routing.clone(), config.data_dir.clone())
.with_election_timeout(config.election_timeout_min, config.election_timeout_max);
for (group_id, info) in routing.group_members() {
let is_voter = info.members.contains(&config.node_id);
let is_learner = info.learners.contains(&config.node_id);
Expand Down Expand Up @@ -91,6 +92,7 @@ mod tests {
use super::super::bootstrap_fn::bootstrap;
use super::*;
use crate::catalog::ClusterCatalog;
use std::time::Duration;

fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
let dir = tempfile::tempdir().unwrap();
Expand All @@ -112,6 +114,8 @@ mod tests {
force_bootstrap: false,
join_retry: Default::default(),
swim_udp_addr: None,
election_timeout_min: Duration::from_millis(150),
election_timeout_max: Duration::from_millis(300),
};

// Bootstrap first.
Expand Down
20 changes: 20 additions & 0 deletions nodedb-cluster/src/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,26 @@ impl CircuitBreaker {
.unwrap_or(CircuitState::Closed)
}

/// Return the ids of every peer whose breaker is currently Open.
///
/// Used by the reachability driver to find peers that need an
/// active probe — without a periodic poke these peers never
/// transition back to HalfOpen (no traffic → no `check()` call
/// → no cooldown re-evaluation).
pub fn open_peers(&self) -> Vec<u64> {
let peers = self.peers.read().unwrap_or_else(|p| p.into_inner());
peers
.iter()
.filter_map(|(id, b)| {
if b.state == CircuitState::Open {
Some(*id)
} else {
None
}
})
.collect()
}

/// Get consecutive failure count for a peer.
pub fn failure_count(&self, peer: u64) -> u32 {
let peers = self.peers.read().unwrap_or_else(|p| p.into_inner());
Expand Down
Loading
Loading