diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bade2b..5bd211a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,86 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Security + +- **Schema name validation at provider construction.** All constructors now + reject schema names that do not match `^[A-Za-z_][A-Za-z0-9_]*$`. + PostgreSQL identifiers cannot be bound as SQL parameters, so the schema + name is interpolated directly into the DDL and DML the provider issues. + Restricting the accepted character set up front eliminates the SQL + injection vector that would otherwise exist for callers that pass + attacker-controlled schema names. PostgreSQL's full identifier grammar + (including quoted identifiers) is broader; this validation is + intentionally conservative. + +### Changed + +- **BREAKING (unreleased API surface only):** `PostgresProvider::new_with_config`, + `new_with_schema`, and the deprecated Entra constructors now return an + error when `schema_name` contains characters outside + `[A-Za-z_][A-Za-z0-9_]*`. Previously such names were silently + interpolated into SQL. Callers passing only constants from their own code + (the common case) are unaffected. Already-shipped releases (`<= 0.1.33`) + are unaffected. + +- **BREAKING (unreleased API surface only):** Collapsed all `*_with_config` + and Entra-specific constructors into a single + `PostgresProvider::new_with_config(ProviderConfig)`. `ProviderConfig` now + carries the connection variant via a new `ConnectionConfig` enum + (`Url(String)` or `Entra { host, port, database, user, options }`), + the optional schema name, and the migration policy. Construct via + `ProviderConfig::url(database_url)` or + `ProviderConfig::entra(host, port, db, user, options)` and adjust fields + as needed. The previously unreleased `new_with_config(url, config)` and + `new_with_schema_and_config(url, schema, config)` constructors are + removed. `new(url)` and `new_with_schema(url, schema)` remain as + convenience wrappers. + +### Deprecated + +- `PostgresProvider::new_with_entra` and + `PostgresProvider::new_with_schema_and_entra` are deprecated in favor of + `new_with_config(ProviderConfig::entra(...))`. They continue to work and + delegate to the new path; they will be removed in a future release. +- `PostgresProvider::initialize_schema` is deprecated. Every constructor + already runs the migration runner; this back-compat shim will be removed + in a future release. + +### Added + +- **Reject schemas ahead of the running binary.** Both `MigrationPolicy::ApplyAll` + and `MigrationPolicy::VerifyOnly` now fail fast when the `_duroxide_migrations` + tracking table records migration versions that are not bundled with the + running binary. Under `ApplyAll` the check runs under the migration advisory + lock and short-circuits before any DDL is executed, so an older binary + cannot rewrite a schema that is ahead of its code. The error message names + the unknown versions and instructs the operator to update the code. + +- **Configurable migration policy at provider construction.** New + `MigrationPolicy` enum, `ProviderConfig` struct, and `ConnectionConfig` + enum, plus the single new constructor `PostgresProvider::new_with_config`. + The default policy is `MigrationPolicy::ApplyAll`, which preserves + pre-feature behavior — all existing constructors (`new`, + `new_with_schema`, and the deprecated `new_with_entra` / + `new_with_schema_and_entra`) continue to apply pending migrations on + startup. The new `MigrationPolicy::VerifyOnly` policy skips migration + application and instead verifies that the `_duroxide_migrations` tracking + table exists in the target schema and that every embedded migration has + already been applied, returning an error otherwise. Intended for + processes that must not run DDL — e.g. application backends, where a + separately privileged worker is responsible for applying schema + changes. `VerifyOnly` does not take the migration advisory lock and does + not create or modify any database objects. + +- **Initialization regression tests.** Added integration tests for the + provider initialization paths: `VerifyOnly` against a missing schema, a + bare schema with no tracking table, and a schema whose tracking table is + behind the bundled migrations; and a concurrency test that exercises the + migration advisory lock by running two `ApplyAll` initializations against + the same fresh schema in parallel. + ## [0.1.33] - 2026-05-13 ### Fixed diff --git a/docs/migration-policy.md b/docs/migration-policy.md new file mode 100644 index 0000000..e5716d0 --- /dev/null +++ b/docs/migration-policy.md @@ -0,0 +1,62 @@ +# Migration Policy + +`duroxide-pg` supports separating schema migration from normal runtime database +operations. This matters in production because DDL usually belongs to a +controlled deploy or upgrade step, while application backends and workers often +run with lower-privilege DML-only roles. + +## Policies + +`MigrationPolicy::ApplyAll` is the default. It preserves the original provider +behavior: provider construction creates the target schema when needed, creates +the `_duroxide_migrations` tracking table, rejects schemas that contain unknown +future migration versions, and applies any bundled migrations that have not yet +been recorded. + +`MigrationPolicy::VerifyOnly` executes no DDL. It verifies that the migration +tracking table exists, that the schema has no unknown future migration versions, +that every bundled migration has already been applied, and that core provider +tables still exist. This is intended for client/backend/worker processes where a +separate privileged process has already run migrations. + +## Behavior Matrix + +| Scenario | `ApplyAll` | `VerifyOnly` | +|---|---|---| +| Schema does not exist | Creates it | Error | +| Schema exists but tracking table is missing | Creates tracking table and runs migrations | Error | +| Schema is behind bundled migrations | Applies pending migrations | Error | +| Schema has unknown future migrations | Error before DDL | Error | +| Migrations are recorded but core tables are missing | Re-applies migrations | Error | +| Schema is fully migrated | No-op | No-op | + +`ApplyAll` takes a PostgreSQL session advisory lock scoped to the target schema +before checking or applying migrations. That serializes concurrent provider +startup so multiple nodes do not race to apply the same migration. + +## Error Messages + +`VerifyOnly` fails with messages in these shapes: + +- Missing tracking table: `duroxide migrations not initialized: schema "..." does not contain _duroxide_migrations...` +- Missing bundled migration versions: `duroxide migrations not up to date in schema "...": missing versions [...]...` +- Unknown future migration versions: `schema "..." has migrations not recognized by this version of the code: [...]...` +- Missing core tables after complete migration records: `duroxide migrations recorded as complete in schema "...", but core tables are missing...` + +`ApplyAll` also rejects unknown future migration versions before it runs DDL, so +an older binary will not mutate a schema that appears ahead of its code. + +## Example + +```rust +use duroxide_pg::{MigrationPolicy, PostgresProvider, ProviderConfig}; + +# async fn example(database_url: &str) -> anyhow::Result<()> { +let mut config = ProviderConfig::url(database_url); +config.schema_name = Some("duroxide".to_string()); +config.migration_policy = MigrationPolicy::VerifyOnly; + +let provider = PostgresProvider::new_with_config(config).await?; +# Ok(()) +# } +``` \ No newline at end of file diff --git a/src/entra.rs b/src/entra.rs index ad3fac9..fc680ff 100644 --- a/src/entra.rs +++ b/src/entra.rs @@ -2,9 +2,8 @@ //! for [`PostgresProvider`](crate::PostgresProvider). //! //! This module exposes [`EntraAuthOptions`] — the configuration type passed to -//! `PostgresProvider::new_with_entra` and `PostgresProvider::new_with_schema_and_entra` -//! (added in Phase 2) — plus the internal credential abstractions used to -//! fetch and rotate Entra access tokens. +//! [`ProviderConfig::entra`](crate::ProviderConfig::entra) — plus the internal +//! credential abstractions used to fetch and rotate Entra access tokens. //! //! Azure SDK types (`azure_core::credentials::TokenCredential`, //! `azure_identity::ManagedIdentityCredential`, etc.) are intentionally **not diff --git a/src/lib.rs b/src/lib.rs index fd21d04..42a0642 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,17 +43,17 @@ //! [`EntraAuthOptions`] for tunables. //! //! ```rust,no_run -//! use duroxide_pg::{EntraAuthOptions, PostgresProvider}; +//! use duroxide_pg::{EntraAuthOptions, PostgresProvider, ProviderConfig}; //! //! # async fn example() -> anyhow::Result<()> { -//! let provider = PostgresProvider::new_with_entra( +//! let config = ProviderConfig::entra( //! "myserver.postgres.database.azure.com", //! 5432, //! "mydb", //! "my-entra-principal@contoso.onmicrosoft.com", //! EntraAuthOptions::new(), -//! ) -//! .await?; +//! ); +//! let provider = PostgresProvider::new_with_config(config).await?; //! # Ok(()) //! # } //! ``` @@ -82,4 +82,4 @@ pub mod migrations; pub mod provider; pub use entra::EntraAuthOptions; -pub use provider::PostgresProvider; +pub use provider::{ConnectionConfig, MigrationPolicy, PostgresProvider, ProviderConfig}; diff --git a/src/migrations.rs b/src/migrations.rs index 1a2228c..eb3b6d3 100644 --- a/src/migrations.rs +++ b/src/migrations.rs @@ -71,12 +71,143 @@ impl MigrationRunner { let conn = &mut *conn; self.lock_for_migrations(conn).await?; - let result = self.migrate_inner(conn).await; + // Reject unknown migrations while holding the advisory lock so that an + // older binary cannot rewrite a schema that is ahead of its code. + // Short-circuit: do NOT run migrate_inner if unknown migrations are + // detected. + let result = match self.check_no_unknown_migrations(conn).await { + Ok(()) => self.migrate_inner(conn).await, + Err(e) => Err(e), + }; self.unlock_for_migrations(conn).await; result } + /// Verify that the migration tracking table exists and that every embedded + /// migration has already been applied. Does not take the migration + /// advisory lock and does not create or modify any database objects. + /// + /// Returns an error if the `_duroxide_migrations` table is missing in + /// `schema_name` or if any bundled migration version is absent from it. + /// + /// Intended for processes that must not perform DDL (e.g. application + /// backends, where a separately privileged worker is responsible for + /// applying schema changes). + pub async fn verify(&self) -> Result<()> { + let mut conn = self.pool.acquire().await?; + let conn = &mut *conn; + + // Check that the tracking table exists in the target schema. + let table_exists: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM information_schema.tables \ + WHERE table_schema = $1 AND table_name = '_duroxide_migrations')", + ) + .bind(&self.schema_name) + .fetch_one(&mut *conn) + .await?; + + if !table_exists { + anyhow::bail!( + "duroxide migrations not initialized: schema {:?} does not \ + contain _duroxide_migrations. Construct a provider with \ + MigrationPolicy::ApplyAll (the default) from a process with \ + DDL privileges before using MigrationPolicy::VerifyOnly.", + self.schema_name + ); + } + + // Reject schemas that have migrations the running binary does not + // recognize (schema is ahead of the code). + self.check_no_unknown_migrations(conn).await?; + + let migrations = self.load_migrations()?; + let applied: std::collections::HashSet = + self.get_applied_versions(conn).await?.into_iter().collect(); + + let mut missing: Vec = migrations + .iter() + .map(|m| m.version) + .filter(|v| !applied.contains(v)) + .collect(); + missing.sort_unstable(); + + if !missing.is_empty() { + anyhow::bail!( + "duroxide migrations not up to date in schema {:?}: missing \ + versions {:?}. Run migrations from a provider configured with \ + MigrationPolicy::ApplyAll before constructing VerifyOnly \ + providers.", + self.schema_name, + missing, + ); + } + + if !self.check_tables_exist(conn).await.unwrap_or(false) { + anyhow::bail!( + "duroxide migrations recorded as complete in schema {:?}, but \ + core tables are missing. The schema may be corrupted; run \ + migrations from a provider configured with \ + MigrationPolicy::ApplyAll before constructing VerifyOnly \ + providers.", + self.schema_name, + ); + } + + Ok(()) + } + + /// Check that the database has no migrations the running binary does not + /// recognize. Used by both `migrate()` (to refuse running DDL against a + /// schema ahead of the code) and `verify()` (to refuse claiming + /// successful verification of an unknown schema). + /// + /// Returns `Ok(())` if the tracking table does not yet exist: under + /// `ApplyAll` it will be created by `migrate_inner`, and under + /// `VerifyOnly` the missing table is reported separately before this is + /// called. + async fn check_no_unknown_migrations( + &self, + conn: &mut sqlx::postgres::PgConnection, + ) -> Result<()> { + let tracking_exists: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM information_schema.tables \ + WHERE table_schema = $1 AND table_name = '_duroxide_migrations')", + ) + .bind(&self.schema_name) + .fetch_one(&mut *conn) + .await?; + + if !tracking_exists { + return Ok(()); + } + + let applied = self.get_applied_versions(conn).await?; + let expected: std::collections::HashSet = self + .load_migrations()? + .into_iter() + .map(|m| m.version) + .collect(); + + let mut unknown: Vec = applied + .into_iter() + .filter(|v| !expected.contains(v)) + .collect(); + unknown.sort_unstable(); + + if !unknown.is_empty() { + anyhow::bail!( + "schema {:?} has migrations not recognized by this version of \ + the code: {:?}. The database schema is ahead of the code. \ + Update the code to a compatible version.", + self.schema_name, + unknown, + ); + } + + Ok(()) + } + async fn migrate_inner(&self, conn: &mut sqlx::postgres::PgConnection) -> Result<()> { // Ensure schema exists if self.schema_name != "public" { diff --git a/src/provider.rs b/src/provider.rs index ad8fc2d..2f83970 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -54,17 +54,17 @@ use crate::migrations::MigrationRunner; /// ## Azure Database for PostgreSQL with Microsoft Entra ID /// /// ```rust,no_run -/// use duroxide_pg::{EntraAuthOptions, PostgresProvider}; +/// use duroxide_pg::{EntraAuthOptions, PostgresProvider, ProviderConfig}; /// /// # async fn example() -> anyhow::Result<()> { -/// let provider = PostgresProvider::new_with_entra( +/// let config = ProviderConfig::entra( /// "myserver.postgres.database.azure.com", /// 5432, /// "mydb", /// "my-entra-principal@contoso.onmicrosoft.com", /// EntraAuthOptions::new(), -/// ) -/// .await?; +/// ); +/// let provider = PostgresProvider::new_with_config(config).await?; /// # Ok(()) /// # } /// ``` @@ -107,6 +107,143 @@ pub struct PostgresProvider { _refresh_task: Option, } +/// Migration policy applied at [`PostgresProvider`] construction. +/// +/// Defaults to [`MigrationPolicy::ApplyAll`], preserving pre-feature behavior: +/// all constructors that do not take a [`ProviderConfig`] apply pending +/// migrations on startup. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum MigrationPolicy { + /// Apply any pending embedded migrations at startup. Default. + /// + /// Requires the database role to have DDL privileges on `schema_name` + /// (or `public` if none was supplied). + #[default] + ApplyAll, + /// Skip migration application. Verify that the `_duroxide_migrations` + /// tracking table exists and every embedded migration has already been + /// applied; return an error otherwise. + /// + /// Intended for processes that must not run DDL — e.g. application + /// backends, when a separately privileged worker is responsible for + /// applying schema changes. + VerifyOnly, +} + +/// Configuration for [`PostgresProvider::new_with_config`]. +/// +/// Construct via [`ProviderConfig::url`] (standard `postgres://` URL) or +/// [`ProviderConfig::entra`] (Azure Database for PostgreSQL with Microsoft +/// Entra ID), then adjust fields as needed: +/// +/// ```rust,no_run +/// use duroxide_pg::{MigrationPolicy, PostgresProvider, ProviderConfig}; +/// +/// # async fn example() -> anyhow::Result<()> { +/// let mut config = ProviderConfig::url("postgres://localhost/mydb"); +/// config.schema_name = Some("my_app".into()); +/// config.migration_policy = MigrationPolicy::VerifyOnly; +/// let provider = PostgresProvider::new_with_config(config).await?; +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ProviderConfig { + /// How the provider should reach the database. + pub connection: ConnectionConfig, + /// PostgreSQL schema for tenant isolation. `None` resolves to `public`. + pub schema_name: Option, + /// Policy for handling embedded migrations at startup. + pub migration_policy: MigrationPolicy, +} + +impl ProviderConfig { + /// Build a config from a standard PostgreSQL connection URL + /// (`postgres://user:pass@host:port/db`). Schema defaults to `public` + /// and migration policy defaults to [`MigrationPolicy::ApplyAll`]. + pub fn url(database_url: impl Into) -> Self { + Self { + connection: ConnectionConfig::Url(database_url.into()), + schema_name: None, + migration_policy: MigrationPolicy::default(), + } + } + + /// Build a config for Azure Database for PostgreSQL with Microsoft + /// Entra ID. Schema defaults to `public` and migration policy defaults + /// to [`MigrationPolicy::ApplyAll`]. All Entra connections use + /// `PgSslMode::VerifyFull`. + pub fn entra( + host: impl Into, + port: u16, + database: impl Into, + user: impl Into, + options: EntraAuthOptions, + ) -> Self { + Self { + connection: ConnectionConfig::Entra { + host: host.into(), + port, + database: database.into(), + user: user.into(), + options, + }, + schema_name: None, + migration_policy: MigrationPolicy::default(), + } + } +} + +/// How [`PostgresProvider`] should reach the database. +/// +/// Constructed indirectly via [`ProviderConfig::url`] or +/// [`ProviderConfig::entra`]. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum ConnectionConfig { + /// Standard PostgreSQL connection URL. + Url(String), + /// Azure Database for PostgreSQL with Microsoft Entra ID. + Entra { + host: String, + port: u16, + database: String, + user: String, + options: EntraAuthOptions, + }, +} + +/// Validate that `schema_name` is a safe PostgreSQL identifier. +/// +/// PostgreSQL identifiers cannot be bound as SQL parameters, so the schema +/// name is interpolated directly into the SQL we issue (e.g. +/// `CREATE SCHEMA {schema}`, `SELECT … FROM {schema}.instances`). To prevent +/// SQL injection, we restrict schema names to a conservative subset: +/// +/// `^[A-Za-z_][A-Za-z0-9_]*$` +/// +/// PostgreSQL's full identifier grammar is broader (quoted identifiers can +/// contain almost anything), but accepting the broader grammar would require +/// quoting every interpolation site and validating that the input does not +/// contain a closing quote — strictly less safe than refusing surprising +/// names up front. +fn validate_schema_name(schema_name: &str) -> Result<()> { + let mut chars = schema_name.chars(); + let Some(first) = chars.next() else { + anyhow::bail!("Invalid schema_name '': must match [A-Za-z_][A-Za-z0-9_]*"); + }; + if !(first == '_' || first.is_ascii_alphabetic()) { + anyhow::bail!("Invalid schema_name '{schema_name}': must match [A-Za-z_][A-Za-z0-9_]*"); + } + for ch in chars { + if !(ch == '_' || ch.is_ascii_alphanumeric()) { + anyhow::bail!("Invalid schema_name '{schema_name}': must match [A-Za-z_][A-Za-z0-9_]*"); + } + } + Ok(()) +} + /// Newtype around `tokio::task::AbortHandle` that aborts the task on drop. /// Used to ensure the Entra token refresh task is cleaned up when the /// provider is dropped. @@ -119,11 +256,75 @@ impl Drop for AbortOnDropHandle { } impl PostgresProvider { + /// Create a provider from a PostgreSQL connection URL, using the + /// `public` schema and applying any pending migrations. + /// + /// Convenience wrapper around [`Self::new_with_config`]. pub async fn new(database_url: &str) -> Result { - Self::new_with_schema(database_url, None).await + Self::new_with_config(ProviderConfig::url(database_url)).await } + /// Create a provider from a PostgreSQL connection URL, using a custom + /// schema for tenant isolation, and applying any pending migrations. + /// + /// Convenience wrapper around [`Self::new_with_config`]. pub async fn new_with_schema(database_url: &str, schema_name: Option<&str>) -> Result { + let mut config = ProviderConfig::url(database_url); + config.schema_name = schema_name.map(str::to_string); + Self::new_with_config(config).await + } + + /// Create a provider from a [`ProviderConfig`]. This is the single + /// constructor that fully exposes the configuration surface: + /// connection variant (URL or Entra), schema, and migration policy. + /// + /// All other public constructors delegate to this one. + pub async fn new_with_config(config: ProviderConfig) -> Result { + let ProviderConfig { + connection, + schema_name, + migration_policy, + } = config; + + if let Some(ref s) = schema_name { + validate_schema_name(s)?; + } + + match connection { + ConnectionConfig::Url(database_url) => { + Self::new_from_url(&database_url, schema_name.as_deref(), migration_policy).await + } + ConnectionConfig::Entra { + host, + port, + database, + user, + options, + } => { + let token_source = options.default_token_source().context( + "Entra credential resolution failed: could not build the default credential chain", + )?; + Self::new_with_entra_with_token_source( + &host, + port, + &database, + &user, + schema_name.as_deref(), + options, + token_source, + PgSslMode::VerifyFull, + migration_policy, + ) + .await + } + } + } + + async fn new_from_url( + database_url: &str, + schema_name: Option<&str>, + migration_policy: MigrationPolicy, + ) -> Result { let max_connections = std::env::var("DUROXIDE_PG_POOL_MAX") .ok() .and_then(|s| s.parse::().ok()) @@ -145,9 +346,11 @@ impl PostgresProvider { _refresh_task: None, }; - // Run migrations to initialize schema - let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name.clone()); - migration_runner.migrate().await?; + let migration_runner = MigrationRunner::new(provider.pool.clone(), schema_name); + match migration_policy { + MigrationPolicy::ApplyAll => migration_runner.migrate().await?, + MigrationPolicy::VerifyOnly => migration_runner.verify().await?, + } Ok(provider) } @@ -178,6 +381,10 @@ impl PostgresProvider { /// # Errors /// Returns an error if credential resolution fails, the initial token /// cannot be acquired, the database connection fails, or migrations fail. + #[deprecated( + since = "0.1.34", + note = "use `PostgresProvider::new_with_config(ProviderConfig::entra(...))` instead" + )] pub async fn new_with_entra( host: &str, port: u16, @@ -185,11 +392,15 @@ impl PostgresProvider { user: &str, options: EntraAuthOptions, ) -> Result { - Self::new_with_schema_and_entra(host, port, database, user, None, options).await + Self::new_with_config(ProviderConfig::entra(host, port, database, user, options)).await } /// Same as [`Self::new_with_entra`] but uses a custom schema for tenant /// isolation. + #[deprecated( + since = "0.1.34", + note = "use `PostgresProvider::new_with_config(ProviderConfig::entra(...))` with `schema_name` set instead" + )] #[instrument( skip(options), fields(host = %host, port = %port, database = %database, user = %user, schema = ?schema_name), @@ -203,21 +414,9 @@ impl PostgresProvider { schema_name: Option<&str>, options: EntraAuthOptions, ) -> Result { - let token_source = options.default_token_source().context( - "Entra credential resolution failed: could not build the default credential chain", - )?; - - Self::new_with_entra_with_token_source( - host, - port, - database, - user, - schema_name, - options, - token_source, - PgSslMode::VerifyFull, - ) - .await + let mut config = ProviderConfig::entra(host, port, database, user, options); + config.schema_name = schema_name.map(str::to_string); + Self::new_with_config(config).await } /// Crate-internal Entra constructor. Accepts an explicit @@ -229,6 +428,7 @@ impl PostgresProvider { /// connect-options → pool → migrations → refresh task) against a local /// PostgreSQL without an Azure dependency, by injecting a fake /// [`TokenSource`] that returns the local password and disabling TLS. + #[allow(clippy::too_many_arguments)] pub(crate) async fn new_with_entra_with_token_source( host: &str, port: u16, @@ -238,6 +438,7 @@ impl PostgresProvider { options: EntraAuthOptions, token_source: Arc, ssl_mode: PgSslMode, + migration_policy: MigrationPolicy, ) -> Result { let audience = options.audience_str().to_string(); let token = token_source @@ -260,7 +461,10 @@ impl PostgresProvider { let schema_name = schema_name.unwrap_or("public").to_string(); let migration_runner = MigrationRunner::new(pool.clone(), schema_name.clone()); - migration_runner.migrate().await?; + match migration_policy { + MigrationPolicy::ApplyAll => migration_runner.migrate().await?, + MigrationPolicy::VerifyOnly => migration_runner.verify().await?, + } let refresh_handle = spawn_token_refresh_task( pool.clone(), @@ -279,10 +483,12 @@ impl PostgresProvider { }) } + #[deprecated( + since = "0.1.34", + note = "schema initialization is now run automatically by every constructor; this shim will be removed in a future release" + )] #[instrument(skip(self), target = "duroxide::providers::postgres")] pub async fn initialize_schema(&self) -> Result<()> { - // Schema initialization is now handled by migrations - // This method is kept for backward compatibility but delegates to migrations let migration_runner = MigrationRunner::new(self.pool.clone(), self.schema_name.clone()); migration_runner.migrate().await?; Ok(()) @@ -2977,6 +3183,7 @@ mod entra_pipeline_tests { EntraAuthOptions::new(), token_source, PgSslMode::Disable, + MigrationPolicy::ApplyAll, ) .await .expect("Entra pipeline must succeed against local PG with correct token"); @@ -3018,6 +3225,7 @@ mod entra_pipeline_tests { EntraAuthOptions::new(), token_source, PgSslMode::Disable, + MigrationPolicy::ApplyAll, ) .await; @@ -3066,6 +3274,7 @@ mod entra_pipeline_tests { EntraAuthOptions::new().refresh_interval(Duration::from_secs(60 * 60)), token_source, PgSslMode::Disable, + MigrationPolicy::ApplyAll, ) .await .expect("default-constructor variant must succeed"); diff --git a/tests/entra_live_test.rs b/tests/entra_live_test.rs index 15ad814..afd34a2 100644 --- a/tests/entra_live_test.rs +++ b/tests/entra_live_test.rs @@ -32,7 +32,7 @@ //! cargo test --test entra_live_test -- --ignored --nocapture //! ``` -use duroxide_pg::{EntraAuthOptions, PostgresProvider}; +use duroxide_pg::{EntraAuthOptions, PostgresProvider, ProviderConfig}; use sqlx::Row; const ENABLE_VAR: &str = "DUROXIDE_PG_ENTRA_LIVE_TEST"; @@ -77,16 +77,11 @@ async fn entra_live_smoke_test() { "Live Entra smoke test: host={host} port={port} db={database} user={user} schema={schema}" ); - let provider = PostgresProvider::new_with_schema_and_entra( - &host, - port, - &database, - &user, - Some(&schema), - EntraAuthOptions::new(), - ) - .await - .expect("provider construction with default Entra credential chain must succeed"); + let mut config = ProviderConfig::entra(&host, port, &database, &user, EntraAuthOptions::new()); + config.schema_name = Some(schema.clone()); + let provider = PostgresProvider::new_with_config(config) + .await + .expect("provider construction with default Entra credential chain must succeed"); // Use a basic query to prove the pool is functional and the schema/migrations // were applied. We look for one of the well-known tables migrations create. diff --git a/tests/migration_policy_tests.rs b/tests/migration_policy_tests.rs new file mode 100644 index 0000000..2005dee --- /dev/null +++ b/tests/migration_policy_tests.rs @@ -0,0 +1,464 @@ +//! Tests for [`MigrationPolicy`] / [`ProviderConfig`] / `new_with_config`. +//! +//! These tests verify that: +//! 1. `MigrationPolicy::ApplyAll` (the default) creates schema + tables, matching +//! pre-feature behavior. +//! 2. `MigrationPolicy::VerifyOnly` succeeds against a schema where migrations +//! have already been applied. +//! 3. `MigrationPolicy::VerifyOnly` returns an error against an uninitialized +//! schema, without creating any objects. + +use duroxide_pg::{MigrationPolicy, PostgresProvider, ProviderConfig}; +use sqlx::postgres::PgPoolOptions; +use sqlx::Row; + +fn get_database_url() -> String { + dotenvy::dotenv().ok(); + std::env::var("DATABASE_URL").expect("DATABASE_URL must be set in environment or .env file") +} + +fn get_test_schema() -> String { + let guid = uuid::Uuid::new_v4().to_string(); + let suffix = &guid[guid.len() - 8..]; + format!("test_migpolicy_{suffix}") +} + +async fn drop_schema(schema_name: &str) { + let pool = PgPoolOptions::new() + .max_connections(1) + .connect(&get_database_url()) + .await + .expect("connect for cleanup"); + sqlx::query(&format!("DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + .execute(&pool) + .await + .expect("drop schema"); +} + +async fn schema_exists(schema_name: &str) -> bool { + let pool = PgPoolOptions::new() + .max_connections(1) + .connect(&get_database_url()) + .await + .expect("connect for schema check"); + let row = sqlx::query( + "SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = $1) AS e", + ) + .bind(schema_name) + .fetch_one(&pool) + .await + .expect("query schema existence"); + row.get::("e") +} + +#[tokio::test] +async fn apply_all_creates_schema_and_tables() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // `ProviderConfig::url` defaults to `MigrationPolicy::ApplyAll`, + // so this also smoke-tests the default policy via the new API. + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some(schema.clone()); + let _provider = PostgresProvider::new_with_config(config) + .await + .expect("ApplyAll should succeed against a fresh schema"); + + assert!( + schema_exists(&schema).await, + "schema {schema} should exist after ApplyAll" + ); + + drop_schema(&schema).await; +} + +#[tokio::test] +async fn verify_only_succeeds_against_initialized_schema() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // First, apply migrations. + let _bootstrap = PostgresProvider::new_with_schema(&database_url, Some(&schema)) + .await + .expect("bootstrap apply"); + + // Then construct a VerifyOnly provider against the same schema. + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some(schema.clone()); + config.migration_policy = MigrationPolicy::VerifyOnly; + + let _verify = PostgresProvider::new_with_config(config) + .await + .expect("VerifyOnly should succeed when migrations are up to date"); + + drop_schema(&schema).await; +} + +#[tokio::test] +async fn verify_only_errors_against_uninitialized_schema() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some(schema.clone()); + config.migration_policy = MigrationPolicy::VerifyOnly; + + let result = PostgresProvider::new_with_config(config).await; + + let err = match result { + Ok(_) => panic!("VerifyOnly must fail when the target schema has no migrations applied"), + Err(e) => e.to_string(), + }; + + assert!( + err.contains("not initialized") || err.contains("_duroxide_migrations"), + "error should mention missing migration table; got: {err}" + ); + + // VerifyOnly must not create the schema. + assert!( + !schema_exists(&schema).await, + "VerifyOnly should not create schema {schema}" + ); +} + +#[tokio::test] +async fn verify_only_rejects_unknown_migrations() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // Bring the schema up to date via ApplyAll. + let bootstrap = PostgresProvider::new_with_schema(&database_url, Some(&schema)) + .await + .expect("bootstrap apply"); + + // Insert an "unknown" migration version directly into the tracking table. + sqlx::query(&format!( + "INSERT INTO {schema}._duroxide_migrations (version, name) VALUES ($1, $2)" + )) + .bind(9_999_i64) + .bind("9999_future.sql") + .execute(bootstrap.pool()) + .await + .expect("insert unknown migration row"); + drop(bootstrap); + + // VerifyOnly must refuse to claim a schema that is ahead of the code. + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some(schema.clone()); + config.migration_policy = MigrationPolicy::VerifyOnly; + + let result = PostgresProvider::new_with_config(config).await; + let msg = match result { + Ok(_) => panic!("VerifyOnly must reject unknown applied migrations"), + Err(e) => format!("{e:#}"), + }; + assert!( + msg.contains("not recognized") && msg.contains("9999"), + "expected ahead-of-code error mentioning the unknown version; got: {msg}" + ); + + drop_schema(&schema).await; +} + +#[tokio::test] +async fn apply_all_unknown_migrations_causes_no_mutations() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // Fully initialize the schema. + let bootstrap = PostgresProvider::new_with_schema(&database_url, Some(&schema)) + .await + .expect("bootstrap apply"); + + // Snapshot the applied migrations before the perturbation. + let before: Vec<(i64, String)> = sqlx::query_as(&format!( + "SELECT version, name FROM {schema}._duroxide_migrations ORDER BY version" + )) + .fetch_all(bootstrap.pool()) + .await + .expect("read migrations"); + let last_version = before.last().expect("at least one bundled migration").0; + + // Create a "pending migration" condition by removing the last real + // migration record, drop a core table to disable the re-apply-if-missing + // path, then insert an unknown future migration. + sqlx::query(&format!( + "DELETE FROM {schema}._duroxide_migrations WHERE version = $1" + )) + .bind(last_version) + .execute(bootstrap.pool()) + .await + .expect("delete last migration"); + + sqlx::query(&format!("DROP TABLE {schema}.instances")) + .execute(bootstrap.pool()) + .await + .expect("drop instances table"); + + sqlx::query(&format!( + "INSERT INTO {schema}._duroxide_migrations (version, name) VALUES ($1, $2)" + )) + .bind(9_999_i64) + .bind("9999_future.sql") + .execute(bootstrap.pool()) + .await + .expect("insert unknown migration row"); + + drop(bootstrap); + + // ApplyAll must short-circuit on the unknown migration and run no DDL. + let result = PostgresProvider::new_with_schema(&database_url, Some(&schema)).await; + let msg = match result { + Ok(_) => panic!("ApplyAll must reject unknown applied migrations"), + Err(e) => format!("{e:#}"), + }; + assert!( + msg.contains("not recognized") && msg.contains("9999"), + "expected ahead-of-code error; got: {msg}" + ); + + // Prove no DDL fired: the deleted real migration is still absent and the + // dropped core table is still missing. + let pool = PgPoolOptions::new() + .max_connections(1) + .connect(&database_url) + .await + .expect("connect for verification"); + + let after_versions: Vec = sqlx::query_scalar(&format!( + "SELECT version FROM {schema}._duroxide_migrations" + )) + .fetch_all(&pool) + .await + .expect("read migrations after rejection"); + assert!( + !after_versions.contains(&last_version), + "migrate_inner should not have re-applied migration {last_version}; \ + current set: {after_versions:?}" + ); + + let instances_exists: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM information_schema.tables \ + WHERE table_schema = $1 AND table_name = 'instances')", + ) + .bind(&schema) + .fetch_one(&pool) + .await + .expect("check instances table after rejection"); + assert!( + !instances_exists, + "migrate_inner should not have recreated the instances table" + ); + + drop_schema(&schema).await; +} + +#[tokio::test] +async fn schema_name_validation_rejects_unsafe_identifiers() { + let database_url = get_database_url(); + + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some("bad-name".to_string()); + + match PostgresProvider::new_with_config(config).await { + Ok(_) => panic!("Expected schema name validation to fail"), + Err(e) => { + let msg = format!("{e:#}"); + assert!( + msg.contains("Invalid schema_name"), + "expected validation error, got: {msg}" + ); + } + } +} + +#[tokio::test] +async fn verify_only_errors_when_schema_missing() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // Belt-and-braces: make sure the schema is absent. + drop_schema(&schema).await; + + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some(schema.clone()); + config.migration_policy = MigrationPolicy::VerifyOnly; + + let result = PostgresProvider::new_with_config(config).await; + let msg = match result { + Ok(_) => panic!("VerifyOnly should fail when the target schema does not exist"), + Err(e) => format!("{e:#}"), + }; + assert!( + msg.contains("not initialized") || msg.contains("_duroxide_migrations"), + "expected missing-schema/missing-tracking-table error, got: {msg}" + ); + + // VerifyOnly must not create the schema as a side effect. + assert!( + !schema_exists(&schema).await, + "VerifyOnly must not create schema {schema}" + ); +} + +#[tokio::test] +async fn verify_only_errors_when_tracking_table_missing() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // Create a bare schema with no tables (no migrations applied). + let pool = PgPoolOptions::new() + .max_connections(1) + .connect(&database_url) + .await + .expect("connect for setup"); + sqlx::query(&format!("CREATE SCHEMA {schema}")) + .execute(&pool) + .await + .expect("create bare schema"); + + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some(schema.clone()); + config.migration_policy = MigrationPolicy::VerifyOnly; + + let result = PostgresProvider::new_with_config(config).await; + let msg = match result { + Ok(_) => panic!("VerifyOnly should fail when the tracking table is missing"), + Err(e) => format!("{e:#}"), + }; + assert!( + msg.contains("not initialized") && msg.contains("_duroxide_migrations"), + "expected tracking-table-missing error, got: {msg}" + ); + + drop_schema(&schema).await; +} + +#[tokio::test] +async fn verify_only_errors_when_migrations_behind() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // Fully initialize, then synthesize a "behind" state by deleting the + // most recent migration record without dropping the tracking table. + let bootstrap = PostgresProvider::new_with_schema(&database_url, Some(&schema)) + .await + .expect("bootstrap apply"); + + let last_version: i64 = sqlx::query_scalar(&format!( + "SELECT MAX(version) FROM {schema}._duroxide_migrations" + )) + .fetch_one(bootstrap.pool()) + .await + .expect("read max version"); + + sqlx::query(&format!( + "DELETE FROM {schema}._duroxide_migrations WHERE version = $1" + )) + .bind(last_version) + .execute(bootstrap.pool()) + .await + .expect("delete most-recent migration row"); + + drop(bootstrap); + + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some(schema.clone()); + config.migration_policy = MigrationPolicy::VerifyOnly; + + let result = PostgresProvider::new_with_config(config).await; + let msg = match result { + Ok(_) => panic!("VerifyOnly should fail when migrations are behind"), + Err(e) => format!("{e:#}"), + }; + assert!( + msg.contains("not up to date") && msg.contains(&last_version.to_string()), + "expected behind-schema error mentioning version {last_version}; got: {msg}" + ); + + drop_schema(&schema).await; +} + +#[tokio::test] +async fn verify_only_errors_when_core_tables_are_missing() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // Fully initialize, then leave migration records intact while dropping a + // core table. VerifyOnly must not trust the tracking table alone. + let bootstrap = PostgresProvider::new_with_schema(&database_url, Some(&schema)) + .await + .expect("bootstrap apply"); + + sqlx::query(&format!("DROP TABLE {schema}.instances")) + .execute(bootstrap.pool()) + .await + .expect("drop instances table"); + + drop(bootstrap); + + let mut config = ProviderConfig::url(&database_url); + config.schema_name = Some(schema.clone()); + config.migration_policy = MigrationPolicy::VerifyOnly; + + let result = PostgresProvider::new_with_config(config).await; + let msg = match result { + Ok(_) => panic!("VerifyOnly should fail when core tables are missing"), + Err(e) => format!("{e:#}"), + }; + assert!( + msg.contains("core tables are missing") || msg.contains("corrupted"), + "expected missing-core-tables error, got: {msg}" + ); + + drop_schema(&schema).await; +} + +#[tokio::test] +async fn concurrent_apply_all_is_serialized() { + let database_url = get_database_url(); + let schema = get_test_schema(); + + // Ensure a clean slate. + drop_schema(&schema).await; + + let make_config = |schema: String| { + let mut cfg = ProviderConfig::url(&database_url); + cfg.schema_name = Some(schema); + cfg.migration_policy = MigrationPolicy::ApplyAll; + cfg + }; + + let h1 = { + let cfg = make_config(schema.clone()); + tokio::spawn(async move { PostgresProvider::new_with_config(cfg).await }) + }; + let h2 = { + let cfg = make_config(schema.clone()); + tokio::spawn(async move { PostgresProvider::new_with_config(cfg).await }) + }; + + let (r1, r2) = tokio::join!(h1, h2); + let p1 = r1 + .expect("task 1 panicked") + .expect("concurrent ApplyAll #1 should succeed under the advisory lock"); + let _p2 = r2 + .expect("task 2 panicked") + .expect("concurrent ApplyAll #2 should succeed under the advisory lock"); + + // Sanity check: the schema is fully migrated. + let row_count: (i64,) = sqlx::query_as(&format!( + "SELECT COUNT(*) FROM {schema}._duroxide_migrations" + )) + .fetch_one(p1.pool()) + .await + .expect("count applied migrations"); + assert!( + row_count.0 > 0, + "expected at least one applied migration after concurrent ApplyAll" + ); + + drop(p1); + drop_schema(&schema).await; +}