Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nodedb/src/control/event_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn execute_then_action(
.replace("$collection", &event.collection)
.replace("$operation", event.operation.as_str());

let query_ctx = QueryContext::for_state(shared, 1);
let query_ctx = QueryContext::for_state(shared);

match query_ctx.plan_sql(&sql, event.tenant_id).await {
Ok(tasks) => {
Expand Down
27 changes: 12 additions & 15 deletions nodedb/src/control/planner/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,33 @@ pub struct QueryContext {
}

/// Inputs needed to construct an `OriginCatalog` per plan call.
///
/// Tenant is intentionally **not** stored here: every plan call passes the
/// effective tenant to `build_adapter`, so a single `QueryContext` shared
/// across a pgwire handler can serve queries from connections belonging to
/// different tenants without cross-tenant catalog resolution.
#[derive(Clone)]
struct CatalogInputs {
credentials: Arc<CredentialStore>,
shared: Option<std::sync::Weak<crate::control::state::SharedState>>,
tenant_id: u32,
retention_policy_registry:
Option<Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>>,
}

impl CatalogInputs {
fn build_adapter(&self) -> super::catalog_adapter::OriginCatalog {
fn build_adapter(&self, tenant_id: u32) -> super::catalog_adapter::OriginCatalog {
if let Some(weak) = &self.shared
&& let Some(shared) = weak.upgrade()
{
super::catalog_adapter::OriginCatalog::new_with_lease(
&shared,
self.tenant_id,
tenant_id,
self.retention_policy_registry.clone(),
)
} else {
super::catalog_adapter::OriginCatalog::new(
Arc::clone(&self.credentials),
self.tenant_id,
tenant_id,
self.retention_policy_registry.clone(),
)
}
Expand All @@ -97,10 +101,9 @@ impl QueryContext {
/// path would return instantly anyway, but going through the
/// sub-planner without a direct `Arc<SharedState>` reference
/// would require threading one through every call site.
pub fn for_state(state: &crate::control::state::SharedState, tenant_id: u32) -> Self {
pub fn for_state(state: &crate::control::state::SharedState) -> Self {
Self::with_catalog(
Arc::clone(&state.credentials),
tenant_id,
Some(Arc::clone(&state.retention_policy_registry)),
)
}
Expand All @@ -110,16 +113,12 @@ impl QueryContext {
/// query's plan acquires descriptor leases before execution.
/// Callers must hold an `Arc<SharedState>` — the adapter
/// downgrades to `Weak` internally.
pub fn for_state_with_lease(
state: &Arc<crate::control::state::SharedState>,
tenant_id: u32,
) -> Self {
pub fn for_state_with_lease(state: &Arc<crate::control::state::SharedState>) -> Self {
let retention = Some(Arc::clone(&state.retention_policy_registry));
Self {
catalog_inputs: Some(CatalogInputs {
credentials: Arc::clone(&state.credentials),
shared: Some(Arc::downgrade(state)),
tenant_id,
retention_policy_registry: retention.clone(),
}),
retention_registry: retention,
Expand All @@ -136,15 +135,13 @@ impl QueryContext {
/// that construct a context without an `Arc<SharedState>`.
pub fn with_catalog(
credentials: Arc<CredentialStore>,
tenant_id: u32,
retention_policy_registry: Option<
Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>,
>,
) -> Self {
let catalog_inputs = Some(CatalogInputs {
credentials,
shared: None,
tenant_id,
retention_policy_registry: retention_policy_registry.clone(),
});

Expand Down Expand Up @@ -193,7 +190,7 @@ impl QueryContext {
// `recorded_versions` field is per-plan state, and
// two concurrent plans through a shared QueryContext
// would otherwise interleave their recorded sets.
let catalog = inputs.build_adapter();
let catalog = inputs.build_adapter(tenant_id.as_u32());
let plans = nodedb_sql::plan_sql(sql, &catalog).map_err(|e| match e {
nodedb_sql::SqlError::RetryableSchemaChanged { descriptor } => {
crate::Error::RetryableSchemaChanged { descriptor }
Expand Down Expand Up @@ -292,7 +289,7 @@ impl QueryContext {
// through a different cache key), but constructing the
// adapter fresh keeps the adapter's state per-plan and
// allows future extension.
let catalog = inputs.build_adapter();
let catalog = inputs.build_adapter(tenant_id.as_u32());
let plans = nodedb_sql::plan_sql_with_params(sql, params, &catalog).map_err(|e| {
crate::Error::PlanError {
detail: format!("{e}"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ impl<'a> StatementExecutor<'a> {
pub(super) async fn execute_dml(&self, sql: &str, bindings: &RowBindings) -> crate::Result<()> {
let bound_sql = fold_literal_string_concat(&bindings.substitute(sql));

let ctx = crate::control::planner::context::QueryContext::for_state(
self.state,
self.tenant_id.as_u32(),
);
let ctx = crate::control::planner::context::QueryContext::for_state(self.state);
let tasks = ctx.plan_sql(&bound_sql, self.tenant_id).await?;

if let Some(ref tx_ctx) = self.tx_ctx {
Expand Down
1 change: 0 additions & 1 deletion nodedb/src/control/scatter_gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ pub async fn coordinate_cross_shard_hop(
// (same pattern as QueryContext::for_state but without &SharedState).
let plan_ctx = crate::control::planner::context::QueryContext::with_catalog(
std::sync::Arc::clone(&credentials_clone),
tenant_id_u32,
Some(std::sync::Arc::clone(&retention_clone)),
);

Expand Down
4 changes: 2 additions & 2 deletions nodedb/src/control/server/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub async fn run_with_listener(
let mut shutdown_rx = bus.handle().flat_watch().raw_receiver();

let query_ctx = Arc::new(crate::control::planner::context::QueryContext::for_state(
&shared, 1,
&shared,
));
let state = AppState {
shared,
Expand Down Expand Up @@ -198,7 +198,7 @@ pub async fn run(
let mut shutdown_rx = bus.handle().flat_watch().raw_receiver();

let query_ctx = Arc::new(crate::control::planner::context::QueryContext::for_state(
&shared, 1,
&shared,
));
let state = AppState {
shared,
Expand Down
2 changes: 1 addition & 1 deletion nodedb/src/control/server/native/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl NativeSession {
state: Arc<SharedState>,
auth_mode: AuthMode,
) -> Self {
let query_ctx = QueryContext::for_state(&state, 1); // default tenant
let query_ctx = QueryContext::for_state(&state);
Self {
stream,
peer_addr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ async fn enforce_subquery_check(
// General fallback: wrap in subselect.
let restructured = restructure_subquery_check(&substituted);

let query_ctx =
crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32());
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);

let tasks = match query_ctx.plan_sql(&restructured.sql, tenant_id).await {
Ok(t) => t,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ pub(super) async fn plan_and_dispatch(
tenant_id: nodedb_types::TenantId,
sql: &str,
) -> PgWireResult<()> {
let query_ctx =
crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32());
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
let tasks = query_ctx
.plan_sql(sql, tenant_id)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn handle_analyze(

// Dispatch a scan to the Data Plane to collect all rows.
let scan_sql = format!("SELECT * FROM {collection}");
let query_ctx = crate::control::planner::context::QueryContext::for_state(state, tenant_id);
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
let rows = match query_ctx.plan_sql(&scan_sql, identity.tenant_id).await {
Ok(tasks) => {
let mut json_rows = Vec::new();
Expand Down
3 changes: 1 addition & 2 deletions nodedb/src/control/server/pgwire/ddl/typeguard/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ pub async fn validate_typeguard(

// Scan all documents.
let scan_sql = format!("SELECT * FROM {coll_name}");
let query_ctx =
crate::control::planner::context::QueryContext::for_state(state, tenant_id.as_u32());
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
let tasks = query_ctx
.plan_sql(&scan_sql, tenant_id)
.await
Expand Down
92 changes: 76 additions & 16 deletions nodedb/src/control/server/pgwire/handler/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use pgwire::api::store::PortalStore;
use pgwire::api::{ClientInfo, ClientPortalStore};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pgwire::messages::PgWireBackendMessage;
use pgwire::messages::PgWireFrontendMessage;

use crate::bridge::envelope::PhysicalPlan;
use crate::config::auth::AuthMode;
Expand Down Expand Up @@ -61,7 +62,7 @@ impl NodeDbPgHandler {
// Sub-planners (check constraints, type guards, ANALYZE,
// procedural DML) build their own no-lease `QueryContext`
// via `for_state`.
let query_ctx = QueryContext::for_state_with_lease(&state, 1);
let query_ctx = QueryContext::for_state_with_lease(&state);
let query_parser = Arc::new(NodeDbQueryParser::new(Arc::clone(&state)));
Self {
state,
Expand Down Expand Up @@ -90,22 +91,20 @@ impl NodeDbPgHandler {

match self.auth_mode {
AuthMode::Trust => {
if let Some(identity) = self
.state
// Strict resolution: `post_startup` has already ensured the
// user exists (either because it was already in the store
// or via the bootstrap auto-create path on an empty store),
// so any miss here is a genuine unknown user.
self.state
.credentials
.to_identity(&username, AuthMethod::Trust)
{
Ok(identity)
} else {
Ok(AuthenticatedIdentity {
user_id: 0,
username,
tenant_id: TenantId::new(1),
auth_method: AuthMethod::Trust,
roles: vec![Role::Superuser],
is_superuser: true,
.ok_or_else(|| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"FATAL".to_owned(),
"28000".to_owned(),
format!("trust auth: user '{username}' does not exist"),
)))
})
}
}
AuthMode::Password | AuthMode::Md5Password | AuthMode::Certificate => self
.state
Expand Down Expand Up @@ -310,5 +309,66 @@ impl ExtendedQueryHandler for NodeDbPgHandler {
}
}

// Trust mode: NoopStartupHandler (no authentication).
impl NoopStartupHandler for NodeDbPgHandler {}
// Trust mode: NoopStartupHandler skips password verification but still
// resolves the connecting username against the credential store, matching
// PostgreSQL's `trust` method semantics. Unknown users are rejected before
// the server sends ReadyForQuery.
#[async_trait]
impl NoopStartupHandler for NodeDbPgHandler {
async fn post_startup<C>(
&self,
client: &mut C,
_message: PgWireFrontendMessage,
) -> PgWireResult<()>
where
C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send,
C::Error: Debug,
PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
{
if !matches!(self.auth_mode, AuthMode::Trust) {
return Ok(());
}

let username = client
.metadata()
.get("user")
.cloned()
.unwrap_or_else(|| "unknown".to_string());

if self
.state
.credentials
.to_identity(&username, AuthMethod::Trust)
.is_some()
{
return Ok(());
}

// Bootstrap: an empty credential store admits the first connecting
// user as a tenant-1 superuser and persists them so subsequent
// queries on the same connection (and any reconnect) resolve
// through the normal strict path.
if self.state.credentials.is_empty() {
let _ = self.state.credentials.create_user(
&username,
"",
TenantId::new(1),
vec![Role::Superuser],
);
return Ok(());
}

let source = client.socket_addr().to_string();
self.state.audit_record(
AuditEvent::AuthFailure,
None,
&source,
&format!("trust auth: user '{username}' does not exist"),
);
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"FATAL".to_owned(),
"28000".to_owned(),
format!("trust auth: user '{username}' does not exist"),
))))
}
}
Loading
Loading