diff --git a/nodedb/src/control/event_trigger.rs b/nodedb/src/control/event_trigger.rs index 80700f5e..d7303858 100644 --- a/nodedb/src/control/event_trigger.rs +++ b/nodedb/src/control/event_trigger.rs @@ -124,7 +124,7 @@ async fn execute_then_action( .replace("$collection", &event.collection) .replace("$operation", event.operation.as_str()); - let query_ctx = QueryContext::for_state(shared, 1); + let query_ctx = QueryContext::for_state(shared); match query_ctx.plan_sql(&sql, event.tenant_id).await { Ok(tasks) => { diff --git a/nodedb/src/control/planner/context.rs b/nodedb/src/control/planner/context.rs index c2dc949b..e238b687 100644 --- a/nodedb/src/control/planner/context.rs +++ b/nodedb/src/control/planner/context.rs @@ -50,29 +50,33 @@ pub struct QueryContext { } /// Inputs needed to construct an `OriginCatalog` per plan call. +/// +/// Tenant is intentionally **not** stored here: every plan call passes the +/// effective tenant to `build_adapter`, so a single `QueryContext` shared +/// across a pgwire handler can serve queries from connections belonging to +/// different tenants without cross-tenant catalog resolution. #[derive(Clone)] struct CatalogInputs { credentials: Arc, shared: Option>, - tenant_id: u32, retention_policy_registry: Option>, } impl CatalogInputs { - fn build_adapter(&self) -> super::catalog_adapter::OriginCatalog { + fn build_adapter(&self, tenant_id: u32) -> super::catalog_adapter::OriginCatalog { if let Some(weak) = &self.shared && let Some(shared) = weak.upgrade() { super::catalog_adapter::OriginCatalog::new_with_lease( &shared, - self.tenant_id, + tenant_id, self.retention_policy_registry.clone(), ) } else { super::catalog_adapter::OriginCatalog::new( Arc::clone(&self.credentials), - self.tenant_id, + tenant_id, self.retention_policy_registry.clone(), ) } @@ -97,10 +101,9 @@ impl QueryContext { /// path would return instantly anyway, but going through the /// sub-planner without a direct `Arc` reference /// would require threading one through every call site. - pub fn for_state(state: &crate::control::state::SharedState, tenant_id: u32) -> Self { + pub fn for_state(state: &crate::control::state::SharedState) -> Self { Self::with_catalog( Arc::clone(&state.credentials), - tenant_id, Some(Arc::clone(&state.retention_policy_registry)), ) } @@ -110,16 +113,12 @@ impl QueryContext { /// query's plan acquires descriptor leases before execution. /// Callers must hold an `Arc` — the adapter /// downgrades to `Weak` internally. - pub fn for_state_with_lease( - state: &Arc, - tenant_id: u32, - ) -> Self { + pub fn for_state_with_lease(state: &Arc) -> Self { let retention = Some(Arc::clone(&state.retention_policy_registry)); Self { catalog_inputs: Some(CatalogInputs { credentials: Arc::clone(&state.credentials), shared: Some(Arc::downgrade(state)), - tenant_id, retention_policy_registry: retention.clone(), }), retention_registry: retention, @@ -136,7 +135,6 @@ impl QueryContext { /// that construct a context without an `Arc`. pub fn with_catalog( credentials: Arc, - tenant_id: u32, retention_policy_registry: Option< Arc, >, @@ -144,7 +142,6 @@ impl QueryContext { let catalog_inputs = Some(CatalogInputs { credentials, shared: None, - tenant_id, retention_policy_registry: retention_policy_registry.clone(), }); @@ -193,7 +190,7 @@ impl QueryContext { // `recorded_versions` field is per-plan state, and // two concurrent plans through a shared QueryContext // would otherwise interleave their recorded sets. - let catalog = inputs.build_adapter(); + let catalog = inputs.build_adapter(tenant_id.as_u32()); let plans = nodedb_sql::plan_sql(sql, &catalog).map_err(|e| match e { nodedb_sql::SqlError::RetryableSchemaChanged { descriptor } => { crate::Error::RetryableSchemaChanged { descriptor } @@ -292,7 +289,7 @@ impl QueryContext { // through a different cache key), but constructing the // adapter fresh keeps the adapter's state per-plan and // allows future extension. - let catalog = inputs.build_adapter(); + let catalog = inputs.build_adapter(tenant_id.as_u32()); let plans = nodedb_sql::plan_sql_with_params(sql, params, &catalog).map_err(|e| { crate::Error::PlanError { detail: format!("{e}"), diff --git a/nodedb/src/control/planner/procedural/executor/core/dispatch.rs b/nodedb/src/control/planner/procedural/executor/core/dispatch.rs index 3684b768..c51ad949 100644 --- a/nodedb/src/control/planner/procedural/executor/core/dispatch.rs +++ b/nodedb/src/control/planner/procedural/executor/core/dispatch.rs @@ -44,10 +44,7 @@ impl<'a> StatementExecutor<'a> { pub(super) async fn execute_dml(&self, sql: &str, bindings: &RowBindings) -> crate::Result<()> { let bound_sql = fold_literal_string_concat(&bindings.substitute(sql)); - let ctx = crate::control::planner::context::QueryContext::for_state( - self.state, - self.tenant_id.as_u32(), - ); + let ctx = crate::control::planner::context::QueryContext::for_state(self.state); let tasks = ctx.plan_sql(&bound_sql, self.tenant_id).await?; if let Some(ref tx_ctx) = self.tx_ctx { diff --git a/nodedb/src/control/scatter_gather.rs b/nodedb/src/control/scatter_gather.rs index 714a65e0..55ceaba3 100644 --- a/nodedb/src/control/scatter_gather.rs +++ b/nodedb/src/control/scatter_gather.rs @@ -343,7 +343,6 @@ pub async fn coordinate_cross_shard_hop( // (same pattern as QueryContext::for_state but without &SharedState). let plan_ctx = crate::control::planner::context::QueryContext::with_catalog( std::sync::Arc::clone(&credentials_clone), - tenant_id_u32, Some(std::sync::Arc::clone(&retention_clone)), ); diff --git a/nodedb/src/control/server/http/server.rs b/nodedb/src/control/server/http/server.rs index 934449cd..1a7e8d28 100644 --- a/nodedb/src/control/server/http/server.rs +++ b/nodedb/src/control/server/http/server.rs @@ -159,7 +159,7 @@ pub async fn run_with_listener( let mut shutdown_rx = bus.handle().flat_watch().raw_receiver(); let query_ctx = Arc::new(crate::control::planner::context::QueryContext::for_state( - &shared, 1, + &shared, )); let state = AppState { shared, @@ -198,7 +198,7 @@ pub async fn run( let mut shutdown_rx = bus.handle().flat_watch().raw_receiver(); let query_ctx = Arc::new(crate::control::planner::context::QueryContext::for_state( - &shared, 1, + &shared, )); let state = AppState { shared, diff --git a/nodedb/src/control/server/native/session.rs b/nodedb/src/control/server/native/session.rs index e158a145..d143859d 100644 --- a/nodedb/src/control/server/native/session.rs +++ b/nodedb/src/control/server/native/session.rs @@ -48,7 +48,7 @@ impl NativeSession { state: Arc, auth_mode: AuthMode, ) -> Self { - let query_ctx = QueryContext::for_state(&state, 1); // default tenant + let query_ctx = QueryContext::for_state(&state); Self { stream, peer_addr, diff --git a/nodedb/src/control/server/pgwire/ddl/collection/check_constraint.rs b/nodedb/src/control/server/pgwire/ddl/collection/check_constraint.rs index 2d416f6f..da6d37e5 100644 --- a/nodedb/src/control/server/pgwire/ddl/collection/check_constraint.rs +++ b/nodedb/src/control/server/pgwire/ddl/collection/check_constraint.rs @@ -102,8 +102,7 @@ async fn enforce_subquery_check( // General fallback: wrap in subselect. let restructured = restructure_subquery_check(&substituted); - let query_ctx = - crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32()); + let query_ctx = crate::control::planner::context::QueryContext::for_state(state); let tasks = match query_ctx.plan_sql(&restructured.sql, tenant_id).await { Ok(t) => t, diff --git a/nodedb/src/control/server/pgwire/ddl/collection/insert_parse.rs b/nodedb/src/control/server/pgwire/ddl/collection/insert_parse.rs index a9cb815c..25555a52 100644 --- a/nodedb/src/control/server/pgwire/ddl/collection/insert_parse.rs +++ b/nodedb/src/control/server/pgwire/ddl/collection/insert_parse.rs @@ -304,8 +304,7 @@ pub(super) async fn plan_and_dispatch( tenant_id: nodedb_types::TenantId, sql: &str, ) -> PgWireResult<()> { - let query_ctx = - crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32()); + let query_ctx = crate::control::planner::context::QueryContext::for_state(state); let tasks = query_ctx .plan_sql(sql, tenant_id) .await diff --git a/nodedb/src/control/server/pgwire/ddl/maintenance/analyze.rs b/nodedb/src/control/server/pgwire/ddl/maintenance/analyze.rs index 666392d8..5cec092e 100644 --- a/nodedb/src/control/server/pgwire/ddl/maintenance/analyze.rs +++ b/nodedb/src/control/server/pgwire/ddl/maintenance/analyze.rs @@ -69,7 +69,7 @@ pub async fn handle_analyze( // Dispatch a scan to the Data Plane to collect all rows. let scan_sql = format!("SELECT * FROM {collection}"); - let query_ctx = crate::control::planner::context::QueryContext::for_state(state, tenant_id); + let query_ctx = crate::control::planner::context::QueryContext::for_state(state); let rows = match query_ctx.plan_sql(&scan_sql, identity.tenant_id).await { Ok(tasks) => { let mut json_rows = Vec::new(); diff --git a/nodedb/src/control/server/pgwire/ddl/typeguard/validate.rs b/nodedb/src/control/server/pgwire/ddl/typeguard/validate.rs index ab0566e9..0ea2429d 100644 --- a/nodedb/src/control/server/pgwire/ddl/typeguard/validate.rs +++ b/nodedb/src/control/server/pgwire/ddl/typeguard/validate.rs @@ -57,8 +57,7 @@ pub async fn validate_typeguard( // Scan all documents. let scan_sql = format!("SELECT * FROM {coll_name}"); - let query_ctx = - crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32()); + let query_ctx = crate::control::planner::context::QueryContext::for_state(state); let tasks = query_ctx .plan_sql(&scan_sql, tenant_id) .await diff --git a/nodedb/src/control/server/pgwire/handler/core.rs b/nodedb/src/control/server/pgwire/handler/core.rs index ecc8ce46..daeedbce 100644 --- a/nodedb/src/control/server/pgwire/handler/core.rs +++ b/nodedb/src/control/server/pgwire/handler/core.rs @@ -19,6 +19,7 @@ use pgwire::api::store::PortalStore; use pgwire::api::{ClientInfo, ClientPortalStore}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use pgwire::messages::PgWireBackendMessage; +use pgwire::messages::PgWireFrontendMessage; use crate::bridge::envelope::PhysicalPlan; use crate::config::auth::AuthMode; @@ -61,7 +62,7 @@ impl NodeDbPgHandler { // Sub-planners (check constraints, type guards, ANALYZE, // procedural DML) build their own no-lease `QueryContext` // via `for_state`. - let query_ctx = QueryContext::for_state_with_lease(&state, 1); + let query_ctx = QueryContext::for_state_with_lease(&state); let query_parser = Arc::new(NodeDbQueryParser::new(Arc::clone(&state))); Self { state, @@ -90,22 +91,20 @@ impl NodeDbPgHandler { match self.auth_mode { AuthMode::Trust => { - if let Some(identity) = self - .state + // Strict resolution: `post_startup` has already ensured the + // user exists (either because it was already in the store + // or via the bootstrap auto-create path on an empty store), + // so any miss here is a genuine unknown user. + self.state .credentials .to_identity(&username, AuthMethod::Trust) - { - Ok(identity) - } else { - Ok(AuthenticatedIdentity { - user_id: 0, - username, - tenant_id: TenantId::new(1), - auth_method: AuthMethod::Trust, - roles: vec![Role::Superuser], - is_superuser: true, + .ok_or_else(|| { + PgWireError::UserError(Box::new(ErrorInfo::new( + "FATAL".to_owned(), + "28000".to_owned(), + format!("trust auth: user '{username}' does not exist"), + ))) }) - } } AuthMode::Password | AuthMode::Md5Password | AuthMode::Certificate => self .state @@ -310,5 +309,66 @@ impl ExtendedQueryHandler for NodeDbPgHandler { } } -// Trust mode: NoopStartupHandler (no authentication). -impl NoopStartupHandler for NodeDbPgHandler {} +// Trust mode: NoopStartupHandler skips password verification but still +// resolves the connecting username against the credential store, matching +// PostgreSQL's `trust` method semantics. Unknown users are rejected before +// the server sends ReadyForQuery. +#[async_trait] +impl NoopStartupHandler for NodeDbPgHandler { + async fn post_startup( + &self, + client: &mut C, + _message: PgWireFrontendMessage, + ) -> PgWireResult<()> + where + C: ClientInfo + Sink + Unpin + Send, + C::Error: Debug, + PgWireError: From<>::Error>, + { + if !matches!(self.auth_mode, AuthMode::Trust) { + return Ok(()); + } + + let username = client + .metadata() + .get("user") + .cloned() + .unwrap_or_else(|| "unknown".to_string()); + + if self + .state + .credentials + .to_identity(&username, AuthMethod::Trust) + .is_some() + { + return Ok(()); + } + + // Bootstrap: an empty credential store admits the first connecting + // user as a tenant-1 superuser and persists them so subsequent + // queries on the same connection (and any reconnect) resolve + // through the normal strict path. + if self.state.credentials.is_empty() { + let _ = self.state.credentials.create_user( + &username, + "", + TenantId::new(1), + vec![Role::Superuser], + ); + return Ok(()); + } + + let source = client.socket_addr().to_string(); + self.state.audit_record( + AuditEvent::AuthFailure, + None, + &source, + &format!("trust auth: user '{username}' does not exist"), + ); + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "FATAL".to_owned(), + "28000".to_owned(), + format!("trust auth: user '{username}' does not exist"), + )))) + } +} diff --git a/nodedb/src/control/server/pgwire/handler/prepared/parser.rs b/nodedb/src/control/server/pgwire/handler/prepared/parser.rs index 6ad50f4d..d24f5a52 100644 --- a/nodedb/src/control/server/pgwire/handler/prepared/parser.rs +++ b/nodedb/src/control/server/pgwire/handler/prepared/parser.rs @@ -29,22 +29,40 @@ impl NodeDbQueryParser { Self { state } } - /// Infer parameter and result types using nodedb-sql catalog. + /// Infer parameter and result types using nodedb-sql catalog, scoped to + /// the connecting user's tenant so a tenant-N user's Parse message + /// resolves against tenant-N's catalog (not tenant 1). fn try_infer_types( &self, sql: &str, client_types: &[Option], + tenant_id: u32, ) -> (Vec>, Vec) { let catalog = crate::control::planner::catalog_adapter::OriginCatalog::new( Arc::clone(&self.state.credentials), - 1, // default tenant for parse-time inference + tenant_id, Some(Arc::clone(&self.state.retention_policy_registry)), ); - // Parse and plan to get collection info for result schema. + // Placeholder inference runs unconditionally so an unplannable + // SQL string (e.g. `WHERE id = $1` where the planner needs bound + // params to typecheck) still reports the right number of + // parameter slots in Describe. + let param_count = count_placeholders(sql); + let mut param_types = vec![None; param_count.max(client_types.len())]; + for (i, ct) in client_types.iter().enumerate() { + if let Some(t) = ct { + param_types[i] = Some(t.clone()); + } + } + + // Parse and plan to get collection info for result schema. A plan + // failure here is not fatal — Describe callers only need the + // parameter count to bind, and execution re-plans with bound + // params anyway. let plans = match nodedb_sql::plan_sql(sql, &catalog) { Ok(p) => p, - Err(_) => return (client_types.to_vec(), Vec::new()), + Err(_) => return (param_types, Vec::new()), }; // Infer result fields from the first plan. @@ -54,15 +72,6 @@ impl NodeDbQueryParser { Vec::new() }; - // Placeholder inference: count $N placeholders in SQL. - let param_count = count_placeholders(sql); - let mut param_types = vec![None; param_count.max(client_types.len())]; - for (i, ct) in client_types.iter().enumerate() { - if let Some(t) = ct { - param_types[i] = Some(t.clone()); - } - } - (param_types, result_fields) } } @@ -73,14 +82,35 @@ impl QueryParser for NodeDbQueryParser { async fn parse_sql( &self, - _client: &C, + client: &C, sql: &str, types: &[Option], ) -> PgWireResult where C: ClientInfo + Unpin + Send + Sync, { - let (param_types, result_fields) = self.try_infer_types(sql, types); + // Resolve the connecting user's tenant from pgwire metadata so + // parse-time catalog lookups are scoped to the right tenant. + // Unknown users fall back to tenant 1 only during bootstrap + // (credential store empty) — otherwise parse-time inference + // returns empty field info, which is the safe default. + let tenant_id = client + .metadata() + .get("user") + .and_then(|u| { + self.state + .credentials + .to_identity(u, crate::control::security::identity::AuthMethod::Trust) + .or_else(|| { + self.state.credentials.to_identity( + u, + crate::control::security::identity::AuthMethod::ScramSha256, + ) + }) + }) + .map(|id| id.tenant_id.as_u32()) + .unwrap_or(1); + let (param_types, result_fields) = self.try_infer_types(sql, types, tenant_id); Ok(ParsedStatement { sql: sql.to_owned(), diff --git a/nodedb/src/control/server/pgwire/handler/sql_exec.rs b/nodedb/src/control/server/pgwire/handler/sql_exec.rs index 9ff44675..ef469e04 100644 --- a/nodedb/src/control/server/pgwire/handler/sql_exec.rs +++ b/nodedb/src/control/server/pgwire/handler/sql_exec.rs @@ -375,10 +375,8 @@ impl NodeDbPgHandler { identity: &AuthenticatedIdentity, ) -> PgWireResult> { let tenant_id = identity.tenant_id; - let query_ctx = crate::control::planner::context::QueryContext::for_state_with_lease( - &self.state, - tenant_id.as_u32(), - ); + let query_ctx = + crate::control::planner::context::QueryContext::for_state_with_lease(&self.state); if let Some(mode) = self.sessions.get_parameter(_addr, "rounding_mode") { query_ctx.set_rounding_mode(&mode); diff --git a/nodedb/src/event/cdc/consume.rs b/nodedb/src/event/cdc/consume.rs index 0f725a9c..784db844 100644 --- a/nodedb/src/event/cdc/consume.rs +++ b/nodedb/src/event/cdc/consume.rs @@ -203,7 +203,7 @@ pub async fn consume_remote( trace_id: 0, }; - let query_ctx = crate::control::planner::context::QueryContext::for_state(state, tenant_id); + let query_ctx = crate::control::planner::context::QueryContext::for_state(state); let payloads = gateway .execute_sql(&gw_ctx, &sql, &[], || { diff --git a/nodedb/src/event/topic/publish.rs b/nodedb/src/event/topic/publish.rs index ece9bb6a..f58ce737 100644 --- a/nodedb/src/event/topic/publish.rs +++ b/nodedb/src/event/topic/publish.rs @@ -154,7 +154,7 @@ pub async fn publish_remote( trace_id: 0, }; - let query_ctx = crate::control::planner::context::QueryContext::for_state(state, tenant_id); + let query_ctx = crate::control::planner::context::QueryContext::for_state(state); gateway .execute_sql(&gw_ctx, &sql, &[], || { diff --git a/nodedb/tests/common/pgwire_harness.rs b/nodedb/tests/common/pgwire_harness.rs index 64a36e52..92a70301 100644 --- a/nodedb/tests/common/pgwire_harness.rs +++ b/nodedb/tests/common/pgwire_harness.rs @@ -17,6 +17,7 @@ use nodedb::wal::WalManager; /// A running test server with a connected pgwire client. pub struct TestServer { pub client: tokio_postgres::Client, + pub pg_port: u16, _conn_handle: tokio::task::JoinHandle<()>, shutdown_bus: nodedb::control::shutdown::ShutdownBus, poller_shutdown_tx: tokio::sync::watch::Sender, @@ -45,6 +46,16 @@ impl TestServer { nodedb::control::security::credential::store::CredentialStore::open(&catalog_path) .unwrap(), ); + // Provision the harness superuser `nodedb` so Trust-mode strict + // identity resolution accepts the default test connection. The + // bootstrap exception in the handler only fires when the store + // is empty, which would break as soon as any DDL creates a user. + let _ = credentials.create_user( + "nodedb", + "nodedb", + nodedb::types::TenantId::new(1), + vec![nodedb::control::security::identity::Role::Superuser], + ); let shared = SharedState::new_with_credentials(dispatcher, Arc::clone(&wal), credentials); // Data Plane core. @@ -139,6 +150,7 @@ impl TestServer { Self { client, + pg_port: pg_addr.port(), _conn_handle: conn_handle, shutdown_bus, poller_shutdown_tx, @@ -175,6 +187,26 @@ impl TestServer { } } + /// Open a second pgwire connection on the same listener under a different + /// username. Returns a client and its background connection task handle. + pub async fn connect_as( + &self, + user: &str, + password: &str, + ) -> Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), String> { + let conn_str = format!( + "host=127.0.0.1 port={} user={} password={} dbname=nodedb", + self.pg_port, user, password + ); + let (client, connection) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls) + .await + .map_err(|e| pg_error_detail(&e))?; + let handle = tokio::spawn(async move { + let _ = connection.await; + }); + Ok((client, handle)) + } + /// Execute a SQL statement expecting an error containing the given substring. pub async fn expect_error(&self, sql: &str, expected_substring: &str) { match self.client.simple_query(sql).await { diff --git a/nodedb/tests/pgwire_tenant_scoping.rs b/nodedb/tests/pgwire_tenant_scoping.rs new file mode 100644 index 00000000..64ad2bb1 --- /dev/null +++ b/nodedb/tests/pgwire_tenant_scoping.rs @@ -0,0 +1,259 @@ +//! pgwire wire-level tests for per-connection tenant scoping and Trust-mode +//! identity resolution. +//! +//! Covers the class of bug where the pgwire handler's planning context is +//! built once with a fixed tenant id, so queries from tenant-scoped users +//! plan against the wrong tenant's catalog. Also covers Trust-mode accepting +//! usernames that were never created as if they were superusers. + +mod common; + +use common::pgwire_harness::TestServer; + +/// Helper: superuser-side bootstrap — create a tenant and a tenant-scoped +/// user, plus a collection owned by that tenant. The harness's default +/// connection is Trust/superuser (tenant 1) and is used for setup. +async fn bootstrap_tenant_user(server: &TestServer, user: &str, collection: &str) { + server + .exec("CREATE TENANT acme ID 2") + .await + .expect("CREATE TENANT"); + server + .exec(&format!( + "CREATE USER {user} WITH PASSWORD 'x' ROLE readwrite TENANT 2" + )) + .await + .expect("CREATE USER"); + // Create the collection as the tenant-scoped user so ownership lands on + // tenant 2. This itself exercises the DDL path, which already reads + // identity.tenant_id correctly. + let (svc, _h) = server + .connect_as(user, "x") + .await + .expect("tenant user connect"); + svc.simple_query(&format!( + "CREATE COLLECTION {collection} TYPE DOCUMENT STRICT \ + (id TEXT PRIMARY KEY, content TEXT NOT NULL)" + )) + .await + .expect("tenant user CREATE COLLECTION"); + drop(svc); +} + +/// Run a simple query under a freshly opened tenant-user connection and +/// return either the rows (first column) or the server error message. +async fn query_as(server: &TestServer, user: &str, sql: &str) -> Result, String> { + let (client, _h) = server.connect_as(user, "x").await?; + match client.simple_query(sql).await { + Ok(msgs) => { + let mut rows = Vec::new(); + for msg in msgs { + if let tokio_postgres::SimpleQueryMessage::Row(row) = msg { + rows.push(row.get(0).unwrap_or("").to_string()); + } + } + Ok(rows) + } + Err(e) => Err(pg_err(&e)), + } +} + +fn pg_err(e: &tokio_postgres::Error) -> String { + if let Some(db) = e.as_db_error() { + format!("{}: {}", db.code().code(), db.message()) + } else { + format!("{e:?}") + } +} + +// ── #29: tenant-scoped planning via shared query_ctx ─────────────────── + +#[tokio::test] +async fn tenant_user_can_select_own_collection() { + let server = TestServer::start().await; + bootstrap_tenant_user(&server, "svc_sel", "t2_sel").await; + + let (svc, _h) = server.connect_as("svc_sel", "x").await.unwrap(); + svc.simple_query("INSERT INTO t2_sel (id, content) VALUES ('a', 'alpha')") + .await + .expect("INSERT under tenant user"); + + let rows = query_as(&server, "svc_sel", "SELECT id FROM t2_sel") + .await + .expect("SELECT under tenant user must not fail with 'unknown table'"); + assert_eq!(rows.len(), 1, "expected 1 row, got {rows:?}"); + assert!( + rows[0].contains("\"a\""), + "row should contain id 'a': {rows:?}" + ); +} + +#[tokio::test] +async fn tenant_user_can_insert_into_own_collection() { + let server = TestServer::start().await; + bootstrap_tenant_user(&server, "svc_ins", "t2_ins").await; + + let (svc, _h) = server.connect_as("svc_ins", "x").await.unwrap(); + svc.simple_query("INSERT INTO t2_ins (id, content) VALUES ('k', 'v')") + .await + .expect("INSERT must succeed for tenant-owned collection"); +} + +#[tokio::test] +async fn tenant_user_can_update_own_collection() { + let server = TestServer::start().await; + bootstrap_tenant_user(&server, "svc_upd", "t2_upd").await; + + let (svc, _h) = server.connect_as("svc_upd", "x").await.unwrap(); + svc.simple_query("INSERT INTO t2_upd (id, content) VALUES ('a', 'old')") + .await + .unwrap(); + svc.simple_query("UPDATE t2_upd SET content = 'new' WHERE id = 'a'") + .await + .expect("UPDATE must not fail with 'unknown table'"); + + let rows = query_as( + &server, + "svc_upd", + "SELECT content FROM t2_upd WHERE id = 'a'", + ) + .await + .unwrap(); + assert_eq!(rows.len(), 1); + assert!( + rows[0].contains("\"new\""), + "row should reflect updated content: {rows:?}" + ); +} + +#[tokio::test] +async fn tenant_user_can_delete_from_own_collection() { + let server = TestServer::start().await; + bootstrap_tenant_user(&server, "svc_del", "t2_del").await; + + let (svc, _h) = server.connect_as("svc_del", "x").await.unwrap(); + svc.simple_query("INSERT INTO t2_del (id, content) VALUES ('a', 'x')") + .await + .unwrap(); + svc.simple_query("DELETE FROM t2_del WHERE id = 'a'") + .await + .expect("DELETE must not fail with 'unknown table'"); + + let rows = query_as(&server, "svc_del", "SELECT id FROM t2_del") + .await + .unwrap(); + assert!(rows.is_empty(), "row should be deleted, got {rows:?}"); +} + +#[tokio::test] +async fn tenant_user_prepared_select_resolves_own_collection() { + // Extended protocol goes through NodeDbQueryParser::parse_sql, which + // builds its own OriginCatalog with a hardcoded tenant id. A tenant-2 + // user must still be able to prepare and execute a statement against + // a tenant-2 collection. + let server = TestServer::start().await; + bootstrap_tenant_user(&server, "svc_prep", "t2_prep").await; + + let (svc, _h) = server.connect_as("svc_prep", "x").await.unwrap(); + svc.simple_query("INSERT INTO t2_prep (id, content) VALUES ('a', 'alpha')") + .await + .unwrap(); + + // `prepare` drives Parse + Describe through NodeDbQueryParser::parse_sql, + // which constructs an OriginCatalog. Before the fix this catalog was + // hardcoded to tenant 1 and could not see tenant-2 collections — + // `prepare` would surface the server's "unknown table" error. + svc.prepare("SELECT content FROM t2_prep WHERE id = 'a'") + .await + .expect("prepare must resolve tenant-owned collection via parser.rs"); +} + +#[tokio::test] +async fn tenant_user_cannot_see_other_tenants_collection_as_empty() { + // Asymmetric-isolation guard: a tenant-2 user issuing SELECT against a + // tenant-1 collection must NOT silently return an empty result set. + // The correct behavior is the same "unknown table" a cross-tenant + // planner produces the other direction. Silent empty is a data-shape + // leak vector even though no rows cross. + let server = TestServer::start().await; + server + .exec( + "CREATE COLLECTION t1_only TYPE DOCUMENT STRICT \ + (id TEXT PRIMARY KEY, secret TEXT NOT NULL)", + ) + .await + .unwrap(); + server + .exec("INSERT INTO t1_only (id, secret) VALUES ('a', 'classified')") + .await + .unwrap(); + + server.exec("CREATE TENANT acme ID 2").await.unwrap(); + server + .exec("CREATE USER svc_xtn WITH PASSWORD 'x' ROLE readwrite TENANT 2") + .await + .unwrap(); + + let result = query_as(&server, "svc_xtn", "SELECT secret FROM t1_only").await; + match result { + Err(msg) => { + assert!( + msg.to_lowercase().contains("unknown table"), + "expected 'unknown table' error, got: {msg}" + ); + } + Ok(rows) => panic!( + "tenant-2 user must not see tenant-1 collection via silent empty result; got rows={rows:?}" + ), + } +} + +// ── #30: Trust-mode identity resolution ──────────────────────────────── + +#[tokio::test] +async fn trust_mode_rejects_unknown_username() { + // Under Trust mode NodeDB skips password verification, but it must + // still resolve the connecting username against the credential store. + // Accepting a fabricated username silently promotes arbitrary clients + // to tenant-1 superuser (see core.rs resolve_identity fallback). + let server = TestServer::start().await; + + let result = server + .connect_as("nosuchuser_ever_created", "anything") + .await; + assert!( + result.is_err(), + "Trust mode must reject a username that was never CREATE USER'd; got an accepted connection" + ); +} + +#[tokio::test] +async fn trust_mode_unknown_user_cannot_run_superuser_ddl() { + // Defense-in-depth regression guard for the same root cause: even if + // a connection is somehow permitted, an unknown username MUST NOT be + // silently fabricated as a superuser identity capable of tenant/user + // management DDL. Today core.rs sets `is_superuser: true` in the + // fallback branch, so `CREATE USER` from a fabricated name succeeds. + let server = TestServer::start().await; + + let Ok((client, _h)) = server.connect_as("ghost_admin", "anything").await else { + // If connect_as is already rejected by the prior test's fix, that + // is a strictly stronger guarantee — the bug is still captured. + return; + }; + + let result = client + .simple_query("CREATE USER mallory WITH PASSWORD 'y' ROLE readwrite TENANT 1") + .await; + assert!( + result.is_err(), + "unknown Trust-mode user must not be granted superuser privileges; CREATE USER succeeded" + ); + if let Err(e) = result { + let msg = pg_err(&e); + assert!( + msg.contains("42501") || msg.to_lowercase().contains("permission"), + "expected a permission-denied error, got: {msg}" + ); + } +}