diff --git a/crates/database/src/patroni_discovery.rs b/crates/database/src/patroni_discovery.rs index f88e4ab84..dd9795410 100644 --- a/crates/database/src/patroni_discovery.rs +++ b/crates/database/src/patroni_discovery.rs @@ -44,8 +44,43 @@ pub struct ClusterMember { pub timeline: Option, } +/// Patroni lists a node as a member as soon as it registers in the DCS — which +/// happens *before* it publishes its `conn_url`. So a replica mid-creation (or +/// an uninitialized node) appears without `host`/`port`/`api_url`. Those fields +/// are required on `ClusterMember`, so a strict `Vec` parse fails +/// the ENTIRE `/cluster` response with `missing field \`host\`` over one +/// half-registered member — freezing topology discovery for every consumer +/// (cloud-api + chat-api) whenever a new postgres instance is being added. +/// +/// Parse each member independently and drop any that don't fully deserialize; a +/// member with no connection info is not a usable leader/replica target anyway, +/// and the next refresh picks it up once it finishes registering. Same +/// resilience philosophy as `deserialize_lag` — one bad member must not poison +/// the whole cluster view. +fn deserialize_members<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let raw = Vec::::deserialize(deserializer)?; + let mut members = Vec::with_capacity(raw.len()); + for value in raw { + match serde_json::from_value::(value.clone()) { + Ok(member) => members.push(member), + Err(e) => { + let name = value + .get("name") + .and_then(serde_json::Value::as_str) + .unwrap_or(""); + warn!("Skipping not-yet-ready cluster member {name}: {e}"); + } + } + } + Ok(members) +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClusterInfo { + #[serde(deserialize_with = "deserialize_members")] pub members: Vec, #[serde(default)] pub scope: Option, @@ -369,6 +404,49 @@ mod tests { assert!(member.lag.is_none()); } + #[test] + fn test_initializing_member_does_not_poison_parse() { + // Regression: a replica mid-creation is registered in the DCS before it + // publishes its conn_url, so Patroni omits host/port/api_url. Previously + // this failed the whole parse with `missing field \`host\``, breaking + // discovery for cloud-api + chat-api whenever a postgres instance was + // added. The not-yet-ready member must be dropped, not poison the rest. + let json = r#"{"members": [ + {"name":"leader1","role":"leader","state":"running","host":"postgres-a.dstack.internal","port":5432,"timeline":3}, + {"name":"newrep","role":"replica","state":"creating replica","timeline":3} + ],"scope":"pg-cluster"}"#; + let info: ClusterInfo = + serde_json::from_str(json).expect("must parse despite half-registered member"); + assert_eq!( + info.members.len(), + 1, + "the not-yet-ready member should be dropped" + ); + assert_eq!(info.members[0].name, "leader1"); + assert_eq!(info.members[0].role, "leader"); + } + + #[test] + fn test_full_cluster_with_initializing_replica_parses() { + // Real staging /cluster shape with an extra member mid-creation appended + // (no host/port/api_url). The 4 ready members must still parse and the + // leader must still be discoverable. + let json = r#"{"members": [ + {"name":"a","role":"replica","state":"streaming","api_url":"http://[postgres-staging-5hbt5t4n.dstack.internal:8008]:8008/patroni","host":"postgres-staging-5hbt5t4n.dstack.internal","port":5432,"timeline":3,"lag":0}, + {"name":"b","role":"replica","state":"streaming","host":"postgres-yr6k7rmo.dstack.internal","port":5432,"timeline":3,"lag":0}, + {"name":"newrep","role":"replica","state":"creating replica","timeline":3}, + {"name":"leader","role":"leader","state":"running","host":"postgres-ew3zj5pk.dstack.internal","port":5432,"timeline":3} + ],"scope":"pg-cluster"}"#; + let info: ClusterInfo = serde_json::from_str(json).expect("must parse"); + assert_eq!( + info.members.len(), + 3, + "only the 3 fully-registered members survive" + ); + assert!(info.members.iter().any(|m| m.role == "leader")); + assert!(info.members.iter().all(|m| m.name != "newrep")); + } + #[test] fn test_cluster_with_stopped_member_string_lag() { // Regression: a single stopped replica reporting `"lag": "unknown"` must