diff --git a/.schema/users.schema.json b/.schema/users.schema.json index 8d26975e7..cb28d6096 100644 --- a/.schema/users.schema.json +++ b/.schema/users.schema.json @@ -68,8 +68,23 @@ } ] }, + "ServerAuth": { + "description": "Backend authentication mode used by PgDog for server connections.", + "oneOf": [ + { + "description": "Use configured static password.", + "type": "string", + "const": "password" + }, + { + "description": "Generate an AWS RDS IAM auth token per connection attempt.", + "type": "string", + "const": "rds_iam" + } + ] + }, "User": { - "description": "A user entry in `users.toml`, controlling which users are allowed to connect to PgDog.\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/", + "description": "User allowed to connect to pgDog.\nA user entry in `users.toml`, controlling which users are allowed to connect to PgDog.\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/", "type": "object", "properties": { "all_databases": { @@ -170,6 +185,18 @@ "type": "boolean", "default": false }, + "server_auth": { + "description": "Backend auth mode for server connections.", + "$ref": "#/$defs/ServerAuth", + "default": "password" + }, + "server_iam_region": { + "description": "Optional region override for RDS IAM token generation.", + "type": [ + "string", + "null" + ] + }, "server_lifetime": { "description": "Server connections older than this (in milliseconds) will be closed when returned to the pool.", "type": [ @@ -194,7 +221,7 @@ ] }, "statement_timeout": { - "description": "Sets the `statement_timeout` on all server connections at connection creation. This allows you to set a reasonable default for each user without modifying `postgresql.conf` or using `ALTER USER`.\n\n**Note:** Nothing is preventing the user from manually changing this setting at runtime, e.g., by running `SET statement_timeout TO 0`;\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#statement_timeout", + "description": "Statement timeout.\n\nSets the `statement_timeout` on all server connections at connection creation. This allows you to set a reasonable default for each user without modifying `postgresql.conf` or using `ALTER USER`.\n\n**Note:** Nothing is preventing the user from manually changing this setting at runtime, e.g., by running `SET statement_timeout TO 0`;\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#statement_timeout", "type": [ "integer", "null" diff --git a/integration/go/go_pq/go_pq_test.go b/integration/go/go_pq/go_pq_test.go index 9744dc153..a9a178d70 100644 --- a/integration/go/go_pq/go_pq_test.go +++ b/integration/go/go_pq/go_pq_test.go @@ -127,3 +127,65 @@ func TestPqCrud(t *testing.T) { } } } + +func TestAdvisoryLockWithTransaction(t *testing.T) { + adminConn, err := sql.Open("postgres", "postgres://admin:pgdog@127.0.0.1:6432/admin?sslmode=disable") + assert.Nil(t, err) + defer adminConn.Close() + + _, err = adminConn.Exec("SET prepared_statements TO 'extended_anonymous'") + assert.Nil(t, err) + + _, err = adminConn.Exec("SET query_parser TO 'on'") + assert.Nil(t, err) + + conn, err := sql.Open("postgres", "postgres://pgdog:pgdog@127.0.0.1:6432/pgdog?sslmode=disable") + assert.Nil(t, err) + defer conn.Close() + + conn.SetMaxOpenConns(1) + + lockKey := rand.Int63() + + // 1. Take an advisory lock + var lockResult bool + err = conn.QueryRow("SELECT pg_advisory_lock($1) IS NOT NULL", lockKey).Scan(&lockResult) + assert.Nil(t, err) + assert.True(t, lockResult) + + // 2. Start a transaction + tx, err := conn.Begin() + assert.Nil(t, err) + + // 3. Run a couple queries inside the transaction + var one int + err = tx.QueryRow("SELECT 1").Scan(&one) + assert.Nil(t, err) + assert.Equal(t, 1, one) + + var two int + err = tx.QueryRow("SELECT 2").Scan(&two) + assert.Nil(t, err) + assert.Equal(t, 2, two) + + // 4. Commit the transaction + err = tx.Commit() + assert.Nil(t, err) + + // 5. Run some queries outside of the transaction + var three int + err = conn.QueryRow("SELECT 3").Scan(&three) + assert.Nil(t, err) + assert.Equal(t, 3, three) + + var four int + err = conn.QueryRow("SELECT 4").Scan(&four) + assert.Nil(t, err) + assert.Equal(t, 4, four) + + // 6. Release the advisory lock + var unlockResult bool + err = conn.QueryRow("SELECT pg_advisory_unlock($1)", lockKey).Scan(&unlockResult) + assert.Nil(t, err) + assert.True(t, unlockResult) +} diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index 3933d7f4b..58ed98ed0 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -384,6 +384,12 @@ impl Connection { } } + /// Check if this connection is locked to a client. + #[cfg(test)] + pub(crate) fn locked(&self) -> bool { + self.locked + } + /// Get connected servers addresses. pub(crate) fn addr(&self) -> Result, Error> { Ok(match self.binding { diff --git a/pgdog/src/frontend/client/query_engine/connect.rs b/pgdog/src/frontend/client/query_engine/connect.rs index 240fdbbf5..6e8db3879 100644 --- a/pgdog/src/frontend/client/query_engine/connect.rs +++ b/pgdog/src/frontend/client/query_engine/connect.rs @@ -45,8 +45,9 @@ impl QueryEngine { // // Used in case the client runs an advisory lock // or another leaky transaction mode abstraction. - self.backend - .lock(context.client_request.route().is_lock_session()); + if let Some(true) = context.client_request.route().lock_session() { + self.backend.lock(true); + } self.debug_connected(context, false); diff --git a/pgdog/src/frontend/client/query_engine/start_transaction.rs b/pgdog/src/frontend/client/query_engine/start_transaction.rs index bce40fa1e..f525466d8 100644 --- a/pgdog/src/frontend/client/query_engine/start_transaction.rs +++ b/pgdog/src/frontend/client/query_engine/start_transaction.rs @@ -19,23 +19,27 @@ impl QueryEngine { ) -> Result<(), Error> { context.transaction = Some(transaction_type); - let bytes_sent = if extended { - self.extended_transaction_reply(context, true, false) - .await? + if self.backend.connected() { + self.execute(context).await?; } else { - context - .stream - .send_many(&[ - CommandComplete::new_begin() - .message()? - .backend(BackendKeyData::default()), - ReadyForQuery::in_transaction(context.in_transaction()).message()?, - ]) - .await? - }; + let bytes_sent = if extended { + self.extended_transaction_reply(context, true, false) + .await? + } else { + context + .stream + .send_many(&[ + CommandComplete::new_begin() + .message()? + .backend(BackendKeyData::default()), + ReadyForQuery::in_transaction(context.in_transaction()).message()?, + ]) + .await? + }; - self.stats.sent(bytes_sent); - self.begin_stmt = Some(begin); + self.stats.sent(bytes_sent); + self.begin_stmt = Some(begin); + } Ok(()) } diff --git a/pgdog/src/frontend/client/query_engine/test/lock_session.rs b/pgdog/src/frontend/client/query_engine/test/lock_session.rs new file mode 100644 index 000000000..3ba8131a6 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/test/lock_session.rs @@ -0,0 +1,50 @@ +use super::prelude::*; + +#[tokio::test] +async fn test_lock_session_advisory_lock() { + let mut test_client = TestClient::new_sharded(Parameters::default()).await; + + // Regular query should not lock the backend + test_client.send_simple(Query::new("SELECT 1")).await; + test_client.read_until('Z').await.unwrap(); + + assert!(!test_client.backend_locked()); + + // Advisory lock should lock the backend + test_client + .send_simple(Query::new("SELECT pg_advisory_lock(12345)")) + .await; + test_client.read_until('Z').await.unwrap(); + + assert!(test_client.backend_locked()); +} + +#[tokio::test] +async fn test_regular_query_not_locked() { + let mut test_client = TestClient::new_sharded(Parameters::default()).await; + + test_client.send_simple(Query::new("SELECT 1")).await; + test_client.read_until('Z').await.unwrap(); + + assert!(!test_client.backend_locked()); +} + +#[tokio::test] +async fn test_transaction_not_locked() { + let mut test_client = TestClient::new_sharded(Parameters::default()).await; + + test_client.send_simple(Query::new("BEGIN")).await; + test_client.read_until('Z').await.unwrap(); + + assert!(!test_client.backend_locked()); + + test_client.send_simple(Query::new("SELECT 1")).await; + test_client.read_until('Z').await.unwrap(); + + assert!(!test_client.backend_locked()); + + test_client.send_simple(Query::new("COMMIT")).await; + test_client.read_until('Z').await.unwrap(); + + assert!(!test_client.backend_locked()); +} diff --git a/pgdog/src/frontend/client/query_engine/test/mod.rs b/pgdog/src/frontend/client/query_engine/test/mod.rs index de93b592b..089b7147f 100644 --- a/pgdog/src/frontend/client/query_engine/test/mod.rs +++ b/pgdog/src/frontend/client/query_engine/test/mod.rs @@ -7,6 +7,7 @@ use crate::{ net::{Parameters, Stream}, }; +mod lock_session; mod omni; pub mod prelude; mod rewrite_extended; diff --git a/pgdog/src/frontend/client/query_engine/test/set.rs b/pgdog/src/frontend/client/query_engine/test/set.rs index 84765e5c7..26a4cfeb0 100644 --- a/pgdog/src/frontend/client/query_engine/test/set.rs +++ b/pgdog/src/frontend/client/query_engine/test/set.rs @@ -26,6 +26,8 @@ async fn test_set() { test_client.client().params.get("application_name").unwrap(), &ParameterValue::String("test_set".into()), ); + + assert!(!test_client.backend_locked()); } #[tokio::test] @@ -68,6 +70,8 @@ async fn test_set_inside_transaction() { 'T' ); + assert!(!test_client.backend_locked()); + test_client .send_simple(Query::new("SET search_path TO acustomer, public")) .await; @@ -96,6 +100,8 @@ async fn test_set_inside_transaction() { test_client.client().params.get("search_path").unwrap(), &ParameterValue::Tuple(vec!["acustomer".into(), "public".into()]), ); + + assert!(!test_client.backend_locked()); } #[tokio::test] diff --git a/pgdog/src/frontend/client/test/test_client.rs b/pgdog/src/frontend/client/test/test_client.rs index fd912e914..9a360da0c 100644 --- a/pgdog/src/frontend/client/test/test_client.rs +++ b/pgdog/src/frontend/client/test/test_client.rs @@ -199,6 +199,11 @@ impl TestClient { self.engine.backend().connected() } + /// Check if the backend is locked to this client. + pub(crate) fn backend_locked(&mut self) -> bool { + self.engine.backend().locked() + } + /// Generate a random ID for a given shard. pub(crate) fn random_id_for_shard(&mut self, shard: usize) -> i64 { let cluster = self.engine.backend().cluster().unwrap().clone(); diff --git a/pgdog/src/frontend/router/parser/function.rs b/pgdog/src/frontend/router/parser/function.rs index 9e9710600..56cdb169c 100644 --- a/pgdog/src/frontend/router/parser/function.rs +++ b/pgdog/src/frontend/router/parser/function.rs @@ -14,6 +14,7 @@ static WRITE_ONLY: Lazy> = Lazy::new(|| { ("pg_try_advisory_lock_shared", LockingBehavior::Lock), ("pg_try_advisory_xact_lock_shared", LockingBehavior::None), ("pg_advisory_unlock_all", LockingBehavior::Unlock), + ("pg_advisory_unlock", LockingBehavior::Unlock), // TODO: we don't track multiple advisory locks. ("nextval", LockingBehavior::None), ("setval", LockingBehavior::None), ]) @@ -92,6 +93,12 @@ impl<'a> TryFrom<&'a Node> for Function<'a> { } } + Some(NodeEnum::NullTest(test)) => { + if let Some(node) = test.arg.as_ref() { + return Self::try_from(node.as_ref()); + } + } + _ => (), } diff --git a/pgdog/src/frontend/router/parser/query/test/test_functions.rs b/pgdog/src/frontend/router/parser/query/test/test_functions.rs index 0b37fab37..3823e50b8 100644 --- a/pgdog/src/frontend/router/parser/query/test/test_functions.rs +++ b/pgdog/src/frontend/router/parser/query/test/test_functions.rs @@ -10,6 +10,18 @@ fn test_write_function_advisory_lock() { assert!(command.route().is_lock_session()); } +#[test] +fn test_write_functions_prepared() { + let mut test = QueryParserTest::new(); + let command = test.execute(vec![Parse::named( + "test", + "SELECT pg_advisory_lock($1) IS NOT NULL", + ) + .into()]); + assert!(command.route().is_write()); + assert!(command.route().is_lock_session()); +} + #[test] fn test_write_function_nextval() { let mut test = QueryParserTest::new(); diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index ed717bf8d..4ff723fa6 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -81,7 +81,7 @@ pub struct Route { order_by: Vec, aggregate: Aggregate, limit: Limit, - lock_session: bool, + lock_session: Option, distinct: Option, maintenance: bool, rewrite_plan: AggregateRewritePlan, @@ -284,13 +284,25 @@ impl Route { locking_behavior, } = write; self.read = !writes; - self.lock_session = matches!(locking_behavior, LockingBehavior::Lock); + self.lock_session = match locking_behavior { + LockingBehavior::Lock => Some(true), + LockingBehavior::Unlock => Some(false), + LockingBehavior::None => None, + }; } pub fn is_lock_session(&self) -> bool { + self.lock_session == Some(true) + } + + pub fn lock_session(&self) -> Option { self.lock_session } + pub fn is_unlock_session(&self) -> bool { + self.lock_session == Some(false) + } + pub fn distinct(&self) -> &Option { &self.distinct }