diff --git a/Cargo.toml b/Cargo.toml index 7420c0e..6112b80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ serde = { version = "1", features = ["derive"] } thiserror = "1" # Eventastic postgres dependencies +async-stream = "0.3.6" sqlx = { version = "0.8", features = [ "runtime-tokio-rustls", "postgres", @@ -32,5 +33,3 @@ chrono = "0.4" serde_json = "1" tokio = { version = "1", features = ["full"] } futures-util = "0.3" -anyhow = "1" -async-stream = "0.3" diff --git a/eventastic/src/aggregate/root.rs b/eventastic/src/aggregate/root.rs index eef3d29..90baf90 100644 --- a/eventastic/src/aggregate/root.rs +++ b/eventastic/src/aggregate/root.rs @@ -342,7 +342,7 @@ where /// This error is returned when the Repository fails to insert the event /// because the version already exists, indicating a concurrent modification. - #[error("Optimistic Concurrency Error Version {1} of aggregate {0:?} already exists")] + #[error("Optimistic Concurrency Error")] OptimisticConcurrency(T::AggregateId, u64), } @@ -426,7 +426,7 @@ mod tests { assert_eq!(context.version(), 0); for i in 1..=5 { - let add_event = create_add_event(&format!("add-{}", i), i); + let add_event = create_add_event(&format!("add-{i}"), i); context.record_that(add_event).unwrap(); assert_eq!(context.version(), i as u64); } diff --git a/eventastic/src/memory.rs b/eventastic/src/memory.rs index 4c7a3c3..0f67964 100644 --- a/eventastic/src/memory.rs +++ b/eventastic/src/memory.rs @@ -870,7 +870,7 @@ mod tests { // Verify side effects were stored // Reset: 2 side effects, Add: 1, Subtract: 1, Multiply: 0, Add: 1 = 5 total - let expected_side_effects = 2 + 1 + 1 + 0 + 1; + let expected_side_effects = 5; assert_eq!(repository.side_effects_count(), expected_side_effects); let side_effects = repository.get_all_side_effects(); diff --git a/eventastic/src/test_fixtures.rs b/eventastic/src/test_fixtures.rs index 0895f91..6d6d4ee 100644 --- a/eventastic/src/test_fixtures.rs +++ b/eventastic/src/test_fixtures.rs @@ -4,8 +4,6 @@ //! that can be used across different test modules to ensure consistency //! and reduce code duplication. -#![cfg(test)] - use crate::{ aggregate::{Aggregate, SideEffect}, event::DomainEvent, @@ -139,21 +137,21 @@ impl Aggregate for TestCounter { match event { TestEvent::Reset { event_id, .. } => Some(vec![ TestSideEffect::LogOperation { - id: format!("{}-log", event_id), + id: format!("{event_id}-log"), operation: "Reset".to_string(), }, TestSideEffect::NotifyUser { - id: format!("{}-notify", event_id), + id: format!("{event_id}-notify"), message: "Counter has been reset".to_string(), }, ]), TestEvent::Add { event_id, value } => Some(vec![TestSideEffect::LogOperation { - id: format!("{}-log", event_id), - operation: format!("Add {}", value), + id: format!("{event_id}-log"), + operation: format!("Add {value}"), }]), TestEvent::Subtract { event_id, value } => Some(vec![TestSideEffect::LogOperation { - id: format!("{}-log", event_id), - operation: format!("Subtract {}", value), + id: format!("{event_id}-log"), + operation: format!("Subtract {value}"), }]), TestEvent::Multiply { .. } => None, // No side effects for multiply } diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml index 95b7d1a..d0de095 100644 --- a/eventastic_outbox_postgres/Cargo.toml +++ b/eventastic_outbox_postgres/Cargo.toml @@ -4,7 +4,6 @@ version = "0.5.0" edition = "2024" [dependencies] -anyhow = { workspace = true } eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" } async-trait = { workspace = true } sqlx = { workspace = true } @@ -13,3 +12,4 @@ uuid = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } futures-util = { workspace = true } tokio = { workspace = true } +thiserror = { workspace = true } diff --git a/eventastic_outbox_postgres/src/outbox.rs b/eventastic_outbox_postgres/src/outbox.rs index b4e649f..3084d92 100644 --- a/eventastic_outbox_postgres/src/outbox.rs +++ b/eventastic_outbox_postgres/src/outbox.rs @@ -1,44 +1,108 @@ -use anyhow::Context; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use eventastic::aggregate::SideEffect; +use eventastic::aggregate::{Aggregate, SideEffect}; +use eventastic::event::DomainEvent; use eventastic_postgres::{ - DbError, Pickle, PostgresRepository, PostgresTransaction, SideEffectStorage, + EncryptionProvider, Pickle, PostgresRepository, PostgresTransaction, SideEffectDbError, + SideEffectStorage, }; -use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; use std::sync::Arc; +use thiserror::Error; +use uuid::Uuid; use crate::OutboxMessage; +/// Errors that can occur during outbox operations. +#[derive(Error, Debug)] +pub enum OutboxError { + /// A database operation failed. + #[error("Database error: {0}")] + Database(sqlx::Error), + /// Failed to encrypt or decrypt side effect data. + #[error("Encryption error: {0}")] + Encryption(EncryptionError), + /// Failed to pickle or unpickle side effect data. + #[error("Side effect pickling error: {0}")] + SideEffectPickling(SideEffectPicklingError), + /// Encryption provider returned wrong number of items. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +impl From for OutboxError { + fn from(e: sqlx::Error) -> Self { + OutboxError::Database(e) + } +} + +impl From> for SideEffectDbError { + fn from(e: OutboxError) -> Self { + match e { + OutboxError::Database(err) => SideEffectDbError::DbError(err), + OutboxError::Encryption(err) => SideEffectDbError::Encryption(err), + OutboxError::SideEffectPickling(err) => SideEffectDbError::SideEffectPicklingError(err), + OutboxError::EncryptionProviderReturnedWrongNumberOfItems => { + SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems + } + } + } +} + /// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table. #[derive(Clone, Copy, Default)] -pub struct TableOutbox; +pub struct TableOutbox { + encryption_provider: E, +} + +impl TableOutbox { + pub fn new(encryption_provider: E) -> Self { + Self { + encryption_provider, + } + } +} #[async_trait] -impl SideEffectStorage for TableOutbox { - async fn store_side_effects + Pickle + Send + Sync>( +impl SideEffectStorage + for TableOutbox +where + S: SideEffect + Pickle + Send + Sync + 'static, +{ + async fn store_side_effects( &self, transaction: &mut Transaction<'_, Postgres>, - items: Vec, - ) -> Result<(), DbError> { + items: Vec, + ) -> Result<(), SideEffectDbError::Error>> { let mut ids: Vec = Vec::with_capacity(items.len()); let mut messages: Vec> = Vec::with_capacity(items.len()); let mut retries: Vec = Vec::with_capacity(items.len()); let mut requeues: Vec = Vec::with_capacity(items.len()); let mut created_ats: Vec> = Vec::with_capacity(items.len()); - for side_effect in items { - let id = *side_effect.id(); - let msg = side_effect - .pickle() - .context("Failed to pickle side effect") - .map_err(DbError::PicklingError)?; - ids.push(id); - messages.push(msg); - retries.push(0); - requeues.push(true); - created_ats.push(Utc::now()); + for chunk in items.chunks(self.encryption_provider.max_batch_size()) { + let mut plain = Vec::with_capacity(chunk.len()); + for side_effect in chunk { + let id = *side_effect.id(); + let msg = side_effect + .pickle() + .map_err(SideEffectDbError::SideEffectPicklingError)?; + ids.push(id); + plain.push(msg); + retries.push(0); + requeues.push(true); + created_ats.push(Utc::now()); + } + let number_of_items = plain.len(); + let mut cipher = self + .encryption_provider + .encrypt(plain) + .await + .map_err(SideEffectDbError::Encryption)?; + if number_of_items != cipher.len() { + return Err(SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems); + } + messages.append(&mut cipher); } sqlx::query( @@ -63,29 +127,46 @@ impl SideEffectStorage for TableOutbox { } #[async_trait] -pub trait TransactionOutboxExt +pub trait TransactionOutboxExt where T: SideEffect + Pickle + Send + 'static, T::SideEffectId: Clone + Send + 'static, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { - async fn get_outbox_batch(&mut self) -> Result>, DbError>; + async fn get_outbox_batch( + &mut self, + ) -> Result>, OutboxError>; - async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError>; + async fn delete_outbox_item( + &mut self, + id: T::SideEffectId, + ) -> Result<(), OutboxError>; - async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError>; + async fn update_outbox_item( + &mut self, + item: OutboxMessage, + ) -> Result<(), OutboxError>; } #[async_trait] -impl TransactionOutboxExt for PostgresTransaction<'_, TableOutbox> +impl TransactionOutboxExt::Error> + for PostgresTransaction<'_, T, TableOutbox, E> where - T: SideEffect + Pickle + Send + 'static, - T::SideEffectId: Clone + Send + 'static, - for<'sql> T::SideEffectId: + T: Aggregate + Send + Sync + Pickle + 'static, + T::SideEffect: SideEffect + Pickle + Send + Sync, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::ApplyError: Send + Sync, + for<'sql> ::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, + E: EncryptionProvider + Send + Sync + 'static, { - async fn get_outbox_batch(&mut self) -> Result>, DbError> { + async fn get_outbox_batch( + &mut self, + ) -> Result< + Vec>, + OutboxError::Error>, + > { #[derive(sqlx::FromRow)] struct OutboxRow { message: Vec, @@ -101,16 +182,35 @@ where .fetch_all(self.inner_mut().as_mut()) .await?; + let mut messages = Vec::with_capacity(rows.len()); + for chunk in rows.chunks(self.encryption_provider().max_batch_size()) { + let cipher: Vec<_> = chunk.iter().map(|row| row.message.clone()).collect(); + let number_of_items = cipher.len(); + let mut plain = self + .encryption_provider() + .decrypt(cipher) + .await + .map_err(OutboxError::Encryption)?; + if plain.len() != number_of_items { + return Err(OutboxError::EncryptionProviderReturnedWrongNumberOfItems); + } + messages.append(&mut plain); + } + rows.into_iter() - .map(|row| { - let msg = T::unpickle(&row.message).context("Failed to unpickle side effect")?; + .zip(messages.into_iter()) + .map(|(row, message)| { + let msg = + T::SideEffect::unpickle(&message).map_err(OutboxError::SideEffectPickling)?; Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) }) - .collect::, anyhow::Error>>() - .map_err(DbError::PicklingError) + .collect::, OutboxError::Error>>>() } - async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError> { + async fn delete_outbox_item( + &mut self, + id: ::SideEffectId, + ) -> Result<(), OutboxError::Error>> { sqlx::query("DELETE FROM outbox WHERE id = $1") .bind(id) .execute(self.inner_mut().as_mut()) @@ -118,7 +218,10 @@ where Ok(()) } - async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError> { + async fn update_outbox_item( + &mut self, + item: OutboxMessage, + ) -> Result<(), OutboxError::Error>> { sqlx::query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") .bind(item.message.id()) .bind(i32::from(item.retries)) @@ -168,65 +271,77 @@ pub trait SideEffectHandler { /// Extension trait for running the outbox worker using a [`TableOutbox`]. #[async_trait] -pub trait RepositoryOutboxExt { - async fn start_outbox( +pub trait RepositoryOutboxExt +where + T: Aggregate + Send + Sync + Pickle + 'static, + T::DomainEvent: Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Clone + Send + Sync + 'static, + ::SideEffectId: Clone + Send + 'static, + H: SideEffectHandler + Send + Sync + 'static, + E: EncryptionProvider + Clone + Send + Sync + 'static, + for<'a> PostgresTransaction<'a, T, TableOutbox, E>: + TransactionOutboxExt::Error>, + for<'sql> ::SideEffectId: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + async fn start_outbox( &self, handler: H, poll_interval: std::time::Duration, - ) -> Result<(), DbError> - where - T: SideEffect + Pickle + Send + Sync + 'static, - T::SideEffectId: Clone + Send + 'static, - H: SideEffectHandler + Send + Sync, - for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin; + ) -> Result<(), OutboxError::Error>>; } #[async_trait] -impl RepositoryOutboxExt for PostgresRepository { - async fn start_outbox( +impl RepositoryOutboxExt for PostgresRepository, E> +where + T: Aggregate + Send + Sync + Pickle + 'static, + ::SideEffectId: Clone + Send + 'static, + T::SideEffect: SideEffect + Clone + Pickle + Send + Sync, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::ApplyError: Send + Sync, + H: SideEffectHandler + Send + Sync + 'static, + E: EncryptionProvider + Clone + Send + Sync + 'static, + for<'a> PostgresTransaction<'a, T, TableOutbox, E>: + TransactionOutboxExt::Error>, + for<'sql> ::SideEffectId: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + async fn start_outbox( &self, handler: H, poll_interval: std::time::Duration, - ) -> Result<(), DbError> - where - T: SideEffect + Pickle + Send + Sync + 'static, - T::SideEffectId: Clone + Send + 'static, - H: SideEffectHandler + Send + Sync, - for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin, - { + ) -> Result<(), OutboxError::Error>> { let handler = Arc::new(handler); loop { let deadline = std::time::Instant::now() + poll_interval; - let _ = process_outbox_batch::(self, handler.clone()).await; + let _ = process_outbox_batch::(self, handler.clone()).await; tokio::time::sleep_until(deadline.into()).await; } } } -async fn process_outbox_batch( - repo: &PostgresRepository, +async fn process_outbox_batch( + repo: &PostgresRepository, E>, handler: Arc, -) -> Result<(), DbError> +) -> Result<(), OutboxError::Error>> where - T: SideEffect + Pickle + Send + Sync + 'static, - T::SideEffectId: Clone + Send + 'static, - H: SideEffectHandler + Send + Sync, - for<'a> PostgresTransaction<'a, TableOutbox>: TransactionOutboxExt, - for<'sql> T::SideEffectId: + T: Aggregate + Send + Sync + Pickle + 'static, + T::SideEffect: SideEffect + Pickle + Send + Sync, + H: SideEffectHandler + Send + Sync, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::ApplyError: Send + Sync, + E: EncryptionProvider + Clone + Send + Sync + 'static, + for<'a> PostgresTransaction<'a, T, TableOutbox, E>: + TransactionOutboxExt::Error>, + for<'sql> ::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { let mut tx = repo.begin_transaction().await?; - let outbox_items: Vec> = tx.get_outbox_batch().await?; + let outbox_items: Vec> = tx.get_outbox_batch().await?; for mut item in outbox_items { - let id: T::SideEffectId = item.message.id().clone(); + let id: ::SideEffectId = *item.message.id(); match handler.handle(&item.message, item.retries).await { Ok(()) => { @@ -240,5 +355,8 @@ where } } - tx.commit().await + tx.into_inner() + .commit() + .await + .map_err(OutboxError::Database) } diff --git a/eventastic_postgres/Cargo.toml b/eventastic_postgres/Cargo.toml index e4ceae7..fbf287b 100644 --- a/eventastic_postgres/Cargo.toml +++ b/eventastic_postgres/Cargo.toml @@ -10,19 +10,22 @@ categories = ["web-programming", "asynchronous"] keywords = ["postgres", "postgresql", "database", "ddd", "event-sourcing"] [dependencies] -anyhow = { workspace = true } async-trait = { workspace = true } +async-stream = { workspace = true } chrono = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } futures = { workspace = true } futures-util = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } +serde = { workspace = true, optional = true } +serde_json = { workspace = true, optional = true } sqlx = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } -async-stream = { workspace = true } [dev-dependencies] eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" } uuid = { workspace = true } + +[features] +default = [] +serde = ["dep:serde", "dep:serde_json"] diff --git a/eventastic_postgres/src/common.rs b/eventastic_postgres/src/common.rs index 40726f5..abfad2d 100644 --- a/eventastic_postgres/src/common.rs +++ b/eventastic_postgres/src/common.rs @@ -5,12 +5,33 @@ use crate::DbError; use crate::pickle::Pickle; -use anyhow::Context; use eventastic::aggregate::Aggregate; use eventastic::event::{DomainEvent, EventStoreEvent}; use eventastic::repository::Snapshot; use sqlx::types::Uuid; +/// Type alias for the complex return type of event conversion operations. +type EventResult = Result< + EventStoreEvent<::DomainEvent>, + DbError< + E, + <::DomainEvent as Pickle>::Error, + ::Error, + <::SideEffect as Pickle>::Error, + >, +>; + +/// Type alias for the complex return type of snapshot conversion operations. +type SnapshotResult = Result< + Snapshot, + DbError< + E, + <::DomainEvent as Pickle>::Error, + ::Error, + <::SideEffect as Pickle>::Error, + >, +>; + /// Internal representation of a database row containing event data. /// /// This struct is used to deserialize event rows from the database @@ -36,21 +57,22 @@ impl PartialEventRow { /// # Errors /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. - /// Returns [`DbError::PicklingError`] if the event JSON cannot be deserialized. - pub fn to_event(row: PartialEventRow) -> Result, DbError> + /// Returns [`DbError::EventPicklingError`] if the event JSON cannot be deserialized. + pub fn to_event(row: PartialEventRow) -> EventResult where - Evt: DomainEvent + Pickle, + T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, { let row_version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; - Evt::unpickle(&row.event) + T::DomainEvent::unpickle(&row.event) .map(|e| EventStoreEvent { id: row.event_id, event: e, version: row_version, }) - .context("Failed to unpickle event") - .map_err(DbError::PicklingError) + .map_err(DbError::EventPicklingError) } } @@ -80,17 +102,17 @@ impl PartialSnapshotRow { /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. /// Returns [`DbError::InvalidSnapshotVersion`] if the snapshot version cannot be converted to u64. - /// Returns [`DbError::PicklingError`] if the aggregate JSON cannot be deserialized. - pub fn to_snapshot(row: PartialSnapshotRow) -> Result, DbError> + /// Returns [`DbError::SnapshotPicklingError`] if the aggregate JSON cannot be deserialized. + pub fn to_snapshot(row: PartialSnapshotRow) -> SnapshotResult where T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, { let version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; let snapshot_version = u64::try_from(row.snapshot_version).map_err(|_| DbError::InvalidSnapshotVersion)?; - let aggregate: T = T::unpickle(&row.aggregate) - .context("Failed to unpickle aggregate") - .map_err(DbError::PicklingError)?; + let aggregate: T = T::unpickle(&row.aggregate).map_err(DbError::SnapshotPicklingError)?; Ok(Snapshot { aggregate, @@ -109,7 +131,7 @@ pub(crate) mod utils { /// # Errors /// /// Returns [`DbError::InvalidVersionNumber`] if the conversion fails. - pub fn version_to_i64(version: u64) -> Result { + pub fn version_to_i64(version: u64) -> Result> { i64::try_from(version).map_err(|_| DbError::InvalidVersionNumber) } @@ -118,7 +140,9 @@ pub(crate) mod utils { /// # Errors /// /// Returns [`DbError::InvalidSnapshotVersion`] if the conversion fails. - pub fn snapshot_version_to_i64(version: u64) -> Result { + pub fn snapshot_version_to_i64( + version: u64, + ) -> Result> { i64::try_from(version).map_err(|_| DbError::InvalidSnapshotVersion) } } diff --git a/eventastic_postgres/src/encryption.rs b/eventastic_postgres/src/encryption.rs new file mode 100644 index 0000000..ab15688 --- /dev/null +++ b/eventastic_postgres/src/encryption.rs @@ -0,0 +1,54 @@ +use async_trait::async_trait; + +/// Encrypt data before storing it in the database. +#[async_trait] +pub trait EncryptionProvider { + type Error: std::error::Error + Send + Sync + 'static; + + /// Encrypt a batch of items. The batch size won't exceed the value returned + /// by [`max_batch_size`]. + async fn encrypt(&self, plain: Vec>) -> Result>, Self::Error>; + + /// Decrypt a batch of items. The batch size won't exceed the value returned + /// by [`max_batch_size`]. + async fn decrypt(&self, cipher: Vec>) -> Result>, Self::Error>; + + /// The maximum batch size to use for [`encrypt`] and [`decrypt`] operations. + fn max_batch_size(&self) -> usize; +} + +/// An [`EncryptionProvider`] that does no encryption. Can be used where you +/// don't need any encryption. +#[derive(Clone)] +pub struct NoEncryption; + +#[async_trait] +impl EncryptionProvider for NoEncryption { + type Error = NoEncryptionError; + + async fn encrypt(&self, plain: Vec>) -> Result>, Self::Error> { + Ok(plain) + } + + async fn decrypt(&self, cipher: Vec>) -> Result>, Self::Error> { + Ok(cipher) + } + + fn max_batch_size(&self) -> usize { + 100 + } +} + +/// The error type for [`NoEncryption`]. +/// +/// This can't actually be returned by `encrypt` or `decrypt` but is required by the trait. +#[derive(Debug)] +pub struct NoEncryptionError; + +impl std::fmt::Display for NoEncryptionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "NoEncryptionError") + } +} + +impl std::error::Error for NoEncryptionError {} diff --git a/eventastic_postgres/src/error.rs b/eventastic_postgres/src/error.rs new file mode 100644 index 0000000..ab120c3 --- /dev/null +++ b/eventastic_postgres/src/error.rs @@ -0,0 +1,102 @@ +use eventastic::aggregate::Aggregate; +use thiserror::Error; + +use crate::{EncryptionProvider, Pickle}; + +#[allow(type_alias_bounds)] +pub type EventSourcingDbError = DbError< + E::Error, + ::Error, + ::Error, + ::Error, +>; + +#[derive(Error, Debug)] +pub enum DbError< + EncryptionError, + EventPicklingError, + SnapshotPicklingError, + SideEffectPicklingError, +> { + /// A database operation failed. + #[error("DB Error {0}")] + DbError(sqlx::Error), + /// Failed to pickle data. + #[error("Pickling Error {0}")] + EventPicklingError(EventPicklingError), + /// Failed to pickle snapshot data. + #[error("Snapshot Pickling Error {0}")] + SnapshotPicklingError(SnapshotPicklingError), + /// Failed to pickle side effect data. + #[error("Side Effect Pickling Error {0}")] + SideEffectPicklingError(SideEffectPicklingError), + /// An invalid version number was encountered (e.g., negative value where positive expected). + #[error("Invalid Version Number")] + InvalidVersionNumber, + /// An invalid snapshot version number was encountered. + #[error("Invalid Snapshot Version Number")] + InvalidSnapshotVersion, + /// A concurrent modification was detected (optimistic locking failure). + #[error("Optimistic Concurrency Error")] + OptimisticConcurrencyError, + /// Failed to encrypt or decrypt data. + #[error("Encryption Error {0}")] + Encryption(EncryptionError), + /// Failed to encrypt or decrypt data. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +/// Errors that can occur during side effect storage operations. +/// +/// This is a specialized error type for side effect operations that only +/// includes the errors relevant to storing and retrieving side effects. +#[derive(Error, Debug)] +pub enum SideEffectDbError { + /// A database operation failed. + #[error("DB Error {0}")] + DbError(sqlx::Error), + /// Failed to pickle side effect data. + #[error("Side Effect Pickling Error {0}")] + SideEffectPicklingError(SideEffectPicklingError), + /// Failed to encrypt or decrypt data. + #[error("Encryption Error {0}")] + Encryption(EncryptionError), + /// Encryption provider returned wrong number of items. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +impl From for DbError { + fn from(e: sqlx::Error) -> Self { + if let Some(db_error) = e.as_database_error() { + if let Some(code) = db_error.code() { + if code == "23505" && db_error.message().contains("aggregate_version") { + return DbError::OptimisticConcurrencyError; + } + } + } + DbError::DbError(e) + } +} + +impl From for SideEffectDbError { + fn from(e: sqlx::Error) -> Self { + SideEffectDbError::DbError(e) + } +} + +impl From> for DbError { + fn from(e: SideEffectDbError) -> Self { + match e { + SideEffectDbError::DbError(err) => DbError::DbError(err), + SideEffectDbError::SideEffectPicklingError(err) => { + DbError::SideEffectPicklingError(err) + } + SideEffectDbError::Encryption(err) => DbError::Encryption(err), + SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems => { + DbError::EncryptionProviderReturnedWrongNumberOfItems + } + } + } +} diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index d3464b9..08085a7 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,17 +1,23 @@ mod common; +mod encryption; +mod error; mod pickle; mod reader_impl; mod repository; mod side_effect; -mod table_registry; +mod table_config; mod transaction; +pub use encryption::{EncryptionProvider, NoEncryption, NoEncryptionError}; +pub use error::{DbError, SideEffectDbError}; pub use pickle::Pickle; pub use repository::PostgresRepository; pub use side_effect::SideEffectStorage; -pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder}; +pub use table_config::TableConfig; pub use transaction::PostgresTransaction; +use crate::error::EventSourcingDbError; + use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, @@ -19,70 +25,34 @@ use eventastic::{ repository::{Repository, RepositoryError}, }; use sqlx::types::Uuid; -use thiserror::Error; - -/// Errors that can occur during PostgreSQL operations. -#[derive(Error, Debug)] -pub enum DbError { - /// A database operation failed. - #[error("DB Error {0}")] - DbError(sqlx::Error), - /// Failed to pickle data. - #[error("Pickling Error {0}")] - PicklingError(anyhow::Error), - /// An invalid version number was encountered (e.g., negative value where positive expected). - #[error("Invalid Version Number")] - InvalidVersionNumber, - /// An invalid snapshot version number was encountered. - #[error("Invalid Snapshot Version Number")] - InvalidSnapshotVersion, - /// A concurrent modification was detected (optimistic locking failure). - #[error("Optimistic Concurrency Error")] - OptimisticConcurrencyError, - /// An aggregate type was not registered in the table registry. - #[error("Aggregate type not registered in table registry")] - UnregisteredAggregate, -} - -impl From for DbError { - fn from(e: sqlx::Error) -> Self { - if let Some(db_error) = e.as_database_error() { - if let Some(code) = db_error.code() { - if code == "23505" && db_error.message().contains("aggregate_version") { - return DbError::OptimisticConcurrencyError; - } - } - } - DbError::DbError(e) - } -} /// Extension trait for loading aggregates from PostgreSQL storage. /// /// This trait provides PostgreSQL-specific methods for working with aggregates /// that have UUID-based identifiers and can be serialized to JSON. #[async_trait] -pub trait RootExt +pub trait RootExt where T: Aggregate + Pickle + Send + Sync + 'static, ::DomainEvent: DomainEvent + Pickle + Send + Sync, ::SideEffect: SideEffect + Pickle + Send + Sync, ::ApplyError: Send + Sync, - O: SideEffectStorage + Send + Sync, + O: SideEffectStorage + Send + Sync, + E: EncryptionProvider + Clone + Send + Sync, { /// Loads an aggregate from PostgreSQL storage by its UUID using an existing transaction. /// /// This method replays the event stream for the given aggregate ID, /// starting from any available snapshot and applying subsequent events. async fn load_with_transaction( - transaction: &mut PostgresTransaction<'_, O>, + transaction: &mut PostgresTransaction<'_, T, O, E>, aggregate_id: Uuid, ) -> Result< Context, RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >, > { Context::load(transaction, &aggregate_id).await @@ -93,14 +63,14 @@ where /// This method is more efficient for read-only operations as it uses a /// connection directly from the pool without starting a transaction. async fn load( - repository: &PostgresRepository, + repository: &PostgresRepository, aggregate_id: Uuid, ) -> Result< Context, RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >, > where @@ -110,12 +80,13 @@ where } } -impl RootExt for T +impl RootExt for T where T: Aggregate + Pickle + Send + Sync + 'static, ::DomainEvent: DomainEvent + Pickle + Send + Sync, ::SideEffect: SideEffect + Pickle + Send + Sync, ::ApplyError: Send + Sync, - O: SideEffectStorage + Send + Sync, + O: SideEffectStorage + Send + Sync, + E: EncryptionProvider + Clone + Send + Sync, { } diff --git a/eventastic_postgres/src/pickle.rs b/eventastic_postgres/src/pickle.rs index e81f267..0281e9d 100644 --- a/eventastic_postgres/src/pickle.rs +++ b/eventastic_postgres/src/pickle.rs @@ -13,6 +13,7 @@ pub trait Pickle: Sized { fn unpickle(bytes: &[u8]) -> Result; } +#[cfg(feature = "serde")] impl Pickle for T where T: serde::Serialize + serde::de::DeserializeOwned, diff --git a/eventastic_postgres/src/reader_impl.rs b/eventastic_postgres/src/reader_impl.rs index 24895c4..8dcde6c 100644 --- a/eventastic_postgres/src/reader_impl.rs +++ b/eventastic_postgres/src/reader_impl.rs @@ -4,76 +4,118 @@ //! that can be used by both [`PostgresTransaction`] and [`PostgresConnection`]. //! All operations use dynamic table names provided via the [`TableConfig`]. -use crate::DbError; +use std::sync::Arc; + use crate::common::{PartialEventRow, PartialSnapshotRow, utils}; use crate::pickle::Pickle; +use crate::{DbError, EncryptionProvider, EventSourcingDbError}; use eventastic::aggregate::Aggregate; use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; use eventastic::repository::Snapshot; +use futures_util::stream::StreamExt; use sqlx::types::Uuid; use sqlx::{Executor, query_as}; /// Generic implementation for streaming events from configured table. -pub fn stream_from<'e, 'c: 'e, E, T>( +pub fn stream_from<'e, 'c: 'e, E, T, EP>( executor: E, id: &T::AggregateId, version: u64, - query: String, -) -> impl futures::Stream, DbError>> + 'e + query: Arc, + encryption_provider: &'e EP, +) -> impl futures::Stream< + Item = std::result::Result, EventSourcingDbError>, +> + 'e where E: Executor<'c, Database = sqlx::Postgres> + 'e, - T: Aggregate, + T: Aggregate + Pickle, T::DomainEvent: DomainEvent + Pickle + Send + 'e, + T::SideEffect: Pickle, + EP: EncryptionProvider + Sync + Send + 'e, { let id = *id; - Box::pin(async_stream::stream! { + async_stream::stream! { let version = utils::version_to_i64(version)?; - let stream = query_as::<_, PartialEventRow>(&query) + let chunks = query_as::<_, PartialEventRow>(&query) .bind(id) .bind(version) - .fetch(executor); + .fetch(executor) + .chunks(encryption_provider.max_batch_size()); - for await result in stream { - yield match result { - Ok(row) => PartialEventRow::to_event::(row), - Err(e) => Err(DbError::from(e)), - }; + for await chunk in chunks { + let chunk = chunk.into_iter().collect::, _>>()?; + // TODO: We could have the query return a vector of events rather than doing this here. + let cipher: Vec<_> = chunk.iter().map(|row| row.event.clone()).collect(); + let number_of_items = cipher.len(); + let plain = encryption_provider + .decrypt(cipher) + .await + .map_err(DbError::Encryption)?; + if plain.len() != number_of_items { + Err(DbError::EncryptionProviderReturnedWrongNumberOfItems)?; + } + for (mut row, plain) in chunk.into_iter().zip(plain.into_iter()) { + row.event = plain; + yield PartialEventRow::to_event::(row); + } } - }) + } } + /// Generic implementation for getting an event by ID from configured table. -pub async fn get_event<'c, E, T>( +pub async fn get_event<'c, E, T, EP>( executor: E, aggregate_id: &T::AggregateId, event_id: &<::DomainEvent as DomainEvent>::EventId, query: &str, -) -> Result::DomainEvent>>, DbError> + encryption_provider: &EP, +) -> Result::DomainEvent>>, EventSourcingDbError> where E: Executor<'c, Database = sqlx::Postgres>, - T: Aggregate, + T: Aggregate + Pickle, T::DomainEvent: DomainEvent + Pickle + Send, + T::SideEffect: Pickle, + EP: EncryptionProvider, { - query_as::<_, PartialEventRow>(query) + let Some(mut row) = query_as::<_, PartialEventRow>(query) .bind(aggregate_id) .bind(event_id) .fetch_optional(executor) .await? - .map(PartialEventRow::to_event) - .transpose() + else { + return Ok(None); + }; + let mut plain = encryption_provider + .decrypt(vec![row.event]) + .await + .map_err(DbError::Encryption)? + .into_iter(); + let Some(event) = plain.next() else { + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); + }; + if plain.next().is_some() { + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); + } + row.event = event; + Ok(Some(PartialEventRow::to_event::(row)?)) } /// Generic implementation for getting a snapshot from configured table. -pub async fn get_snapshot<'c, E, T>( +pub async fn get_snapshot<'c, E, T, EP>( executor: E, id: &T::AggregateId, query: &str, -) -> Result>, DbError> + encryption_provider: &EP, +) -> Result>, EventSourcingDbError> where E: Executor<'c, Database = sqlx::Postgres>, T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, + EP: EncryptionProvider, { let row = query_as::<_, PartialSnapshotRow>(query) .bind(id) @@ -81,9 +123,21 @@ where .fetch_optional(executor) .await?; - let Some(row) = row else { + let Some(mut row) = row else { return Ok(None); }; - Ok(Some(PartialSnapshotRow::to_snapshot(row)?)) + let plain = encryption_provider + .decrypt(vec![row.aggregate.clone()]) + .await + .map_err(DbError::Encryption)?; + if plain.len() != 1 { + Err(DbError::EncryptionProviderReturnedWrongNumberOfItems)?; + } + row.aggregate = plain + .into_iter() + .next() + .expect("Decrypt must return 1 item for snapshot"); + + Ok(Some(PartialSnapshotRow::to_snapshot::(row)?)) } diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index f374f54..8fb7149 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -1,17 +1,19 @@ -use crate::pickle::Pickle; -use crate::{DbError, PostgresTransaction, SideEffectStorage, TableRegistry, reader_impl}; +use crate::{ + EventSourcingDbError, PostgresTransaction, SideEffectStorage, encryption::EncryptionProvider, + pickle::Pickle, reader_impl, table_config::TableConfig, +}; use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, event::{DomainEvent, EventStoreEvent}, repository::{Repository, RepositoryError, RepositoryReader, Snapshot}, }; -use futures::StreamExt; use sqlx::{ Pool, Postgres, postgres::{PgConnectOptions, PgPoolOptions}, types::Uuid, }; +use std::marker::PhantomData; /// PostgreSQL-based repository implementation for event sourcing. /// @@ -19,19 +21,15 @@ use sqlx::{ /// using PostgreSQL as the backing store. It integrates with a configurable side effect /// storage mechanism for handling the outbox pattern. #[derive(Clone)] -pub struct PostgresRepository -where - O: SideEffectStorage + Clone, -{ - pub(crate) inner: Pool, - pub(crate) outbox: O, - pub(crate) tables: TableRegistry, +pub struct PostgresRepository { + inner: Pool, + outbox: O, + table_config: TableConfig, + encryption_provider: E, + phantom_aggregate: std::marker::PhantomData, } -impl PostgresRepository -where - O: SideEffectStorage + Clone, -{ +impl PostgresRepository { /// Creates a new PostgreSQL repository with the specified connection and pool options. /// /// # Parameters @@ -39,19 +37,21 @@ where /// - `connect_options` - PostgreSQL connection configuration /// - `pool_options` - Connection pool configuration /// - `outbox` - Side effect storage implementation for the outbox pattern - /// - `tables` - Registry of table configurations for different aggregates pub async fn new( connect_options: PgConnectOptions, pool_options: PgPoolOptions, + table_config: TableConfig, outbox: O, - tables: TableRegistry, + encryption_provider: E, ) -> Result { let pool = pool_options.connect_with(connect_options).await?; Ok(Self { inner: pool, outbox, - tables, + table_config, + encryption_provider, + phantom_aggregate: PhantomData, }) } @@ -59,14 +59,33 @@ where /// /// The returned transaction can be used to perform multiple operations /// atomically and provides access to the repository methods. - pub async fn begin_transaction(&self) -> Result, sqlx::Error> { + pub async fn begin_transaction(&self) -> Result, sqlx::Error> { Ok(PostgresTransaction { inner: self.inner.begin().await?, outbox: &self.outbox, - tables: &self.tables, + table_config: &self.table_config, + encryption_provider: &self.encryption_provider, + phantom_aggregate: PhantomData, }) } + /// Create a transaction from an existing raw sqlx transaction. + /// + /// This is useful for multi-aggregate scenarios where you want to use + /// the same database transaction across multiple repository types. + pub fn transaction_from<'a>( + &'a self, + transaction: sqlx::Transaction<'a, Postgres>, + ) -> PostgresTransaction<'a, T, O, E> { + PostgresTransaction { + inner: transaction, + outbox: &self.outbox, + table_config: &self.table_config, + encryption_provider: &self.encryption_provider, + phantom_aggregate: PhantomData, + } + } + /// Run database migrations to set up the required tables and schema. /// /// This method should be called once during application startup to ensure @@ -80,15 +99,16 @@ where } #[async_trait] -impl RepositoryReader for PostgresRepository +impl RepositoryReader for PostgresRepository where T: Aggregate + Pickle + Send + Sync + 'static, T::DomainEvent: DomainEvent + Pickle + Send + Sync, T::SideEffect: SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, - O: SideEffectStorage + Clone + Send + Sync, + O: SideEffectStorage + Clone + Send + Sync, + E: EncryptionProvider + Clone + Send + Sync, { - type DbError = DbError; + type DbError = EventSourcingDbError; /// Returns a stream of domain events. fn stream_from( @@ -103,17 +123,13 @@ where Self::DbError, >, > { - let query = match self.tables.stream_events_query::() { - Some(query) => query.to_string(), - None => { - return futures::stream::iter(vec![Err(DbError::UnregisteredAggregate)]).boxed(); - } - }; - Box::pin(reader_impl::stream_from::<_, T>( + let query = &self.table_config.stream_events_query; + Box::pin(reader_impl::stream_from::<_, T, E>( &self.inner, id, version, - query, + query.clone(), + &self.encryption_provider, )) } @@ -123,11 +139,15 @@ where aggregate_id: &T::AggregateId, event_id: &<::DomainEvent as DomainEvent>::EventId, ) -> Result::DomainEvent>>, Self::DbError> { - let query = self - .tables - .get_event_query::() - .ok_or(DbError::UnregisteredAggregate)?; - reader_impl::get_event::<_, T>(&self.inner, aggregate_id, event_id, query).await + let query = &self.table_config.get_event_query; + reader_impl::get_event::<_, T, E>( + &self.inner, + aggregate_id, + event_id, + query, + &self.encryption_provider, + ) + .await } /// Returns a snapshot of the aggregate in the database @@ -135,27 +155,26 @@ where &mut self, id: &T::AggregateId, ) -> Result>, Self::DbError> { - let query = self - .tables - .get_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; - reader_impl::get_snapshot::<_, T>(&self.inner, id, query).await + let query = &self.table_config.get_snapshot_query; + reader_impl::get_snapshot::<_, T, E>(&self.inner, id, query, &self.encryption_provider) + .await } } #[async_trait] -impl Repository for PostgresRepository +impl Repository for PostgresRepository where T: Aggregate + Pickle + Send + Sync + 'static, T::DomainEvent: DomainEvent + Pickle + Send + Sync, T::SideEffect: eventastic::aggregate::SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, - O: SideEffectStorage + Clone + Send + Sync, + O: SideEffectStorage + Clone + Send + Sync, + E: EncryptionProvider + Clone + Send + Sync, { type Error = RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >; /// Loads an aggregate from the repository by its ID. diff --git a/eventastic_postgres/src/side_effect.rs b/eventastic_postgres/src/side_effect.rs index cd18fa4..2452bf3 100644 --- a/eventastic_postgres/src/side_effect.rs +++ b/eventastic_postgres/src/side_effect.rs @@ -1,4 +1,4 @@ -use crate::DbError; +use crate::SideEffectDbError; use crate::pickle::Pickle; use async_trait::async_trait; use eventastic::aggregate::SideEffect; @@ -11,7 +11,10 @@ use sqlx::{Postgres, Transaction}; /// different implementations such as direct table storage or outbox patterns. /// Implementors define how side effects are persisted within a database transaction. #[async_trait] -pub trait SideEffectStorage: Send + Sync { +pub trait SideEffectStorage: Send + Sync +where + T: SideEffect + Pickle + Send + Sync, +{ /// Store a collection of side effects within the given database transaction. /// /// This method is called as part of the aggregate save process to ensure @@ -24,10 +27,10 @@ pub trait SideEffectStorage: Send + Sync { /// /// # Errors /// - /// Returns [`DbError`] if the storage operation fails. - async fn store_side_effects + Pickle + Send + Sync>( + /// Returns [`SideEffectDbError`] if the storage operation fails. + async fn store_side_effects( &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, - ) -> Result<(), DbError>; + ) -> Result<(), SideEffectDbError::Error>>; } diff --git a/eventastic_postgres/src/table_config.rs b/eventastic_postgres/src/table_config.rs new file mode 100644 index 0000000..999fe4b --- /dev/null +++ b/eventastic_postgres/src/table_config.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +/// Configuration for database tables used by an aggregate type. +/// +/// This struct contains pre-computed SQL queries to avoid string allocation +/// during query execution. +#[derive(Debug, Clone)] +pub struct TableConfig { + pub(crate) stream_events_query: Arc, + pub(crate) get_event_query: String, + pub(crate) get_snapshot_query: String, + pub(crate) insert_events_query: String, + pub(crate) upsert_snapshot_query: String, +} + +impl TableConfig { + /// Create a new TableConfig with pre-computed queries. + pub fn new(events: impl Into, snapshots: impl Into) -> Self { + let events = events.into(); + let snapshots = snapshots.into(); + + Self { + stream_events_query: format!( + "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND version >= $2 ORDER BY version ASC", + &events + ).into(), + get_event_query: format!( + "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND event_id = $2", + &events + ), + get_snapshot_query: format!( + "SELECT aggregate, version, snapshot_version FROM {} WHERE aggregate_id = $1 AND snapshot_version = $2", + &snapshots + ), + insert_events_query: format!( + "INSERT INTO {} (event_id, version, aggregate_id, event, created_at) \ + SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::bytea[], $5::timestamptz[]) \ + ON CONFLICT DO NOTHING returning event_id", + &events + ), + upsert_snapshot_query: format!( + "INSERT INTO {} (aggregate_id, aggregate, version, snapshot_version, created_at) \ + VALUES ($1, $2, $3, $4, $5) \ + ON CONFLICT (aggregate_id, snapshot_version) DO UPDATE SET aggregate = $2, version = $3, created_at = $5", + &snapshots + ), + } + } +} diff --git a/eventastic_postgres/src/table_registry.rs b/eventastic_postgres/src/table_registry.rs deleted file mode 100644 index 85d084c..0000000 --- a/eventastic_postgres/src/table_registry.rs +++ /dev/null @@ -1,152 +0,0 @@ -use eventastic::aggregate::Aggregate; -use std::any::TypeId; -use std::collections::HashMap; -use std::sync::Arc; - -/// Configuration for database tables used by an aggregate type. -/// -/// This struct contains pre-computed SQL queries to avoid string allocation -/// during query execution. -#[derive(Debug, Clone)] -pub struct TableConfig { - pub(crate) stream_events_query: String, - pub(crate) get_event_query: String, - pub(crate) get_snapshot_query: String, - pub(crate) insert_events_query: String, - pub(crate) upsert_snapshot_query: String, -} - -impl TableConfig { - /// Create a new TableConfig with pre-computed queries. - pub fn new(events: impl Into, snapshots: impl Into) -> Self { - let events = events.into(); - let snapshots = snapshots.into(); - - Self { - stream_events_query: format!( - "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND version >= $2 ORDER BY version ASC", - &events - ), - get_event_query: format!( - "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND event_id = $2", - &events - ), - get_snapshot_query: format!( - "SELECT aggregate, version, snapshot_version FROM {} WHERE aggregate_id = $1 AND snapshot_version = $2", - &snapshots - ), - insert_events_query: format!( - "INSERT INTO {} (event_id, version, aggregate_id, event, created_at) \ - SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::bytea[], $5::timestamptz[]) \ - ON CONFLICT DO NOTHING returning event_id", - &events - ), - upsert_snapshot_query: format!( - "INSERT INTO {} (aggregate_id, aggregate, version, snapshot_version, created_at) \ - VALUES ($1, $2, $3, $4, $5) \ - ON CONFLICT (aggregate_id, snapshot_version) DO UPDATE SET aggregate = $2, version = $3, created_at = $5", - &snapshots - ), - } - } -} - -/// Registry that maps aggregate types to their table configurations. -/// -/// This allows different aggregate types to use different tables while -/// supporting runtime configuration. -#[derive(Debug, Clone, Default)] -pub struct TableRegistry { - tables: HashMap>, -} - -impl TableRegistry { - /// Create a new empty table registry. - pub fn new() -> Self { - Self { - tables: HashMap::new(), - } - } - - /// Register table configuration for an aggregate type. - pub fn register(&mut self, config: TableConfig) { - self.tables.insert(TypeId::of::(), Arc::new(config)); - } - - /// Get the stream events query for an aggregate type. - pub fn stream_events_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.stream_events_query.as_str()) - } - - /// Get the get event query for an aggregate type. - pub fn get_event_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.get_event_query.as_str()) - } - - /// Get the get snapshot query for an aggregate type. - pub fn get_snapshot_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.get_snapshot_query.as_str()) - } - - /// Get the insert events query for an aggregate type. - pub fn insert_events_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.insert_events_query.as_str()) - } - - /// Get the upsert snapshot query for an aggregate type. - pub fn upsert_snapshot_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.upsert_snapshot_query.as_str()) - } -} - -/// Builder for creating a TableRegistry with a fluent API. -pub struct TableRegistryBuilder { - registry: TableRegistry, -} - -impl TableRegistryBuilder { - /// Create a new builder. - pub fn new() -> Self { - Self { - registry: TableRegistry::new(), - } - } - - /// Register table configuration for an aggregate type. - pub fn register(mut self, config: TableConfig) -> Self { - self.registry.register::(config); - self - } - - /// Register table configuration for an aggregate type with explicit table names. - pub fn register_with_tables( - mut self, - events: impl Into, - snapshots: impl Into, - ) -> Self { - self.registry - .register::(TableConfig::new(events, snapshots)); - self - } - - /// Build the TableRegistry. - pub fn build(self) -> TableRegistry { - self.registry - } -} - -impl Default for TableRegistryBuilder { - fn default() -> Self { - Self::new() - } -} diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 9252cf5..b76392e 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,7 +1,7 @@ use crate::common::utils; use crate::pickle::Pickle; -use crate::{DbError, SideEffectStorage, TableRegistry, reader_impl}; -use anyhow::Context as _; +use crate::table_config::TableConfig; +use crate::{DbError, EncryptionProvider, EventSourcingDbError, SideEffectStorage, reader_impl}; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; @@ -12,7 +12,6 @@ use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; use eventastic::repository::Snapshot; use eventastic::repository::{RepositoryError, RepositoryReader, RepositoryWriter}; -use futures::StreamExt; use sqlx::Row; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; @@ -21,24 +20,26 @@ use sqlx::{Postgres, Transaction}; /// /// This struct provides transactional access to PostgreSQL storage for event sourcing /// operations. It manages database transactions and integrates with side effect storage. -pub struct PostgresTransaction<'a, O> -where - O: SideEffectStorage, -{ +pub struct PostgresTransaction<'a, T, O, E> { pub(crate) inner: Transaction<'a, Postgres>, pub(crate) outbox: &'a O, - pub(crate) tables: &'a TableRegistry, + pub(crate) table_config: &'a TableConfig, + pub(crate) encryption_provider: &'a E, + pub(crate) phantom_aggregate: std::marker::PhantomData, } -impl<'a, O> PostgresTransaction<'a, O> +impl<'a, T, O, E> PostgresTransaction<'a, T, O, E> where - O: SideEffectStorage, + T: eventastic::aggregate::Aggregate + Pickle, + T::DomainEvent: Pickle, + T::SideEffect: Pickle, + E: EncryptionProvider, { /// Commit the transaction to the database. /// /// This finalizes all operations performed within this transaction, /// making them permanently visible to other database connections. - pub async fn commit(self) -> Result<(), DbError> { + pub async fn commit(self) -> Result<(), EventSourcingDbError> { Ok(self.inner.commit().await?) } @@ -46,7 +47,7 @@ where /// /// This undoes all operations performed within this transaction, /// returning the database to its state before the transaction began. - pub async fn rollback(self) -> Result<(), DbError> { + pub async fn rollback(self) -> Result<(), EventSourcingDbError> { Ok(self.inner.rollback().await?) } @@ -60,45 +61,49 @@ where &mut self.inner } - /// Get an aggregate by ID using the table registry. - pub async fn get( + /// Get the encryption provider reference + pub fn encryption_provider(&self) -> &E { + self.encryption_provider + } +} + +impl<'a, T, O, E> PostgresTransaction<'a, T, O, E> +where + O: SideEffectStorage, + E: EncryptionProvider + Send + Sync + 'static, + T: Aggregate + 'static + Send + Sync + Pickle, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, + T::ApplyError: Send + Sync, +{ + /// Get an aggregate by ID. + pub async fn get( &mut self, id: &Uuid, - ) -> Result, RepositoryError> - where - T: Aggregate + 'static + Send + Sync + Pickle, - T::DomainEvent: DomainEvent + Pickle + Send + Sync, - T::SideEffect: SideEffect + Pickle + Send + Sync, - T::ApplyError: Send + Sync, - { + ) -> Result, RepositoryError>> { Context::load(self, id).await } - /// Store an aggregate using the table registry. - pub async fn store( + /// Store an aggregate. + pub async fn store( &mut self, aggregate: &mut Context, - ) -> Result<(), SaveError> - where - T: Aggregate + 'static + Send + Sync + Pickle, - T::DomainEvent: DomainEvent + Pickle + Send + Sync, - T::SideEffect: SideEffect + Pickle + Send + Sync, - T::ApplyError: Send + Sync, - { + ) -> Result<(), SaveError>> { aggregate.save(self).await } } #[async_trait] -impl RepositoryReader for PostgresTransaction<'_, O> +impl RepositoryReader for PostgresTransaction<'_, T, O, E> where T: Aggregate + 'static + Pickle + Send + Sync, T::SideEffect: SideEffect + Pickle + Send + Sync, T::DomainEvent: DomainEvent + Pickle + Send + Sync, T::ApplyError: Send + Sync, - O: SideEffectStorage, + O: SideEffectStorage, + E: EncryptionProvider + Send + Sync, { - type DbError = DbError; + type DbError = EventSourcingDbError; /// Returns a stream of domain events. fn stream_from( @@ -113,17 +118,13 @@ where Self::DbError, >, > { - let query = match self.tables.stream_events_query::() { - Some(query) => query.to_string(), - None => { - return futures::stream::iter(vec![Err(DbError::UnregisteredAggregate)]).boxed(); - } - }; - Box::pin(reader_impl::stream_from::<_, T>( + let query = &self.table_config.stream_events_query; + Box::pin(reader_impl::stream_from::<_, T, E>( &mut *self.inner, id, version, - query, + query.clone(), + self.encryption_provider, )) } @@ -133,11 +134,15 @@ where aggregate_id: &T::AggregateId, event_id: &<::DomainEvent as DomainEvent>::EventId, ) -> Result::DomainEvent>>, Self::DbError> { - let query = self - .tables - .get_event_query::() - .ok_or(DbError::UnregisteredAggregate)?; - reader_impl::get_event::<_, T>(&mut *self.inner, aggregate_id, event_id, query).await + let query = &self.table_config.get_event_query; + reader_impl::get_event::<_, T, E>( + &mut *self.inner, + aggregate_id, + event_id, + query, + self.encryption_provider, + ) + .await } /// Returns a snapshot of the aggregate in the database @@ -145,22 +150,21 @@ where &mut self, id: &T::AggregateId, ) -> Result>, Self::DbError> { - let query = self - .tables - .get_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; - reader_impl::get_snapshot::<_, T>(&mut *self.inner, id, query).await + let query = &self.table_config.get_snapshot_query; + reader_impl::get_snapshot::<_, T, E>(&mut *self.inner, id, query, self.encryption_provider) + .await } } #[async_trait] -impl RepositoryWriter for PostgresTransaction<'_, O> +impl RepositoryWriter for PostgresTransaction<'_, T, O, E> where T: Aggregate + 'static + Pickle + Send + Sync, T::SideEffect: SideEffect + Pickle + Send + Sync, T::DomainEvent: DomainEvent + Pickle + Send + Sync, T::ApplyError: Send + Sync, - O: SideEffectStorage, + O: SideEffectStorage, + E: EncryptionProvider + Send + Sync, { /// Stores new domain events to the database async fn store_events( @@ -175,29 +179,35 @@ where let mut events_to_insert: Vec> = Vec::with_capacity(events.len()); let mut created_ats_to_insert: Vec> = Vec::with_capacity(events.len()); - for event in events { - let event_id = *event.id(); - let version = event.version; + for events in events.chunks(self.encryption_provider.max_batch_size()) { + let mut plain = Vec::with_capacity(events.len()); + for event in events { + let event_id = *event.id(); + let version = event.version; - let version = utils::version_to_i64(version)?; + let version = utils::version_to_i64(version)?; - let serialised_event = event - .event - .pickle() - .context("Failed to pickle event") - .map_err(DbError::PicklingError)?; + let serialised_event = event.event.pickle().map_err(DbError::EventPicklingError)?; - event_ids_to_insert.push(event_id); - versions_to_insert.push(version); - aggregate_ids_to_insert.push(*id); - events_to_insert.push(serialised_event); - created_ats_to_insert.push(Utc::now()); + event_ids_to_insert.push(event_id); + versions_to_insert.push(version); + aggregate_ids_to_insert.push(*id); + plain.push(serialised_event); + created_ats_to_insert.push(Utc::now()); + } + let number_of_items = plain.len(); + let mut cipher = self + .encryption_provider + .encrypt(plain) + .await + .map_err(DbError::Encryption)?; + if cipher.len() != number_of_items { + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); + } + events_to_insert.append(&mut cipher); } - let insert_query = self - .tables - .insert_events_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let insert_query = &self.table_config.insert_events_query; let inserted_ids: Result, sqlx::Error> = sqlx::query(insert_query) .bind(&event_ids_to_insert[..]) @@ -220,13 +230,21 @@ where let aggregate = snapshot .aggregate .pickle() - .context("Failed to pickle aggregate") - .map_err(DbError::PicklingError)?; + .map_err(DbError::SnapshotPicklingError)?; + let mut cipher = self + .encryption_provider + .encrypt(vec![aggregate]) + .await + .map_err(DbError::Encryption)? + .into_iter(); + let Some(aggregate) = cipher.next() else { + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); + }; + if cipher.next().is_some() { + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); + } - let upsert_query = self - .tables - .upsert_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let upsert_query = &self.table_config.upsert_snapshot_query; sqlx::query(upsert_query) .bind(aggregated_id) @@ -249,5 +267,6 @@ where self.outbox .store_side_effects(&mut self.inner, outbox_item) .await + .map_err(|e| e.into()) } } diff --git a/eventastic_postgres/tests/common/encryption.rs b/eventastic_postgres/tests/common/encryption.rs new file mode 100644 index 0000000..9caba67 --- /dev/null +++ b/eventastic_postgres/tests/common/encryption.rs @@ -0,0 +1,89 @@ +use async_trait::async_trait; +use eventastic_postgres::EncryptionProvider; + +/// Doesn't actually encrypt just does one XOR with the series 0, 1, 2..255, 0.. +/// to encrypt it then decrypts it by doing the same operation again. (A XOR B) +/// XOR B = A. +#[derive(Clone)] +pub struct TestEncryptionProvider; + +#[async_trait] +impl EncryptionProvider for TestEncryptionProvider { + type Error = TestEncryptionError; + + async fn encrypt(&self, plain: Vec>) -> Result>, Self::Error> { + Ok(plain + .into_iter() + .map(|plain| { + plain + .into_iter() + .enumerate() + .map(|(key, plain)| plain ^ (key as u8)) + .collect() + }) + .collect()) + } + + async fn decrypt(&self, cipher: Vec>) -> Result>, Self::Error> { + Ok(cipher + .into_iter() + .map(|cipher| { + cipher + .into_iter() + .enumerate() + .map(|(key, cipher)| cipher ^ (key as u8)) + .collect() + }) + .collect()) + } + + fn max_batch_size(&self) -> usize { + 42 + } +} + +#[derive(Debug)] +pub struct TestEncryptionError; + +impl std::fmt::Display for TestEncryptionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "TestEncryptionError") + } +} + +impl std::error::Error for TestEncryptionError {} + +#[tokio::test] +async fn encryption_changes_the_data() { + // Arrange + let encryption_provider = TestEncryptionProvider; + let plain = b"Hello, World!"; + + // Act + let cipher = encryption_provider + .encrypt(vec![plain.into()]) + .await + .unwrap(); + + // Assert + assert_eq!(cipher.len(), 1); + assert_ne!(&cipher[0], plain); +} + +#[tokio::test] +async fn encrypt_then_decrypt_returns_original_data() { + // Arrange + let encryption_provider = TestEncryptionProvider; + let plain = b"Hello, World!"; + + // Act + let cipher = encryption_provider + .encrypt(vec![plain.into()]) + .await + .unwrap(); + let decrypted = encryption_provider.decrypt(cipher).await.unwrap(); + + // Assert + assert_eq!(decrypted.len(), 1); + assert_eq!(&decrypted[0], plain); +} diff --git a/eventastic_postgres/tests/common/helpers.rs b/eventastic_postgres/tests/common/helpers.rs index df2087b..f2146cb 100644 --- a/eventastic_postgres/tests/common/helpers.rs +++ b/eventastic_postgres/tests/common/helpers.rs @@ -1,14 +1,18 @@ +use super::encryption::TestEncryptionProvider; use super::test_aggregate::{Account, AccountEvent}; use chrono::{DateTime, Utc}; use eventastic::aggregate::{Context, Root}; use eventastic_outbox_postgres::TableOutbox; -use eventastic_postgres::{Pickle, PostgresRepository, TableRegistryBuilder}; +use eventastic_postgres::{ + EncryptionProvider, NoEncryption, Pickle, PostgresRepository, TableConfig, +}; use sqlx::Row; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; use std::str::FromStr; use uuid::Uuid; -pub async fn get_repository() -> PostgresRepository { +pub async fn get_repository() -> PostgresRepository, NoEncryption> +{ let host = std::env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".to_string()); let connection_string = format!("postgres://postgres:password@{host}/postgres"); let connection_options = PgConnectOptions::from_str(connection_string.as_str()) @@ -16,20 +20,46 @@ pub async fn get_repository() -> PostgresRepository { let pool_options = PoolOptions::default(); - let tables = TableRegistryBuilder::new() - .register_with_tables::("events", "snapshots") - .build(); - - let repo = PostgresRepository::new(connection_options, pool_options, TableOutbox, tables) + let repo = PostgresRepository::new( + connection_options, + pool_options, + TableConfig::new("events", "snapshots"), + TableOutbox::new(NoEncryption), + NoEncryption, + ) + .await + .expect("Failed to connect to postgres"); + repo.run_migrations() .await - .expect("Failed to connect to postgres"); + .expect("Failed to run migrations"); + repo +} + +pub async fn get_encrypted_repository() +-> PostgresRepository, TestEncryptionProvider> { + let host = std::env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".to_string()); + let connection_string = format!("postgres://postgres:password@{host}/postgres"); + let connection_options = PgConnectOptions::from_str(connection_string.as_str()) + .expect("Failed to parse connection options"); + + let pool_options = PoolOptions::default(); + + let repo = PostgresRepository::new( + connection_options, + pool_options, + TableConfig::new("events", "snapshots"), + TableOutbox::new(TestEncryptionProvider), + TestEncryptionProvider, + ) + .await + .expect("Failed to connect to postgres"); repo.run_migrations() .await .expect("Failed to run migrations"); repo } -#[derive(serde::Deserialize, Debug, Clone, serde::Serialize)] +#[derive(Debug, Clone)] pub struct SavedSnapshot { pub version: i64, pub aggregate: Account, @@ -355,6 +385,7 @@ impl AccountBuilder { pub async fn get_side_effect( id: uuid::Uuid, + encryption_provider: impl EncryptionProvider, ) -> Option<(super::test_aggregate::SideEffects, i32, bool)> { let repository = get_repository().await; let transaction = repository @@ -379,8 +410,14 @@ pub async fn get_side_effect( .try_get("requeue") .expect("Failed to get requeue from row"); + let plain = encryption_provider + .decrypt(vec![message_bytes]) + .await + .unwrap(); + assert!(plain.len() == 1); + let plain = &plain[0]; let side_effect: super::test_aggregate::SideEffects = - super::test_aggregate::SideEffects::unpickle(&message_bytes) + super::test_aggregate::SideEffects::unpickle(plain) .expect("Failed to deserialize side effect"); Some((side_effect, retries, requeue)) diff --git a/eventastic_postgres/tests/common/mod.rs b/eventastic_postgres/tests/common/mod.rs index 9ccc4ce..7bf30b2 100644 --- a/eventastic_postgres/tests/common/mod.rs +++ b/eventastic_postgres/tests/common/mod.rs @@ -1,3 +1,6 @@ #![allow(dead_code)] + +pub mod encryption; pub mod helpers; pub mod test_aggregate; +pub mod test_order_aggregate; diff --git a/eventastic_postgres/tests/common/test_order_aggregate.rs b/eventastic_postgres/tests/common/test_order_aggregate.rs new file mode 100644 index 0000000..cf6b6d1 --- /dev/null +++ b/eventastic_postgres/tests/common/test_order_aggregate.rs @@ -0,0 +1,185 @@ +use eventastic::aggregate::Aggregate; +use eventastic::aggregate::SideEffect; +use eventastic::event::DomainEvent; +use serde::Deserialize; +use serde::Serialize; +use thiserror::Error; +use uuid::Uuid; + +// Define our Order aggregate - different from Account +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct Order { + pub order_id: Uuid, + pub customer_id: Uuid, + pub total_amount: i64, + pub status: OrderStatus, +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +pub enum OrderStatus { + Pending, + Confirmed, + Shipped, + Delivered, + Cancelled, +} + +// Define our domain events for Order +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] +pub enum OrderEvent { + Created { + order_id: Uuid, + event_id: Uuid, + customer_id: Uuid, + total_amount: i64, + }, + Confirmed { + event_id: Uuid, + }, + Shipped { + event_id: Uuid, + tracking_number: String, + }, + Delivered { + event_id: Uuid, + }, + Cancelled { + event_id: Uuid, + reason: String, + }, +} + +impl DomainEvent for OrderEvent { + type EventId = Uuid; + fn id(&self) -> &Uuid { + match self { + OrderEvent::Created { event_id, .. } + | OrderEvent::Confirmed { event_id, .. } + | OrderEvent::Shipped { event_id, .. } + | OrderEvent::Delivered { event_id, .. } + | OrderEvent::Cancelled { event_id, .. } => event_id, + } + } +} + +// Define our domain error for Order +#[derive(Error, Debug)] +pub enum OrderDomainError { + #[error("This event can't be applied given the current state of the order")] + InvalidState, + #[error("Order is already in final state")] + AlreadyFinalized, +} + +// Define our side effects for Order - different from Account side effects +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] +pub enum OrderSideEffects { + SendConfirmationEmail { + id: Uuid, + customer_email: String, + order_id: Uuid, + }, + NotifyWarehouse { + id: Uuid, + order_id: Uuid, + items: Vec, + }, + UpdateInventory { + id: Uuid, + product_ids: Vec, + quantities: Vec, + }, +} + +impl SideEffect for OrderSideEffects { + type SideEffectId = Uuid; + + fn id(&self) -> &Self::SideEffectId { + match self { + OrderSideEffects::SendConfirmationEmail { id, .. } + | OrderSideEffects::NotifyWarehouse { id, .. } + | OrderSideEffects::UpdateInventory { id, .. } => id, + } + } +} + +// Implement the aggregate trait for our Order struct +impl Aggregate for Order { + const SNAPSHOT_VERSION: u64 = 1; + + type AggregateId = Uuid; + type DomainEvent = OrderEvent; + type ApplyError = OrderDomainError; + type SideEffect = OrderSideEffects; + + fn aggregate_id(&self) -> &Self::AggregateId { + &self.order_id + } + + fn apply(&mut self, event: &Self::DomainEvent) -> Result<(), Self::ApplyError> { + match event { + OrderEvent::Confirmed { .. } => { + if self.status != OrderStatus::Pending { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Confirmed; + } + OrderEvent::Shipped { .. } => { + if self.status != OrderStatus::Confirmed { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Shipped; + } + OrderEvent::Delivered { .. } => { + if self.status != OrderStatus::Shipped { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Delivered; + } + OrderEvent::Cancelled { .. } => { + if matches!(self.status, OrderStatus::Delivered | OrderStatus::Cancelled) { + return Err(Self::ApplyError::AlreadyFinalized); + } + self.status = OrderStatus::Cancelled; + } + OrderEvent::Created { .. } => return Err(Self::ApplyError::InvalidState), + } + Ok(()) + } + + fn apply_new(event: &Self::DomainEvent) -> Result { + match event { + OrderEvent::Created { + order_id, + customer_id, + total_amount, + .. + } => Ok(Self { + order_id: *order_id, + customer_id: *customer_id, + total_amount: *total_amount, + status: OrderStatus::Pending, + }), + _ => Err(Self::ApplyError::InvalidState), + } + } + + fn side_effects(&self, event: &Self::DomainEvent) -> Option> { + let side_effect = match event { + OrderEvent::Created { event_id, .. } => Some(OrderSideEffects::SendConfirmationEmail { + id: *event_id, + customer_email: "customer@example.com".to_string(), + order_id: self.order_id, + }), + OrderEvent::Confirmed { event_id } => Some(OrderSideEffects::NotifyWarehouse { + id: *event_id, + order_id: self.order_id, + items: vec!["item1".to_string(), "item2".to_string()], + }), + OrderEvent::Shipped { .. } + | OrderEvent::Delivered { .. } + | OrderEvent::Cancelled { .. } => None, + }; + side_effect.map(|s| vec![s]) + } +} diff --git a/eventastic_postgres/tests/encryption.rs b/eventastic_postgres/tests/encryption.rs new file mode 100644 index 0000000..7e2a9c5 --- /dev/null +++ b/eventastic_postgres/tests/encryption.rs @@ -0,0 +1,304 @@ +use common::{ + encryption::TestEncryptionProvider, + helpers::{AccountBuilder, get_encrypted_repository, get_repository, get_side_effect}, + test_aggregate::{Account, AccountEvent, SideEffects}, +}; +use eventastic::aggregate::{Context, Root}; +use eventastic::repository::RepositoryReader; +use eventastic_outbox_postgres::TableOutbox; +use eventastic_postgres::{DbError, NoEncryption, PostgresRepository}; +use futures::StreamExt; +use uuid::Uuid; + +mod common; + +#[tokio::test] +async fn when_encryption_is_enabled_aggregate_can_be_saved_and_loaded() { + // Arrange + let repository = get_encrypted_repository().await; + let mut account = AccountBuilder::new().build(); + let account_id = account.state().account_id; + + let mut transaction = repository + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Act + account + .save(&mut transaction) + .await + .expect("Failed to commit transaction"); + let created_account = account.state(); + transaction + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert + let loaded_account = load_encrypted_account(account_id).await; + let loaded_account = loaded_account.state(); + + assert_eq!(created_account, loaded_account); +} + +#[tokio::test] +async fn when_encryption_is_enabled_events_can_be_saved_and_loaded_by_id() { + // Arrange + let repository = get_encrypted_repository().await; + + let account_id = Uuid::new_v4(); + let event_id = Uuid::new_v4(); + let email = "test@example.com".to_string(); + let starting_balance = 100; + let open_event = AccountEvent::Open { + account_id, + event_id, + email: email.clone(), + starting_balance, + }; + + let mut account = Account::record_new(open_event.clone()).unwrap(); + + let mut transaction = repository + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Act + account + .save(&mut transaction) + .await + .expect("Failed to commit transaction"); + transaction + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert + let mut repository = get_encrypted_repository().await; + let result = , + TestEncryptionProvider, + > as RepositoryReader>::get_event( + &mut repository, &account_id, &event_id + ) + .await; + assert!(matches!(result, Ok(Some(e)) if e.event == open_event)); +} + +#[tokio::test] +async fn when_encryption_is_enabled_events_cannot_be_loaded_by_id_without_encryption() { + // Arrange + let repository = get_encrypted_repository().await; + + let account_id = Uuid::new_v4(); + let event_id = Uuid::new_v4(); + let email = "test@example.com".to_string(); + let starting_balance = 100; + let open_event = AccountEvent::Open { + account_id, + event_id, + email: email.clone(), + starting_balance, + }; + + let mut account = Account::record_new(open_event).unwrap(); + + let mut transaction = repository + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Act + account + .save(&mut transaction) + .await + .expect("Failed to commit transaction"); + transaction + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert + let mut repository = get_repository().await; + let result = + , NoEncryption> as RepositoryReader< + Account, + >>::get_event(&mut repository, &account_id, &event_id) + .await; + assert!(matches!(result, Err(DbError::EventPicklingError(_)))); +} + +#[tokio::test] +async fn when_encryption_is_enabled_events_can_be_saved_and_loaded() { + // Arrange + let repository = get_encrypted_repository().await; + let mut account = AccountBuilder::new().build(); + let account_id = account.state().account_id; + + let mut transaction = repository + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Act + account + .save(&mut transaction) + .await + .expect("Failed to commit transaction"); + transaction + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert + let mut repository = get_encrypted_repository().await; + let mut events = , + TestEncryptionProvider, + > as RepositoryReader>::stream_from( + &mut repository, &account_id, 0 + ); + while let Some(event) = events.next().await { + assert!(event.is_ok()); + } +} + +#[tokio::test] +async fn when_encryption_is_enabled_events_cannot_be_loaded_without_encryption() { + // Arrange + let repository = get_encrypted_repository().await; + let mut account = AccountBuilder::new().build(); + let account_id = account.state().account_id; + + let mut transaction = repository + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Act + account + .save(&mut transaction) + .await + .expect("Failed to commit transaction"); + transaction + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert + let mut repository = get_repository().await; + let mut events = + , NoEncryption> as RepositoryReader< + Account, + >>::stream_from(&mut repository, &account_id, 0); + while let Some(event) = events.next().await { + assert!(matches!( + event, + Err(eventastic_postgres::DbError::EventPicklingError(_)) + )); + } +} + +#[tokio::test] +async fn when_encryption_is_enabled_aggregate_cannot_be_loaded_without_encryption() { + // Arrange + let repository = get_encrypted_repository().await; + let mut account = AccountBuilder::new().build(); + let account_id = account.state().account_id; + + let mut transaction = repository + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Act + account + .save(&mut transaction) + .await + .expect("Failed to commit transaction"); + transaction + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert + let repository = get_repository().await; + let mut transaction = repository.begin_transaction().await.unwrap(); + + assert!(matches!( + transaction.get(&account_id).await, + Err(eventastic::repository::RepositoryError::Repository( + eventastic_postgres::DbError::SnapshotPicklingError(_) + )), + )); +} + +#[tokio::test] +async fn when_encryption_is_enabled_side_effect_can_be_saved_and_loaded() { + // Arrange + let repository = get_encrypted_repository().await; + + let account_id = Uuid::new_v4(); + let event_id = Uuid::new_v4(); + let email = "test@example.com".to_string(); + let starting_balance = 100; + let open_event = AccountEvent::Open { + account_id, + event_id, + email: email.clone(), + starting_balance, + }; + + let mut account = Account::record_new(open_event).unwrap(); + + let mut transaction = repository + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Act + account + .save(&mut transaction) + .await + .expect("Failed to commit transaction"); + transaction + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert + let (side_effect, _retries, _requeue) = get_side_effect(event_id, TestEncryptionProvider) + .await + .expect("Side effect should be stored in outbox table"); + match side_effect { + SideEffects::SendEmail { + id: side_effect_id, + address, + content, + } => { + assert_eq!(side_effect_id, event_id); + assert_eq!(address, email); + assert!(content.contains(&account_id.to_string())); + assert!(content.contains(&starting_balance.to_string())); + } + _ => panic!("Expected SendEmail side effect"), + } +} + +async fn load_encrypted_account(account_id: Uuid) -> Context { + let repository = get_encrypted_repository().await; + + let mut transaction = repository + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + transaction + .get(&account_id) + .await + .expect("Failed to encrypted load account") +} diff --git a/eventastic_postgres/tests/multi_aggregate.rs b/eventastic_postgres/tests/multi_aggregate.rs new file mode 100644 index 0000000..b26f206 --- /dev/null +++ b/eventastic_postgres/tests/multi_aggregate.rs @@ -0,0 +1,300 @@ +mod common; + +use common::helpers::get_repository; +use common::test_aggregate::{Account, AccountEvent}; +use common::test_order_aggregate::{Order, OrderEvent, OrderStatus}; +use eventastic::aggregate::Root; +use eventastic_outbox_postgres::TableOutbox; +use eventastic_postgres::PostgresRepository; +use eventastic_postgres::{NoEncryption, TableConfig}; +use sqlx::pool::PoolOptions; +use sqlx::postgres::PgConnectOptions; +use std::str::FromStr; +use uuid::Uuid; + +// Helper function to get an order repository using the same pool +async fn get_order_repository() -> PostgresRepository, NoEncryption> +{ + let host = std::env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".to_string()); + let connection_string = format!("postgres://postgres:password@{host}/postgres"); + let connection_options = + PgConnectOptions::from_str(&connection_string).expect("Failed to parse connection options"); + + let pool_options = PoolOptions::default(); + + PostgresRepository::new( + connection_options, + pool_options, + TableConfig::new("events", "snapshots"), + TableOutbox::new(NoEncryption), + NoEncryption, + ) + .await + .expect("Failed to connect to postgres") +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_commit_test() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create and store account + let account_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_event).expect("Failed to create account"); + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and store order + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Commit the transaction + order_tx + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert - verify both aggregates were saved + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let loaded_account = account_load_tx + .get(&account_id) + .await + .expect("Failed to load account"); + assert_eq!(loaded_account.state().account_id, account_id); + assert_eq!(loaded_account.state().balance, 1000); + account_load_tx + .commit() + .await + .expect("Failed to commit load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let loaded_order = order_load_tx + .get(&order_id) + .await + .expect("Failed to load order"); + assert_eq!(loaded_order.state().order_id, order_id); + assert_eq!(loaded_order.state().total_amount, 500); + assert_eq!(loaded_order.state().status, OrderStatus::Pending); + order_load_tx + .commit() + .await + .expect("Failed to commit order load transaction"); +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_rollback_test() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create and store account + let account_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_event).expect("Failed to create account"); + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and store order + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Rollback the transaction instead of committing + order_tx + .rollback() + .await + .expect("Failed to rollback transaction"); + + // Assert - verify neither aggregate was saved + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let account_result = account_load_tx.get(&account_id).await; + assert!( + account_result.is_err(), + "Account should not exist after rollback" + ); + account_load_tx + .rollback() + .await + .expect("Failed to rollback load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let order_result = order_load_tx.get(&order_id).await; + assert!( + order_result.is_err(), + "Order should not exist after rollback" + ); + order_load_tx + .rollback() + .await + .expect("Failed to rollback order load transaction"); +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_with_mixed_side_effects() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create account and add money (generates side effects) + let account_open_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_open_event).expect("Failed to create account"); + + let add_event = AccountEvent::Add { + event_id: Uuid::new_v4(), + amount: 500, + }; + account + .record_that(add_event) + .expect("Failed to add money to account"); + + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and confirm order (generates different side effects) + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + + let confirm_event = OrderEvent::Confirmed { + event_id: Uuid::new_v4(), + }; + order + .record_that(confirm_event) + .expect("Failed to confirm order"); + + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Commit the transaction + order_tx + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert - verify both aggregates were saved with correct states + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let loaded_account = account_load_tx + .get(&account_id) + .await + .expect("Failed to load account"); + assert_eq!(loaded_account.state().balance, 1500); // 1000 + 500 + account_load_tx + .commit() + .await + .expect("Failed to commit load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let loaded_order = order_load_tx + .get(&order_id) + .await + .expect("Failed to load order"); + assert_eq!(loaded_order.state().status, OrderStatus::Confirmed); + order_load_tx + .commit() + .await + .expect("Failed to commit order load transaction"); +} diff --git a/eventastic_postgres/tests/side_effect.rs b/eventastic_postgres/tests/side_effect.rs index 907f479..f73ef61 100644 --- a/eventastic_postgres/tests/side_effect.rs +++ b/eventastic_postgres/tests/side_effect.rs @@ -3,6 +3,7 @@ mod common; use common::helpers::{get_repository, get_side_effect}; use common::test_aggregate::{Account, AccountEvent, SideEffects}; use eventastic::aggregate::Root; +use eventastic_postgres::NoEncryption; use uuid::Uuid; #[tokio::test] @@ -43,7 +44,7 @@ async fn side_effect_is_correctly_stored() { .expect("Failed to commit transaction"); // Assert - Verify the side effect was stored in the outbox table - let (side_effect, retries, requeue) = get_side_effect(event_id) + let (side_effect, retries, requeue) = get_side_effect(event_id, NoEncryption) .await .expect("Side effect should be stored in outbox table"); @@ -118,12 +119,12 @@ async fn multiple_side_effects_are_stored_correctly() { // Assert - Verify both side effects were stored // Check for the Open event's side effect - get_side_effect(open_event_id) + get_side_effect(open_event_id, NoEncryption) .await .expect("Open event side effect should be stored"); // Check for the Add event's side effect - let (side_effect, _, _) = get_side_effect(add_event_id) + let (side_effect, _, _) = get_side_effect(add_event_id, NoEncryption) .await .expect("Add event side effect should be stored"); diff --git a/examples/bank/Cargo.toml b/examples/bank/Cargo.toml index e64ff49..8a2f0bc 100644 --- a/examples/bank/Cargo.toml +++ b/examples/bank/Cargo.toml @@ -7,12 +7,11 @@ edition = "2024" [dependencies] eventastic = { path = "../../eventastic" } -eventastic_postgres = { path = "../../eventastic_postgres" } +eventastic_postgres = { path = "../../eventastic_postgres", features = ["serde"] } eventastic_outbox_postgres = { path = "../../eventastic_outbox_postgres" } thiserror = { workspace = true } uuid = { workspace = true } tokio = { workspace = true } sqlx = { workspace = true } -anyhow = { workspace = true } serde = { workspace = true } async-trait = { workspace = true } diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 5b653e7..b1b7d31 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -9,7 +9,7 @@ use eventastic::aggregate::SideEffect; use eventastic::event::DomainEvent; use eventastic::repository::Repository; use eventastic_outbox_postgres::{RepositoryOutboxExt, SideEffectHandler, TableOutbox}; -use eventastic_postgres::{PostgresRepository, RootExt}; +use eventastic_postgres::{NoEncryption, PostgresRepository, RootExt, TableConfig}; use serde::Deserialize; use serde::Serialize; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; @@ -17,7 +17,7 @@ use thiserror::Error; use uuid::Uuid; #[tokio::main] -async fn main() -> Result<(), anyhow::Error> { +async fn main() -> Result<(), Box> { // Setup postgres repo let repository = get_repository().await; @@ -386,17 +386,19 @@ impl Aggregate for Account { // Using the default outbox implementation // You can also implement your own outbox handler by implementing the `SideEffectStorage` trait -async fn get_repository() -> PostgresRepository { +async fn get_repository() -> PostgresRepository, NoEncryption> { let connection_options = PgConnectOptions::from_str("postgres://postgres:password@localhost/postgres").unwrap(); let pool_options = PoolOptions::default(); - let tables = eventastic_postgres::TableRegistryBuilder::new() - .register_with_tables::("events", "snapshots") - .build(); - - PostgresRepository::new(connection_options, pool_options, TableOutbox, tables) - .await - .unwrap() + PostgresRepository::new( + connection_options, + pool_options, + TableConfig::new("events", "snapshots"), + TableOutbox::new(NoEncryption), + NoEncryption, + ) + .await + .unwrap() }