Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions .schema/users.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,23 @@
}
]
},
"ServerAuth": {
"description": "Backend authentication mode used by PgDog for server connections.",
"oneOf": [
{
"description": "Use configured static password.",
"type": "string",
"const": "password"
},
{
"description": "Generate an AWS RDS IAM auth token per connection attempt.",
"type": "string",
"const": "rds_iam"
}
]
},
"User": {
"description": "A user entry in `users.toml`, controlling which users are allowed to connect to PgDog.\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/",
"description": "User allowed to connect to pgDog.\nA user entry in `users.toml`, controlling which users are allowed to connect to PgDog.\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/",
"type": "object",
"properties": {
"all_databases": {
Expand Down Expand Up @@ -170,6 +185,18 @@
"type": "boolean",
"default": false
},
"server_auth": {
"description": "Backend auth mode for server connections.",
"$ref": "#/$defs/ServerAuth",
"default": "password"
},
"server_iam_region": {
"description": "Optional region override for RDS IAM token generation.",
"type": [
"string",
"null"
]
},
"server_lifetime": {
"description": "Server connections older than this (in milliseconds) will be closed when returned to the pool.",
"type": [
Expand All @@ -194,7 +221,7 @@
]
},
"statement_timeout": {
"description": "Sets the `statement_timeout` on all server connections at connection creation. This allows you to set a reasonable default for each user without modifying `postgresql.conf` or using `ALTER USER`.\n\n**Note:** Nothing is preventing the user from manually changing this setting at runtime, e.g., by running `SET statement_timeout TO 0`;\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#statement_timeout",
"description": "Statement timeout.\n\nSets the `statement_timeout` on all server connections at connection creation. This allows you to set a reasonable default for each user without modifying `postgresql.conf` or using `ALTER USER`.\n\n**Note:** Nothing is preventing the user from manually changing this setting at runtime, e.g., by running `SET statement_timeout TO 0`;\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#statement_timeout",
"type": [
"integer",
"null"
Expand Down
62 changes: 62 additions & 0 deletions integration/go/go_pq/go_pq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,65 @@ func TestPqCrud(t *testing.T) {
}
}
}

func TestAdvisoryLockWithTransaction(t *testing.T) {
adminConn, err := sql.Open("postgres", "postgres://admin:pgdog@127.0.0.1:6432/admin?sslmode=disable")
assert.Nil(t, err)
defer adminConn.Close()

_, err = adminConn.Exec("SET prepared_statements TO 'extended_anonymous'")
assert.Nil(t, err)

_, err = adminConn.Exec("SET query_parser TO 'on'")
assert.Nil(t, err)

conn, err := sql.Open("postgres", "postgres://pgdog:pgdog@127.0.0.1:6432/pgdog?sslmode=disable")
assert.Nil(t, err)
defer conn.Close()

conn.SetMaxOpenConns(1)

lockKey := rand.Int63()

// 1. Take an advisory lock
var lockResult bool
err = conn.QueryRow("SELECT pg_advisory_lock($1) IS NOT NULL", lockKey).Scan(&lockResult)
assert.Nil(t, err)
assert.True(t, lockResult)

// 2. Start a transaction
tx, err := conn.Begin()
assert.Nil(t, err)

// 3. Run a couple queries inside the transaction
var one int
err = tx.QueryRow("SELECT 1").Scan(&one)
assert.Nil(t, err)
assert.Equal(t, 1, one)

var two int
err = tx.QueryRow("SELECT 2").Scan(&two)
assert.Nil(t, err)
assert.Equal(t, 2, two)

// 4. Commit the transaction
err = tx.Commit()
assert.Nil(t, err)

// 5. Run some queries outside of the transaction
var three int
err = conn.QueryRow("SELECT 3").Scan(&three)
assert.Nil(t, err)
assert.Equal(t, 3, three)

var four int
err = conn.QueryRow("SELECT 4").Scan(&four)
assert.Nil(t, err)
assert.Equal(t, 4, four)

// 6. Release the advisory lock
var unlockResult bool
err = conn.QueryRow("SELECT pg_advisory_unlock($1)", lockKey).Scan(&unlockResult)
assert.Nil(t, err)
assert.True(t, unlockResult)
}
6 changes: 6 additions & 0 deletions pgdog/src/backend/pool/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ impl Connection {
}
}

/// Check if this connection is locked to a client.
#[cfg(test)]
pub(crate) fn locked(&self) -> bool {
self.locked
}

/// Get connected servers addresses.
pub(crate) fn addr(&self) -> Result<Vec<&Address>, Error> {
Ok(match self.binding {
Expand Down
5 changes: 3 additions & 2 deletions pgdog/src/frontend/client/query_engine/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ impl QueryEngine {
//
// Used in case the client runs an advisory lock
// or another leaky transaction mode abstraction.
self.backend
.lock(context.client_request.route().is_lock_session());
if let Some(true) = context.client_request.route().lock_session() {
self.backend.lock(true);
}

self.debug_connected(context, false);

Expand Down
34 changes: 19 additions & 15 deletions pgdog/src/frontend/client/query_engine/start_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@ impl QueryEngine {
) -> Result<(), Error> {
context.transaction = Some(transaction_type);

let bytes_sent = if extended {
self.extended_transaction_reply(context, true, false)
.await?
if self.backend.connected() {
self.execute(context).await?;
} else {
context
.stream
.send_many(&[
CommandComplete::new_begin()
.message()?
.backend(BackendKeyData::default()),
ReadyForQuery::in_transaction(context.in_transaction()).message()?,
])
.await?
};
let bytes_sent = if extended {
self.extended_transaction_reply(context, true, false)
.await?
} else {
context
.stream
.send_many(&[
CommandComplete::new_begin()
.message()?
.backend(BackendKeyData::default()),
ReadyForQuery::in_transaction(context.in_transaction()).message()?,
])
.await?
};

self.stats.sent(bytes_sent);
self.begin_stmt = Some(begin);
self.stats.sent(bytes_sent);
self.begin_stmt = Some(begin);
}

Ok(())
}
Expand Down
50 changes: 50 additions & 0 deletions pgdog/src/frontend/client/query_engine/test/lock_session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use super::prelude::*;

#[tokio::test]
async fn test_lock_session_advisory_lock() {
let mut test_client = TestClient::new_sharded(Parameters::default()).await;

// Regular query should not lock the backend
test_client.send_simple(Query::new("SELECT 1")).await;
test_client.read_until('Z').await.unwrap();

assert!(!test_client.backend_locked());

// Advisory lock should lock the backend
test_client
.send_simple(Query::new("SELECT pg_advisory_lock(12345)"))
.await;
test_client.read_until('Z').await.unwrap();

assert!(test_client.backend_locked());
}

#[tokio::test]
async fn test_regular_query_not_locked() {
let mut test_client = TestClient::new_sharded(Parameters::default()).await;

test_client.send_simple(Query::new("SELECT 1")).await;
test_client.read_until('Z').await.unwrap();

assert!(!test_client.backend_locked());
}

#[tokio::test]
async fn test_transaction_not_locked() {
let mut test_client = TestClient::new_sharded(Parameters::default()).await;

test_client.send_simple(Query::new("BEGIN")).await;
test_client.read_until('Z').await.unwrap();

assert!(!test_client.backend_locked());

test_client.send_simple(Query::new("SELECT 1")).await;
test_client.read_until('Z').await.unwrap();

assert!(!test_client.backend_locked());

test_client.send_simple(Query::new("COMMIT")).await;
test_client.read_until('Z').await.unwrap();

assert!(!test_client.backend_locked());
}
1 change: 1 addition & 0 deletions pgdog/src/frontend/client/query_engine/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
net::{Parameters, Stream},
};

mod lock_session;
mod omni;
pub mod prelude;
mod rewrite_extended;
Expand Down
6 changes: 6 additions & 0 deletions pgdog/src/frontend/client/query_engine/test/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ async fn test_set() {
test_client.client().params.get("application_name").unwrap(),
&ParameterValue::String("test_set".into()),
);

assert!(!test_client.backend_locked());
}

#[tokio::test]
Expand Down Expand Up @@ -68,6 +70,8 @@ async fn test_set_inside_transaction() {
'T'
);

assert!(!test_client.backend_locked());

test_client
.send_simple(Query::new("SET search_path TO acustomer, public"))
.await;
Expand Down Expand Up @@ -96,6 +100,8 @@ async fn test_set_inside_transaction() {
test_client.client().params.get("search_path").unwrap(),
&ParameterValue::Tuple(vec!["acustomer".into(), "public".into()]),
);

assert!(!test_client.backend_locked());
}

#[tokio::test]
Expand Down
5 changes: 5 additions & 0 deletions pgdog/src/frontend/client/test/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ impl TestClient {
self.engine.backend().connected()
}

/// Check if the backend is locked to this client.
pub(crate) fn backend_locked(&mut self) -> bool {
self.engine.backend().locked()
}

/// Generate a random ID for a given shard.
pub(crate) fn random_id_for_shard(&mut self, shard: usize) -> i64 {
let cluster = self.engine.backend().cluster().unwrap().clone();
Expand Down
7 changes: 7 additions & 0 deletions pgdog/src/frontend/router/parser/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ static WRITE_ONLY: Lazy<HashMap<&'static str, LockingBehavior>> = Lazy::new(|| {
("pg_try_advisory_lock_shared", LockingBehavior::Lock),
("pg_try_advisory_xact_lock_shared", LockingBehavior::None),
("pg_advisory_unlock_all", LockingBehavior::Unlock),
("pg_advisory_unlock", LockingBehavior::Unlock), // TODO: we don't track multiple advisory locks.
("nextval", LockingBehavior::None),
("setval", LockingBehavior::None),
])
Expand Down Expand Up @@ -92,6 +93,12 @@ impl<'a> TryFrom<&'a Node> for Function<'a> {
}
}

