diff --git a/integration/setup.sh b/integration/setup.sh old mode 100644 new mode 100755 diff --git a/integration/wildcard/pgdog.toml b/integration/wildcard/pgdog.toml new file mode 100644 index 000000000..b2c9e55c2 --- /dev/null +++ b/integration/wildcard/pgdog.toml @@ -0,0 +1,33 @@ +# Wildcard integration test config. +# Only "pgdog" is explicitly configured. All other database names +# are handled by the wildcard "*" template pointing at the same +# Postgres instance. +# +# Uses passthrough auth: pgdog forwards the client's credentials +# to Postgres for verification. This lets wildcard users connect +# with their own Postgres username and password. + +[general] +host = "0.0.0.0" +port = 6432 +query_timeout = 5_000 +checkout_timeout = 5_000 +connect_timeout = 5_000 +idle_timeout = 30_000 +min_pool_size = 0 +default_pool_size = 5 +pooler_mode = "transaction" +passthrough_auth = "enabled_plain" + +# Explicit database — always available. +[[databases]] +name = "pgdog" +host = "127.0.0.1" +port = 5432 + +# Wildcard template — any database name not explicitly listed +# will spawn a pool using these connection settings. +[[databases]] +name = "*" +host = "127.0.0.1" +port = 5432 diff --git a/integration/wildcard/run.sh b/integration/wildcard/run.sh new file mode 100644 index 000000000..fceeacf00 --- /dev/null +++ b/integration/wildcard/run.sh @@ -0,0 +1,15 @@ +#!/bin/bash +set -euo pipefail +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source ${SCRIPT_DIR}/../common.sh + +run_pgdog ${SCRIPT_DIR} +wait_for_pgdog + +pushd ${SCRIPT_DIR} + +python3 test_wildcard.py + +popd + +stop_pgdog diff --git a/integration/wildcard/test_wildcard.py b/integration/wildcard/test_wildcard.py new file mode 100644 index 000000000..08a40e9bd --- /dev/null +++ b/integration/wildcard/test_wildcard.py @@ -0,0 +1,244 @@ +""" +Integration test for wildcard database routing with passthrough auth. + +Tests that pgdog dynamically creates pools when a client connects with +a (user, database) pair not explicitly listed in the config — using +the wildcard "*" template — and forwards the client's actual +credentials to Postgres for verification. + +Setup (run before this test): + CREATE USER wildcard_tester WITH PASSWORD 'Xk9mP2vLq7w'; + CREATE DATABASE wildcard_test_db OWNER wildcard_tester; + -- plus a table: + CREATE TABLE items (id serial PRIMARY KEY, name text NOT NULL); + INSERT INTO items (name) VALUES ('alpha'), ('beta'), ('gamma'); +""" + +import psycopg +import sys + +PGDOG_HOST = "127.0.0.1" +PGDOG_PORT = 6432 + +# Existing user configured explicitly in users.toml. +EXPLICIT_USER = "pgdog" +EXPLICIT_PASS = "pgdog" + +# New user unknown to pgdog — only exists in Postgres. +WILDCARD_USER = "wildcard_tester" +WILDCARD_PASS = "Xk9mP2vLq7w" +WILDCARD_DB = "wildcard_test_db" + + +def connect(dbname, user, password): + return psycopg.connect( + host=PGDOG_HOST, + port=PGDOG_PORT, + dbname=dbname, + user=user, + password=password, + autocommit=True, + ) + + +# ------------------------------------------------------------------ # +# 1. Baseline: explicit pool still works +# ------------------------------------------------------------------ # + +def test_explicit_pool(): + """The explicit (pgdog, pgdog) pool works as before.""" + conn = connect("pgdog", EXPLICIT_USER, EXPLICIT_PASS) + cur = conn.cursor() + cur.execute("SELECT current_user, current_database()") + row = cur.fetchone() + assert row[0] == "pgdog", f"expected user pgdog, got {row[0]}" + assert row[1] == "pgdog", f"expected db pgdog, got {row[1]}" + conn.close() + print(" PASS explicit pool (pgdog/pgdog)") + + +# ------------------------------------------------------------------ # +# 2. Wildcard: known user (pgdog) → unknown database +# ------------------------------------------------------------------ # + +def test_known_user_wildcard_db(): + """User 'pgdog' connects to 'wildcard_test_db' — a database + pgdog doesn't know about. Passthrough auth forwards pgdog's + credentials to Postgres.""" + conn = connect(WILDCARD_DB, EXPLICIT_USER, EXPLICIT_PASS) + cur = conn.cursor() + cur.execute("SELECT current_database()") + db = cur.fetchone()[0] + assert db == WILDCARD_DB, f"expected {WILDCARD_DB}, got {db}" + conn.close() + print(f" PASS known user → wildcard db ({WILDCARD_DB})") + + +# ------------------------------------------------------------------ # +# 3. Wildcard: unknown user + unknown database (the main scenario) +# ------------------------------------------------------------------ # + +def test_unknown_user_wildcard_db(): + """User 'wildcard_tester' (unknown to pgdog) connects to + 'wildcard_test_db' (also unknown). Both user and database are + resolved via the wildcard template, and passthrough auth + forwards the real credentials to Postgres.""" + conn = connect(WILDCARD_DB, WILDCARD_USER, WILDCARD_PASS) + cur = conn.cursor() + cur.execute("SELECT current_user, current_database()") + row = cur.fetchone() + assert row[0] == WILDCARD_USER, f"expected user {WILDCARD_USER}, got {row[0]}" + assert row[1] == WILDCARD_DB, f"expected db {WILDCARD_DB}, got {row[1]}" + conn.close() + print(f" PASS unknown user ({WILDCARD_USER}) → wildcard db ({WILDCARD_DB})") + + +def test_unknown_user_read_existing_data(): + """wildcard_tester reads the pre-seeded 'items' table through + the wildcard pool.""" + conn = connect(WILDCARD_DB, WILDCARD_USER, WILDCARD_PASS) + cur = conn.cursor() + cur.execute("SELECT name FROM items ORDER BY id") + rows = [r[0] for r in cur.fetchall()] + assert rows == ["alpha", "beta", "gamma"], f"unexpected: {rows}" + conn.close() + print(" PASS unknown user → read existing data") + + +def test_unknown_user_write_and_read(): + """wildcard_tester creates a table, writes, reads, and drops it + — full lifecycle through the wildcard pool.""" + conn = connect(WILDCARD_DB, WILDCARD_USER, WILDCARD_PASS) + cur = conn.cursor() + cur.execute("DROP TABLE IF EXISTS wc_lifecycle") + cur.execute("CREATE TABLE wc_lifecycle (id int, val text)") + cur.execute("INSERT INTO wc_lifecycle VALUES (1, 'x'), (2, 'y')") + cur.execute("SELECT val FROM wc_lifecycle ORDER BY id") + rows = [r[0] for r in cur.fetchall()] + assert rows == ["x", "y"], f"unexpected: {rows}" + cur.execute("DROP TABLE wc_lifecycle") + conn.close() + print(" PASS unknown user → full DDL+DML lifecycle") + + +# ------------------------------------------------------------------ # +# 4. Wrong password — pgdog should relay the Postgres auth error +# ------------------------------------------------------------------ # + +def test_wrong_password_rejected(): + """wildcard_tester with a wrong password is rejected. + Passthrough auth should forward the bad password to Postgres + and relay the auth failure back.""" + try: + conn = connect(WILDCARD_DB, WILDCARD_USER, "WRONG_PASSWORD") + cur = conn.cursor() + cur.execute("SELECT 1") + conn.close() + raise AssertionError("expected auth failure, but connection succeeded") + except psycopg.OperationalError as e: + err = str(e).lower() + ok = ("password" in err or "authentication" in err + or "auth" in err or "fatal" in err) + assert ok, f"unexpected error: {e}" + print(" PASS wrong password → rejected") + + +# ------------------------------------------------------------------ # +# 5. Unknown user + unknown db — nonexistent database +# ------------------------------------------------------------------ # + +def test_nonexistent_database(): + """wildcard_tester tries to connect to a database that doesn't + exist in Postgres. The error should come from Postgres.""" + try: + conn = connect("nope_db_xyz", WILDCARD_USER, WILDCARD_PASS) + cur = conn.cursor() + cur.execute("SELECT 1") + conn.close() + raise AssertionError("expected error for nonexistent db") + except psycopg.OperationalError as e: + err = str(e).lower() + ok = "does not exist" in err or "fatal" in err or "down" in err + assert ok, f"unexpected error: {e}" + print(" PASS nonexistent db → correct error") + + +# ------------------------------------------------------------------ # +# 6. Multiple wildcard users concurrently +# ------------------------------------------------------------------ # + +def test_concurrent_wildcard_users(): + """Both pgdog and wildcard_tester connect to wildcard_test_db + at the same time — each gets their own pool.""" + conn1 = connect(WILDCARD_DB, EXPLICIT_USER, EXPLICIT_PASS) + conn2 = connect(WILDCARD_DB, WILDCARD_USER, WILDCARD_PASS) + + cur1 = conn1.cursor() + cur2 = conn2.cursor() + + cur1.execute("SELECT current_user") + cur2.execute("SELECT current_user") + + assert cur1.fetchone()[0] == EXPLICIT_USER + assert cur2.fetchone()[0] == WILDCARD_USER + + conn1.close() + conn2.close() + print(" PASS concurrent wildcard users (pgdog + wildcard_tester)") + + +# ------------------------------------------------------------------ # +# 7. Unknown user connects to multiple databases +# ------------------------------------------------------------------ # + +def test_wildcard_user_multiple_dbs(): + """wildcard_tester connects to wildcard_test_db and also to pgdog + (the pgdog database grants connect to all users by default).""" + for dbname in [WILDCARD_DB, "pgdog"]: + conn = connect(dbname, WILDCARD_USER, WILDCARD_PASS) + cur = conn.cursor() + cur.execute("SELECT current_database()") + db = cur.fetchone()[0] + assert db == dbname, f"expected {dbname}, got {db}" + conn.close() + print(" PASS wildcard user → multiple databases") + + +# ------------------------------------------------------------------ # + +def main(): + print("=== Wildcard Passthrough Auth Integration Tests ===") + print(f" user: {WILDCARD_USER}, db: {WILDCARD_DB}") + print() + failures = 0 + total = 0 + + tests = [ + ("explicit pool", test_explicit_pool), + ("known user → wc db", test_known_user_wildcard_db), + ("unknown user → wc db", test_unknown_user_wildcard_db), + ("read existing data", test_unknown_user_read_existing_data), + ("write+read lifecycle", test_unknown_user_write_and_read), + ("wrong password", test_wrong_password_rejected), + ("nonexistent db", test_nonexistent_database), + ("concurrent users", test_concurrent_wildcard_users), + ("user → multiple dbs", test_wildcard_user_multiple_dbs), + ] + + for name, test_fn in tests: + total += 1 + try: + test_fn() + except Exception as e: + print(f" FAIL {name}: {e}") + failures += 1 + + passed = total - failures + print(f"\n=== Results: {passed}/{total} passed, {failures} failed ===") + if failures > 0: + sys.exit(1) + print("All wildcard passthrough auth tests passed!") + + +if __name__ == "__main__": + main() diff --git a/integration/wildcard/users.toml b/integration/wildcard/users.toml new file mode 100644 index 000000000..00633dbb0 --- /dev/null +++ b/integration/wildcard/users.toml @@ -0,0 +1,13 @@ +# Explicit user for the explicit database. +[[users]] +name = "pgdog" +database = "pgdog" +password = "pgdog" + +# Wildcard user — any (user, database) pair not matched above. +# No server_user/server_password: pgdog passes through the +# client-provided credentials to Postgres directly. +[[users]] +name = "*" +database = "*" +min_pool_size = 0 diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index 64199dbb5..076400800 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -214,6 +214,34 @@ impl Config { databases } + /// Get wildcard database entries (name = "*"), organized by shard. + /// Returns None if no wildcard databases are configured. + pub fn wildcard_databases(&self) -> Option>> { + let wildcard_dbs: Vec<&Database> = + self.databases.iter().filter(|d| d.is_wildcard()).collect(); + if wildcard_dbs.is_empty() { + return None; + } + + let mut shards: Vec> = Vec::new(); + for (number, database) in self.databases.iter().enumerate() { + if database.is_wildcard() { + while shards.len() <= database.shard { + shards.push(vec![]); + } + shards + .get_mut(database.shard) + .unwrap() + .push(EnumeratedDatabase { + number, + database: database.clone(), + }); + } + } + + Some(shards) + } + /// Organize sharded tables by database name. pub fn sharded_tables(&self) -> HashMap> { let mut tables = HashMap::new(); @@ -321,7 +349,17 @@ impl Config { pub fn check(&mut self) { // Check databases. let mut duplicate_dbs = HashSet::new(); + let mut wildcard_db_count = 0usize; for database in self.databases.clone() { + if database.is_wildcard() { + wildcard_db_count += 1; + if database.shard > 0 { + warn!( + r#"wildcard database "*" with shard={} is not supported, use shard=0 only"#, + database.shard + ); + } + } let id = ( database.name.clone(), database.role, @@ -338,6 +376,13 @@ impl Config { } } + if wildcard_db_count > 2 { + warn!( + r#"multiple wildcard "*" database entries detected ({} entries), only one primary and one replica are expected"#, + wildcard_db_count + ); + } + struct Check { pooler_mode: Option, role: Role, diff --git a/pgdog-config/src/database.rs b/pgdog-config/src/database.rs index a8683d528..82cccd876 100644 --- a/pgdog-config/src/database.rs +++ b/pgdog-config/src/database.rs @@ -137,6 +137,28 @@ impl Database { fn port() -> u16 { 5432 } + + /// Whether this database entry is a wildcard template (`name = "*"`). + pub fn is_wildcard(&self) -> bool { + self.name == "*" + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_database_is_wildcard() { + let mut db = Database::default(); + assert!(!db.is_wildcard()); + + db.name = "mydb".to_string(); + assert!(!db.is_wildcard()); + + db.name = "*".to_string(); + assert!(db.is_wildcard()); + } } #[derive( diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index cf7b5ed45..1cc1a588d 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -238,6 +238,17 @@ pub struct General { /// Cutover save config to disk. #[serde(default)] pub cutover_save_config: bool, + /// Maximum number of dynamically-created wildcard pools (0 = unlimited). + /// Once this limit is reached further wildcard connections are rejected with + /// a "no such database" error until an existing wildcard pool is evicted + /// (e.g. via a SIGHUP config reload). + #[serde(default)] + pub max_wildcard_pools: usize, + /// Seconds a dynamically-created wildcard pool must have zero connections + /// before it is automatically removed. 0 disables automatic eviction; + /// pools are only cleaned up on SIGHUP or restart. + #[serde(default)] + pub wildcard_pool_idle_timeout: u64, } impl Default for General { @@ -320,6 +331,8 @@ impl Default for General { cutover_timeout: Self::cutover_timeout(), cutover_timeout_action: Self::cutover_timeout_action(), cutover_save_config: bool::default(), + max_wildcard_pools: 0, + wildcard_pool_idle_timeout: 0, } } } diff --git a/pgdog-config/src/users.rs b/pgdog-config/src/users.rs index ab36f5af0..da6948f43 100644 --- a/pgdog-config/src/users.rs +++ b/pgdog-config/src/users.rs @@ -146,6 +146,16 @@ impl User { ..Default::default() } } + + /// Whether this user entry has a wildcard name (`name = "*"`). + pub fn is_wildcard_name(&self) -> bool { + self.name == "*" + } + + /// Whether this user entry has a wildcard database (`database = "*"`). + pub fn is_wildcard_database(&self) -> bool { + self.database == "*" + } } /// Admin database settings. @@ -253,4 +263,28 @@ mod tests { .unwrap(); assert_eq!(bob_source.password(), "pass4"); } + + #[test] + fn test_user_wildcard_name() { + let mut user = User::default(); + assert!(!user.is_wildcard_name()); + + user.name = "alice".to_string(); + assert!(!user.is_wildcard_name()); + + user.name = "*".to_string(); + assert!(user.is_wildcard_name()); + } + + #[test] + fn test_user_wildcard_database() { + let mut user = User::default(); + assert!(!user.is_wildcard_database()); + + user.database = "mydb".to_string(); + assert!(!user.is_wildcard_database()); + + user.database = "*".to_string(); + assert!(user.is_wildcard_database()); + } } diff --git a/pgdog-plugin/src/bindings.rs b/pgdog-plugin/src/bindings.rs index 351aeb3b5..fdabc97b7 100644 --- a/pgdog-plugin/src/bindings.rs +++ b/pgdog-plugin/src/bindings.rs @@ -1,214 +1,339 @@ /* automatically generated by rust-bindgen 0.71.1 */ -pub const _STDINT_H: u32 = 1; -pub const _FEATURES_H: u32 = 1; -pub const _DEFAULT_SOURCE: u32 = 1; -pub const __GLIBC_USE_ISOC2Y: u32 = 0; -pub const __GLIBC_USE_ISOC23: u32 = 0; -pub const __USE_ISOC11: u32 = 1; -pub const __USE_ISOC99: u32 = 1; -pub const __USE_ISOC95: u32 = 1; -pub const __USE_POSIX_IMPLICITLY: u32 = 1; -pub const _POSIX_SOURCE: u32 = 1; -pub const _POSIX_C_SOURCE: u32 = 202405; -pub const __USE_POSIX: u32 = 1; -pub const __USE_POSIX2: u32 = 1; -pub const __USE_POSIX199309: u32 = 1; -pub const __USE_POSIX199506: u32 = 1; -pub const __USE_XOPEN2K: u32 = 1; -pub const __USE_XOPEN2K8: u32 = 1; -pub const _ATFILE_SOURCE: u32 = 1; -pub const __USE_XOPEN2K24: u32 = 1; pub const __WORDSIZE: u32 = 64; -pub const __WORDSIZE_TIME64_COMPAT32: u32 = 1; -pub const __SYSCALL_WORDSIZE: u32 = 64; -pub const __TIMESIZE: u32 = 64; -pub const __USE_TIME_BITS64: u32 = 1; -pub const __USE_MISC: u32 = 1; -pub const __USE_ATFILE: u32 = 1; -pub const __USE_FORTIFY_LEVEL: u32 = 0; -pub const __GLIBC_USE_DEPRECATED_GETS: u32 = 0; -pub const __GLIBC_USE_DEPRECATED_SCANF: u32 = 0; -pub const __GLIBC_USE_C23_STRTOL: u32 = 0; -pub const _STDC_PREDEF_H: u32 = 1; -pub const __STDC_IEC_559__: u32 = 1; -pub const __STDC_IEC_60559_BFP__: u32 = 201404; -pub const __STDC_IEC_559_COMPLEX__: u32 = 1; -pub const __STDC_IEC_60559_COMPLEX__: u32 = 201404; -pub const __STDC_ISO_10646__: u32 = 201706; -pub const __GNU_LIBRARY__: u32 = 6; -pub const __GLIBC__: u32 = 2; -pub const __GLIBC_MINOR__: u32 = 43; -pub const _SYS_CDEFS_H: u32 = 1; -pub const __glibc_c99_flexarr_available: u32 = 1; -pub const __LDOUBLE_REDIRECTS_TO_FLOAT128_ABI: u32 = 0; -pub const __HAVE_GENERIC_SELECTION: u32 = 1; -pub const __GLIBC_USE_LIB_EXT2: u32 = 0; -pub const __GLIBC_USE_IEC_60559_BFP_EXT: u32 = 0; -pub const __GLIBC_USE_IEC_60559_BFP_EXT_C23: u32 = 0; -pub const __GLIBC_USE_IEC_60559_EXT: u32 = 0; -pub const __GLIBC_USE_IEC_60559_FUNCS_EXT: u32 = 0; -pub const __GLIBC_USE_IEC_60559_FUNCS_EXT_C23: u32 = 0; -pub const __GLIBC_USE_IEC_60559_TYPES_EXT: u32 = 0; -pub const _BITS_TYPES_H: u32 = 1; -pub const _BITS_TYPESIZES_H: u32 = 1; -pub const __OFF_T_MATCHES_OFF64_T: u32 = 1; -pub const __INO_T_MATCHES_INO64_T: u32 = 1; -pub const __RLIM_T_MATCHES_RLIM64_T: u32 = 1; -pub const __STATFS_MATCHES_STATFS64: u32 = 1; -pub const __KERNEL_OLD_TIMEVAL_MATCHES_TIMEVAL64: u32 = 1; -pub const __FD_SETSIZE: u32 = 1024; -pub const _BITS_TIME64_H: u32 = 1; -pub const _BITS_WCHAR_H: u32 = 1; -pub const _BITS_STDINT_INTN_H: u32 = 1; -pub const _BITS_STDINT_UINTN_H: u32 = 1; -pub const _BITS_STDINT_LEAST_H: u32 = 1; -pub const INT8_MIN: i32 = -128; -pub const INT16_MIN: i32 = -32768; -pub const INT32_MIN: i32 = -2147483648; +pub const __has_safe_buffers: u32 = 1; +pub const __DARWIN_ONLY_64_BIT_INO_T: u32 = 1; +pub const __DARWIN_ONLY_UNIX_CONFORMANCE: u32 = 1; +pub const __DARWIN_ONLY_VERS_1050: u32 = 1; +pub const __DARWIN_UNIX03: u32 = 1; +pub const __DARWIN_64_BIT_INO_T: u32 = 1; +pub const __DARWIN_VERS_1050: u32 = 1; +pub const __DARWIN_NON_CANCELABLE: u32 = 0; +pub const __DARWIN_SUF_EXTSN: &[u8; 14] = b"$DARWIN_EXTSN\0"; +pub const __DARWIN_C_ANSI: u32 = 4096; +pub const __DARWIN_C_FULL: u32 = 900000; +pub const __DARWIN_C_LEVEL: u32 = 900000; +pub const __STDC_WANT_LIB_EXT1__: u32 = 1; +pub const __DARWIN_NO_LONG_LONG: u32 = 0; +pub const _DARWIN_FEATURE_64_BIT_INODE: u32 = 1; +pub const _DARWIN_FEATURE_ONLY_64_BIT_INODE: u32 = 1; +pub const _DARWIN_FEATURE_ONLY_VERS_1050: u32 = 1; +pub const _DARWIN_FEATURE_ONLY_UNIX_CONFORMANCE: u32 = 1; +pub const _DARWIN_FEATURE_UNIX_CONFORMANCE: u32 = 3; +pub const __has_ptrcheck: u32 = 0; +pub const __has_bounds_safety_attributes: u32 = 0; +pub const USE_CLANG_TYPES: u32 = 0; +pub const __PTHREAD_SIZE__: u32 = 8176; +pub const __PTHREAD_ATTR_SIZE__: u32 = 56; +pub const __PTHREAD_MUTEXATTR_SIZE__: u32 = 8; +pub const __PTHREAD_MUTEX_SIZE__: u32 = 56; +pub const __PTHREAD_CONDATTR_SIZE__: u32 = 8; +pub const __PTHREAD_COND_SIZE__: u32 = 40; +pub const __PTHREAD_ONCE_SIZE__: u32 = 8; +pub const __PTHREAD_RWLOCK_SIZE__: u32 = 192; +pub const __PTHREAD_RWLOCKATTR_SIZE__: u32 = 16; pub const INT8_MAX: u32 = 127; pub const INT16_MAX: u32 = 32767; pub const INT32_MAX: u32 = 2147483647; +pub const INT64_MAX: u64 = 9223372036854775807; +pub const INT8_MIN: i32 = -128; +pub const INT16_MIN: i32 = -32768; +pub const INT32_MIN: i32 = -2147483648; +pub const INT64_MIN: i64 = -9223372036854775808; pub const UINT8_MAX: u32 = 255; pub const UINT16_MAX: u32 = 65535; pub const UINT32_MAX: u32 = 4294967295; +pub const UINT64_MAX: i32 = -1; pub const INT_LEAST8_MIN: i32 = -128; pub const INT_LEAST16_MIN: i32 = -32768; pub const INT_LEAST32_MIN: i32 = -2147483648; +pub const INT_LEAST64_MIN: i64 = -9223372036854775808; pub const INT_LEAST8_MAX: u32 = 127; pub const INT_LEAST16_MAX: u32 = 32767; pub const INT_LEAST32_MAX: u32 = 2147483647; +pub const INT_LEAST64_MAX: u64 = 9223372036854775807; pub const UINT_LEAST8_MAX: u32 = 255; pub const UINT_LEAST16_MAX: u32 = 65535; pub const UINT_LEAST32_MAX: u32 = 4294967295; +pub const UINT_LEAST64_MAX: i32 = -1; pub const INT_FAST8_MIN: i32 = -128; -pub const INT_FAST16_MIN: i64 = -9223372036854775808; -pub const INT_FAST32_MIN: i64 = -9223372036854775808; +pub const INT_FAST16_MIN: i32 = -32768; +pub const INT_FAST32_MIN: i32 = -2147483648; +pub const INT_FAST64_MIN: i64 = -9223372036854775808; pub const INT_FAST8_MAX: u32 = 127; -pub const INT_FAST16_MAX: u64 = 9223372036854775807; -pub const INT_FAST32_MAX: u64 = 9223372036854775807; +pub const INT_FAST16_MAX: u32 = 32767; +pub const INT_FAST32_MAX: u32 = 2147483647; +pub const INT_FAST64_MAX: u64 = 9223372036854775807; pub const UINT_FAST8_MAX: u32 = 255; -pub const UINT_FAST16_MAX: i32 = -1; -pub const UINT_FAST32_MAX: i32 = -1; -pub const INTPTR_MIN: i64 = -9223372036854775808; +pub const UINT_FAST16_MAX: u32 = 65535; +pub const UINT_FAST32_MAX: u32 = 4294967295; +pub const UINT_FAST64_MAX: i32 = -1; pub const INTPTR_MAX: u64 = 9223372036854775807; +pub const INTPTR_MIN: i64 = -9223372036854775808; pub const UINTPTR_MAX: i32 = -1; -pub const PTRDIFF_MIN: i64 = -9223372036854775808; -pub const PTRDIFF_MAX: u64 = 9223372036854775807; +pub const SIZE_MAX: i32 = -1; +pub const RSIZE_MAX: i32 = -1; +pub const WINT_MIN: i32 = -2147483648; +pub const WINT_MAX: u32 = 2147483647; pub const SIG_ATOMIC_MIN: i32 = -2147483648; pub const SIG_ATOMIC_MAX: u32 = 2147483647; -pub const SIZE_MAX: i32 = -1; -pub const WINT_MIN: u32 = 0; -pub const WINT_MAX: u32 = 4294967295; pub type wchar_t = ::std::os::raw::c_int; -#[repr(C)] -#[repr(align(16))] -#[derive(Debug, Copy, Clone)] -pub struct max_align_t { - pub __clang_max_align_nonce1: ::std::os::raw::c_longlong, - pub __bindgen_padding_0: u64, - pub __clang_max_align_nonce2: u128, -} -#[allow(clippy::unnecessary_operation, clippy::identity_op)] -const _: () = { - ["Size of max_align_t"][::std::mem::size_of::() - 32usize]; - ["Alignment of max_align_t"][::std::mem::align_of::() - 16usize]; - ["Offset of field: max_align_t::__clang_max_align_nonce1"] - [::std::mem::offset_of!(max_align_t, __clang_max_align_nonce1) - 0usize]; - ["Offset of field: max_align_t::__clang_max_align_nonce2"] - [::std::mem::offset_of!(max_align_t, __clang_max_align_nonce2) - 16usize]; -}; -pub type __u_char = ::std::os::raw::c_uchar; -pub type __u_short = ::std::os::raw::c_ushort; -pub type __u_int = ::std::os::raw::c_uint; -pub type __u_long = ::std::os::raw::c_ulong; +pub type max_align_t = f64; +pub type int_least8_t = i8; +pub type int_least16_t = i16; +pub type int_least32_t = i32; +pub type int_least64_t = i64; +pub type uint_least8_t = u8; +pub type uint_least16_t = u16; +pub type uint_least32_t = u32; +pub type uint_least64_t = u64; +pub type int_fast8_t = i8; +pub type int_fast16_t = i16; +pub type int_fast32_t = i32; +pub type int_fast64_t = i64; +pub type uint_fast8_t = u8; +pub type uint_fast16_t = u16; +pub type uint_fast32_t = u32; +pub type uint_fast64_t = u64; pub type __int8_t = ::std::os::raw::c_schar; pub type __uint8_t = ::std::os::raw::c_uchar; pub type __int16_t = ::std::os::raw::c_short; pub type __uint16_t = ::std::os::raw::c_ushort; pub type __int32_t = ::std::os::raw::c_int; pub type __uint32_t = ::std::os::raw::c_uint; -pub type __int64_t = ::std::os::raw::c_long; -pub type __uint64_t = ::std::os::raw::c_ulong; -pub type __int_least8_t = __int8_t; -pub type __uint_least8_t = __uint8_t; -pub type __int_least16_t = __int16_t; -pub type __uint_least16_t = __uint16_t; -pub type __int_least32_t = __int32_t; -pub type __uint_least32_t = __uint32_t; -pub type __int_least64_t = __int64_t; -pub type __uint_least64_t = __uint64_t; -pub type __quad_t = ::std::os::raw::c_long; -pub type __u_quad_t = ::std::os::raw::c_ulong; -pub type __intmax_t = ::std::os::raw::c_long; -pub type __uintmax_t = ::std::os::raw::c_ulong; -pub type __dev_t = ::std::os::raw::c_ulong; -pub type __uid_t = ::std::os::raw::c_uint; -pub type __gid_t = ::std::os::raw::c_uint; -pub type __ino_t = ::std::os::raw::c_ulong; -pub type __ino64_t = ::std::os::raw::c_ulong; -pub type __mode_t = ::std::os::raw::c_uint; -pub type __nlink_t = ::std::os::raw::c_ulong; -pub type __off_t = ::std::os::raw::c_long; -pub type __off64_t = ::std::os::raw::c_long; -pub type __pid_t = ::std::os::raw::c_int; +pub type __int64_t = ::std::os::raw::c_longlong; +pub type __uint64_t = ::std::os::raw::c_ulonglong; +pub type __darwin_intptr_t = ::std::os::raw::c_long; +pub type __darwin_natural_t = ::std::os::raw::c_uint; +pub type __darwin_ct_rune_t = ::std::os::raw::c_int; +#[repr(C)] +#[derive(Copy, Clone)] +pub union __mbstate_t { + pub __mbstate8: [::std::os::raw::c_char; 128usize], + pub _mbstateL: ::std::os::raw::c_longlong, +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of __mbstate_t"][::std::mem::size_of::<__mbstate_t>() - 128usize]; + ["Alignment of __mbstate_t"][::std::mem::align_of::<__mbstate_t>() - 8usize]; + ["Offset of field: __mbstate_t::__mbstate8"] + [::std::mem::offset_of!(__mbstate_t, __mbstate8) - 0usize]; + ["Offset of field: __mbstate_t::_mbstateL"] + [::std::mem::offset_of!(__mbstate_t, _mbstateL) - 0usize]; +}; +pub type __darwin_mbstate_t = __mbstate_t; +pub type __darwin_ptrdiff_t = ::std::os::raw::c_long; +pub type __darwin_size_t = ::std::os::raw::c_ulong; +pub type __darwin_va_list = __builtin_va_list; +pub type __darwin_wchar_t = ::std::os::raw::c_int; +pub type __darwin_rune_t = __darwin_wchar_t; +pub type __darwin_wint_t = ::std::os::raw::c_int; +pub type __darwin_clock_t = ::std::os::raw::c_ulong; +pub type __darwin_socklen_t = __uint32_t; +pub type __darwin_ssize_t = ::std::os::raw::c_long; +pub type __darwin_time_t = ::std::os::raw::c_long; +pub type __darwin_blkcnt_t = __int64_t; +pub type __darwin_blksize_t = __int32_t; +pub type __darwin_dev_t = __int32_t; +pub type __darwin_fsblkcnt_t = ::std::os::raw::c_uint; +pub type __darwin_fsfilcnt_t = ::std::os::raw::c_uint; +pub type __darwin_gid_t = __uint32_t; +pub type __darwin_id_t = __uint32_t; +pub type __darwin_ino64_t = __uint64_t; +pub type __darwin_ino_t = __darwin_ino64_t; +pub type __darwin_mach_port_name_t = __darwin_natural_t; +pub type __darwin_mach_port_t = __darwin_mach_port_name_t; +pub type __darwin_mode_t = __uint16_t; +pub type __darwin_off_t = __int64_t; +pub type __darwin_pid_t = __int32_t; +pub type __darwin_sigset_t = __uint32_t; +pub type __darwin_suseconds_t = __int32_t; +pub type __darwin_uid_t = __uint32_t; +pub type __darwin_useconds_t = __uint32_t; +pub type __darwin_uuid_t = [::std::os::raw::c_uchar; 16usize]; +pub type __darwin_uuid_string_t = [::std::os::raw::c_char; 37usize]; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct __darwin_pthread_handler_rec { + pub __routine: ::std::option::Option, + pub __arg: *mut ::std::os::raw::c_void, + pub __next: *mut __darwin_pthread_handler_rec, +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of __darwin_pthread_handler_rec"] + [::std::mem::size_of::<__darwin_pthread_handler_rec>() - 24usize]; + ["Alignment of __darwin_pthread_handler_rec"] + [::std::mem::align_of::<__darwin_pthread_handler_rec>() - 8usize]; + ["Offset of field: __darwin_pthread_handler_rec::__routine"] + [::std::mem::offset_of!(__darwin_pthread_handler_rec, __routine) - 0usize]; + ["Offset of field: __darwin_pthread_handler_rec::__arg"] + [::std::mem::offset_of!(__darwin_pthread_handler_rec, __arg) - 8usize]; + ["Offset of field: __darwin_pthread_handler_rec::__next"] + [::std::mem::offset_of!(__darwin_pthread_handler_rec, __next) - 16usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_attr_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 56usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_attr_t"][::std::mem::size_of::<_opaque_pthread_attr_t>() - 64usize]; + ["Alignment of _opaque_pthread_attr_t"] + [::std::mem::align_of::<_opaque_pthread_attr_t>() - 8usize]; + ["Offset of field: _opaque_pthread_attr_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_attr_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_attr_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_attr_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_cond_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 40usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_cond_t"][::std::mem::size_of::<_opaque_pthread_cond_t>() - 48usize]; + ["Alignment of _opaque_pthread_cond_t"] + [::std::mem::align_of::<_opaque_pthread_cond_t>() - 8usize]; + ["Offset of field: _opaque_pthread_cond_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_cond_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_cond_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_cond_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_condattr_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 8usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_condattr_t"] + [::std::mem::size_of::<_opaque_pthread_condattr_t>() - 16usize]; + ["Alignment of _opaque_pthread_condattr_t"] + [::std::mem::align_of::<_opaque_pthread_condattr_t>() - 8usize]; + ["Offset of field: _opaque_pthread_condattr_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_condattr_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_condattr_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_condattr_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_mutex_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 56usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_mutex_t"][::std::mem::size_of::<_opaque_pthread_mutex_t>() - 64usize]; + ["Alignment of _opaque_pthread_mutex_t"] + [::std::mem::align_of::<_opaque_pthread_mutex_t>() - 8usize]; + ["Offset of field: _opaque_pthread_mutex_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_mutex_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_mutex_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_mutex_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_mutexattr_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 8usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_mutexattr_t"] + [::std::mem::size_of::<_opaque_pthread_mutexattr_t>() - 16usize]; + ["Alignment of _opaque_pthread_mutexattr_t"] + [::std::mem::align_of::<_opaque_pthread_mutexattr_t>() - 8usize]; + ["Offset of field: _opaque_pthread_mutexattr_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_mutexattr_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_mutexattr_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_mutexattr_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_once_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 8usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_once_t"][::std::mem::size_of::<_opaque_pthread_once_t>() - 16usize]; + ["Alignment of _opaque_pthread_once_t"] + [::std::mem::align_of::<_opaque_pthread_once_t>() - 8usize]; + ["Offset of field: _opaque_pthread_once_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_once_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_once_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_once_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_rwlock_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 192usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_rwlock_t"] + [::std::mem::size_of::<_opaque_pthread_rwlock_t>() - 200usize]; + ["Alignment of _opaque_pthread_rwlock_t"] + [::std::mem::align_of::<_opaque_pthread_rwlock_t>() - 8usize]; + ["Offset of field: _opaque_pthread_rwlock_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_rwlock_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_rwlock_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_rwlock_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_rwlockattr_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 16usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_rwlockattr_t"] + [::std::mem::size_of::<_opaque_pthread_rwlockattr_t>() - 24usize]; + ["Alignment of _opaque_pthread_rwlockattr_t"] + [::std::mem::align_of::<_opaque_pthread_rwlockattr_t>() - 8usize]; + ["Offset of field: _opaque_pthread_rwlockattr_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_rwlockattr_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_rwlockattr_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_rwlockattr_t, __opaque) - 8usize]; +}; #[repr(C)] #[derive(Debug, Copy, Clone)] -pub struct __fsid_t { - pub __val: [::std::os::raw::c_int; 2usize], +pub struct _opaque_pthread_t { + pub __sig: ::std::os::raw::c_long, + pub __cleanup_stack: *mut __darwin_pthread_handler_rec, + pub __opaque: [::std::os::raw::c_char; 8176usize], } #[allow(clippy::unnecessary_operation, clippy::identity_op)] const _: () = { - ["Size of __fsid_t"][::std::mem::size_of::<__fsid_t>() - 8usize]; - ["Alignment of __fsid_t"][::std::mem::align_of::<__fsid_t>() - 4usize]; - ["Offset of field: __fsid_t::__val"][::std::mem::offset_of!(__fsid_t, __val) - 0usize]; + ["Size of _opaque_pthread_t"][::std::mem::size_of::<_opaque_pthread_t>() - 8192usize]; + ["Alignment of _opaque_pthread_t"][::std::mem::align_of::<_opaque_pthread_t>() - 8usize]; + ["Offset of field: _opaque_pthread_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_t::__cleanup_stack"] + [::std::mem::offset_of!(_opaque_pthread_t, __cleanup_stack) - 8usize]; + ["Offset of field: _opaque_pthread_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_t, __opaque) - 16usize]; }; -pub type __clock_t = ::std::os::raw::c_long; -pub type __rlim_t = ::std::os::raw::c_ulong; -pub type __rlim64_t = ::std::os::raw::c_ulong; -pub type __id_t = ::std::os::raw::c_uint; -pub type __time_t = ::std::os::raw::c_long; -pub type __useconds_t = ::std::os::raw::c_uint; -pub type __suseconds_t = ::std::os::raw::c_long; -pub type __suseconds64_t = ::std::os::raw::c_long; -pub type __daddr_t = ::std::os::raw::c_int; -pub type __key_t = ::std::os::raw::c_int; -pub type __clockid_t = ::std::os::raw::c_int; -pub type __timer_t = *mut ::std::os::raw::c_void; -pub type __blksize_t = ::std::os::raw::c_long; -pub type __blkcnt_t = ::std::os::raw::c_long; -pub type __blkcnt64_t = ::std::os::raw::c_long; -pub type __fsblkcnt_t = ::std::os::raw::c_ulong; -pub type __fsblkcnt64_t = ::std::os::raw::c_ulong; -pub type __fsfilcnt_t = ::std::os::raw::c_ulong; -pub type __fsfilcnt64_t = ::std::os::raw::c_ulong; -pub type __fsword_t = ::std::os::raw::c_long; -pub type __ssize_t = ::std::os::raw::c_long; -pub type __syscall_slong_t = ::std::os::raw::c_long; -pub type __syscall_ulong_t = ::std::os::raw::c_ulong; -pub type __loff_t = __off64_t; -pub type __caddr_t = *mut ::std::os::raw::c_char; -pub type __intptr_t = ::std::os::raw::c_long; -pub type __socklen_t = ::std::os::raw::c_uint; -pub type __sig_atomic_t = ::std::os::raw::c_int; -pub type int_least8_t = __int_least8_t; -pub type int_least16_t = __int_least16_t; -pub type int_least32_t = __int_least32_t; -pub type int_least64_t = __int_least64_t; -pub type uint_least8_t = __uint_least8_t; -pub type uint_least16_t = __uint_least16_t; -pub type uint_least32_t = __uint_least32_t; -pub type uint_least64_t = __uint_least64_t; -pub type int_fast8_t = ::std::os::raw::c_schar; -pub type int_fast16_t = ::std::os::raw::c_long; -pub type int_fast32_t = ::std::os::raw::c_long; -pub type int_fast64_t = ::std::os::raw::c_long; -pub type uint_fast8_t = ::std::os::raw::c_uchar; -pub type uint_fast16_t = ::std::os::raw::c_ulong; -pub type uint_fast32_t = ::std::os::raw::c_ulong; -pub type uint_fast64_t = ::std::os::raw::c_ulong; -pub type intmax_t = __intmax_t; -pub type uintmax_t = __uintmax_t; +pub type __darwin_pthread_attr_t = _opaque_pthread_attr_t; +pub type __darwin_pthread_cond_t = _opaque_pthread_cond_t; +pub type __darwin_pthread_condattr_t = _opaque_pthread_condattr_t; +pub type __darwin_pthread_key_t = ::std::os::raw::c_ulong; +pub type __darwin_pthread_mutex_t = _opaque_pthread_mutex_t; +pub type __darwin_pthread_mutexattr_t = _opaque_pthread_mutexattr_t; +pub type __darwin_pthread_once_t = _opaque_pthread_once_t; +pub type __darwin_pthread_rwlock_t = _opaque_pthread_rwlock_t; +pub type __darwin_pthread_rwlockattr_t = _opaque_pthread_rwlockattr_t; +pub type __darwin_pthread_t = *mut _opaque_pthread_t; +pub type intmax_t = ::std::os::raw::c_long; +pub type uintmax_t = ::std::os::raw::c_ulong; #[doc = " Wrapper around Rust's [`&str`], without allocating memory, unlike [`std::ffi::CString`].\n The caller must use it as a Rust string. This is not a C-string."] #[repr(C)] #[derive(Debug, Copy, Clone)] @@ -325,3 +450,4 @@ const _: () = { ["Offset of field: PdRoute::shard"][::std::mem::offset_of!(PdRoute, shard) - 0usize]; ["Offset of field: PdRoute::read_write"][::std::mem::offset_of!(PdRoute, read_write) - 8usize]; }; +pub type __builtin_va_list = *mut ::std::os::raw::c_char; diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index 7607ea975..0d9f4171a 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -1,8 +1,9 @@ //! Databases behind pgDog. -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::ops::Deref; use std::sync::Arc; +use std::time::Duration; use arc_swap::ArcSwap; use futures::future::try_join_all; @@ -34,6 +35,20 @@ use super::{ static DATABASES: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Databases::default())); static LOCK: Lazy> = Lazy::new(|| Mutex::new(())); +/// Spawns the wildcard-pool background eviction loop exactly once. +static WILDCARD_EVICTION: Lazy<()> = Lazy::new(|| { + tokio::spawn(async { + loop { + let timeout_secs = config().config.general.wildcard_pool_idle_timeout; + if timeout_secs == 0 { + tokio::time::sleep(Duration::from_secs(60)).await; + continue; + } + tokio::time::sleep(Duration::from_secs(timeout_secs)).await; + evict_idle_wildcard_pools(); + } + }); +}); /// Sync databases during modification. pub fn lock() -> MutexGuard<'static, RawMutex, ()> { @@ -75,8 +90,16 @@ pub fn reconnect() -> Result<(), Error> { Ok(()) } -/// Re-create databases from existing config, -/// preserving connections. +/// Re-create databases from existing config, preserving connections. +/// +/// **SIGHUP / config-reload behaviour for wildcard pools:** +/// Wildcard pools created on demand by [`add_wildcard_pool`] are *not* included +/// in the freshly built [`Databases`] produced by [`from_config`]. Because +/// [`replace_databases`] only moves connections whose key exists in the new +/// config, those connections are dropped and the pools are evicted. On the +/// next client login [`add_wildcard_pool`] will recreate the pool from the +/// (potentially updated) wildcard template, and the +/// [`General::max_wildcard_pools`] counter resets to zero. pub fn reload_from_existing() -> Result<(), Error> { let _lock = lock(); @@ -98,9 +121,48 @@ pub fn init() -> Result<(), Error> { // Start two-pc manager. let _monitor = Manager::get(); + // Start the wildcard pool eviction background task. + let _ = &*WILDCARD_EVICTION; + Ok(()) } +/// Remove dynamically-created wildcard pools that currently have zero connections. +/// +/// This is called periodically by the background eviction task started in +/// [`init`], and is also exposed as `pub(crate)` so unit tests can invoke it +/// directly without running a Tokio runtime loop. +pub(crate) fn evict_idle_wildcard_pools() { + let _lock = lock(); + let dbs = databases(); + + let to_evict: Vec = dbs + .dynamic_pools + .iter() + .filter(|user| { + dbs.databases + .get(*user) + .map_or(false, |c| c.total_connections() == 0) + }) + .cloned() + .collect(); + + if to_evict.is_empty() { + return; + } + + let mut new_dbs = (*dbs).clone(); + for user in &to_evict { + if let Some(cluster) = new_dbs.databases.remove(user) { + cluster.shutdown(); + new_dbs.dynamic_pools.remove(user); + new_dbs.wildcard_pool_count = new_dbs.wildcard_pool_count.saturating_sub(1); + } + } + DATABASES.store(Arc::new(new_dbs)); + info!("evicted {} idle wildcard pool(s)", to_evict.len()); +} + /// Shutdown all databases. pub fn shutdown() { databases().shutdown(); @@ -173,6 +235,162 @@ pub(crate) fn add(mut user: crate::config::User) { } } +/// Attempt to create a pool from wildcard templates for the given user/database. +/// Returns the Cluster if a wildcard match was found and the pool was created. +/// +/// When `passthrough_password` is provided (from passthrough auth), it overrides +/// the wildcard template's password so the pool can authenticate to PostgreSQL +/// and the login check can verify the client's credential. +pub(crate) fn add_wildcard_pool( + user: &str, + database: &str, + passthrough_password: Option<&str>, +) -> Result, Error> { + let _lock = lock(); + + // Double-check: another thread may have created it. + let dbs = databases(); + if dbs.exists((user, database)) { + return Ok(Some(dbs.cluster((user, database))?)); + } + + let wildcard_match = match dbs.find_wildcard_match(user, database) { + Some(m) => m, + None => return Ok(None), + }; + + let config_snapshot = match dbs.config_snapshot() { + Some(c) => c.clone(), + None => return Ok(None), + }; + + // Enforce the operator-configured pool limit before allocating a new pool. + let max = config_snapshot.config.general.max_wildcard_pools; + if max > 0 && dbs.wildcard_pool_count >= max { + warn!( + "max_wildcard_pools limit ({}) reached, rejecting wildcard pool \ + for user=\"{}\" database=\"{}\"", + max, user, database + ); + return Ok(None); + } + + // Build a synthetic user config from the wildcard template. + let template_user_key = User { + user: if wildcard_match.wildcard_user { + "*".to_string() + } else { + user.to_string() + }, + database: if wildcard_match.wildcard_database { + "*".to_string() + } else { + database.to_string() + }, + }; + + // Find the user template from wildcard_users or from the existing pool configs. + let user_config = if wildcard_match.wildcard_user { + // Look for a wildcard user template that matches. + let db_pattern = if wildcard_match.wildcard_database { + "*" + } else { + database + }; + dbs.wildcard_users() + .iter() + .find(|u| { + u.is_wildcard_name() && (u.database == db_pattern || u.is_wildcard_database()) + }) + .cloned() + } else { + // Use an existing user config's settings from a template pool. + let template_cluster = dbs.databases.get(&template_user_key); + template_cluster.map(|_| { + // Use the snapshot so user lookups are consistent with the database + // config captured at the same instant (avoids a race if a SIGHUP + // reload changes the global config mid-call). + config_snapshot + .users + .users + .iter() + .find(|u| u.name == user && (u.database == "*" || u.is_wildcard_database())) + .cloned() + .unwrap_or_else(|| crate::config::User::new(user, "", database)) + }) + }; + + let mut user_config = match user_config { + Some(u) => u, + None => return Ok(None), + }; + + // Override the wildcard name/database with the actual values. + if user_config.is_wildcard_name() { + user_config.name = user.to_string(); + } + user_config.database = database.to_string(); + + // For passthrough auth, set the client's password so the backend pool can + // authenticate to PostgreSQL and the proxy-level credential check succeeds. + if let Some(pw) = passthrough_password { + user_config.password = Some(pw.to_string()); + } + + // Build a synthetic Config so we can substitute the real database name + // into the wildcard template before handing it to new_pool. + let mut synthetic_config = config_snapshot.config.clone(); + if wildcard_match.wildcard_database { + if let Some(templates) = dbs.wildcard_db_templates() { + let mut new_dbs: Vec = synthetic_config + .databases + .iter() + .filter(|d| !d.is_wildcard()) + .cloned() + .collect(); + + for shard_templates in templates { + for template in shard_templates { + let mut db = template.database.clone(); + db.name = database.to_string(); + // Respect explicit database_name; otherwise use the client-requested name. + if db.database_name.is_none() { + db.database_name = Some(database.to_string()); + } + new_dbs.push(db); + } + } + + synthetic_config.databases = new_dbs; + } + } + + let pool = new_pool(&user_config, &synthetic_config); + if let Some((pool_user, cluster)) = pool { + debug!( + "created wildcard pool for user=\"{}\" database=\"{}\"", + user, database + ); + + let databases = (*databases()).clone(); + let (added, mut databases) = databases.add(pool_user.clone(), cluster.clone()); + if added { + databases.wildcard_pool_count += 1; + databases.dynamic_pools.insert(pool_user); + databases.launch(); + DATABASES.store(Arc::new(databases)); + } + + Ok(Some(cluster)) + } else { + warn!( + "wildcard match found but pool creation failed for user=\"{}\" database=\"{}\"", + user, database + ); + Ok(None) + } +} + /// Swap database configs between source and destination. /// Both databases keep their names, but their configs (host, port, etc.) are exchanged. /// User database references are also swapped. @@ -278,6 +496,13 @@ impl ToUser for (&str, Option<&str>) { } } +/// Describes which parts of a wildcard match were used. +#[derive(Debug, Clone, PartialEq)] +struct WildcardMatch { + wildcard_user: bool, + wildcard_database: bool, +} + /// Databases. #[derive(Default, Clone)] pub struct Databases { @@ -285,6 +510,20 @@ pub struct Databases { manual_queries: HashMap, mirrors: HashMap>, mirror_configs: HashMap<(String, String), crate::config::MirrorConfig>, + /// Wildcard database templates (databases with name = "*"), organized by shard. + wildcard_db_templates: Option>>, + /// Wildcard user templates (users with name = "*"). + wildcard_users: Vec, + /// Full config snapshot (both databases and users) captured at construction + /// time, needed to create pools lazily from wildcard templates without + /// racing against a concurrent config reload that might change `config()`. + config_snapshot: Option>, + /// Number of pools created dynamically via wildcard matching. + /// Reset to zero on every config reload so the limit applies per-epoch. + wildcard_pool_count: usize, + /// Keys of pools that were created dynamically via wildcard matching. + /// Used by the background eviction task to identify eligible candidates. + dynamic_pools: HashSet, } impl Databases { @@ -315,6 +554,94 @@ impl Databases { } } + /// Check if any wildcard templates are configured. + pub fn has_wildcard(&self) -> bool { + self.wildcard_db_templates.is_some() || !self.wildcard_users.is_empty() + } + + /// Check if a cluster exists or could be created via wildcard matching. + pub fn exists_or_wildcard(&self, user: impl ToUser) -> bool { + let user = user.to_user(); + if self.exists((&*user.user, &*user.database)) { + return true; + } + self.has_wildcard() + && self + .find_wildcard_match(&user.user, &user.database) + .is_some() + } + + /// Find a wildcard match for a user/database pair. + /// Returns a tuple of (user_template, is_wildcard_user, is_wildcard_db). + fn find_wildcard_match(&self, user: &str, database: &str) -> Option { + // Priority 1: exact user, wildcard database + let user_key = User { + user: user.to_string(), + database: "*".to_string(), + }; + if self.databases.contains_key(&user_key) && self.wildcard_db_templates.is_some() { + return Some(WildcardMatch { + wildcard_user: false, + wildcard_database: true, + }); + } + + // Priority 2: wildcard user, exact database + let wildcard_user_key = User { + user: "*".to_string(), + database: database.to_string(), + }; + if self.databases.contains_key(&wildcard_user_key) { + return Some(WildcardMatch { + wildcard_user: true, + wildcard_database: false, + }); + } + + // Priority 3: both wildcard + let full_wildcard_key = User { + user: "*".to_string(), + database: "*".to_string(), + }; + if self.databases.contains_key(&full_wildcard_key) + || (!self.wildcard_users.is_empty() && self.wildcard_db_templates.is_some()) + { + return Some(WildcardMatch { + wildcard_user: true, + wildcard_database: true, + }); + } + + None + } + + /// Get wildcard database templates. + pub fn wildcard_db_templates(&self) -> Option<&Vec>> { + self.wildcard_db_templates.as_ref() + } + + /// Get wildcard user templates. + pub fn wildcard_users(&self) -> &[crate::config::User] { + &self.wildcard_users + } + + /// Get the full config snapshot used for creating wildcard pools. + pub fn config_snapshot(&self) -> Option<&crate::config::ConfigAndUsers> { + self.config_snapshot.as_deref() + } + + /// Number of pools currently created via wildcard matching. + #[cfg(test)] + pub(crate) fn wildcard_pool_count(&self) -> usize { + self.wildcard_pool_count + } + + /// Keys of dynamically-created wildcard pools. + #[cfg(test)] + pub(crate) fn dynamic_pools(&self) -> &HashSet { + &self.dynamic_pools + } + /// Get a cluster for the user/database pair if it's configured. pub fn cluster(&self, user: impl ToUser) -> Result { let user = user.to_user(); @@ -679,11 +1006,31 @@ pub fn from_config(config: &ConfigAndUsers) -> Databases { } } + let wildcard_db_templates = config.config.wildcard_databases(); + let wildcard_users: Vec = config + .users + .users + .iter() + .filter(|u| u.is_wildcard_name() || u.is_wildcard_database()) + .cloned() + .collect(); + + let config_snapshot = if wildcard_db_templates.is_some() || !wildcard_users.is_empty() { + Some(Arc::new(config.clone())) + } else { + None + }; + Databases { databases, manual_queries: config.config.manual_queries(), mirrors, mirror_configs, + wildcard_db_templates, + wildcard_users, + config_snapshot, + wildcard_pool_count: 0, + dynamic_pools: HashSet::new(), } } @@ -1738,4 +2085,168 @@ password = "testpass" assert_eq!(new_users.users[0].name, "testuser"); assert_eq!(new_users.users[0].database, "destination_db"); } + + #[test] + fn test_wildcard_db_templates_populated() { + let mut config = Config::default(); + config.databases = vec![ + Database { + name: "explicit_db".to_string(), + host: "host1".to_string(), + role: Role::Primary, + ..Default::default() + }, + Database { + name: "*".to_string(), + host: "wildcard-host".to_string(), + role: Role::Primary, + ..Default::default() + }, + ]; + + let config_and_users = ConfigAndUsers { + config, + users: crate::config::Users { + users: vec![crate::config::User::new("alice", "pass", "explicit_db")], + ..Default::default() + }, + ..Default::default() + }; + + let databases = from_config(&config_and_users); + + assert!(databases.has_wildcard()); + assert!(databases.wildcard_db_templates().is_some()); + let templates = databases.wildcard_db_templates().unwrap(); + assert_eq!(templates.len(), 1); // shard 0 only + assert_eq!(templates[0].len(), 1); + assert_eq!(templates[0][0].host, "wildcard-host"); + } + + #[test] + fn test_no_wildcard_when_absent() { + let mut config = Config::default(); + config.databases = vec![Database { + name: "mydb".to_string(), + host: "host1".to_string(), + role: Role::Primary, + ..Default::default() + }]; + + let config_and_users = ConfigAndUsers { + config, + users: crate::config::Users { + users: vec![crate::config::User::new("alice", "pass", "mydb")], + ..Default::default() + }, + ..Default::default() + }; + + let databases = from_config(&config_and_users); + + assert!(!databases.has_wildcard()); + assert!(databases.wildcard_db_templates().is_none()); + assert!(databases.wildcard_users().is_empty()); + assert!(databases.config_snapshot().is_none()); + } + + #[test] + fn test_wildcard_users_populated() { + let mut config = Config::default(); + config.databases = vec![Database { + name: "*".to_string(), + host: "wildcard-host".to_string(), + role: Role::Primary, + ..Default::default() + }]; + + let config_and_users = ConfigAndUsers { + config, + users: crate::config::Users { + users: vec![crate::config::User { + name: "*".to_string(), + database: "*".to_string(), + password: Some("secret".to_string()), + ..Default::default() + }], + ..Default::default() + }, + ..Default::default() + }; + + let databases = from_config(&config_and_users); + + assert!(databases.has_wildcard()); + assert_eq!(databases.wildcard_users().len(), 1); + assert!(databases.wildcard_users()[0].is_wildcard_name()); + assert!(databases.wildcard_users()[0].is_wildcard_database()); + assert!(databases.config_snapshot().is_some()); + } + + #[test] + fn test_find_wildcard_match_priority() { + let mut config = Config::default(); + config.databases = vec![ + Database { + name: "explicit_db".to_string(), + host: "host1".to_string(), + role: Role::Primary, + ..Default::default() + }, + Database { + name: "*".to_string(), + host: "wildcard-host".to_string(), + role: Role::Primary, + ..Default::default() + }, + ]; + + let config_and_users = ConfigAndUsers { + config, + users: crate::config::Users { + users: vec![ + crate::config::User::new("alice", "pass", "explicit_db"), + crate::config::User { + name: "alice".to_string(), + database: "*".to_string(), + password: Some("pass".to_string()), + ..Default::default() + }, + crate::config::User { + name: "*".to_string(), + database: "*".to_string(), + password: Some("wild".to_string()), + ..Default::default() + }, + ], + ..Default::default() + }, + ..Default::default() + }; + + let databases = from_config(&config_and_users); + + // Exact match exists — no wildcard needed. + assert!(databases.cluster(("alice", "explicit_db")).is_ok()); + + // Wildcard database for known user (alice/*) — priority 1. + let m = databases.find_wildcard_match("alice", "unknown_db"); + assert_eq!( + m, + Some(WildcardMatch { + wildcard_user: false, + wildcard_database: true, + }) + ); + + // Wildcard user for unknown user — priority 3 (full wildcard). + let m = databases.find_wildcard_match("unknown_user", "unknown_db"); + assert_eq!( + m, + Some(WildcardMatch { + wildcard_user: true, + wildcard_database: true, + }) + ); + } } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index f86632e75..786dd02b9 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -346,6 +346,16 @@ impl Cluster { &self.shards } + /// Total number of connections (idle + checked-out) across all shards. + /// Used by the wildcard-pool eviction task to decide whether a pool is idle. + pub fn total_connections(&self) -> usize { + self.shards + .iter() + .flat_map(|shard| shard.pools()) + .map(|pool| pool.state().total) + .sum() + } + /// Get the password the user should use to connect to the database. pub fn password(&self) -> &str { &self.password diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index 3933d7f4b..854dbd88d 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -333,15 +333,41 @@ impl Connection { } let databases = databases(); - let cluster = databases.cluster(user)?; + let cluster = match databases.cluster(user) { + Ok(c) => c, + Err(_) => { + // Drop the Arc before mutating global state. + drop(databases); + // Attempt wildcard pool creation. + match databases::add_wildcard_pool( + &self.user, + &self.database, + self.passthrough_password.as_deref(), + ) { + Ok(Some(c)) => c, + Ok(None) => { + return Err(Error::NoDatabase(databases::User { + user: self.user.clone(), + database: self.database.clone(), + })); + } + Err(e) => return Err(e), + } + } + }; self.cluster = Some(cluster.clone()); let source_db = cluster.name(); + + // Re-read databases after potential wildcard pool creation. + let databases = databases::databases(); self.mirrors = databases - .mirrors(user)? + .mirrors(user) + .ok() + .flatten() .unwrap_or(&[]) .iter() - .map(|dest_cluster| { + .map(|dest_cluster: &Cluster| { let mirror_config = databases.mirror_config(source_db, dest_cluster.name()); Mirror::spawn(source_db, dest_cluster, mirror_config) }) diff --git a/pgdog/src/backend/pool/lb/test.rs b/pgdog/src/backend/pool/lb/test.rs index 31d29a48e..5ff5788fd 100644 --- a/pgdog/src/backend/pool/lb/test.rs +++ b/pgdog/src/backend/pool/lb/test.rs @@ -33,7 +33,7 @@ fn create_test_pool_config(host: &str, port: u16) -> PoolConfig { fn setup_test_replicas() -> LoadBalancer { let pool_config1 = create_test_pool_config("127.0.0.1", 5432); - let pool_config2 = create_test_pool_config("localhost", 5432); + let pool_config2 = create_test_pool_config("127.0.0.1", 5432); let replicas = LoadBalancer::new( &None, @@ -171,7 +171,7 @@ async fn test_primary_pool_banning() { let primary_pool = Pool::new(&primary_config); primary_pool.launch(); - let replica_configs = [create_test_pool_config("localhost", 5432)]; + let replica_configs = [create_test_pool_config("127.0.0.1", 5432)]; let replicas = LoadBalancer::new( &Some(primary_pool), @@ -325,7 +325,7 @@ async fn test_read_write_split_exclude_primary() { primary_pool.launch(); let replica_configs = [ - create_test_pool_config("localhost", 5432), + create_test_pool_config("127.0.0.1", 5432), create_test_pool_config("127.0.0.1", 5432), ]; @@ -363,7 +363,7 @@ async fn test_read_write_split_include_primary() { let primary_pool = Pool::new(&primary_config); primary_pool.launch(); - let replica_configs = [create_test_pool_config("localhost", 5432)]; + let replica_configs = [create_test_pool_config("127.0.0.1", 5432)]; let replicas = LoadBalancer::new( &Some(primary_pool), @@ -397,7 +397,7 @@ async fn test_read_write_split_include_primary() { async fn test_read_write_split_exclude_primary_no_primary() { // Test exclude primary setting when no primary exists let replica_configs = [ - create_test_pool_config("localhost", 5432), + create_test_pool_config("127.0.0.1", 5432), create_test_pool_config("127.0.0.1", 5432), ]; @@ -427,7 +427,7 @@ async fn test_read_write_split_exclude_primary_no_primary() { async fn test_read_write_split_include_primary_no_primary() { // Test include primary setting when no primary exists let replica_configs = [ - create_test_pool_config("localhost", 5432), + create_test_pool_config("127.0.0.1", 5432), create_test_pool_config("127.0.0.1", 5432), ]; @@ -460,7 +460,7 @@ async fn test_read_write_split_with_banned_primary() { let primary_pool = Pool::new(&primary_config); primary_pool.launch(); - let replica_configs = [create_test_pool_config("localhost", 5432)]; + let replica_configs = [create_test_pool_config("127.0.0.1", 5432)]; let replicas = LoadBalancer::new( &Some(primary_pool), @@ -500,7 +500,7 @@ async fn test_read_write_split_with_banned_replicas() { let primary_pool = Pool::new(&primary_config); primary_pool.launch(); - let replica_configs = [create_test_pool_config("localhost", 5432)]; + let replica_configs = [create_test_pool_config("127.0.0.1", 5432)]; let replicas = LoadBalancer::new( &Some(primary_pool), @@ -541,7 +541,7 @@ async fn test_read_write_split_exclude_primary_with_round_robin() { primary_pool.launch(); let replica_configs = [ - create_test_pool_config("localhost", 5432), + create_test_pool_config("127.0.0.1", 5432), create_test_pool_config("127.0.0.1", 5432), ]; @@ -588,7 +588,7 @@ async fn test_read_write_split_exclude_primary_with_round_robin() { #[tokio::test] async fn test_monitor_shuts_down_on_notify() { let pool_config1 = create_test_pool_config("127.0.0.1", 5432); - let pool_config2 = create_test_pool_config("localhost", 5432); + let pool_config2 = create_test_pool_config("127.0.0.1", 5432); let replicas = LoadBalancer::new( &None, @@ -708,7 +708,7 @@ async fn test_monitor_does_not_ban_with_zero_ban_timeout() { let pool_config2 = PoolConfig { address: Address { - host: "localhost".into(), + host: "127.0.0.1".into(), port: 5432, user: "pgdog".into(), password: "pgdog".into(), @@ -780,7 +780,7 @@ async fn test_include_primary_if_replica_banned_no_bans() { let primary_pool = Pool::new(&primary_config); primary_pool.launch(); - let replica_configs = [create_test_pool_config("localhost", 5432)]; + let replica_configs = [create_test_pool_config("127.0.0.1", 5432)]; let replicas = LoadBalancer::new( &Some(primary_pool), @@ -816,7 +816,7 @@ async fn test_include_primary_if_replica_banned_with_ban() { let primary_pool = Pool::new(&primary_config); primary_pool.launch(); - let replica_configs = [create_test_pool_config("localhost", 5432)]; + let replica_configs = [create_test_pool_config("127.0.0.1", 5432)]; let replicas = LoadBalancer::new( &Some(primary_pool), @@ -865,7 +865,7 @@ async fn test_has_replicas_with_primary_and_replicas() { let primary_pool = Pool::new(&primary_config); primary_pool.launch(); - let replica_configs = [create_test_pool_config("localhost", 5432)]; + let replica_configs = [create_test_pool_config("127.0.0.1", 5432)]; let lb = LoadBalancer::new( &Some(primary_pool), @@ -945,7 +945,7 @@ async fn test_set_role() { #[tokio::test] async fn test_can_move_conns_to_same_config() { let pool_config1 = create_test_pool_config("127.0.0.1", 5432); - let pool_config2 = create_test_pool_config("localhost", 5432); + let pool_config2 = create_test_pool_config("127.0.0.1", 5432); let lb1 = LoadBalancer::new( &None, @@ -967,7 +967,7 @@ async fn test_can_move_conns_to_same_config() { #[tokio::test] async fn test_can_move_conns_to_different_count() { let pool_config1 = create_test_pool_config("127.0.0.1", 5432); - let pool_config2 = create_test_pool_config("localhost", 5432); + let pool_config2 = create_test_pool_config("127.0.0.1", 5432); let lb1 = LoadBalancer::new( &None, @@ -989,7 +989,7 @@ async fn test_can_move_conns_to_different_count() { #[tokio::test] async fn test_can_move_conns_to_different_addresses() { let pool_config1 = create_test_pool_config("127.0.0.1", 5432); - let pool_config2 = create_test_pool_config("localhost", 5432); + let pool_config2 = create_test_pool_config("127.0.0.1", 5432); let pool_config3 = create_test_pool_config("127.0.0.1", 5433); let lb1 = LoadBalancer::new( @@ -1050,7 +1050,7 @@ async fn test_monitor_unbans_all_when_second_target_becomes_unhealthy_after_firs #[tokio::test] async fn test_least_active_connections_prefers_pool_with_fewer_checked_out() { let pool_config1 = create_test_pool_config("127.0.0.1", 5432); - let pool_config2 = create_test_pool_config("localhost", 5432); + let pool_config2 = create_test_pool_config("127.0.0.1", 5432); let replicas = LoadBalancer::new( &None, @@ -1086,7 +1086,7 @@ async fn test_least_active_connections_prefers_pool_with_fewer_checked_out() { fn setup_test_replicas_no_launch() -> LoadBalancer { let pool_config1 = create_test_pool_config("127.0.0.1", 5432); - let pool_config2 = create_test_pool_config("localhost", 5432); + let pool_config2 = create_test_pool_config("127.0.0.1", 5432); LoadBalancer::new( &None, @@ -1337,7 +1337,7 @@ fn test_ban_check_does_not_ban_with_zero_ban_timeout() { let pool_config2 = PoolConfig { address: Address { - host: "localhost".into(), + host: "127.0.0.1".into(), port: 5432, user: "pgdog".into(), password: "pgdog".into(), diff --git a/pgdog/src/config/mod.rs b/pgdog/src/config/mod.rs index c20aa4d37..1a6e141ad 100644 --- a/pgdog/src/config/mod.rs +++ b/pgdog/src/config/mod.rs @@ -16,7 +16,7 @@ pub mod sharding; pub mod users; pub use core::{Config, ConfigAndUsers}; -pub use database::{Database, Role}; +pub use database::{Database, EnumeratedDatabase, Role}; pub use error::Error; pub use general::General; pub use memory::*; @@ -314,3 +314,102 @@ pub fn load_test_sharded() { set(config).unwrap(); init().unwrap(); } + +/// Load a wildcard test configuration. +/// +/// Sets up a wildcard database template (`name = "*"`) pointing at a real +/// PostgreSQL server and a wildcard user (`name = "*", database = "*"`). +/// An explicit pool for user=pgdog / database=pgdog is also created so +/// that tests can compare explicit vs. wildcard resolution. +#[cfg(test)] +pub fn load_test_wildcard() { + use crate::backend::databases::init; + + let mut config = ConfigAndUsers::default(); + config.config.general.min_pool_size = 0; + + config.config.databases = vec![ + // Explicit database — should always take priority. + Database { + name: "pgdog".into(), + host: "127.0.0.1".into(), + port: 5432, + role: Role::Primary, + database_name: Some("pgdog".into()), + ..Default::default() + }, + // Wildcard template — any other database name resolves here. + Database { + name: "*".into(), + host: "127.0.0.1".into(), + port: 5432, + role: Role::Primary, + ..Default::default() + }, + ]; + + config.users.users = vec![ + // Explicit user for the explicit database. + User { + name: "pgdog".into(), + database: "pgdog".into(), + password: Some("pgdog".into()), + ..Default::default() + }, + // Wildcard user — any user / any database. + User { + name: "*".into(), + database: "*".into(), + password: Some("pgdog".into()), + ..Default::default() + }, + ]; + + set(config).unwrap(); + init().unwrap(); +} +/// Like [`load_test_wildcard`] but also sets `max_wildcard_pools` so tests +/// can exercise the pool-count limit without modifying the global default. +pub fn load_test_wildcard_with_limit(max_wildcard_pools: usize) { + use crate::backend::databases::init; + + let mut config = ConfigAndUsers::default(); + config.config.general.min_pool_size = 0; + config.config.general.max_wildcard_pools = max_wildcard_pools; + + config.config.databases = vec![ + Database { + name: "pgdog".into(), + host: "127.0.0.1".into(), + port: 5432, + role: Role::Primary, + database_name: Some("pgdog".into()), + ..Default::default() + }, + Database { + name: "*".into(), + host: "127.0.0.1".into(), + port: 5432, + role: Role::Primary, + ..Default::default() + }, + ]; + + config.users.users = vec![ + User { + name: "pgdog".into(), + database: "pgdog".into(), + password: Some("pgdog".into()), + ..Default::default() + }, + User { + name: "*".into(), + database: "*".into(), + password: Some("pgdog".into()), + ..Default::default() + }, + ]; + + set(config).unwrap(); + init().unwrap(); +} diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index b868a6c1d..e6124f3cc 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -156,7 +156,11 @@ impl Client { let comms = ClientComms::new(&id); // Auto database. - let exists = databases::databases().exists((user, database)); + let dbs = databases::databases(); + let exists = dbs.exists((user, database)); + let wildcard_available = !exists && dbs.exists_or_wildcard((user, database)); + drop(dbs); + let passthrough_password = if config.config.general.passthrough_auth() && !admin { let password = if auth_type.trust() { // Use empty password. @@ -172,7 +176,7 @@ impl Client { Password::from_bytes(password.to_bytes()?)? }; - if !exists { + if !exists && !wildcard_available { let user = user_from_params(¶ms, &password).ok(); if let Some(user) = user { databases::add(user); diff --git a/pgdog/src/frontend/client/query_engine/test/mod.rs b/pgdog/src/frontend/client/query_engine/test/mod.rs index de93b592b..292e06a57 100644 --- a/pgdog/src/frontend/client/query_engine/test/mod.rs +++ b/pgdog/src/frontend/client/query_engine/test/mod.rs @@ -18,6 +18,7 @@ mod set; mod set_schema_sharding; mod sharded; mod spliced; +mod wildcard; pub(super) fn test_client() -> Client { load_test(); diff --git a/pgdog/src/frontend/client/query_engine/test/omni.rs b/pgdog/src/frontend/client/query_engine/test/omni.rs index 1dfb4c1c6..eae4b31e9 100644 --- a/pgdog/src/frontend/client/query_engine/test/omni.rs +++ b/pgdog/src/frontend/client/query_engine/test/omni.rs @@ -9,13 +9,7 @@ use super::prelude::*; async fn test_omni_update_returns_single_shard_count() { let mut client = TestClient::new_sharded(Parameters::default()).await; - // Setup: create table and insert data on both shards - client - .send_simple(Query::new( - "CREATE TABLE IF NOT EXISTS sharded_omni (id BIGINT PRIMARY KEY, value TEXT)", - )) - .await; - client.read_until('Z').await.unwrap(); + // Setup: table is provisioned by integration/setup.sh client .send_simple(Query::new("DELETE FROM sharded_omni")) @@ -47,7 +41,7 @@ async fn test_omni_update_returns_single_shard_count() { // Cleanup client - .send_simple(Query::new("DROP TABLE IF EXISTS sharded_omni")) + .send_simple(Query::new("DELETE FROM sharded_omni")) .await; client.read_until('Z').await.unwrap(); } @@ -56,13 +50,7 @@ async fn test_omni_update_returns_single_shard_count() { async fn test_omni_delete_returns_single_shard_count() { let mut client = TestClient::new_sharded(Parameters::default()).await; - // Setup - client - .send_simple(Query::new( - "CREATE TABLE IF NOT EXISTS sharded_omni (id BIGINT PRIMARY KEY, value TEXT)", - )) - .await; - client.read_until('Z').await.unwrap(); + // Setup: table is provisioned by integration/setup.sh client .send_simple(Query::new("DELETE FROM sharded_omni")) @@ -92,7 +80,7 @@ async fn test_omni_delete_returns_single_shard_count() { // Cleanup client - .send_simple(Query::new("DROP TABLE IF EXISTS sharded_omni")) + .send_simple(Query::new("DELETE FROM sharded_omni")) .await; client.read_until('Z').await.unwrap(); } @@ -101,13 +89,7 @@ async fn test_omni_delete_returns_single_shard_count() { async fn test_omni_insert_returns_single_shard_count() { let mut client = TestClient::new_sharded(Parameters::default()).await; - // Setup - client - .send_simple(Query::new( - "CREATE TABLE IF NOT EXISTS sharded_omni (id BIGINT PRIMARY KEY, value TEXT)", - )) - .await; - client.read_until('Z').await.unwrap(); + // Setup: table is provisioned by integration/setup.sh client .send_simple(Query::new("DELETE FROM sharded_omni")) @@ -132,7 +114,7 @@ async fn test_omni_insert_returns_single_shard_count() { // Cleanup client - .send_simple(Query::new("DROP TABLE IF EXISTS sharded_omni")) + .send_simple(Query::new("DELETE FROM sharded_omni")) .await; client.read_until('Z').await.unwrap(); } @@ -141,13 +123,7 @@ async fn test_omni_insert_returns_single_shard_count() { async fn test_omni_update_returning_only_from_one_shard() { let mut client = TestClient::new_sharded(Parameters::default()).await; - // Setup - client - .send_simple(Query::new( - "CREATE TABLE IF NOT EXISTS sharded_omni (id BIGINT PRIMARY KEY, value TEXT)", - )) - .await; - client.read_until('Z').await.unwrap(); + // Setup: table is provisioned by integration/setup.sh client .send_simple(Query::new("DELETE FROM sharded_omni")) @@ -194,7 +170,7 @@ async fn test_omni_update_returning_only_from_one_shard() { // Cleanup client - .send_simple(Query::new("DROP TABLE IF EXISTS sharded_omni")) + .send_simple(Query::new("DELETE FROM sharded_omni")) .await; client.read_until('Z').await.unwrap(); } @@ -203,13 +179,7 @@ async fn test_omni_update_returning_only_from_one_shard() { async fn test_omni_delete_returning_only_from_one_shard() { let mut client = TestClient::new_sharded(Parameters::default()).await; - // Setup - client - .send_simple(Query::new( - "CREATE TABLE IF NOT EXISTS sharded_omni (id BIGINT PRIMARY KEY, value TEXT)", - )) - .await; - client.read_until('Z').await.unwrap(); + // Setup: table is provisioned by integration/setup.sh client .send_simple(Query::new("DELETE FROM sharded_omni")) @@ -246,7 +216,7 @@ async fn test_omni_delete_returning_only_from_one_shard() { // Cleanup client - .send_simple(Query::new("DROP TABLE IF EXISTS sharded_omni")) + .send_simple(Query::new("DELETE FROM sharded_omni")) .await; client.read_until('Z').await.unwrap(); } @@ -255,13 +225,7 @@ async fn test_omni_delete_returning_only_from_one_shard() { async fn test_omni_insert_returning_only_from_one_shard() { let mut client = TestClient::new_sharded(Parameters::default()).await; - // Setup - client - .send_simple(Query::new( - "CREATE TABLE IF NOT EXISTS sharded_omni (id BIGINT PRIMARY KEY, value TEXT)", - )) - .await; - client.read_until('Z').await.unwrap(); + // Setup: table is provisioned by integration/setup.sh client .send_simple(Query::new("DELETE FROM sharded_omni")) @@ -293,7 +257,7 @@ async fn test_omni_insert_returning_only_from_one_shard() { // Cleanup client - .send_simple(Query::new("DROP TABLE IF EXISTS sharded_omni")) + .send_simple(Query::new("DELETE FROM sharded_omni")) .await; client.read_until('Z').await.unwrap(); } diff --git a/pgdog/src/frontend/client/query_engine/test/wildcard.rs b/pgdog/src/frontend/client/query_engine/test/wildcard.rs new file mode 100644 index 000000000..b988d4ba7 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/test/wildcard.rs @@ -0,0 +1,275 @@ +use crate::backend::databases::{add_wildcard_pool, databases, evict_idle_wildcard_pools}; +use crate::config::load_test_wildcard_with_limit; +use crate::frontend::client::test::test_client::TestClient; +use crate::net::{Parameters, Query}; + +/// Wildcard database: connecting to an unmapped database name triggers +/// dynamic pool creation from the "*" template. The pool should forward +/// queries to the real Postgres database whose name matches the +/// client-requested name. +#[tokio::test] +async fn test_wildcard_database_simple_query() { + let mut params = Parameters::default(); + params.insert("user", "pgdog"); + params.insert("database", "pgdog"); + + let mut client = TestClient::new_wildcard(params).await; + + // The explicit pool for (pgdog, pgdog) already exists, so this goes + // through the explicit path. Verify basic connectivity. + client.send_simple(Query::new("SELECT 1 AS result")).await; + let messages = client.read_until('Z').await.unwrap(); + assert!( + messages.len() >= 3, + "expected DataRow + CommandComplete + ReadyForQuery" + ); +} + +/// When a wildcard template is configured, `exists_or_wildcard` should +/// return true for database names that don't have an explicit pool but +/// match the wildcard pattern. +#[tokio::test] +async fn test_wildcard_exists_or_wildcard() { + use crate::backend::databases::databases; + use crate::config::load_test_wildcard; + + load_test_wildcard(); + + let dbs = databases(); + + // Explicit pool exists: + assert!(dbs.exists(("pgdog", "pgdog"))); + assert!(dbs.exists_or_wildcard(("pgdog", "pgdog"))); + + // No explicit pool, but wildcard matches: + assert!(!dbs.exists(("pgdog", "some_other_db"))); + assert!(dbs.exists_or_wildcard(("pgdog", "some_other_db"))); + + // Fully unknown user + database — wildcard user+db template covers it: + assert!(!dbs.exists(("unknown_user", "unknown_db"))); + assert!(dbs.exists_or_wildcard(("unknown_user", "unknown_db"))); +} + +/// Dynamic pool creation via `add_wildcard_pool` for a database that has +/// no explicit pool but matches the wildcard template. +#[tokio::test] +async fn test_wildcard_add_pool_dynamic() { + use crate::backend::databases::{add_wildcard_pool, databases}; + use crate::config::load_test_wildcard; + + load_test_wildcard(); + + let target_db = "pgdog"; // must exist in Postgres + + // Before: no explicit pool for ("pgdog", target_db) via wildcard user. + // The explicit pool is under user "pgdog" / database "pgdog", so let's + // test a wildcard user scenario. + let dbs = databases(); + assert!(!dbs.exists(("wildcard_user", target_db))); + drop(dbs); + + // Create pool dynamically. + let result = add_wildcard_pool("wildcard_user", target_db, None); + assert!(result.is_ok(), "add_wildcard_pool should succeed"); + let cluster = result.unwrap(); + assert!(cluster.is_some(), "wildcard match should produce a cluster"); + + // After: pool exists. + let dbs = databases(); + assert!(dbs.exists(("wildcard_user", target_db))); +} + +/// Requesting a database that doesn't exist in Postgres should still +/// create a wildcard pool — the error only surfaces when a connection +/// attempt is actually made. +#[tokio::test] +async fn test_wildcard_nonexistent_pg_database() { + use crate::backend::databases::{add_wildcard_pool, databases}; + use crate::config::load_test_wildcard; + + load_test_wildcard(); + + let fake_db = "totally_fake_db_12345"; + + let dbs = databases(); + assert!(!dbs.exists(("pgdog", fake_db))); + assert!(dbs.exists_or_wildcard(("pgdog", fake_db))); + drop(dbs); + + // Pool creation succeeds (it only creates the config, doesn't connect yet). + let result = add_wildcard_pool("pgdog", fake_db, None); + assert!(result.is_ok()); + assert!(result.unwrap().is_some()); + + let dbs = databases(); + assert!(dbs.exists(("pgdog", fake_db))); +} + +/// When `max_wildcard_pools` is set, pools beyond the limit are rejected +/// (returning `Ok(None)`) without panicking or erroring. +#[tokio::test] +async fn test_max_wildcard_pools_limit_enforced() { + load_test_wildcard_with_limit(2); + + // First two pools succeed. + let r1 = add_wildcard_pool("user_a", "db_one", None); + assert!(r1.is_ok()); + assert!( + r1.unwrap().is_some(), + "first pool within limit should be created" + ); + + let r2 = add_wildcard_pool("user_b", "db_two", None); + assert!(r2.is_ok()); + assert!( + r2.unwrap().is_some(), + "second pool within limit should be created" + ); + + // Third pool must be rejected. + let r3 = add_wildcard_pool("user_c", "db_three", None); + assert!(r3.is_ok(), "should not error, just reject gracefully"); + assert!( + r3.unwrap().is_none(), + "pool creation beyond max_wildcard_pools must return None" + ); + + let dbs = databases(); + assert!( + !dbs.exists(("user_c", "db_three")), + "rejected pool must not be registered" + ); +} + +/// `max_wildcard_pools = 0` means unlimited: any number of pools may be +/// created without triggering the limit. +#[tokio::test] +async fn test_max_wildcard_pools_zero_means_unlimited() { + load_test_wildcard_with_limit(0); + + for i in 0..5usize { + let db = format!("unlimited_db_{i}"); + let user = format!("unlimited_user_{i}"); + let result = add_wildcard_pool(&user, &db, None); + assert!(result.is_ok()); + assert!( + result.unwrap().is_some(), + "pool {i} should be created when limit is 0" + ); + } +} + +/// After a config reload (simulated by calling `load_test_wildcard_with_limit` +/// again) the wildcard pool counter is reset to zero, so pools that were +/// previously rejected can now be created. +#[tokio::test] +async fn test_max_wildcard_pools_counter_resets_on_reload() { + load_test_wildcard_with_limit(1); + + // Fill the single slot. + let r1 = add_wildcard_pool("reload_user", "first_db", None); + assert!(r1.unwrap().is_some(), "slot 1 should be filled"); + + // Next pool is rejected. + let r2 = add_wildcard_pool("reload_user", "second_db", None); + assert!(r2.unwrap().is_none(), "should be rejected at limit"); + + // Simulate SIGHUP / config reload — resets the counter and the pool map. + load_test_wildcard_with_limit(1); + + // The previously rejected database can now be created again. + let r3 = add_wildcard_pool("reload_user", "second_db", None); + assert!( + r3.unwrap().is_some(), + "should succeed after reload resets the counter" + ); +} + +/// Eviction removes an idle wildcard pool and clears it from the dynamic-pool +/// registry and the pool count. +#[tokio::test] +async fn test_evict_idle_wildcard_pools_removes_idle_pool() { + load_test_wildcard_with_limit(0); + + let result = add_wildcard_pool("evict_user", "evict_db", None); + assert!(result.unwrap().is_some(), "pool should be created"); + + let dbs = databases(); + assert!( + dbs.exists(("evict_user", "evict_db")), + "pool must exist before eviction" + ); + assert!( + dbs.dynamic_pools() + .iter() + .any(|u| u.user == "evict_user" && u.database == "evict_db"), + "pool must be tracked in dynamic_pools" + ); + assert!(dbs.wildcard_pool_count() >= 1, "counter must be positive"); + drop(dbs); + + // All freshly-created test pools have zero connections — eviction proceeds. + evict_idle_wildcard_pools(); + + let dbs = databases(); + assert!( + !dbs.exists(("evict_user", "evict_db")), + "evicted pool must no longer be registered" + ); + assert!( + !dbs.dynamic_pools() + .iter() + .any(|u| u.user == "evict_user" && u.database == "evict_db"), + "evicted pool must be removed from dynamic_pools" + ); +} + +/// Evicting a pool decrements `wildcard_pool_count` so that a new pool can be +/// created even when the limit was full before eviction. +#[tokio::test] +async fn test_evict_idle_wildcard_pools_decrements_count() { + load_test_wildcard_with_limit(1); + + // Fill the single slot. + let r = add_wildcard_pool("count_user", "count_db", None); + assert!(r.unwrap().is_some(), "slot should be filled"); + assert_eq!(databases().wildcard_pool_count(), 1, "counter should be 1"); + + // Slot is full — a new pool is rejected. + let rejected = add_wildcard_pool("count_user", "other_db", None); + assert!( + rejected.unwrap().is_none(), + "must be rejected when at limit" + ); + + evict_idle_wildcard_pools(); + + assert_eq!( + databases().wildcard_pool_count(), + 0, + "counter must drop to 0 after eviction" + ); + + // Now a new pool can be created again without reloading config. + let r2 = add_wildcard_pool("count_user", "new_db_after_eviction", None); + assert!( + r2.unwrap().is_some(), + "pool creation must succeed once eviction freed a slot" + ); +} + +/// When there are no dynamic pools, calling `evict_idle_wildcard_pools` is a +/// safe no-op that doesn't disturb statically-configured pools. +#[tokio::test] +async fn test_evict_idle_wildcard_pools_noop_on_empty() { + load_test_wildcard_with_limit(0); + + let before = databases().all().len(); + evict_idle_wildcard_pools(); + let after = databases().all().len(); + + assert_eq!( + before, after, + "static pools must be unaffected by eviction when no dynamic pools exist" + ); +} diff --git a/pgdog/src/frontend/client/test/test_client.rs b/pgdog/src/frontend/client/test/test_client.rs index fd912e914..688a62def 100644 --- a/pgdog/src/frontend/client/test/test_client.rs +++ b/pgdog/src/frontend/client/test/test_client.rs @@ -1,6 +1,8 @@ use std::{fmt::Debug, ops::Deref}; use bytes::{BufMut, Bytes, BytesMut}; +use once_cell::sync::Lazy; +use parking_lot::{Mutex, MutexGuard}; use pgdog_config::RewriteMode; use rand::{rng, Rng}; use tokio::{ @@ -10,7 +12,7 @@ use tokio::{ use crate::{ backend::databases::{reload_from_existing, shutdown}, - config::{config, load_test_replicas, load_test_sharded, set}, + config::{config, load_test_replicas, load_test_sharded, load_test_wildcard, set}, frontend::{ client::query_engine::QueryEngine, router::{parser::Shard, sharding::ContextBuilder}, @@ -48,11 +50,14 @@ macro_rules! expect_message { /// Test client. #[derive(Debug)] pub struct TestClient { + _test_guard: MutexGuard<'static, ()>, pub(crate) client: Client, pub(crate) engine: QueryEngine, pub(crate) conn: TcpStream, } +static TEST_CLIENT_LOCK: Lazy> = Lazy::new(|| Mutex::new(())); + impl TestClient { /// Create new test client after the login phase /// is complete. @@ -60,6 +65,8 @@ impl TestClient { /// Config needs to be loaded. /// async fn new(params: Parameters) -> Self { + let test_guard = TEST_CLIENT_LOCK.lock(); + let addr = "127.0.0.1:0".to_string(); let conn_addr = addr.clone(); let stream = TcpListener::bind(&conn_addr).await.unwrap(); @@ -77,6 +84,7 @@ impl TestClient { let client = connect_handle.await.unwrap(); Self { + _test_guard: test_guard, conn, engine: QueryEngine::from_client(&client).expect("create query engine from client"), client, @@ -96,6 +104,13 @@ impl TestClient { Self::new(params).await } + /// New client with wildcard database configuration. + #[allow(dead_code)] + pub(crate) async fn new_wildcard(params: Parameters) -> Self { + load_test_wildcard(); + Self::new(params).await + } + /// New client with cross-shard-queries disabled. pub(crate) async fn new_cross_shard_disabled(params: Parameters) -> Self { load_test_sharded();