Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions crates/database/src/patroni_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,43 @@ pub struct ClusterMember {
pub timeline: Option<i64>,
}

/// Patroni lists a node as a member as soon as it registers in the DCS — which
/// happens *before* it publishes its `conn_url`. So a replica mid-creation (or
/// an uninitialized node) appears without `host`/`port`/`api_url`. Those fields
/// are required on `ClusterMember`, so a strict `Vec<ClusterMember>` parse fails
/// the ENTIRE `/cluster` response with `missing field \`host\`` over one
/// half-registered member — freezing topology discovery for every consumer
/// (cloud-api + chat-api) whenever a new postgres instance is being added.
///
/// Parse each member independently and drop any that don't fully deserialize; a
/// member with no connection info is not a usable leader/replica target anyway,
/// and the next refresh picks it up once it finishes registering. Same
/// resilience philosophy as `deserialize_lag` — one bad member must not poison
/// the whole cluster view.
fn deserialize_members<'de, D>(deserializer: D) -> Result<Vec<ClusterMember>, D::Error>
where
D: Deserializer<'de>,
{
let raw = Vec::<serde_json::Value>::deserialize(deserializer)?;
let mut members = Vec::with_capacity(raw.len());
for value in raw {
match serde_json::from_value::<ClusterMember>(value.clone()) {
Ok(member) => members.push(member),
Err(e) => {
let name = value
.get("name")
.and_then(serde_json::Value::as_str)
.unwrap_or("<unknown>");
warn!("Skipping not-yet-ready cluster member {name}: {e}");
}
}
}
Comment on lines +66 to +77

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

By extracting the member's name before deserialization, we can avoid cloning the entire serde_json::Value on every iteration. serde_json::from_value can then take ownership of the Value directly, which allows it to reuse the internal allocations (like strings) when constructing the ClusterMember struct.

Suggested change
for value in raw {
match serde_json::from_value::<ClusterMember>(value.clone()) {
Ok(member) => members.push(member),
Err(e) => {
let name = value
.get("name")
.and_then(serde_json::Value::as_str)
.unwrap_or("<unknown>");
warn!("Skipping not-yet-ready cluster member {name}: {e}");
}
}
}
for value in raw {
let name = value
.get("name")
.and_then(serde_json::Value::as_str)
.map(String::from);
match serde_json::from_value::<ClusterMember>(value) {
Ok(member) => members.push(member),
Err(e) => {
let name = name.as_deref().unwrap_or("<unknown>");
warn!("Skipping not-yet-ready cluster member {name}: {e}");
}
}
}

Ok(members)
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterInfo {
#[serde(deserialize_with = "deserialize_members")]
pub members: Vec<ClusterMember>,
#[serde(default)]
pub scope: Option<String>,
Expand Down Expand Up @@ -369,6 +404,49 @@ mod tests {
assert!(member.lag.is_none());
}

#[test]
fn test_initializing_member_does_not_poison_parse() {
// Regression: a replica mid-creation is registered in the DCS before it
// publishes its conn_url, so Patroni omits host/port/api_url. Previously
// this failed the whole parse with `missing field \`host\``, breaking
// discovery for cloud-api + chat-api whenever a postgres instance was
// added. The not-yet-ready member must be dropped, not poison the rest.
let json = r#"{"members": [
{"name":"leader1","role":"leader","state":"running","host":"postgres-a.dstack.internal","port":5432,"timeline":3},
{"name":"newrep","role":"replica","state":"creating replica","timeline":3}
],"scope":"pg-cluster"}"#;
let info: ClusterInfo =
serde_json::from_str(json).expect("must parse despite half-registered member");
assert_eq!(
info.members.len(),
1,
"the not-yet-ready member should be dropped"
);
assert_eq!(info.members[0].name, "leader1");
assert_eq!(info.members[0].role, "leader");
}

#[test]
fn test_full_cluster_with_initializing_replica_parses() {
// Real staging /cluster shape with an extra member mid-creation appended
// (no host/port/api_url). The 4 ready members must still parse and the
// leader must still be discoverable.
let json = r#"{"members": [
{"name":"a","role":"replica","state":"streaming","api_url":"http://[postgres-staging-5hbt5t4n.dstack.internal:8008]:8008/patroni","host":"postgres-staging-5hbt5t4n.dstack.internal","port":5432,"timeline":3,"lag":0},
{"name":"b","role":"replica","state":"streaming","host":"postgres-yr6k7rmo.dstack.internal","port":5432,"timeline":3,"lag":0},
{"name":"newrep","role":"replica","state":"creating replica","timeline":3},
{"name":"leader","role":"leader","state":"running","host":"postgres-ew3zj5pk.dstack.internal","port":5432,"timeline":3}
],"scope":"pg-cluster"}"#;
let info: ClusterInfo = serde_json::from_str(json).expect("must parse");
assert_eq!(
info.members.len(),
3,
"only the 3 fully-registered members survive"
);
assert!(info.members.iter().any(|m| m.role == "leader"));
assert!(info.members.iter().all(|m| m.name != "newrep"));
}

#[test]
fn test_cluster_with_stopped_member_string_lag() {
// Regression: a single stopped replica reporting `"lag": "unknown"` must
Expand Down
Loading