Some(NodeEnum::NullTest(test)) => {
if let Some(node) = test.arg.as_ref() {
return Self::try_from(node.as_ref());
}
}

_ => (),
}

Expand Down
12 changes: 12 additions & 0 deletions pgdog/src/frontend/router/parser/query/test/test_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ fn test_write_function_advisory_lock() {
assert!(command.route().is_lock_session());
}

#[test]
fn test_write_functions_prepared() {
let mut test = QueryParserTest::new();
let command = test.execute(vec![Parse::named(
"test",
"SELECT pg_advisory_lock($1) IS NOT NULL",
)
.into()]);
assert!(command.route().is_write());
assert!(command.route().is_lock_session());
}

#[test]
fn test_write_function_nextval() {
let mut test = QueryParserTest::new();
Expand Down
16 changes: 14 additions & 2 deletions pgdog/src/frontend/router/parser/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct Route {
order_by: Vec<OrderBy>,
aggregate: Aggregate,
limit: Limit,
lock_session: bool,
lock_session: Option<bool>,
distinct: Option<DistinctBy>,
maintenance: bool,
rewrite_plan: AggregateRewritePlan,
Expand Down Expand Up @@ -284,13 +284,25 @@ impl Route {
locking_behavior,
} = write;
self.read = !writes;
self.lock_session = matches!(locking_behavior, LockingBehavior::Lock);
self.lock_session = match locking_behavior {
LockingBehavior::Lock => Some(true),
LockingBehavior::Unlock => Some(false),
LockingBehavior::None => None,
};
}

pub fn is_lock_session(&self) -> bool {
self.lock_session == Some(true)
}

pub fn lock_session(&self) -> Option<bool> {
self.lock_session
}

pub fn is_unlock_session(&self) -> bool {
self.lock_session == Some(false)
}

pub fn distinct(&self) -> &Option<DistinctBy> {
&self.distinct
}
Expand Down
Loading