Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .schema/pgdog.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"connection_recovery": "recover",
"cross_shard_disabled": false,
"cutover_last_transaction_delay": 1000,
"cutover_replication_lag_threshold": 1000,
"cutover_replication_lag_threshold": 0,
"cutover_save_config": false,
"cutover_timeout": 30000,
"cutover_timeout_action": "abort",
Expand Down Expand Up @@ -627,38 +627,38 @@
"default": false
},
"cutover_last_transaction_delay": {
"description": "How long to wait after the last transaction drains before completing cutover, in milliseconds.",
"description": "Time (in milliseconds) since the last transaction on any table in the publication before PgDog will swap the configuration during a cutover.\n\n_Default:_ `1000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_last_transaction_delay",
"type": "integer",
"format": "uint64",
"default": 1000,
"minimum": 0
},
"cutover_replication_lag_threshold": {
"description": "Maximum replication lag (in bytes) allowed before proceeding with cutover.",
"description": "Replication lag (in bytes) that must be reached before PgDog will swap the configuration during a cutover.\n\n_Default:_ `0`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_replication_lag_threshold",
"type": "integer",
"format": "uint64",
"default": 1000,
"default": 0,
"minimum": 0
},
"cutover_save_config": {
"description": "Persist the post-cutover configuration to `pgdog.toml` and `users.toml` on disk.",
"description": "Save the swapped configuration to disk after a traffic cutover. When enabled, PgDog will backup both configuration files as `pgdog.bak.toml` and `users.bak.toml`, and write the new configuration to `pgdog.toml` and `users.toml`.\n\n_Default:_ `false`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_save_config",
"type": "boolean",
"default": false
},
"cutover_timeout": {
"description": "How long to wait for cutover conditions to be met before taking the timeout action, in milliseconds.",
"description": "Maximum amount of time (in milliseconds) to wait for the cutover thresholds to be met. If exceeded, PgDog will take the action specified by `cutover_timeout_action`.\n\n_Default:_ `30000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_timeout",
"type": "integer",
"format": "uint64",
"default": 30000,
"minimum": 0
},
"cutover_timeout_action": {
"description": "What to do when the cutover timeout is reached.",
"description": "Action to take when `cutover_timeout` is exceeded.\n\n_Default:_ `abort`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_timeout_action",
"$ref": "#/$defs/CutoverTimeoutAction",
"default": "abort"
},
"cutover_traffic_stop_threshold": {
"description": "Maximum number of active transactions allowed before cutover pauses traffic, in milliseconds.",
"description": "Replication lag threshold (in bytes) at which PgDog will pause traffic automatically during a traffic cutover.\n\n_Default:_ `1000000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_traffic_stop_threshold",
"type": "integer",
"format": "uint64",
"default": 1000000,
Expand Down
40 changes: 32 additions & 8 deletions pgdog-config/src/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,27 +537,51 @@ pub struct General {
#[serde(default = "General::load_schema")]
pub load_schema: LoadSchema,

/// Maximum number of active transactions allowed before cutover pauses traffic, in milliseconds.
/// Replication lag threshold (in bytes) at which PgDog will pause traffic automatically during a traffic cutover.
///
/// _Default:_ `1000000`
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_traffic_stop_threshold
#[serde(default = "General::cutover_traffic_stop_threshold")]
pub cutover_traffic_stop_threshold: u64,

/// Maximum replication lag (in bytes) allowed before proceeding with cutover.
/// Replication lag (in bytes) that must be reached before PgDog will swap the configuration during a cutover.
///
/// _Default:_ `0`
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_replication_lag_threshold
#[serde(default = "General::cutover_replication_lag_threshold")]
pub cutover_replication_lag_threshold: u64,

/// How long to wait after the last transaction drains before completing cutover, in milliseconds.
/// Time (in milliseconds) since the last transaction on any table in the publication before PgDog will swap the configuration during a cutover.
///
/// _Default:_ `1000`
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_last_transaction_delay
#[serde(default = "General::cutover_last_transaction_delay")]
pub cutover_last_transaction_delay: u64,

/// How long to wait for cutover conditions to be met before taking the timeout action, in milliseconds.
/// Maximum amount of time (in milliseconds) to wait for the cutover thresholds to be met. If exceeded, PgDog will take the action specified by `cutover_timeout_action`.
///
/// _Default:_ `30000`
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_timeout
#[serde(default = "General::cutover_timeout")]
pub cutover_timeout: u64,

/// What to do when the cutover timeout is reached.
/// Action to take when `cutover_timeout` is exceeded.
///
/// _Default:_ `abort`
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_timeout_action
#[serde(default = "General::cutover_timeout_action")]
pub cutover_timeout_action: CutoverTimeoutAction,

/// Persist the post-cutover configuration to `pgdog.toml` and `users.toml` on disk.
/// Save the swapped configuration to disk after a traffic cutover. When enabled, PgDog will backup both configuration files as `pgdog.bak.toml` and `users.bak.toml`, and write the new configuration to `pgdog.toml` and `users.toml`.
///
/// _Default:_ `false`
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_save_config
#[serde(default)]
pub cutover_save_config: bool,
}
Expand Down Expand Up @@ -742,8 +766,8 @@ impl General {
}

fn cutover_replication_lag_threshold() -> u64 {
Self::env_or_default("PGDOG_CUTOVER_REPLICATION_LAG_THRESHOLD", 1_000)
// 1KB
Self::env_or_default("PGDOG_CUTOVER_REPLICATION_LAG_THRESHOLD", 0)
// 0 bytes
}

fn cutover_traffic_stop_threshold() -> u64 {
Expand Down
73 changes: 54 additions & 19 deletions pgdog/src/backend/replication/logical/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,19 @@ enum CutoverReason {
LastTransaction,
}

#[derive(Debug, Clone, PartialEq, Eq)]
enum CutoverAction {
Go(CutoverReason),
NoGo(CutoverData),
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct CutoverData {
lag: u64,
last_transaction: Option<Duration>,
elapsed: Duration,
}

impl Display for CutoverReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -289,7 +302,7 @@ impl ReplicationWaiter {
Ok(())
}

async fn should_cutover(&self, elapsed: Duration) -> Option<CutoverReason> {
async fn should_cutover(&self, elapsed: Duration) -> CutoverAction {
let cutover_timeout = Duration::from_millis(self.config.config.general.cutover_timeout);
let cutover_threshold = self.config.config.general.cutover_replication_lag_threshold;
let last_transaction_delay =
Expand All @@ -300,13 +313,17 @@ impl ReplicationWaiter {
let cutover_timeout_exceeded = elapsed >= cutover_timeout;

if cutover_timeout_exceeded {
Some(CutoverReason::Timeout)
CutoverAction::Go(CutoverReason::Timeout)
} else if lag <= cutover_threshold {
Some(CutoverReason::Lag)
CutoverAction::Go(CutoverReason::Lag)
} else if last_transaction.is_none_or(|t| t > last_transaction_delay) {
Some(CutoverReason::LastTransaction)
CutoverAction::Go(CutoverReason::LastTransaction)
} else {
None
CutoverAction::NoGo(CutoverData {
lag,
last_transaction,
elapsed,
})
}
}

Expand All @@ -331,16 +348,25 @@ impl ReplicationWaiter {
// Abort clock starts now.
let start = Instant::now();

let mut cutover_data = None;

loop {
select! {
_ = check.tick() => {}

_ = log.tick() => {
info!("[cutover] lag={}, last_transaction={}, timeout={}",
human_duration(cutover_timeout),
human_duration(last_transaction_delay),
format_bytes(cutover_threshold)
);
if let Some(CutoverData { lag, last_transaction, elapsed }) = cutover_data {
info!("[cutover] lag={}, last_transaction={}, timeout={}",
format_bytes(lag),
if let Some(last_transaction) = last_transaction {
human_duration(last_transaction)
} else {
"none".into()
},
human_duration(elapsed),
);
}

}

// In case replication breaks now.
Expand All @@ -352,16 +378,25 @@ impl ReplicationWaiter {
let elapsed = start.elapsed();
let cutover_reason = self.should_cutover(elapsed).await;
match cutover_reason {
Some(CutoverReason::Timeout) => {
CutoverAction::Go(CutoverReason::Timeout) => {
if cutover_timeout_action == CutoverTimeoutAction::Abort {
maintenance_mode::stop();
warn!("[cutover] abort timeout reached, resuming traffic");
return Err(Error::AbortTimeout);
} else {
info!(
"[cutover] performing cutover now, reason: {}",
CutoverReason::Timeout
);
break;
}
}

None => continue,
Some(reason) => {
CutoverAction::NoGo(data) => {
cutover_data = Some(data);
continue;
}
CutoverAction::Go(reason) => {
info!("[cutover] performing cutover now, reason: {}", reason);
break;
}
Expand Down Expand Up @@ -514,7 +549,7 @@ mod tests {

// should_cutover returns Lag when lag is below threshold
let result = waiter.should_cutover(Duration::from_millis(100)).await;
assert_eq!(result, Some(CutoverReason::Lag));
assert_eq!(result, CutoverAction::Go(CutoverReason::Lag));

// Should exit immediately since lag (50) <= threshold (100)
let result = waiter.wait_for_cutover().await;
Expand Down Expand Up @@ -543,7 +578,7 @@ mod tests {

// should_cutover returns LastTransaction when last transaction is old
let result = waiter.should_cutover(Duration::from_millis(100)).await;
assert_eq!(result, Some(CutoverReason::LastTransaction));
assert_eq!(result, CutoverAction::Go(CutoverReason::LastTransaction));

// Should exit because last_transaction (200ms) > threshold (100ms)
let result = waiter.wait_for_cutover().await;
Expand Down Expand Up @@ -572,7 +607,7 @@ mod tests {

// should_cutover returns LastTransaction when there's no transaction
let result = waiter.should_cutover(Duration::from_millis(100)).await;
assert_eq!(result, Some(CutoverReason::LastTransaction));
assert_eq!(result, CutoverAction::Go(CutoverReason::LastTransaction));
}

#[tokio::test]
Expand All @@ -597,7 +632,7 @@ mod tests {

// Not timed out (100ms elapsed, timeout is 10000ms)
let result = waiter.should_cutover(Duration::from_millis(100)).await;
assert_eq!(result, None);
assert!(matches!(result, CutoverAction::NoGo { .. }));
}

#[tokio::test]
Expand All @@ -622,7 +657,7 @@ mod tests {

// Elapsed is 999ms, timeout is 1000ms - should not trigger timeout
let result = waiter.should_cutover(Duration::from_millis(999)).await;
assert_eq!(result, None);
assert!(matches!(result, CutoverAction::NoGo { .. }));
}

#[tokio::test]
Expand All @@ -646,6 +681,6 @@ mod tests {
let waiter = ReplicationWaiter::new_test(orchestrator, config);

let result = waiter.should_cutover(Duration::from_millis(100)).await;
assert_eq!(result, None);
assert!(matches!(result, CutoverAction::NoGo { .. }));
}
}
Loading