diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 00000000..7f6a0f39 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,79 @@ +# nextest configuration. Run with: cargo nextest run --all-features +# +# Why nextest over `cargo test`: +# - Each test runs in its own process → no in-process state contention. +# Integration tests that spawn 3-node clusters used to hang under +# `cargo test`'s default within-binary parallelism because multiple +# clusters in the same process exhausted ports / file descriptors. +# - Per-test timeouts make hangs fail fast instead of stalling CI. +# - Better failure output, retry support, and JUnit XML for CI. + +[profile.default] +# Hard ceiling per test. Anything above this is a bug, not a slow test. +slow-timeout = { period = "30s", terminate-after = 4 } + +# Use every available core for cheap unit tests. Heavy cluster tests +# are kept from starving by `threads-required` overrides below — they +# claim ALL slots so nothing else runs alongside them, regardless of +# whether you're on a 24-core dev box or a 2-core CI runner. +test-threads = "num-cpus" + +# Heavy cluster tests: each one brings up 3 servers + per-node Tokio +# runtimes. Two things keep them stable across machine sizes: +# +# 1. `test-group = "cluster"` with `max-threads = 1` ensures at +# most ONE cluster test runs at a time (no two clusters share +# ports / file descriptors / thread pools). +# 2. `threads-required = "num-test-threads"` makes the running +# cluster test claim every available test slot, which evicts +# every other test from the run-queue while it's executing. +# That's what prevents a 24-core dev box from scheduling 23 +# unit tests alongside the cluster and starving its Raft +# heartbeats. +# +# The combined effect: cluster tests run strictly serially AND +# strictly alone, and the rest of the suite gets full parallelism +# the moment the cluster test finishes. +[[profile.default.overrides]] +filter = ''' +binary(/cluster/) +| binary(/cross_node/) +| binary(/_lease_/) +| binary(descriptor_lease_drain) +| binary(descriptor_lease_forwarding_and_renewal) +| binary(descriptor_lease_planner_integration) +| binary(descriptor_versioning_cross_node) +| binary(prepared_cache_invalidation) +| binary(sql_cluster_cross_node_dml) +''' +test-group = 'cluster' +threads-required = 'num-test-threads' +# Cluster tests bring up real Raft nodes and racy multi-node +# convergence checks. They're flaky enough that one retry catches +# legitimate startup jitter without hiding real regressions — a +# genuinely broken test fails twice in a row. +retries = { backoff = "fixed", count = 2, delay = "1s" } + +[test-groups] +cluster = { max-threads = 1 } + +[profile.ci] +# CI inherits the default profile (cluster group, threads-required, +# slow-timeout) and adds: +# - more retries: CI runners are ~2× slower per-core than dev +# workstations, so the cluster tests' in-test `wait_for` +# budgets are proportionally tighter. Three retries (four total +# attempts) buys headroom for jitter without papering over real +# regressions — a genuinely broken test fails four times in a row. +# - JUnit XML: picked up by the workflow's artifact upload. +# +# NOTE: we deliberately do NOT bump `slow-timeout` here. The +# slow-timeout only controls when nextest gives up on a stuck +# *process*; it does NOT extend the test's internal `wait_for` +# budgets. Once a `wait_for` panics, the test has already failed — +# making nextest wait longer just wastes CI minutes on cleanup. +retries = { backoff = "fixed", count = 3, delay = "2s" } +fail-fast = false + +[profile.ci.junit] +path = "junit.xml" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d0345d2e..c3ac51ea 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -55,5 +55,21 @@ jobs: sudo apt-get install -y --no-install-recommends \ cmake clang libclang-dev pkg-config protobuf-compiler perl \ libcurl4-openssl-dev libsasl2-dev + # nextest is required — `.config/nextest.toml` defines the + # `cluster` test-group that serializes 3-node integration tests + # and the `ci` profile that retries flaky cluster tests once and + # writes a JUnit report. Plain `cargo test` ignores all of that + # and will hang/fail on the cluster suite. + - name: Install cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: nextest - name: Run tests - run: cargo test --all-features --profile ci + run: cargo nextest run --all-features --cargo-profile ci --profile ci + - name: Upload JUnit report + if: always() + uses: actions/upload-artifact@v4 + with: + name: junit-report + path: target/nextest/ci/junit.xml + if-no-files-found: ignore diff --git a/Dockerfile b/Dockerfile index 62ef70c6..ee517fb4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,9 +36,11 @@ FROM debian:bookworm-slim AS runtime # ca-certificates: needed for JWKS fetch, OTLP export, S3 archival # curl: needed for HEALTHCHECK +# gosu: drop privileges from root after fixing data-dir ownership in entrypoint RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ curl \ + gosu \ && rm -rf /var/lib/apt/lists/* # Non-root user @@ -51,12 +53,18 @@ RUN mkdir -p /var/lib/nodedb /etc/nodedb \ COPY --from=builder /build/target/release/nodedb /usr/local/bin/nodedb +# Entrypoint: when started as root, fix data-dir ownership and drop to the +# nodedb user. When already started as a non-root user (e.g. `--user 10001`), +# exec directly. This makes `-v :/var/lib/nodedb` work even +# when Docker initialises the volume as root-owned (common on Linux hosts). +COPY docker-entrypoint.sh /usr/local/bin/docker-entrypoint.sh +RUN chmod +x /usr/local/bin/docker-entrypoint.sh + # Bind to all interfaces (required for Docker port mapping) # Point data dir at the declared volume ENV NODEDB_HOST=0.0.0.0 \ NODEDB_DATA_DIR=/var/lib/nodedb -USER nodedb WORKDIR /var/lib/nodedb # pgwire | native protocol | HTTP API | WebSocket sync | OTLP gRPC | OTLP HTTP @@ -67,4 +75,5 @@ VOLUME ["/var/lib/nodedb"] HEALTHCHECK --interval=10s --timeout=3s --start-period=5s \ CMD curl -f http://localhost:6480/health || exit 1 -ENTRYPOINT ["/usr/local/bin/nodedb"] +ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] +CMD ["/usr/local/bin/nodedb"] diff --git a/README.md b/README.md index 16a9b7bd..c2b276cc 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,8 @@ For development or contributing: git clone https://github.com/NodeDB-Lab/nodedb.git cd nodedb cargo build --release -cargo test --all-features +cargo install cargo-nextest --locked # one-time +cargo nextest run --all-features ``` ## Status diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh new file mode 100755 index 00000000..20d67508 --- /dev/null +++ b/docker-entrypoint.sh @@ -0,0 +1,47 @@ +#!/bin/sh +# NodeDB container entrypoint. +# +# When invoked as root (the default for `docker run` with no --user), fix +# ownership of NODEDB_DATA_DIR and drop privileges to the unprivileged +# `nodedb` user before exec'ing the server. When invoked as any other UID +# (e.g. `--user 10001` or via Kubernetes runAsUser), exec directly and +# leave the data directory alone. +# +# This makes `-v :/var/lib/nodedb` work even when Docker +# initialises the named volume as root-owned (common on Linux hosts where +# the volume is created out-of-band before the container's first run). + +set -e + +DATA_DIR="${NODEDB_DATA_DIR:-/var/lib/nodedb}" + +if [ "$(id -u)" = "0" ]; then + # Running as root: ensure the data dir exists and is owned by nodedb, + # then drop privileges. mkdir is a no-op for the declared VOLUME but + # protects against custom NODEDB_DATA_DIR overrides. + mkdir -p "$DATA_DIR" + chown -R nodedb:nodedb "$DATA_DIR" + exec gosu nodedb "$@" +fi + +# Already non-root: ensure we can actually write to the data dir, otherwise +# fail fast with a clear message instead of the cryptic WAL "Permission +# denied (os error 13)" the user sees on a misconfigured volume mount. +if [ ! -w "$DATA_DIR" ]; then + cat >&2 < For a specific version or to browse changelogs, see the release page: . The SQL surface is still pre-1.0 and changes between tags, so pin a version in production. + +--- + ## Build from Source ```bash @@ -70,8 +160,9 @@ cd nodedb # Release build (all crates) cargo build --release -# Run tests -cargo test --all-features +# Run tests (use nextest — see .config/nextest.toml) +cargo install cargo-nextest --locked # one-time +cargo nextest run --all-features ``` Requires Rust 1.94+ and Linux (the Data Plane uses io_uring). The build produces two binaries: @@ -84,8 +175,22 @@ Requires Rust 1.94+ and Linux (the Data Plane uses io_uring). The build produces ```bash # Single-node, default ports ./target/release/nodedb + +# Or with a config file +./target/release/nodedb --config nodedb.toml ``` +--- + +## Configuration + +This section applies to **every** install method — Docker, prebuilt binary, and source builds all read the same TOML schema and respond to the same environment variables. Pick whichever is convenient: + +- **TOML file** — pass `--config /path/to/nodedb.toml` on the command line. Best for production / systemd / pre-baked images. +- **Environment variables** — prefix `NODEDB_*`. Best for Docker (`-e`), Compose (`environment:`), and Kubernetes. Env vars **override** values from the TOML file when both are set. + +### Default ports + By default, NodeDB listens on: - **6432** — PostgreSQL wire protocol (pgwire) @@ -98,11 +203,11 @@ Two additional protocols are available but **disabled by default**: - **RESP** (Redis-compatible KV protocol) — `GET`/`SET`/`DEL`/`EXPIRE`/`SCAN`/`SUBSCRIBE` - **ILP** (InfluxDB Line Protocol) — high-throughput timeseries ingest -Enable them by setting a listen address in config or via env var (see below). +Enable them by setting a listen address in the config or via env var (see below). -### Configuration +### Example config file -All protocols share one bind address (`host`). Only the port differs per protocol. Env vars take precedence over the TOML file. +All protocols share one bind address (`host`). Only the port differs per protocol. ```toml # nodedb.toml @@ -133,19 +238,19 @@ ilp = false # Example: disable TLS for ILP ingest **Server settings:** -| Config field | Environment variable | Default | -| ------------------ | ------------------------- | ---------------- | -| `host` | `NODEDB_HOST` | `127.0.0.1` | -| `ports.native` | `NODEDB_PORT_NATIVE` | `6433` | -| `ports.pgwire` | `NODEDB_PORT_PGWIRE` | `6432` | -| `ports.http` | `NODEDB_PORT_HTTP` | `6480` | -| `ports.resp` | `NODEDB_PORT_RESP` | disabled | -| `ports.ilp` | `NODEDB_PORT_ILP` | disabled | -| `data_dir` | `NODEDB_DATA_DIR` | `~/.nodedb/data` | -| `memory_limit` | `NODEDB_MEMORY_LIMIT` | `1GiB` | -| `data_plane_cores` | `NODEDB_DATA_PLANE_CORES` | CPUs - 1 | -| `max_connections` | `NODEDB_MAX_CONNECTIONS` | `1024` | -| `log_format` | `NODEDB_LOG_FORMAT` | `text` | +| Config field | Environment variable | Default | +| ------------------ | ------------------------- | ----------------------------------------------------- | +| `host` | `NODEDB_HOST` | `127.0.0.1` | +| `ports.native` | `NODEDB_PORT_NATIVE` | `6433` | +| `ports.pgwire` | `NODEDB_PORT_PGWIRE` | `6432` | +| `ports.http` | `NODEDB_PORT_HTTP` | `6480` | +| `ports.resp` | `NODEDB_PORT_RESP` | disabled | +| `ports.ilp` | `NODEDB_PORT_ILP` | disabled | +| `data_dir` | `NODEDB_DATA_DIR` | `~/.nodedb/data` (binary), `/var/lib/nodedb` (Docker) | +| `memory_limit` | `NODEDB_MEMORY_LIMIT` | `1GiB` | +| `data_plane_cores` | `NODEDB_DATA_PLANE_CORES` | CPUs - 1 | +| `max_connections` | `NODEDB_MAX_CONNECTIONS` | `1024` | +| `log_format` | `NODEDB_LOG_FORMAT` | `text` | **Per-protocol TLS** (only applies when `[server.tls]` is configured): diff --git a/nodedb-cluster/src/bootstrap/bootstrap_fn.rs b/nodedb-cluster/src/bootstrap/bootstrap_fn.rs index 2a4c0260..2db01945 100644 --- a/nodedb-cluster/src/bootstrap/bootstrap_fn.rs +++ b/nodedb-cluster/src/bootstrap/bootstrap_fn.rs @@ -100,6 +100,7 @@ mod tests { replication_factor: 1, data_dir: _dir.path().to_path_buf(), force_bootstrap: false, + join_retry: Default::default(), }; let state = bootstrap(&config, &catalog).unwrap(); diff --git a/nodedb-cluster/src/bootstrap/config.rs b/nodedb-cluster/src/bootstrap/config.rs index ee5da894..eb2a8611 100644 --- a/nodedb-cluster/src/bootstrap/config.rs +++ b/nodedb-cluster/src/bootstrap/config.rs @@ -1,11 +1,61 @@ //! Cluster configuration and post-start state. use std::net::SocketAddr; +use std::time::Duration; use crate::multi_raft::MultiRaft; use crate::routing::RoutingTable; use crate::topology::ClusterTopology; +/// Tunable retry policy for the join loop. +/// +/// The schedule is computed by halving from the configured ceiling: +/// for `max_attempts = 8` and `max_backoff_secs = 32`, the per-attempt +/// delays are `0.25 s, 0.5 s, 1 s, 2 s, 4 s, 8 s, 16 s, 32 s` — i.e. +/// each delay is `max_backoff_secs >> (max_attempts - attempt)`. This +/// keeps the formula obvious from a single number while preserving +/// exponential growth. +/// +/// Defaults match the production schedule. Tests construct their own +/// policy with a much smaller `max_backoff_secs` so the integration +/// suite doesn't pay a ~minute backoff on every join failure path. +#[derive(Debug, Clone, Copy)] +pub struct JoinRetryPolicy { + /// Number of join attempts before the loop gives up. + pub max_attempts: u32, + /// Cap on the per-attempt backoff delay, in seconds. The schedule + /// is derived from this ceiling — see the struct doc comment. + pub max_backoff_secs: u64, +} + +impl Default for JoinRetryPolicy { + fn default() -> Self { + Self { + max_attempts: 8, + max_backoff_secs: 32, + } + } +} + +impl JoinRetryPolicy { + /// Backoff delay before `attempt` (1-indexed). Attempt 0 is the + /// initial try and never sleeps. Returns `Duration::ZERO` for + /// out-of-range attempts. + pub fn backoff_for(&self, attempt: u32) -> Duration { + if attempt == 0 || attempt > self.max_attempts { + return Duration::ZERO; + } + // Schedule grows exponentially toward `max_backoff_secs`. We + // compute in millis so small `max_backoff_secs` values (test + // configs) still produce non-zero delays for the early + // attempts instead of being floored to zero seconds. + let exp = self.max_attempts - attempt; + let max_ms = self.max_backoff_secs.saturating_mul(1_000); + let ms = max_ms >> exp; + Duration::from_millis(ms.max(1)) + } +} + /// Configuration for cluster formation. #[derive(Debug, Clone)] pub struct ClusterConfig { @@ -30,6 +80,10 @@ pub struct ClusterConfig { /// to be present in `seed_nodes` (enforced at the caller's config /// validation layer). pub force_bootstrap: bool, + /// Retry policy for the join loop. Defaults to production values + /// (`8` attempts, `32 s` ceiling). Tests override this with a + /// faster policy. + pub join_retry: JoinRetryPolicy, } /// Result of cluster startup — everything needed to run the Raft loop. diff --git a/nodedb-cluster/src/bootstrap/join.rs b/nodedb-cluster/src/bootstrap/join.rs index 1c35b682..cad1dfad 100644 --- a/nodedb-cluster/src/bootstrap/join.rs +++ b/nodedb-cluster/src/bootstrap/join.rs @@ -20,7 +20,6 @@ use std::collections::HashSet; use std::net::SocketAddr; -use std::time::Duration; use tracing::{debug, info, warn}; @@ -35,39 +34,11 @@ use crate::transport::NexarTransport; use super::config::{ClusterConfig, ClusterState}; -/// Maximum number of outer retry attempts before `join()` gives up and -/// returns the last concrete error to its caller. With the backoff -/// schedule below this gives a total window of roughly 32 seconds. -const MAX_JOIN_ATTEMPTS: u32 = 8; - /// Maximum number of leader-redirect hops inside a single join /// attempt. The redirect chain starts at whichever seed we first /// contact; each hop costs a round-trip, so keep this small. const MAX_REDIRECTS_PER_ATTEMPT: u32 = 3; -/// Exponential-backoff delay between join attempts, capped at 16 s. -/// -/// Attempt 0 is immediate. Subsequent attempts sleep 500 ms, 1 s, 2 s, -/// 4 s, 8 s, then 16 s for every further attempt. Total window for -/// the default `MAX_JOIN_ATTEMPTS = 8` is roughly 32 s. -/// -/// Pure so it can be unit-tested in isolation — no time-source -/// dependency. -pub(crate) fn next_backoff(attempt: u32) -> Duration { - if attempt == 0 { - return Duration::ZERO; - } - let secs_millis: u64 = match attempt { - 1 => 500, - 2 => 1_000, - 3 => 2_000, - 4 => 4_000, - 5 => 8_000, - _ => 16_000, - }; - Duration::from_millis(secs_millis) -} - /// Parse a `JoinResponse::error` string as a leader redirect hint. /// /// The prefix is defined as a shared constant in `rpc_codec` @@ -90,8 +61,8 @@ pub(crate) fn parse_leader_hint(error: &str) -> Option { /// /// The loop has two layers: /// -/// - **Outer**: up to `MAX_JOIN_ATTEMPTS` retry passes with -/// exponential backoff. Handles the "bootstrapper not up yet" +/// - **Outer**: retry passes with exponential backoff per +/// `config.join_retry`. Handles the "bootstrapper not up yet" /// startup race. /// - **Inner**: walk the seed list plus any leader-redirect hops for /// this attempt. A successful `JoinResponse` short-circuits the @@ -123,12 +94,13 @@ pub(super) async fn join( wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION, }; + let policy = config.join_retry; let mut last_err: Option = None; - for attempt in 0..MAX_JOIN_ATTEMPTS { + for attempt in 0..policy.max_attempts { lifecycle.to_joining(attempt); - let delay = next_backoff(attempt); + let delay = policy.backoff_for(attempt); if !delay.is_zero() { debug!( node_id = config.node_id, @@ -153,8 +125,9 @@ pub(super) async fn join( } } + let max_attempts = policy.max_attempts; let err = last_err.unwrap_or_else(|| ClusterError::Transport { - detail: format!("join exhausted {MAX_JOIN_ATTEMPTS} attempts with no concrete error"), + detail: format!("join exhausted {max_attempts} attempts with no concrete error"), }); lifecycle.to_failed(err.to_string()); Err(err) @@ -170,15 +143,30 @@ async fn try_join_once( transport: &NexarTransport, req_template: &JoinRequest, ) -> Result { - // Work list: start with the configured seeds, prepend leader hints - // as they arrive. `HashSet` deduplicates so a redirect loop can't - // consume all attempts against the same address. - let mut work: Vec = config.seed_nodes.clone(); + // Work list: try seeds in sorted order so the lexicographically + // smallest address — the designated bootstrapper under the + // single-elected-bootstrapper rule — is contacted first. This is + // critical during the initial 5-node race: every other seed points + // at a node that is itself still joining, so asking them first + // eats the full RPC timeout per non-bootstrapper before we reach + // the one peer that can actually answer. `HashSet` deduplicates + // so a redirect loop can't consume all attempts against the same + // address. + let mut work: std::collections::VecDeque = + config.seed_nodes.iter().copied().collect(); + { + // Sort so the designated bootstrapper surfaces first. Leader + // redirects get prepended with push_front below, keeping the + // "most likely to answer" candidate at the head. + let mut sorted: Vec = work.drain(..).collect(); + sorted.sort(); + work.extend(sorted); + } let mut visited: HashSet = HashSet::new(); let mut redirects: u32 = 0; let mut last_err: Option = None; - while let Some(addr) = work.pop() { + while let Some(addr) = work.pop_front() { if !visited.insert(addr) { continue; } @@ -199,7 +187,7 @@ async fn try_join_once( "following leader redirect" ); redirects += 1; - work.push(leader); + work.push_front(leader); continue; } debug!( @@ -351,6 +339,7 @@ fn apply_join_response( #[cfg(test)] mod tests { use super::super::bootstrap_fn::bootstrap; + use super::super::config::JoinRetryPolicy; use super::super::handle_join::handle_join_request; use super::*; use std::sync::{Arc, Mutex}; @@ -398,16 +387,43 @@ mod tests { } #[test] - fn next_backoff_schedule() { - assert_eq!(next_backoff(0), Duration::ZERO); - assert_eq!(next_backoff(1), Duration::from_millis(500)); - assert_eq!(next_backoff(2), Duration::from_secs(1)); - assert_eq!(next_backoff(3), Duration::from_secs(2)); - assert_eq!(next_backoff(4), Duration::from_secs(4)); - assert_eq!(next_backoff(5), Duration::from_secs(8)); - assert_eq!(next_backoff(6), Duration::from_secs(16)); - assert_eq!(next_backoff(7), Duration::from_secs(16)); - assert_eq!(next_backoff(100), Duration::from_secs(16)); + fn join_retry_policy_default_schedule() { + // Production default: 8 attempts, ceiling 32 s. Each delay is + // `32 s >> (8 - attempt)`, so the schedule halves down from + // the ceiling toward the first attempt. + let policy = JoinRetryPolicy::default(); + assert_eq!(policy.backoff_for(0), Duration::ZERO); + assert_eq!(policy.backoff_for(1), Duration::from_millis(250)); + assert_eq!(policy.backoff_for(2), Duration::from_millis(500)); + assert_eq!(policy.backoff_for(3), Duration::from_secs(1)); + assert_eq!(policy.backoff_for(4), Duration::from_secs(2)); + assert_eq!(policy.backoff_for(5), Duration::from_secs(4)); + assert_eq!(policy.backoff_for(6), Duration::from_secs(8)); + assert_eq!(policy.backoff_for(7), Duration::from_secs(16)); + assert_eq!(policy.backoff_for(8), Duration::from_secs(32)); + // Out-of-range attempt → no backoff. + assert_eq!(policy.backoff_for(9), Duration::ZERO); + } + + #[test] + fn join_retry_policy_test_schedule_is_subsecond() { + // A typical test config: still 8 attempts, but a 2 s ceiling + // produces a sub-5-second total backoff window. + let policy = JoinRetryPolicy { + max_attempts: 8, + max_backoff_secs: 2, + }; + // First few attempts are floored to 1 ms (they round down + // below a millisecond in raw shifts). + let total: Duration = (0..=policy.max_attempts) + .map(|a| policy.backoff_for(a)) + .sum(); + assert!( + total < Duration::from_secs(5), + "test schedule too slow: {total:?}" + ); + // Final attempt sleeps the full ceiling. + assert_eq!(policy.backoff_for(8), Duration::from_secs(2)); } // ── End-to-end bootstrap + join flow over QUIC ──────────────── @@ -432,6 +448,7 @@ mod tests { replication_factor: 1, data_dir: _dir1.path().to_path_buf(), force_bootstrap: false, + join_retry: Default::default(), }; let state1 = bootstrap(&config1, &catalog1).unwrap(); @@ -479,6 +496,7 @@ mod tests { replication_factor: 1, data_dir: _dir2.path().to_path_buf(), force_bootstrap: false, + join_retry: Default::default(), }; let lifecycle = ClusterLifecycleTracker::new(); diff --git a/nodedb-cluster/src/bootstrap/mod.rs b/nodedb-cluster/src/bootstrap/mod.rs index 82556564..5723e79b 100644 --- a/nodedb-cluster/src/bootstrap/mod.rs +++ b/nodedb-cluster/src/bootstrap/mod.rs @@ -19,6 +19,6 @@ pub mod probe; pub mod restart; pub mod start; -pub use config::{ClusterConfig, ClusterState}; +pub use config::{ClusterConfig, ClusterState, JoinRetryPolicy}; pub use handle_join::handle_join_request; pub use start::start_cluster; diff --git a/nodedb-cluster/src/bootstrap/probe.rs b/nodedb-cluster/src/bootstrap/probe.rs index a3bc8baa..389f3994 100644 --- a/nodedb-cluster/src/bootstrap/probe.rs +++ b/nodedb-cluster/src/bootstrap/probe.rs @@ -221,6 +221,7 @@ mod tests { replication_factor: 1, data_dir: std::env::temp_dir(), force_bootstrap: false, + join_retry: Default::default(), } } diff --git a/nodedb-cluster/src/bootstrap/restart.rs b/nodedb-cluster/src/bootstrap/restart.rs index 5cd56fa9..a6e7577a 100644 --- a/nodedb-cluster/src/bootstrap/restart.rs +++ b/nodedb-cluster/src/bootstrap/restart.rs @@ -110,6 +110,7 @@ mod tests { replication_factor: 1, data_dir: _dir.path().to_path_buf(), force_bootstrap: false, + join_retry: Default::default(), }; // Bootstrap first. diff --git a/nodedb-cluster/src/lib.rs b/nodedb-cluster/src/lib.rs index 08e8aa8a..ece709dc 100644 --- a/nodedb-cluster/src/lib.rs +++ b/nodedb-cluster/src/lib.rs @@ -36,7 +36,7 @@ pub mod transport; pub mod vshard_handler; pub mod wire; -pub use bootstrap::{ClusterConfig, ClusterState, start_cluster}; +pub use bootstrap::{ClusterConfig, ClusterState, JoinRetryPolicy, start_cluster}; pub use catalog::ClusterCatalog; pub use cluster_info::{ ClusterInfoSnapshot, ClusterObserver, GroupSnapshot, GroupStatusProvider, PeerSnapshot, diff --git a/nodedb-cluster/src/lifecycle_state.rs b/nodedb-cluster/src/lifecycle_state.rs index 5c5ed2a0..73bcf56d 100644 --- a/nodedb-cluster/src/lifecycle_state.rs +++ b/nodedb-cluster/src/lifecycle_state.rs @@ -40,7 +40,7 @@ pub enum ClusterLifecycleState { /// Joining an existing cluster. `attempt` counts from 0. Joining { /// Current join attempt (0-indexed). See - /// `bootstrap::join::next_backoff` for the backoff schedule. + /// `bootstrap::config::JoinRetryPolicy` for the backoff schedule. attempt: u32, }, /// Cluster init finished successfully. `nodes` is the number of diff --git a/nodedb-cluster/src/raft_loop/loop_core.rs b/nodedb-cluster/src/raft_loop/loop_core.rs index 4a6821d4..f39e3cbe 100644 --- a/nodedb-cluster/src/raft_loop/loop_core.rs +++ b/nodedb-cluster/src/raft_loop/loop_core.rs @@ -291,7 +291,7 @@ impl RaftLoop { pub async fn propose_to_metadata_group_via_leader(&self, data: Vec) -> Result { // Phase 1: try local propose. match self.propose_to_metadata_group(data.clone()) { - Ok(idx) => return Ok(idx), + Ok(idx) => Ok(idx), Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader { leader_hint, })) => { diff --git a/nodedb-cluster/src/transport/client.rs b/nodedb-cluster/src/transport/client.rs index 69c34f24..71a7d5fd 100644 --- a/nodedb-cluster/src/transport/client.rs +++ b/nodedb-cluster/src/transport/client.rs @@ -267,7 +267,23 @@ impl NexarTransport { } /// Send an RPC to an address directly (for bootstrap/join before peer IDs are known). + /// + /// The **entire** operation — handshake, stream open, write, read — is + /// bounded by `self.rpc_timeout`. Previously only the response read was + /// timed out, which meant a QUIC handshake against an unreachable peer + /// could block for the transport's internal idle timeout (~30 s per + /// address). That was fatal to cluster join races where every non- + /// bootstrapper seed points at another non-bootstrapper: each attempt + /// would stall 30 s × (N-1) peers, compounding across retry passes. pub async fn send_rpc_to_addr(&self, addr: SocketAddr, rpc: RaftRpc) -> Result { + tokio::time::timeout(self.rpc_timeout, self.send_rpc_to_addr_inner(addr, rpc)) + .await + .map_err(|_| ClusterError::Transport { + detail: format!("RPC timeout ({}ms) to {addr}", self.rpc_timeout.as_millis()), + })? + } + + async fn send_rpc_to_addr_inner(&self, addr: SocketAddr, rpc: RaftRpc) -> Result { let frame = rpc_codec::encode(&rpc)?; let conn = self @@ -295,12 +311,7 @@ impl NexarTransport { detail: format!("finish send to {addr}: {e}"), })?; - let response_frame = tokio::time::timeout(self.rpc_timeout, server::read_frame(&mut recv)) - .await - .map_err(|_| ClusterError::Transport { - detail: format!("RPC timeout ({}ms) to {addr}", self.rpc_timeout.as_millis()), - })??; - + let response_frame = server::read_frame(&mut recv).await?; rpc_codec::decode(&response_frame) } diff --git a/nodedb-cluster/tests/cluster_join_race.rs b/nodedb-cluster/tests/cluster_join_race.rs index f6d2d4f1..feddda21 100644 --- a/nodedb-cluster/tests/cluster_join_race.rs +++ b/nodedb-cluster/tests/cluster_join_race.rs @@ -28,7 +28,7 @@ async fn five_nodes_race_on_full_seed_list_form_one_cluster() { let mut transports: Vec> = Vec::with_capacity(NODE_COUNT as usize); for id in 1..=NODE_COUNT { transports.push(Arc::new( - NexarTransport::new(id, "127.0.0.1:0".parse().unwrap()).expect("bind transport"), + common::test_transport(id).expect("bind transport"), )); } let seeds: Vec = transports.iter().map(|t| t.local_addr()).collect(); diff --git a/nodedb-cluster/tests/common/mod.rs b/nodedb-cluster/tests/common/mod.rs index 614b6116..1e4f8dbe 100644 --- a/nodedb-cluster/tests/common/mod.rs +++ b/nodedb-cluster/tests/common/mod.rs @@ -37,6 +37,19 @@ use nodedb_cluster::{ CacheApplier, ClusterCatalog, ClusterConfig, ClusterLifecycleState, ClusterLifecycleTracker, ClusterTopology, MetadataCache, NexarTransport, NoopForwarder, RaftLoop, start_cluster, }; + +/// Build a `NexarTransport` with a tighter-than-production RPC +/// timeout for tests. Production default is 5 s × 3 retries = ~15 s +/// per failed peer contact. 4 s leaves enough headroom for legitimate +/// Raft RPCs under contention while still cutting the join-failure +/// tests (which retry against a dead seed) substantially. +pub fn test_transport(node_id: u64) -> Result { + NexarTransport::with_timeout( + node_id, + "127.0.0.1:0".parse().unwrap(), + Duration::from_secs(4), + ) +} use nodedb_raft::message::LogEntry; use tempfile::TempDir; use tokio::sync::watch; @@ -100,10 +113,7 @@ impl TestNode { node_id: u64, seed_nodes: Vec, ) -> Result> { - let transport = Arc::new(NexarTransport::new( - node_id, - "127.0.0.1:0".parse().unwrap(), - )?); + let transport = Arc::new(test_transport(node_id)?); Self::spawn_with_transport(node_id, transport, seed_nodes).await } @@ -139,10 +149,7 @@ impl TestNode { data_dir: &Path, seed_nodes: Vec, ) -> Result> { - let transport = Arc::new(NexarTransport::new( - node_id, - "127.0.0.1:0".parse().unwrap(), - )?); + let transport = Arc::new(test_transport(node_id)?); Self::spawn_inner(node_id, transport, seed_nodes, data_dir.to_path_buf(), None).await } @@ -172,6 +179,13 @@ impl TestNode { replication_factor: 3, data_dir: data_dir_path.clone(), force_bootstrap: false, + // Fast retry policy: 2 s ceiling keeps the join-failure + // tests (especially `cluster_join_leader_crash`) under + // ~5 s of sleeping instead of the production ~64 s. + join_retry: nodedb_cluster::JoinRetryPolicy { + max_attempts: 8, + max_backoff_secs: 2, + }, }; let lifecycle = ClusterLifecycleTracker::new(); diff --git a/nodedb/src/control/catalog_entry/post_apply/collection.rs b/nodedb/src/control/catalog_entry/post_apply/collection.rs index 5bf4fda8..4d1f40c3 100644 --- a/nodedb/src/control/catalog_entry/post_apply/collection.rs +++ b/nodedb/src/control/catalog_entry/post_apply/collection.rs @@ -7,19 +7,11 @@ use tracing::debug; use crate::control::security::catalog::{StoredCollection, StoredOwner}; use crate::control::state::SharedState; -pub async fn put(stored: StoredCollection, shared: Arc) { - // Tell this node's Data Plane about the new collection so the - // first cross-node INSERT doesn't need to rediscover the - // storage mode. - crate::control::server::pgwire::ddl::collection::create::dispatch_register_from_stored( - &shared, &stored, - ) - .await; - debug!( - collection = %stored.name, - "catalog_entry: Register dispatched to local Data Plane" - ); - +/// Synchronous half of `PutCollection` post-apply: install the owner +/// record into the in-memory `PermissionStore`. Called inline by the +/// metadata applier BEFORE the applied-index watcher bump so readers +/// of `applied_index` observe the ownership consistently. +pub fn put_owner_sync(stored: &StoredCollection, shared: Arc) { // Replicate the owner record on every node so cluster-wide // `is_owner` / `check` evaluations succeed. Handlers no longer // call `set_owner` directly — ownership is entirely a side @@ -32,6 +24,22 @@ pub async fn put(stored: StoredCollection, shared: Arc) { }); } +/// Asynchronous half: dispatch a `Register` request to this node's +/// Data Plane so the first cross-node INSERT doesn't need to +/// rediscover the storage mode. Spawned as a best-effort task — +/// correctness does not depend on it completing before the +/// `applied_index` watcher bumps, only performance does. +pub async fn put_async(stored: StoredCollection, shared: Arc) { + crate::control::server::pgwire::ddl::collection::create::dispatch_register_from_stored( + &shared, &stored, + ) + .await; + debug!( + collection = %stored.name, + "catalog_entry: Register dispatched to local Data Plane" + ); +} + pub fn deactivate(tenant_id: u32, name: String, shared: Arc) { // Remove the ownership record so `is_owner` checks return false // after drop — the in-memory map would otherwise keep a stale diff --git a/nodedb/src/control/catalog_entry/post_apply/mod.rs b/nodedb/src/control/catalog_entry/post_apply/mod.rs index 4d8a7aa6..88814339 100644 --- a/nodedb/src/control/catalog_entry/post_apply/mod.rs +++ b/nodedb/src/control/catalog_entry/post_apply/mod.rs @@ -1,12 +1,28 @@ -//! Asynchronous post-apply side effects for a [`CatalogEntry`] — -//! dispatched by DDL family. +//! Post-apply side effects for a [`CatalogEntry`] — dispatched by +//! DDL family. //! -//! The top-level [`spawn_post_apply_side_effects`] is one -//! `tokio::spawn` containing an exhaustive match that routes each -//! variant to a typed function in a per-family sibling file. -//! Adding a new variant forces this file to grow by one line and -//! the corresponding family file by one function — never grows -//! unboundedly. +//! Split into two phases so readers of `applied_index` observe a +//! consistent view: +//! +//! - [`apply_post_apply_side_effects_sync`] runs the synchronous +//! in-memory cache updates (install_replicated_user, +//! install_replicated_role, etc.) **inline** on the raft applier +//! thread, BEFORE the metadata applier bumps the +//! `AppliedIndexWatcher`. Once `applied_index = N`, readers are +//! guaranteed to see every sync side-effect of every entry up to +//! N — no tokio spawn race. +//! - [`spawn_post_apply_async_side_effects`] spawns a tokio task for +//! the genuinely async work — today that is only Data Plane +//! dispatches for `PutCollection`. Readers of the Data Plane +//! register state still race with this, but no test relies on +//! that synchronisation. +//! +//! Previously both were combined into a single `tokio::spawn`, so +//! a freshly-applied `PutUser` could bump the watcher while its +//! `install_replicated_user` task was still queued on the +//! scheduler. Tests that waited on `applied_index` and then +//! immediately polled `credentials.get_user` would flake whenever +//! the scheduler ran them in that order. pub mod api_key; pub mod change_stream; @@ -29,117 +45,147 @@ use std::sync::Arc; use crate::control::catalog_entry::entry::CatalogEntry; use crate::control::state::SharedState; -/// Spawn the post-apply side effects of `entry`. Best-effort: any -/// failure inside the spawned task logs a warning but does not -/// unwind the raft apply path. -pub fn spawn_post_apply_side_effects(entry: CatalogEntry, shared: Arc) { - tokio::spawn(async move { - match entry { - CatalogEntry::PutCollection(stored) => { - collection::put(*stored, shared).await; - } - CatalogEntry::DeactivateCollection { tenant_id, name } => { - collection::deactivate(tenant_id, name, shared); - } - CatalogEntry::PutSequence(stored) => { - sequence::put(*stored, shared); - } - CatalogEntry::DeleteSequence { tenant_id, name } => { - sequence::delete(tenant_id, name, shared); - } - CatalogEntry::PutSequenceState(state) => { - sequence::put_state(*state, shared); - } - CatalogEntry::PutTrigger(stored) => { - trigger::put(*stored, shared); - } - CatalogEntry::DeleteTrigger { tenant_id, name } => { - trigger::delete(tenant_id, name, shared); - } - CatalogEntry::PutFunction(stored) => { - function::put(*stored, shared); - } - CatalogEntry::DeleteFunction { tenant_id, name } => { - function::delete(tenant_id, name, shared); - } - CatalogEntry::PutProcedure(stored) => { - procedure::put(*stored, shared); - } - CatalogEntry::DeleteProcedure { tenant_id, name } => { - procedure::delete(tenant_id, name, shared); - } - CatalogEntry::PutSchedule(stored) => { - schedule::put(*stored, shared); - } - CatalogEntry::DeleteSchedule { tenant_id, name } => { - schedule::delete(tenant_id, name, shared); - } - CatalogEntry::PutChangeStream(stored) => { - change_stream::put(*stored, shared); - } - CatalogEntry::DeleteChangeStream { tenant_id, name } => { - change_stream::delete(tenant_id, name, shared); - } - CatalogEntry::PutUser(stored) => { - user::put(*stored, shared); - } - CatalogEntry::DeactivateUser { username } => { - user::deactivate(username, shared); - } - CatalogEntry::PutRole(stored) => { - role::put(*stored, shared); - } - CatalogEntry::DeleteRole { name } => { - role::delete(name, shared); - } - CatalogEntry::PutApiKey(stored) => { - api_key::put(*stored, shared); - } - CatalogEntry::RevokeApiKey { key_id } => { - api_key::revoke(key_id, shared); - } - CatalogEntry::PutMaterializedView(stored) => { - materialized_view::put(*stored, shared); - } - CatalogEntry::DeleteMaterializedView { tenant_id, name } => { - materialized_view::delete(tenant_id, name, shared); - } - CatalogEntry::PutTenant(stored) => { - tenant::put(*stored, shared); - } - CatalogEntry::DeleteTenant { tenant_id } => { - tenant::delete(tenant_id, shared); - } - CatalogEntry::PutRlsPolicy(stored) => { - rls::put(*stored, shared); - } - CatalogEntry::DeleteRlsPolicy { - tenant_id, - collection, - name, - } => { - rls::delete(tenant_id, collection, name, shared); - } - CatalogEntry::PutPermission(stored) => { - permission::put(*stored, shared); - } - CatalogEntry::DeletePermission { - target, - grantee, - permission: perm, - } => { - permission::delete(target, grantee, perm, shared); - } - CatalogEntry::PutOwner(stored) => { - owner::put(*stored, shared); - } - CatalogEntry::DeleteOwner { - object_type, - tenant_id, - object_name, - } => { - owner::delete(object_type, tenant_id, object_name, shared); - } - } - }); +/// Run every **synchronous** post-apply side effect inline. Must be +/// called from the metadata applier BEFORE the watcher bump so +/// readers of the applied index see every in-memory cache update +/// that entry triggered. Best-effort per variant: the whole thing +/// is infallible today (all typed functions log on failure and +/// return). +pub fn apply_post_apply_side_effects_sync(entry: &CatalogEntry, shared: &Arc) { + match entry { + CatalogEntry::PutCollection(stored) => { + // Owner record install is sync; Data Plane register is + // the async part, handled by `spawn_post_apply_async_side_effects`. + collection::put_owner_sync(stored, Arc::clone(shared)); + } + CatalogEntry::DeactivateCollection { tenant_id, name } => { + collection::deactivate(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutSequence(stored) => { + sequence::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteSequence { tenant_id, name } => { + sequence::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutSequenceState(state) => { + sequence::put_state((**state).clone(), Arc::clone(shared)); + } + CatalogEntry::PutTrigger(stored) => { + trigger::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteTrigger { tenant_id, name } => { + trigger::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutFunction(stored) => { + function::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteFunction { tenant_id, name } => { + function::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutProcedure(stored) => { + procedure::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteProcedure { tenant_id, name } => { + procedure::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutSchedule(stored) => { + schedule::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteSchedule { tenant_id, name } => { + schedule::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutChangeStream(stored) => { + change_stream::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteChangeStream { tenant_id, name } => { + change_stream::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutUser(stored) => { + user::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeactivateUser { username } => { + user::deactivate(username.clone(), Arc::clone(shared)); + } + CatalogEntry::PutRole(stored) => { + role::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteRole { name } => { + role::delete(name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutApiKey(stored) => { + api_key::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::RevokeApiKey { key_id } => { + api_key::revoke(key_id.clone(), Arc::clone(shared)); + } + CatalogEntry::PutMaterializedView(stored) => { + materialized_view::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteMaterializedView { tenant_id, name } => { + materialized_view::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutTenant(stored) => { + tenant::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteTenant { tenant_id } => { + tenant::delete(*tenant_id, Arc::clone(shared)); + } + CatalogEntry::PutRlsPolicy(stored) => { + rls::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteRlsPolicy { + tenant_id, + collection, + name, + } => { + rls::delete( + *tenant_id, + collection.clone(), + name.clone(), + Arc::clone(shared), + ); + } + CatalogEntry::PutPermission(stored) => { + permission::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeletePermission { + target, + grantee, + permission: perm, + } => { + permission::delete( + target.clone(), + grantee.clone(), + perm.clone(), + Arc::clone(shared), + ); + } + CatalogEntry::PutOwner(stored) => { + owner::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteOwner { + object_type, + tenant_id, + object_name, + } => { + owner::delete( + object_type.clone(), + *tenant_id, + object_name.clone(), + Arc::clone(shared), + ); + } + } +} + +/// Spawn the async post-apply side effects of `entry`. Today this is +/// limited to Data Plane dispatches for `PutCollection` (the only +/// genuinely `.await`-carrying variant). Best-effort: failures log +/// and drop. +pub fn spawn_post_apply_async_side_effects(entry: CatalogEntry, shared: Arc) { + if let CatalogEntry::PutCollection(stored) = entry { + tokio::spawn(async move { + collection::put_async(*stored, shared).await; + }); + } } diff --git a/nodedb/src/control/cluster/init.rs b/nodedb/src/control/cluster/init.rs index af47b215..056baa0d 100644 --- a/nodedb/src/control/cluster/init.rs +++ b/nodedb/src/control/cluster/init.rs @@ -73,6 +73,7 @@ pub async fn init_cluster_with_transport( replication_factor: config.replication_factor, data_dir: data_dir.to_path_buf(), force_bootstrap: config.force_bootstrap, + join_retry: join_retry_policy_from_env(), }; let lifecycle = nodedb_cluster::ClusterLifecycleTracker::new(); @@ -105,3 +106,30 @@ pub async fn init_cluster_with_transport( multi_raft: Mutex::new(Some(state.multi_raft)), }) } + +/// Build the join retry policy, honouring two optional environment +/// variables for test/CI overrides: +/// +/// - `NODEDB_JOIN_RETRY_MAX_ATTEMPTS` — total attempts (default 8) +/// - `NODEDB_JOIN_RETRY_MAX_BACKOFF_SECS` — per-attempt ceiling +/// (default 32 s) +/// +/// Production deployments leave both unset and get the production +/// schedule. The integration test harness sets both to small values +/// so a join-retry path doesn't spend ~1 minute sleeping in CI. +fn join_retry_policy_from_env() -> nodedb_cluster::JoinRetryPolicy { + let mut policy = nodedb_cluster::JoinRetryPolicy::default(); + if let Ok(v) = std::env::var("NODEDB_JOIN_RETRY_MAX_ATTEMPTS") + && let Ok(n) = v.parse::() + && n > 0 + { + policy.max_attempts = n; + } + if let Ok(v) = std::env::var("NODEDB_JOIN_RETRY_MAX_BACKOFF_SECS") + && let Ok(n) = v.parse::() + && n > 0 + { + policy.max_backoff_secs = n; + } + policy +} diff --git a/nodedb/src/control/cluster/metadata_applier.rs b/nodedb/src/control/cluster/metadata_applier.rs index 1c61164d..b4fb2360 100644 --- a/nodedb/src/control/cluster/metadata_applier.rs +++ b/nodedb/src/control/cluster/metadata_applier.rs @@ -157,7 +157,7 @@ impl MetadataCommitApplier { if compat { catalog_entry } else { - catalog_entry::descriptor_stamp::stamp(catalog_entry, &shared.hlc_clock, &catalog) + catalog_entry::descriptor_stamp::stamp(catalog_entry, &shared.hlc_clock, catalog) } } else { // Unit tests construct the applier without a SharedState. @@ -183,7 +183,19 @@ impl MetadataCommitApplier { { shared.lease_drain.install_end(&drained_id); } - catalog_entry::post_apply::spawn_post_apply_side_effects(stamped, shared); + // Run synchronous post-apply side effects INLINE so every + // in-memory cache update (install_replicated_user, + // install_replicated_owner, etc.) is visible before the + // watcher bump. Any reader that observes `applied_index` + // moving past `last` is guaranteed to see the sync side + // effects of every entry up to `last`. + // + // The async tail (today: Data Plane Register dispatches + // for PutCollection) is spawned separately and is NOT + // part of the applied-index contract — it's a + // performance optimisation, not a correctness gate. + catalog_entry::post_apply::apply_post_apply_side_effects_sync(&stamped, &shared); + catalog_entry::post_apply::spawn_post_apply_async_side_effects(stamped, shared); } } } diff --git a/nodedb/src/control/lease/renewal.rs b/nodedb/src/control/lease/renewal.rs index 6601a02a..abb25666 100644 --- a/nodedb/src/control/lease/renewal.rs +++ b/nodedb/src/control/lease/renewal.rs @@ -163,26 +163,9 @@ impl LeaseRenewalLoop { "descriptor lease renewal: re-acquiring near-expiry leases" ); for (id, held_version) in near_expiry { - // Look up the CURRENT persisted version from the - // local catalog before re-acquiring. If the - // descriptor has been altered since we last took - // the lease, we need to advance to the new version - // — otherwise the old lease sticks around forever - // and blocks drain on any ALTER that wants to bump - // past it. - // - // If the descriptor has been dropped we release the - // lease instead of renewing: renewing a lease on a - // non-existent descriptor would leak it. let current_version = lookup_current_version(&shared, &id); match current_version { Some(v) => { - // Re-acquire at whichever version is higher: - // the persisted version, or the one we - // already hold (defensive — a concurrent - // PutCollection apply between cache read - // and propose could leave us briefly - // observing an older version). let version = v.max(held_version); if let Err(e) = super::propose::force_refresh_lease( &shared, @@ -199,8 +182,6 @@ impl LeaseRenewalLoop { } } None => { - // Descriptor dropped — release our lease so - // drain on the drop path can make progress. if let Err(e) = super::release::release_leases(&shared, vec![id.clone()]) { warn!( descriptor = ?id, @@ -328,8 +309,10 @@ mod tests { #[test] fn threshold_clamped_at_100() { - let mut tuning = ClusterTransportTuning::default(); - tuning.descriptor_lease_renewal_threshold_pct = 250; + let tuning = ClusterTransportTuning { + descriptor_lease_renewal_threshold_pct: 250, + ..ClusterTransportTuning::default() + }; let config = LeaseRenewalConfig::from_tuning(&tuning); assert_eq!(config.threshold_pct, 100); } diff --git a/nodedb/src/control/security/catalog/system_catalog.rs b/nodedb/src/control/security/catalog/system_catalog.rs index 9a640000..f59817e3 100644 --- a/nodedb/src/control/security/catalog/system_catalog.rs +++ b/nodedb/src/control/security/catalog/system_catalog.rs @@ -107,6 +107,27 @@ impl SystemCatalog { let _ = write_txn .open_table(super::rls::RLS_POLICIES) .map_err(|e| catalog_err("init rls_policies table", e))?; + let _ = write_txn + .open_table(ALERT_RULES) + .map_err(|e| catalog_err("init alert_rules table", e))?; + let _ = write_txn + .open_table(RETENTION_POLICIES) + .map_err(|e| catalog_err("init retention_policies table", e))?; + let _ = write_txn + .open_table(SEQUENCES) + .map_err(|e| catalog_err("init sequences table", e))?; + let _ = write_txn + .open_table(SEQUENCE_STATE) + .map_err(|e| catalog_err("init sequence_state table", e))?; + let _ = write_txn + .open_table(COLUMN_STATS) + .map_err(|e| catalog_err("init column_stats table", e))?; + let _ = write_txn + .open_table(VECTOR_MODEL_METADATA) + .map_err(|e| catalog_err("init vector_model_metadata table", e))?; + let _ = write_txn + .open_table(CHECKPOINTS) + .map_err(|e| catalog_err("init checkpoints table", e))?; } write_txn .commit() diff --git a/nodedb/src/control/security/jwt.rs b/nodedb/src/control/security/jwt.rs index 02a3743e..aeac9dd2 100644 --- a/nodedb/src/control/security/jwt.rs +++ b/nodedb/src/control/security/jwt.rs @@ -339,6 +339,14 @@ mod tests { base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(data) } + /// Bit length used for RSA keys generated in this test module. + /// Production validates whatever the operator configures; the + /// signing/verification logic doesn't care about strength, so we + /// use 1024 here to keep `RsaPrivateKey::new` from dominating the + /// test runtime. RSA-1024 keygen is ~10x faster than RSA-2048 + /// without changing what these tests actually exercise. + const TEST_RSA_BITS: usize = 1024; + #[test] fn rs256_roundtrip() { use rsa::pkcs1v15::SigningKey; @@ -346,7 +354,7 @@ mod tests { // Generate a test RSA key pair. let mut rng = rsa::rand_core::OsRng; - let private_key = rsa::RsaPrivateKey::new(&mut rng, 2048).unwrap(); + let private_key = rsa::RsaPrivateKey::new(&mut rng, TEST_RSA_BITS).unwrap(); let public_key = rsa::RsaPublicKey::from(&private_key); // Export public key as DER (PKCS#8). @@ -387,8 +395,8 @@ mod tests { use rsa::signature::{SignatureEncoding, Signer}; let mut rng = rsa::rand_core::OsRng; - let key1 = rsa::RsaPrivateKey::new(&mut rng, 2048).unwrap(); - let key2 = rsa::RsaPrivateKey::new(&mut rng, 2048).unwrap(); + let key1 = rsa::RsaPrivateKey::new(&mut rng, TEST_RSA_BITS).unwrap(); + let key2 = rsa::RsaPrivateKey::new(&mut rng, TEST_RSA_BITS).unwrap(); let pub2 = rsa::RsaPublicKey::from(&key2); let pub2_der = { diff --git a/nodedb/src/control/server/pgwire/ddl/user.rs b/nodedb/src/control/server/pgwire/ddl/user.rs index b9d27a8b..73eaf665 100644 --- a/nodedb/src/control/server/pgwire/ddl/user.rs +++ b/nodedb/src/control/server/pgwire/ddl/user.rs @@ -121,15 +121,41 @@ pub fn create_user( let entry = crate::control::catalog_entry::CatalogEntry::PutUser(Box::new(stored.clone())); let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry) .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?; - if log_index == 0 - && let Some(catalog) = state.credentials.catalog() - { - // Single-node / no-cluster fallback: write the record - // directly and install it into the in-memory cache. - catalog - .put_user(&stored) - .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + if log_index == 0 { + // Single-node / no-cluster fallback: install into the + // in-memory cache so subsequent reads see the user. + // Persist to redb when a catalog is wired up — the + // catalog write is best-effort durability, not a gate + // on the cache update. Test fixtures (and any future + // fully-in-memory deployment) can run without a redb + // catalog and still get correct read-after-write. + if let Some(catalog) = state.credentials.catalog() { + catalog + .put_user(&stored) + .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + } state.credentials.install_replicated_user(&stored); + } else { + // Cluster mode: `propose_catalog_entry` waits for the + // entry to be applied on THIS node, which runs the + // synchronous post_apply (`install_replicated_user`) + // inline BEFORE the applied-index watermark bumps. So if + // our entry really committed, `get_user` must see it now. + // + // If `get_user` returns None, the Raft log entry at the + // index our leader assigned has been truncated and + // overwritten with a noop from a new leader term (a known + // Raft subtlety: `propose` returns the assigned log index + // without waiting for commit; if leadership changes + // before the quorum ack, the entry is dropped). Return a + // retryable error so `exec_ddl_on_any_leader` re-proposes + // on the next attempt against whoever is now leader. + if state.credentials.get_user(username).is_none() { + return Err(sqlstate_error( + "40001", + "transient: metadata entry truncated by leader change, retry", + )); + } } state.audit_record( @@ -190,12 +216,12 @@ pub fn alter_user( crate::control::catalog_entry::CatalogEntry::PutUser(Box::new(stored.clone())); let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry) .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?; - if log_index == 0 - && let Some(catalog) = state.credentials.catalog() - { - catalog - .put_user(&stored) - .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + if log_index == 0 { + if let Some(catalog) = state.credentials.catalog() { + catalog + .put_user(&stored) + .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + } state.credentials.install_replicated_user(&stored); } @@ -228,12 +254,12 @@ pub fn alter_user( crate::control::catalog_entry::CatalogEntry::PutUser(Box::new(stored.clone())); let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry) .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?; - if log_index == 0 - && let Some(catalog) = state.credentials.catalog() - { - catalog - .put_user(&stored) - .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + if log_index == 0 { + if let Some(catalog) = state.credentials.catalog() { + catalog + .put_user(&stored) + .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + } state.credentials.install_replicated_user(&stored); } diff --git a/nodedb/src/control/server/pgwire/handler/retry.rs b/nodedb/src/control/server/pgwire/handler/retry.rs index d468cb89..051b84a9 100644 --- a/nodedb/src/control/server/pgwire/handler/retry.rs +++ b/nodedb/src/control/server/pgwire/handler/retry.rs @@ -65,8 +65,8 @@ where "pgwire: retrying plan after schema change" ); last_err = Some(Error::RetryableSchemaChanged { descriptor }); - if attempt + 1 < MAX_ATTEMPTS { - tokio::time::sleep(BACKOFFS[attempt]).await; + if let Some(backoff) = BACKOFFS.get(attempt) { + tokio::time::sleep(*backoff).await; } } Err(other) => return Err(other), @@ -108,8 +108,8 @@ where leader_node, leader_addr, }); - if attempt + 1 < MAX_ATTEMPTS { - tokio::time::sleep(BACKOFFS[attempt]).await; + if let Some(backoff) = BACKOFFS.get(attempt) { + tokio::time::sleep(*backoff).await; } } Err(other) => return Err(other), diff --git a/nodedb/src/control/server/session.rs b/nodedb/src/control/server/session.rs index 06df6c22..5c7968b5 100644 --- a/nodedb/src/control/server/session.rs +++ b/nodedb/src/control/server/session.rs @@ -477,7 +477,10 @@ mod tests { let core_handle = tokio::task::spawn_blocking(move || { let mut core = CoreLoop::open(0, data_side.request_rx, data_side.response_tx, &core_dir).unwrap(); - while core_stop_rx.try_recv().is_err() { + while matches!( + core_stop_rx.try_recv(), + Err(std::sync::mpsc::TryRecvError::Empty) + ) { core.tick(); std::thread::sleep(Duration::from_millis(1)); } diff --git a/nodedb/src/control/state/methods.rs b/nodedb/src/control/state/methods.rs index a1f28ad2..a8f54b8f 100644 --- a/nodedb/src/control/state/methods.rs +++ b/nodedb/src/control/state/methods.rs @@ -306,7 +306,9 @@ impl SharedState { } /// Poll responses from all Data Plane cores and route them to waiting sessions. - pub fn poll_and_route_responses(&self) { + /// Returns the number of responses routed — callers use this for adaptive + /// backoff (zero ⇒ idle, sleep longer; non-zero ⇒ active, stay hot). + pub fn poll_and_route_responses(&self) -> usize { let responses = match self.dispatcher.lock() { Ok(mut d) => d.poll_responses(), Err(poisoned) => { @@ -314,11 +316,13 @@ impl SharedState { poisoned.into_inner().poll_responses() } }; + let count = responses.len(); for resp in responses { if !self.tracker.complete(resp) { warn!("response for unknown or cancelled request"); } } + count } /// Acquire (or re-confirm) a descriptor lease at the given diff --git a/nodedb/src/event/consumer.rs b/nodedb/src/event/consumer.rs index b9e1b15e..8c1725b6 100644 --- a/nodedb/src/event/consumer.rs +++ b/nodedb/src/event/consumer.rs @@ -34,8 +34,17 @@ use super::consumer_helpers::{ detect_sequence_gap, flush_watermark, maybe_flush_watermark, record_event, }; -/// How often to poll the ring buffer when empty (milliseconds). -const EMPTY_POLL_INTERVAL: Duration = Duration::from_millis(1); +/// Initial sleep when the ring buffer is empty. Adaptive backoff ramps +/// up to `EMPTY_POLL_MAX` after `EMPTY_POLL_RAMP` consecutive empty polls +/// so an idle Event Plane consumer does not wake every 1ms forever. +const EMPTY_POLL_MIN: Duration = Duration::from_millis(1); +/// Cap on the empty-poll sleep. 50ms keeps trigger / CDC dispatch latency +/// bounded for the first event after an idle period while limiting idle +/// CPU to ~20 wakes/sec per core. +const EMPTY_POLL_MAX: Duration = Duration::from_millis(50); +/// After this many consecutive empty polls (~32ms of idleness at 1ms), +/// switch to the long sleep. +const EMPTY_POLL_RAMP: u32 = 32; /// Maximum events to process per ring buffer drain before yielding. const DRAIN_BATCH_LIMIT: u32 = 1024; @@ -137,6 +146,7 @@ async fn consumer_loop(config: ConsumerConfig, metrics: Arc) { debug!(core_id, "event plane consumer started"); let mut wal_retry_count: u32 = 0; + let mut empty_polls: u32 = 0; loop { if *shutdown.borrow() { @@ -161,6 +171,7 @@ async fn consumer_loop(config: ConsumerConfig, metrics: Arc) { let batch_count = events.len(); if batch_count > 0 { + empty_polls = 0; dirty_watermark = true; let mut trigger_collector = @@ -329,8 +340,15 @@ async fn consumer_loop(config: ConsumerConfig, metrics: Arc) { &mut last_watermark_flush, ); + empty_polls = empty_polls.saturating_add(1); + let poll_sleep = if empty_polls < EMPTY_POLL_RAMP { + EMPTY_POLL_MIN + } else { + EMPTY_POLL_MAX + }; + tokio::select! { - _ = tokio::time::sleep(EMPTY_POLL_INTERVAL) => {} + _ = tokio::time::sleep(poll_sleep) => {} _ = shutdown.changed() => { if dirty_watermark { flush_watermark(&watermark_store, core_id, last_lsn); diff --git a/nodedb/src/main.rs b/nodedb/src/main.rs index 13309f6b..5502eb27 100644 --- a/nodedb/src/main.rs +++ b/nodedb/src/main.rs @@ -357,24 +357,46 @@ async fn main() -> anyhow::Result<()> { ); // Start response poller: routes Data Plane responses to - // waiting sessions. Uses `yield_now()` instead of `sleep()` - // because tokio's timer wheel has 1ms minimum granularity — - // sleep(100us) actually sleeps ~1ms, adding 1ms to every - // request's latency. `yield_now()` yields to the scheduler - // without a timer, polling on every scheduler cycle - // (microsecond-level). + // waiting sessions. + // + // Adaptive backoff strategy: under load we use `yield_now()` + // for microsecond-level responsiveness (tokio's timer wheel + // has 1ms granularity, so sleep(100us) actually sleeps ~1ms, + // adding 1ms to every request's latency). When the poller + // observes an idle streak we ramp the wait up so an idle + // server does not peg an entire tokio worker at 100% CPU. + // + // - Active (response just routed OR within the last ~256 yields): + // yield_now() — sub-millisecond latency for bursts. + // - Idle for 256+ iterations: sleep 1ms (still responsive, + // matches the timer wheel minimum). + // - Idle for 1024+ iterations (~1s of true idleness): sleep + // 10ms — bounds idle CPU to ~0.1% of one core. let shared_poller = Arc::clone(&shared); nodedb::control::shutdown::spawn_loop( &shared.loop_registry, &shared.shutdown, "response_poller", move |shutdown| async move { + let mut idle_iters: u32 = 0; loop { if shutdown.is_cancelled() { break; } - shared_poller.poll_and_route_responses(); - tokio::task::yield_now().await; + let routed = shared_poller.poll_and_route_responses(); + if routed > 0 { + idle_iters = 0; + tokio::task::yield_now().await; + continue; + } + idle_iters = idle_iters.saturating_add(1); + if idle_iters <= 256 { + tokio::task::yield_now().await; + } else if idle_iters <= 1024 { + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + } else { + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } } }, ); @@ -669,15 +691,19 @@ async fn main() -> anyhow::Result<()> { shared.cluster_transport.as_ref(), shared.cluster_topology.as_ref(), ) { - let topo_guard = topology.read().unwrap_or_else(|p| p.into_inner()); + // Clone the topology snapshot so the read guard is dropped + // before awaiting — clippy::await_holding_lock. + let topo_snapshot = { + let guard = topology.read().unwrap_or_else(|p| p.into_inner()); + guard.clone() + }; let warm_report = nodedb::control::cluster::warm_known_peers( transport, - &topo_guard, + &topo_snapshot, shared.node_id, Duration::from_secs(2), ) .await; - drop(topo_guard); if warm_report.attempted > 0 { info!(report = %warm_report, "peer cache warm-up complete"); if !warm_report.is_complete() { diff --git a/nodedb/tests/common/cluster_harness/cluster.rs b/nodedb/tests/common/cluster_harness/cluster.rs index 2d3002c7..d9e6c0f3 100644 --- a/nodedb/tests/common/cluster_harness/cluster.rs +++ b/nodedb/tests/common/cluster_harness/cluster.rs @@ -49,6 +49,43 @@ impl TestCluster { ) .await; + // CRITICAL: wait for every node to exit rolling-upgrade + // compat mode before letting the test issue any DDL. + // + // `metadata_proposer::propose_catalog_entry` consults + // `cluster_version_view().can_activate_feature(DISTRIBUTED_CATALOG_VERSION)` + // and, while even one node still reports a lower wire + // version, returns `Ok(0)` without going through the raft + // group. The pgwire DDL handlers (CREATE USER, etc.) then + // fall through to a LEGACY path that writes the record + // directly on the proposing node — **with zero + // replication** to followers. Any subsequent + // `has_active_user` check on a follower returns false and + // the test flakes. + // + // Topology has three members the moment the join request + // completes, but the `wire_version` field on each node's + // topology entry is updated asynchronously by the gossip + // path. That's why `topology_size == 3` converges fast yet + // `can_activate_feature(...)` can still be false for + // several hundred milliseconds afterwards. Waiting here + // closes the window deterministically — no retries, no + // flakes, no compat-mode fallback silently breaking + // replication. + wait_for( + "all 3 nodes exit rolling-upgrade compat mode", + Duration::from_secs(10), + Duration::from_millis(20), + || { + cluster.nodes.iter().all(|n| { + n.shared.cluster_version_view().can_activate_feature( + nodedb::control::rolling_upgrade::DISTRIBUTED_CATALOG_VERSION, + ) + }) + }, + ) + .await; + Ok(cluster) } @@ -57,13 +94,28 @@ impl TestCluster { /// `not metadata-group leader` errors via the pgwire error path; /// the retry loop tries the next node on failure so the test /// doesn't have to discover the leader explicitly. + /// + /// After the DDL is accepted, **blocks until every node's + /// metadata applier has caught up to the proposer's applied + /// index**. `propose_catalog_entry` already waits for the entry + /// to be applied on the proposing node before returning, but + /// followers apply asynchronously — without this barrier a + /// subsequent `wait_for("x visible on every node")` would race + /// the follower appliers and trip its timeout on the cold-start + /// attempt. Polling the watermark directly is O(applied_index) + /// and converges as soon as the followers drain their commit + /// queues, so it's both strictly more correct and strictly + /// faster than waiting on the visibility check itself. pub async fn exec_ddl_on_any_leader(&self, sql: &str) -> Result { let deadline = std::time::Instant::now() + Duration::from_secs(10); let mut last_err = String::new(); while std::time::Instant::now() < deadline { for (idx, node) in self.nodes.iter().enumerate() { match node.exec(sql).await { - Ok(()) => return Ok(idx), + Ok(()) => { + self.wait_for_applied_index_convergence(idx).await; + return Ok(idx); + } Err(e) => last_err = e, } } @@ -74,6 +126,36 @@ impl TestCluster { )) } + /// Block until every node's metadata applier has caught up to the + /// proposer's current applied index. Called after every successful + /// DDL by `exec_ddl_on_any_leader`. + async fn wait_for_applied_index_convergence(&self, proposer_idx: usize) { + let target = self.nodes[proposer_idx] + .shared + .applied_index_watcher() + .current(); + if target == 0 { + return; + } + let deadline = std::time::Instant::now() + Duration::from_secs(10); + loop { + let all_caught_up = self + .nodes + .iter() + .all(|n| n.shared.applied_index_watcher().current() >= target); + if all_caught_up { + return; + } + if std::time::Instant::now() >= deadline { + // Don't panic — the caller's own `wait_for` assertion + // will report the specific visibility failure with a + // better error than "convergence timed out". + return; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + } + /// Cooperatively shut down every node. Reverse order so peers /// observe their neighbours' drop without rejecting inbound /// traffic on an already-closed transport. diff --git a/nodedb/tests/common/cluster_harness/node.rs b/nodedb/tests/common/cluster_harness/node.rs index db6feb2a..b1da9210 100644 --- a/nodedb/tests/common/cluster_harness/node.rs +++ b/nodedb/tests/common/cluster_harness/node.rs @@ -155,7 +155,20 @@ impl TestClusterNode { CoreLoop::open(0, data_side.request_rx, data_side.response_tx, &core_dir) .expect("core open"); core.set_event_producer(event_producer); - while core_stop_rx.try_recv().is_err() { + // Continue ticking only while the channel is Empty. + // `Ok(())` means we got an explicit stop signal; + // `Disconnected` means the sender was dropped (e.g. the + // owning `TestClusterNode` was dropped mid-panic). In + // both cases we must exit — `spawn_blocking` threads + // cannot be aborted, so a loop that continued on + // `Disconnected` would block tokio runtime shutdown + // indefinitely and force nextest to kill the test + // process at `slow-timeout` (~2 minutes of wasted CI + // time per flaky cluster test). + while matches!( + core_stop_rx.try_recv(), + Err(std::sync::mpsc::TryRecvError::Empty) + ) { core.tick(); std::thread::sleep(Duration::from_millis(1)); } @@ -642,6 +655,42 @@ impl TestClusterNode { } } +/// Panic-safe teardown. Without this, a test that panics (e.g. a +/// `wait_for` tripping its budget) would drop `TestClusterNode` +/// without ever calling the async `shutdown()`, leaving every +/// background task still running: +/// +/// - `watch::Sender`s close on drop but DO NOT transmit their last +/// value, so the raft / pgwire / poller loops block on +/// `select { shutdown.changed() }` forever. +/// - `JoinHandle`s on drop DETACH the task instead of cancelling it. +/// - Those detached tasks keep the tempdir's redb files open, so +/// `TempDir::drop` either hangs or the whole test process sticks +/// around until nextest kills it at `slow-timeout` (previously +/// ~2 minutes of wasted CI time per flaky cluster test). +/// +/// The Drop here fires the watch senders synchronously and aborts +/// every JoinHandle we own. `abort()` is non-blocking: the next time +/// the task hits an `.await` it gets cancelled and releases its +/// resources, including the redb handles. Combined with the +/// already-present `core_stop_tx` drop (which disconnects the +/// blocking Data Plane loop), this guarantees the node tears down +/// in milliseconds instead of minutes. +impl Drop for TestClusterNode { + fn drop(&mut self) { + let _ = self.pg_shutdown_tx.send(true); + let _ = self.cluster_shutdown_tx.send(true); + let _ = self.poller_shutdown_tx.send(true); + // `core_stop_tx` is a std mpsc Sender; dropping it disconnects + // the receiver the spawn_blocking data-plane loop polls, so + // no explicit signal needed here. + self._conn_handle.abort(); + self._pg_handle.abort(); + self._poller_handle.abort(); + self._core_handle.abort(); + } +} + fn pg_error_detail(e: &tokio_postgres::Error) -> String { if let Some(db_err) = e.as_db_error() { format!( diff --git a/nodedb/tests/common/pgwire_harness.rs b/nodedb/tests/common/pgwire_harness.rs index 48a7c6c3..101b0ef3 100644 --- a/nodedb/tests/common/pgwire_harness.rs +++ b/nodedb/tests/common/pgwire_harness.rs @@ -56,7 +56,10 @@ impl TestServer { let mut core = CoreLoop::open(0, data_side.request_rx, data_side.response_tx, &core_dir).unwrap(); core.set_event_producer(event_producer); - while core_stop_rx.try_recv().is_err() { + while matches!( + core_stop_rx.try_recv(), + Err(std::sync::mpsc::TryRecvError::Empty) + ) { core.tick(); std::thread::sleep(Duration::from_millis(1)); } diff --git a/nodedb/tests/descriptor_lease_forwarding_and_renewal.rs b/nodedb/tests/descriptor_lease_forwarding_and_renewal.rs index 0a5b3198..032519a5 100644 --- a/nodedb/tests/descriptor_lease_forwarding_and_renewal.rs +++ b/nodedb/tests/descriptor_lease_forwarding_and_renewal.rs @@ -9,13 +9,12 @@ //! the leader. All 3 leases land in `MetadataCache.leases` on //! every node. Without forwarding, the followers would panic //! with `not leader`. -//! //! 2. **lease_renews_before_expiry** — short lease (3 seconds) -//! + short renewal interval (250ms) + 50% threshold (renew when -//! < 1.5s remaining). Acquire on the leader, wait long enough -//! for at least one renewal cycle, assert the lease's -//! `expires_at` advanced (it was re-acquired with a fresh -//! expiry). +//! plus short renewal interval (250ms) and a 50% threshold +//! (renew when < 1.5s remaining). Acquire on the leader, wait +//! long enough for at least one renewal cycle, assert the +//! lease's `expires_at` advanced (it was re-acquired with a +//! fresh expiry). mod common; @@ -53,7 +52,7 @@ async fn follower_acquire_forwards_to_leader() { // 3 distinct (descriptor_id, node_id) keys. wait_for( "every node observes all 3 forwarded leases", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(20), || { cluster.nodes.iter().all(|n| { @@ -75,14 +74,39 @@ async fn lease_renews_before_expiry() { // Custom tuning: 3-second lease, check every 250ms, renew at // 50% remaining (< 1.5s left). Within a 2.5-second test wait // we should observe at least one renewal. - let mut tuning = ClusterTransportTuning::default(); - tuning.descriptor_lease_duration_secs = 3; - tuning.descriptor_lease_renewal_check_interval_secs = 1; // min granularity - tuning.descriptor_lease_renewal_threshold_pct = 80; + let tuning = ClusterTransportTuning { + descriptor_lease_duration_secs: 3, + descriptor_lease_renewal_check_interval_secs: 1, // min granularity + descriptor_lease_renewal_threshold_pct: 80, + ..ClusterTransportTuning::default() + }; let cluster = TestCluster::spawn_three_with_tuning(tuning) .await .expect("3-node cluster"); + + // Create the collection so the renewal loop's + // `lookup_current_version` finds it in the local catalog. + // Without this the renewal logic treats the lease as orphaned + // and releases it before our 1.5 s observation window — see + // `control::lease::renewal::lookup_current_version`. + cluster + .exec_ddl_on_any_leader("CREATE COLLECTION renewable (id BIGINT PRIMARY KEY, label TEXT)") + .await + .expect("create renewable collection"); + common::cluster_harness::wait_for( + "renewable visible on every node", + Duration::from_secs(10), + Duration::from_millis(50), + || { + cluster + .nodes + .iter() + .all(|n| n.collection_descriptor(TENANT, "renewable").is_some()) + }, + ) + .await; + let leader = &cluster.nodes[0]; // Acquire on the leader. Lease has ~3s expiry from now. diff --git a/nodedb/tests/descriptor_versioning_cross_node.rs b/nodedb/tests/descriptor_versioning_cross_node.rs index 0f458a11..da90f5ad 100644 --- a/nodedb/tests/descriptor_versioning_cross_node.rs +++ b/nodedb/tests/descriptor_versioning_cross_node.rs @@ -33,7 +33,7 @@ async fn create_collection_stamps_version_one_on_every_node() { wait_for( "all 3 nodes stamp orders @ version 1 with non-zero HLC", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster.nodes.iter().all(|n| { @@ -93,7 +93,7 @@ async fn alter_collection_bumps_version_monotonically() { // Wait for v1. wait_for( "v1 stamped on every node", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -118,7 +118,7 @@ async fn alter_collection_bumps_version_monotonically() { let expected_version = (i + 2) as u64; wait_for( &format!("all nodes observe assets @ v{expected_version}"), - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster.nodes.iter().all(|n| { @@ -162,7 +162,7 @@ async fn distinct_collections_get_independent_versions() { wait_for( "all 3 collections present on all nodes", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster.nodes.iter().all(|n| { diff --git a/nodedb/tests/pgwire_connect.rs b/nodedb/tests/pgwire_connect.rs index e080e4b4..588b8d18 100644 --- a/nodedb/tests/pgwire_connect.rs +++ b/nodedb/tests/pgwire_connect.rs @@ -27,7 +27,10 @@ async fn pgwire_connect_and_query() { let core_handle = tokio::task::spawn_blocking(move || { let mut core = CoreLoop::open(0, data_side.request_rx, data_side.response_tx, &core_dir).unwrap(); - while core_stop_rx.try_recv().is_err() { + while matches!( + core_stop_rx.try_recv(), + Err(std::sync::mpsc::TryRecvError::Empty) + ) { core.tick(); std::thread::sleep(Duration::from_millis(1)); } diff --git a/nodedb/tests/prepared_cache_invalidation.rs b/nodedb/tests/prepared_cache_invalidation.rs index cb0d922e..31c5f844 100644 --- a/nodedb/tests/prepared_cache_invalidation.rs +++ b/nodedb/tests/prepared_cache_invalidation.rs @@ -17,7 +17,13 @@ use std::time::Duration; use common::cluster_harness::{TestCluster, wait_for}; const TENANT: u32 = 1; -const WAIT_BUDGET: Duration = Duration::from_secs(5); +// 10 s — every visibility check in this test rides on the metadata +// Raft commit + apply + post-apply cache update path. Five seconds +// (the original budget) was tight enough that fresh-cluster startup +// jitter occasionally tripped a false timeout. Ten seconds is still +// strict enough to catch real regressions but tolerant of cold-start +// election lag. +const WAIT_BUDGET: Duration = Duration::from_secs(10); const POLL: Duration = Duration::from_millis(20); #[tokio::test(flavor = "multi_thread", worker_threads = 6)] diff --git a/nodedb/tests/sql_cluster_cross_node_dml.rs b/nodedb/tests/sql_cluster_cross_node_dml.rs index 280db5a3..8da26e95 100644 --- a/nodedb/tests/sql_cluster_cross_node_dml.rs +++ b/nodedb/tests/sql_cluster_cross_node_dml.rs @@ -76,7 +76,7 @@ async fn create_on_any_node_is_visible_on_every_node() { // Every node's replicated cache must see the new collection. wait_for( "all 3 nodes see the replicated collection", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -98,7 +98,7 @@ async fn create_on_any_node_is_visible_on_every_node() { // The replicated-cache view removes the descriptor on Drop. wait_for( "all 3 nodes no longer see the collection", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -131,7 +131,7 @@ async fn sequence_create_visible_on_every_node() { wait_for( "all 3 nodes see the replicated sequence in their in-memory registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_sequence(1, "order_id")), ) @@ -147,7 +147,7 @@ async fn sequence_create_visible_on_every_node() { wait_for( "all 3 nodes see sequence counter == 500", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -165,7 +165,7 @@ async fn sequence_create_visible_on_every_node() { wait_for( "all 3 nodes remove the sequence from their registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_sequence(1, "order_id")), ) @@ -189,7 +189,7 @@ async fn trigger_create_visible_on_every_node() { wait_for( "collection visible on every node", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -209,7 +209,7 @@ async fn trigger_create_visible_on_every_node() { wait_for( "all 3 nodes see the replicated trigger in trigger_registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_trigger(1, "audit_ins")), ) @@ -222,7 +222,7 @@ async fn trigger_create_visible_on_every_node() { wait_for( "all 3 nodes unregister the trigger", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_trigger(1, "audit_ins")), ) @@ -245,7 +245,7 @@ async fn procedure_create_visible_on_every_node() { wait_for( "all 3 nodes see the procedure in local SystemCatalog redb", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -263,7 +263,7 @@ async fn procedure_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the procedure", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -293,7 +293,7 @@ async fn schedule_create_visible_on_every_node() { wait_for( "all 3 nodes see the schedule in schedule_registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -311,7 +311,7 @@ async fn schedule_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the schedule", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -340,7 +340,7 @@ async fn change_stream_create_visible_on_every_node() { wait_for( "collection visible on every node", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -358,7 +358,7 @@ async fn change_stream_create_visible_on_every_node() { wait_for( "all 3 nodes see the stream in stream_registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -376,7 +376,7 @@ async fn change_stream_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the stream", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -404,7 +404,7 @@ async fn user_create_visible_on_every_node() { wait_for( "all 3 nodes see the replicated user in credentials", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_active_user("alice")), ) @@ -417,7 +417,7 @@ async fn user_create_visible_on_every_node() { wait_for( "all 3 nodes see alice as deactivated", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_active_user("alice")), ) @@ -440,7 +440,7 @@ async fn role_create_visible_on_every_node() { wait_for( "all 3 nodes see the replicated role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_role("data_analyst")), ) @@ -453,7 +453,7 @@ async fn role_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_role("data_analyst")), ) @@ -476,7 +476,7 @@ async fn alter_user_role_replicates() { wait_for( "all 3 nodes see bob with read_only role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -494,7 +494,7 @@ async fn alter_user_role_replicates() { wait_for( "all 3 nodes see bob with read_write role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -523,7 +523,7 @@ async fn api_key_create_and_revoke_replicates() { wait_for( "all 3 nodes see charlie", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_active_user("charlie")), ) @@ -550,7 +550,7 @@ async fn api_key_create_and_revoke_replicates() { wait_for( "all 3 nodes see a replicated API key for charlie", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || all_nodes_have_key(&cluster), ) @@ -572,7 +572,7 @@ async fn api_key_create_and_revoke_replicates() { wait_for( "all 3 nodes see the key as revoked", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_active_api_key(&key_id)), ) @@ -597,7 +597,7 @@ async fn function_create_visible_on_every_node() { wait_for( "all 3 nodes see the function in local SystemCatalog redb", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_function(1, "add_one")), ) @@ -610,7 +610,7 @@ async fn function_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the function", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_function(1, "add_one")), ) @@ -630,7 +630,7 @@ async fn tenant_create_visible_on_every_node() { wait_for( "all 3 nodes see tenant 4242", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_tenant(4242)), ) @@ -643,7 +643,7 @@ async fn tenant_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see tenant 4242", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_tenant(4242)), ) @@ -670,7 +670,7 @@ async fn rls_policy_create_visible_on_every_node() { wait_for( "all 3 nodes see the RLS policy", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -688,7 +688,7 @@ async fn rls_policy_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the RLS policy", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -726,7 +726,7 @@ async fn grant_permission_visible_on_every_node() { let target = "collection:1:documents"; wait_for( "all 3 nodes see the grant", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -744,7 +744,7 @@ async fn grant_permission_visible_on_every_node() { wait_for( "all 3 nodes no longer see the grant", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -776,7 +776,7 @@ async fn grant_role_visible_on_every_node() { wait_for( "all 3 nodes see ops_user has monitor role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -794,7 +794,7 @@ async fn grant_role_visible_on_every_node() { wait_for( "all 3 nodes see ops_user no longer has monitor role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -825,6 +825,25 @@ async fn ownership_transfer_visible_on_every_node() { .await .expect("create new owner user"); + // ALTER ... OWNER TO validates the target user against the + // executing node's local credential cache, which is updated + // asynchronously by the metadata applier when the Raft entry + // commits. Wait for every node to observe the user before + // referencing it in the next DDL — otherwise the ALTER races + // the apply on whichever node `exec_ddl_on_any_leader` picks. + wait_for( + "all 3 nodes observe new_owner_user", + Duration::from_secs(10), + Duration::from_millis(50), + || { + cluster + .nodes + .iter() + .all(|n| n.has_active_user("new_owner_user")) + }, + ) + .await; + cluster .exec_ddl_on_any_leader("ALTER COLLECTION assets OWNER TO new_owner_user") .await @@ -832,7 +851,7 @@ async fn ownership_transfer_visible_on_every_node() { wait_for( "all 3 nodes see new_owner_user as owner of assets", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -864,7 +883,7 @@ async fn materialized_view_create_visible_on_every_node() { wait_for( "all 3 nodes see the materialized view", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -882,7 +901,7 @@ async fn materialized_view_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the materialized view", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster