diff --git a/integration/rust/tests/integration/auth.rs b/integration/rust/tests/integration/auth.rs index 8556137e8..c2edb74b7 100644 --- a/integration/rust/tests/integration/auth.rs +++ b/integration/rust/tests/integration/auth.rs @@ -91,3 +91,34 @@ async fn test_passthrough_auth() { user.execute("SELECT 1").await.unwrap(); original.execute("SELECT 1").await.unwrap(); } + +#[tokio::test] +#[serial] +async fn test_user_without_password_passthrough_auth() { + let admin = admin_sqlx().await; + + admin.execute("RELOAD").await.unwrap(); + admin.execute("SET auth_type TO 'scram'").await.unwrap(); + assert_setting_str("auth_type", "scram").await; + + let user = "postgres://pgdog2:pgdog@127.0.0.1:6432/pgdog"; + + let no_password_err = PgConnection::connect(user).await.err().unwrap(); + + assert!( + no_password_err + .to_string() + .contains("password for user \"pgdog2\" and database \"pgdog\" is wrong") + ); + + admin + .execute("SET passthrough_auth TO 'enabled_plain'") + .await + .unwrap(); + assert_setting_str("passthrough_auth", "enabled_plain").await; + + let mut user = PgConnection::connect(user).await.unwrap(); + + user.execute("SELECT 1").await.unwrap(); + user.close().await.unwrap(); +} diff --git a/integration/users.toml b/integration/users.toml index 87273f7c6..4f818edc2 100644 --- a/integration/users.toml +++ b/integration/users.toml @@ -3,6 +3,10 @@ name = "pgdog" database = "pgdog" password = "pgdog" +[[users]] +name = "pgdog2" +database = "pgdog" + [[users]] name = "pgdog_session" database = "pgdog" diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index 7607ea975..0576f149a 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -20,7 +20,7 @@ use crate::frontend::router::sharding::Mapping; use crate::frontend::PreparedStatements; use crate::{ backend::pool::PoolConfig, - config::{config, load, ConfigAndUsers, ManualQuery, Role}, + config::{config, load, AuthType, ConfigAndUsers, ManualQuery, Role}, net::{messages::BackendKeyData, tls}, }; @@ -307,11 +307,24 @@ impl Databases { } /// Check if a cluster exists, quickly. - pub fn exists(&self, user: impl ToUser) -> bool { - if let Some(cluster) = self.databases.get(&user.to_user()) { - !cluster.password().is_empty() + pub(crate) fn exists(&self, user: impl ToUser) -> bool { + self.databases.contains_key(&user.to_user()) + } + + /// Check if a cluster exists, and has a non-empty password. + pub(crate) fn has_password(&self, user: impl ToUser) -> bool { + self.databases + .get(&user.to_user()) + .is_some_and(|cluster| !cluster.password().is_empty()) + } + + /// Check if backend authentication can work for this user. + pub fn is_backend_auth_ready(&self, user: impl ToUser, authtype: &AuthType) -> bool { + // Trust auth doesn't need a password, so the cluster merely has to exist. + if authtype.trust() { + self.exists(user) } else { - false + self.has_password(user) } } @@ -533,12 +546,21 @@ fn new_pool(user: &crate::config::User, config: &crate::config::Config) -> Optio &config.rewrite, ); + let cluster = Cluster::new(cluster_config); + + if config.general.passthrough_auth() + && user.password().is_empty() + && !config.general.auth_type.trust() + { + cluster.pause(); + } + Some(( User { user: user.name.clone(), database: user.database.clone(), }, - Cluster::new(cluster_config), + cluster, )) } @@ -690,7 +712,7 @@ pub fn from_config(config: &ConfigAndUsers) -> Databases { #[cfg(test)] mod tests { use super::*; - use crate::config::{Config, ConfigAndUsers, Database, Role}; + use crate::config::{AuthType, Config, ConfigAndUsers, Database, PassthoughAuth, Role}; #[test] fn test_mirror_user_isolation() { @@ -1738,4 +1760,191 @@ password = "testpass" assert_eq!(new_users.users[0].name, "testuser"); assert_eq!(new_users.users[0].database, "destination_db"); } + + #[test] + fn test_passthrough_empty_password_starts_paused() { + let mut config = Config::default(); + config.general.passthrough_auth = PassthoughAuth::EnabledPlain; + config.databases = vec![Database { + name: "pgdog".to_string(), + host: "localhost".to_string(), + port: 5432, + role: Role::Primary, + ..Default::default() + }]; + + let users = crate::config::Users { + users: vec![crate::config::User { + name: "pgdog".to_string(), + database: "pgdog".to_string(), + password: None, + ..Default::default() + }], + ..Default::default() + }; + + let databases = from_config(&ConfigAndUsers { + config, + users, + config_path: std::path::PathBuf::new(), + users_path: std::path::PathBuf::new(), + }); + + let key = User { + user: "pgdog".to_string(), + database: "pgdog".to_string(), + }; + + let cluster = databases.all().get(&key).expect("cluster should exist"); + + for shard in cluster.shards() { + for pool in shard.pools() { + assert!(pool.state().paused); + } + } + } + + #[test] + fn test_passthrough_user_with_password_unpaused() { + let mut config = Config::default(); + config.general.passthrough_auth = PassthoughAuth::EnabledPlain; + config.databases = vec![Database { + name: "pgdog".to_string(), + host: "localhost".to_string(), + port: 5432, + role: Role::Primary, + ..Default::default() + }]; + + let users = crate::config::Users { + users: vec![crate::config::User { + name: "pgdog".to_string(), + database: "pgdog".to_string(), + password: Some("pgdog".to_string()), + ..Default::default() + }], + ..Default::default() + }; + + let databases = from_config(&ConfigAndUsers { + config, + users, + config_path: std::path::PathBuf::new(), + users_path: std::path::PathBuf::new(), + }); + + let key = User { + user: "pgdog".to_string(), + database: "pgdog".to_string(), + }; + + let cluster = databases.all().get(&key).expect("cluster should exist"); + + for shard in cluster.shards() { + for pool in shard.pools() { + assert!(!pool.state().paused); + } + } + } + + #[test] + fn test_passthrough_empty_password_trust_starts_unpaused() { + let mut config = Config::default(); + config.general.passthrough_auth = PassthoughAuth::EnabledPlain; + config.general.auth_type = AuthType::Trust; + config.databases = vec![Database { + name: "pgdog".to_string(), + host: "localhost".to_string(), + port: 5432, + role: Role::Primary, + ..Default::default() + }]; + + let users = crate::config::Users { + users: vec![crate::config::User { + name: "pgdog".to_string(), + database: "pgdog".to_string(), + password: None, + ..Default::default() + }], + ..Default::default() + }; + + let databases = from_config(&ConfigAndUsers { + config, + users, + config_path: std::path::PathBuf::new(), + users_path: std::path::PathBuf::new(), + }); + + let key = User { + user: "pgdog".to_string(), + database: "pgdog".to_string(), + }; + let cluster = databases.all().get(&key).expect("cluster should exist"); + + for shard in cluster.shards() { + for pool in shard.pools() { + assert!(!pool.state().paused); + } + } + } + + #[test] + fn test_replace_empty_password_cluster_with_passthrough_password() { + let mut config = Config::default(); + config.general.passthrough_auth = PassthoughAuth::EnabledPlain; + config.databases = vec![Database { + name: "pgdog".to_string(), + host: "localhost".to_string(), + port: 5432, + role: Role::Primary, + ..Default::default() + }]; + + let users = crate::config::Users { + users: vec![crate::config::User { + name: "pgdog".to_string(), + database: "pgdog".to_string(), + password: None, + ..Default::default() + }], + ..Default::default() + }; + + let databases = from_config(&ConfigAndUsers { + config: config.clone(), + users, + config_path: std::path::PathBuf::new(), + users_path: std::path::PathBuf::new(), + }); + + let passthrough_user = crate::config::User { + name: "pgdog".to_string(), + database: "pgdog".to_string(), + password: Some("secret".to_string()), + ..Default::default() + }; + + let (user, cluster) = new_pool(&passthrough_user, &config).expect("cluster should exist"); + let (added, databases) = databases.add(user, cluster); + + assert!(added); + assert!(databases.has_password(("pgdog", "pgdog"))); + + let key = User { + user: "pgdog".to_string(), + database: "pgdog".to_string(), + }; + + let cluster = databases.all().get(&key).expect("cluster should exist"); + + assert_eq!(cluster.password(), "secret"); + + for shard in cluster.shards() { + for pool in shard.pools() { + assert!(!pool.state().paused); + } + } + } } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index f86632e75..9b267048f 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -571,6 +571,16 @@ impl Cluster { self.readiness.online.store(true, Ordering::Relaxed); } + /// Pause all pools in this cluster. + pub fn pause(&self) { + self.shards().iter().for_each(|shard| shard.pause()) + } + + /// Resume all pools in this cluster. + pub fn resume(&self) { + self.shards().iter().for_each(|shard| shard.resume()) + } + /// Shutdown the connection pools. pub(crate) fn shutdown(&self) { for shard in self.shards() { @@ -837,6 +847,26 @@ mod test { assert!(!cluster.online()); } + #[test] + fn test_pause_resume_toggles_all_pools() { + let config = ConfigAndUsers::default(); + let cluster = Cluster::new_test(&config); + + cluster.pause(); + for shard in cluster.shards() { + for pool in shard.pools() { + assert!(pool.state().paused); + } + } + + cluster.resume(); + for shard in cluster.shards() { + for pool in shard.pools() { + assert!(!pool.state().paused); + } + } + } + #[tokio::test] async fn test_launch_schema_loading_idempotent() { use std::sync::atomic::Ordering; diff --git a/pgdog/src/backend/pool/connection/mirror/mod.rs b/pgdog/src/backend/pool/connection/mirror/mod.rs index a162bbe1a..4811ef04b 100644 --- a/pgdog/src/backend/pool/connection/mirror/mod.rs +++ b/pgdog/src/backend/pool/connection/mirror/mod.rs @@ -251,14 +251,14 @@ mod test { 3, "mirror buffer should have 3 requests" ); - sleep(Duration::from_millis(50)).await; + sleep(Duration::from_millis(100)).await; // Nothing happens until we flush. assert!( conn.execute("DROP TABLE pgdog.test_mirror").await.is_err(), "table pgdog.test_mirror shouldn't exist yet" ); assert!(mirror.flush(), "mirror didn't flush"); - sleep(Duration::from_millis(50)).await; + sleep(Duration::from_millis(100)).await; assert!( conn.execute("DROP TABLE pgdog.test_mirror").await.is_ok(), "pgdog.test_mirror should exist" diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index 3933d7f4b..0642b4e09 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -324,8 +324,11 @@ impl Connection { match self.binding { Binding::Direct(_) | Binding::MultiShard(_, _) => { let user = (self.user.as_str(), self.database.as_str()); + let config = config(); // Check passthrough auth. - if config().config.general.passthrough_auth() && !databases().exists(user) { + if config.config.general.passthrough_auth() + && !databases().is_backend_auth_ready(user, &config.config.general.auth_type) + { if let Some(ref passthrough_password) = self.passthrough_password { let new_user = User::new(&self.user, passthrough_password, &self.database); databases::add(new_user); @@ -359,6 +362,13 @@ impl Connection { Ok(()) } + /// Resume pools for the currently bound cluster. + pub(crate) fn resume_cluster_pools(&self) { + if let Some(cluster) = &self.cluster { + cluster.resume(); + } + } + pub(crate) fn bind(&mut self, bind: &Bind) -> Result<(), Error> { match self.binding { Binding::MultiShard(_, ref mut state) => { diff --git a/pgdog/src/backend/pool/lb/mod.rs b/pgdog/src/backend/pool/lb/mod.rs index 784f192e7..d3d7fc0ef 100644 --- a/pgdog/src/backend/pool/lb/mod.rs +++ b/pgdog/src/backend/pool/lb/mod.rs @@ -193,6 +193,16 @@ impl LoadBalancer { Monitor::spawn(self); } + /// Pause all target pools. + pub fn pause(&self) { + self.targets.iter().for_each(|target| target.pool.pause()); + } + + /// Resume all target pools. + pub fn resume(&self) { + self.targets.iter().for_each(|target| target.pool.resume()); + } + /// Get a live connection from the pool. pub async fn get(&self, request: &Request) -> Result { match timeout(self.checkout_timeout, self.get_internal(request)).await { diff --git a/pgdog/src/backend/pool/shard/mod.rs b/pgdog/src/backend/pool/shard/mod.rs index 79fc7eee1..d1090507a 100644 --- a/pgdog/src/backend/pool/shard/mod.rs +++ b/pgdog/src/backend/pool/shard/mod.rs @@ -148,6 +148,16 @@ impl Shard { } } + /// Pause every pool in this shard. + pub fn pause(&self) { + self.lb.pause(); + } + + /// Resume every pool in this shard. + pub fn resume(&self) { + self.lb.resume(); + } + /// Returns true if the shard has a primary database. pub fn has_primary(&self) -> bool { self.lb.primary().is_some() diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index b868a6c1d..3b3c320da 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -156,7 +156,6 @@ impl Client { let comms = ClientComms::new(&id); // Auto database. - let exists = databases::databases().exists((user, database)); let passthrough_password = if config.config.general.passthrough_auth() && !admin { let password = if auth_type.trust() { // Use empty password. @@ -172,7 +171,7 @@ impl Client { Password::from_bytes(password.to_bytes()?)? }; - if !exists { + if !databases::databases().is_backend_auth_ready((user, database), auth_type) { let user = user_from_params(¶ms, &password).ok(); if let Some(user) = user { databases::add(user); @@ -251,6 +250,10 @@ impl Client { stream.send(&Authentication::Ok).await?; } + if passthrough_password.is_some() && conn.cluster()?.password().is_empty() { + conn.resume_cluster_pools(); + } + // Check if the pooler is shutting down. if comms.offline() && !admin { stream.fatal(ErrorResponse::shutting_down()).await?;