From a718335b87bd749bac8f72198a4fb6c5f0045c6e Mon Sep 17 00:00:00 2001 From: Jonathan Donaldson Date: Wed, 4 Jun 2025 00:33:49 +0100 Subject: [PATCH 1/6] Add transactional outbox trait and example --- Cargo.toml | 2 +- eventastic_outbox_postgres/Cargo.toml | 13 +++++++ eventastic_outbox_postgres/src/lib.rs | 51 ++++++++++++++++++++++++ eventastic_postgres/src/lib.rs | 20 ++++++++-- eventastic_postgres/src/repository.rs | 19 ++++++--- eventastic_postgres/src/transaction.rs | 54 +++++++++++--------------- examples/bank/Cargo.toml | 1 + examples/bank/src/main.rs | 6 +-- 8 files changed, 121 insertions(+), 45 deletions(-) create mode 100644 eventastic_outbox_postgres/Cargo.toml create mode 100644 eventastic_outbox_postgres/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index c5caa3d..718f72c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["eventastic", "eventastic_postgres", "examples/*"] +members = ["eventastic", "eventastic_outbox_postgres", "eventastic_postgres", "examples/*"] [workspace.package] license = "MIT" diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml new file mode 100644 index 0000000..b037a33 --- /dev/null +++ b/eventastic_outbox_postgres/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "eventastic_outbox_postgres" +version = "0.1.0" +edition = "2024" + +[dependencies] +eventastic_postgres = { path = "../eventastic_postgres" } +async-trait = { workspace = true } +serde_json = { workspace = true } +sqlx = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } +serde = { workspace = true } diff --git a/eventastic_outbox_postgres/src/lib.rs b/eventastic_outbox_postgres/src/lib.rs new file mode 100644 index 0000000..7ff803b --- /dev/null +++ b/eventastic_outbox_postgres/src/lib.rs @@ -0,0 +1,51 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use eventastic_postgres::{DbError, TransactionalOutbox}; +use sqlx::{Postgres, Transaction}; +use sqlx::types::Uuid; + +/// Default implementation of [`TransactionalOutbox`] that stores messages in an `outbox` table. +#[derive(Clone, Copy, Default)] +pub struct TableOutbox; + +#[async_trait] +impl TransactionalOutbox for TableOutbox { + async fn store_side_effects( + &self, + transaction: &mut Transaction<'_, Postgres>, + items: Vec<(Uuid, serde_json::Value)>, + ) -> Result<(), DbError> { + let mut ids: Vec = Vec::with_capacity(items.len()); + let mut messages: Vec = Vec::with_capacity(items.len()); + let mut retries: Vec = Vec::with_capacity(items.len()); + let mut requeues: Vec = Vec::with_capacity(items.len()); + let mut created_ats: Vec> = Vec::with_capacity(items.len()); + + for (id, msg) in items { + ids.push(id); + messages.push(msg); + retries.push(0); + requeues.push(true); + created_ats.push(Utc::now()); + } + + sqlx::query( + "INSERT INTO outbox(id, message, retries, requeue, created_at) + SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[]) + ON CONFLICT (id) DO UPDATE SET + message = excluded.message, + retries = excluded.retries, + requeue = excluded.requeue, + created_at = excluded.created_at", + ) + .bind(&ids) + .bind(&messages) + .bind(&retries) + .bind(&requeues) + .bind(&created_ats) + .execute(transaction.as_mut()) + .await?; + + Ok(()) + } +} diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 2491669..6783351 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -5,11 +5,12 @@ use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, repository::RepositoryError, }; +use sqlx::{Postgres, Transaction}; pub use repository::PostgresRepository; +pub use transaction::PostgresTransaction; use serde::{Serialize, de::DeserializeOwned}; use sqlx::types::Uuid; use thiserror::Error; -pub use transaction::PostgresTransaction; #[derive(Error, Debug)] pub enum DbError { @@ -39,7 +40,16 @@ impl From for DbError { } #[async_trait] -pub trait RootExt +pub trait TransactionalOutbox: Send + Sync { + async fn store_side_effects( + &self, + transaction: &mut Transaction<'_, Postgres>, + items: Vec<(Uuid, serde_json::Value)>, + ) -> Result<(), DbError>; +} + +#[async_trait] +pub trait RootExt where S: SideEffect + Serialize + Send + Sync + 'static, T: Aggregate @@ -49,16 +59,17 @@ where + Sync + 'static, ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, + O: TransactionalOutbox + Send + Sync, { async fn load( - transaction: &mut PostgresTransaction<'_>, + transaction: &mut PostgresTransaction<'_, O>, aggregate_id: Uuid, ) -> Result, RepositoryError> { Context::load(transaction, &aggregate_id).await } } -impl RootExt for T +impl RootExt for T where S: SideEffect + Serialize + Send + Sync + 'static, T: Aggregate @@ -68,5 +79,6 @@ where + Sync + 'static, ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, + O: TransactionalOutbox + Send + Sync, { } diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index b66b496..eade1f9 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -1,28 +1,37 @@ -use crate::PostgresTransaction; +use crate::{PostgresTransaction, TransactionalOutbox}; use sqlx::{ Pool, Postgres, postgres::{PgConnectOptions, PgPoolOptions}, }; #[derive(Clone)] -pub struct PostgresRepository { +pub struct PostgresRepository +where + O: TransactionalOutbox + Clone, +{ pub(crate) inner: Pool, + pub(crate) outbox: O, } -impl PostgresRepository { +impl PostgresRepository +where + O: TransactionalOutbox + Clone, +{ pub async fn new( connect_options: PgConnectOptions, pool_options: PgPoolOptions, + outbox: O, ) -> Result { let pool = pool_options.connect_with(connect_options).await?; - Ok(Self { inner: pool }) + Ok(Self { inner: pool, outbox }) } /// Start a new transaction using the default isolation level - pub async fn begin_transaction(&self) -> Result, sqlx::Error> { + pub async fn begin_transaction(&self) -> Result, sqlx::Error> { Ok(PostgresTransaction { inner: self.inner.begin().await?, + outbox: &self.outbox, }) } diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 64e1ce9..8a85b59 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use crate::DbError; +use crate::{DbError, TransactionalOutbox}; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; @@ -20,11 +20,18 @@ use sqlx::query_as; use sqlx::types::JsonValue; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; -pub struct PostgresTransaction<'a> { +pub struct PostgresTransaction<'a, O> +where + O: TransactionalOutbox, +{ pub(crate) inner: Transaction<'a, Postgres>, + pub(crate) outbox: &'a O, } -impl<'a> PostgresTransaction<'a> { +impl<'a, O> PostgresTransaction<'a, O> +where + O: TransactionalOutbox, +{ /// Commit the transaction to the db. pub async fn commit(self) -> Result<(), DbError> { Ok(self.inner.commit().await?) @@ -81,8 +88,9 @@ where } #[async_trait] -impl<'a, S, T> RepositoryTransaction for PostgresTransaction<'a> +impl<'a, O, S, T> RepositoryTransaction for PostgresTransaction<'a, O> where + O: TransactionalOutbox, S: SideEffect + 'a + Serialize + Send + Sync, T: Aggregate + 'a @@ -241,37 +249,19 @@ where &mut self, outbox_item: Vec, ) -> Result<(), Self::DbError> { - let mut ids: Vec = Vec::with_capacity(outbox_item.len()); - let mut messages: Vec = Vec::with_capacity(outbox_item.len()); - let mut retries: Vec = Vec::with_capacity(outbox_item.len()); - let mut requeues: Vec = Vec::with_capacity(outbox_item.len()); - let mut created_ats: Vec> = Vec::with_capacity(outbox_item.len()); + let mut items: Vec<(Uuid, serde_json::Value)> = Vec::with_capacity(outbox_item.len()); for item in outbox_item { - ids.push(*item.id()); - messages.push(serde_json::to_value(item).map_err(DbError::SerializationError)?); - retries.push(0); - requeues.push(true); - created_ats.push(Utc::now()); + items.push(( + *item.id(), + serde_json::to_value(item).map_err(DbError::SerializationError)?, + )); } - sqlx::query( - "INSERT INTO outbox(id, message, retries, requeue, created_at) - SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[]) - ON CONFLICT (id) DO UPDATE SET - message = excluded.message, - retries = excluded.retries, - requeue = excluded.requeue, - created_at = excluded.created_at", - ) - .bind(&ids) - .bind(&messages) - .bind(&retries) - .bind(&requeues) - .bind(&created_ats) - .execute(&mut *self.inner) - .await?; - - Ok(()) + self + .outbox + .store_side_effects(&mut self.inner, items) + .await + } } diff --git a/examples/bank/Cargo.toml b/examples/bank/Cargo.toml index 3c4f26b..e64ff49 100644 --- a/examples/bank/Cargo.toml +++ b/examples/bank/Cargo.toml @@ -8,6 +8,7 @@ edition = "2024" [dependencies] eventastic = { path = "../../eventastic" } eventastic_postgres = { path = "../../eventastic_postgres" } +eventastic_outbox_postgres = { path = "../../eventastic_outbox_postgres" } thiserror = { workspace = true } uuid = { workspace = true } tokio = { workspace = true } diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 6ca2f89..e62a637 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -8,8 +8,8 @@ use eventastic::aggregate::SaveError; use eventastic::aggregate::SideEffect; use eventastic::event::Event; use eventastic_postgres::PostgresRepository; - use eventastic_postgres::RootExt; +use eventastic_outbox_postgres::TableOutbox; use serde::Deserialize; use serde::Serialize; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; @@ -298,13 +298,13 @@ impl Aggregate for Account { } } -async fn get_repository() -> PostgresRepository { +async fn get_repository() -> PostgresRepository { let connection_options = PgConnectOptions::from_str("postgres://postgres:password@localhost/postgres").unwrap(); let pool_options = PoolOptions::default(); - PostgresRepository::new(connection_options, pool_options) + PostgresRepository::new(connection_options, pool_options, TableOutbox) .await .unwrap() } From 56c6cbad228a0fd41deb9bc7a0ec1638318ab154 Mon Sep 17 00:00:00 2001 From: Jonathan Donaldson Date: Wed, 4 Jun 2025 00:44:25 +0100 Subject: [PATCH 2/6] Rename outbox trait --- eventastic_outbox_postgres/src/lib.rs | 6 +++--- eventastic_postgres/Cargo.toml | 1 + eventastic_postgres/src/lib.rs | 6 +++--- eventastic_postgres/src/repository.rs | 6 +++--- eventastic_postgres/src/transaction.rs | 8 ++++---- eventastic_postgres/tests/common/helpers.rs | 5 +++-- 6 files changed, 17 insertions(+), 15 deletions(-) diff --git a/eventastic_outbox_postgres/src/lib.rs b/eventastic_outbox_postgres/src/lib.rs index 7ff803b..58888f2 100644 --- a/eventastic_outbox_postgres/src/lib.rs +++ b/eventastic_outbox_postgres/src/lib.rs @@ -1,15 +1,15 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use eventastic_postgres::{DbError, TransactionalOutbox}; +use eventastic_postgres::{DbError, SideEffectStorage}; use sqlx::{Postgres, Transaction}; use sqlx::types::Uuid; -/// Default implementation of [`TransactionalOutbox`] that stores messages in an `outbox` table. +/// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table. #[derive(Clone, Copy, Default)] pub struct TableOutbox; #[async_trait] -impl TransactionalOutbox for TableOutbox { +impl SideEffectStorage for TableOutbox { async fn store_side_effects( &self, transaction: &mut Transaction<'_, Postgres>, diff --git a/eventastic_postgres/Cargo.toml b/eventastic_postgres/Cargo.toml index 81b610f..8f9396f 100644 --- a/eventastic_postgres/Cargo.toml +++ b/eventastic_postgres/Cargo.toml @@ -24,3 +24,4 @@ thiserror = { workspace = true } [dev-dependencies] uuid = { workspace = true } +eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" } diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 6783351..15ef413 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -40,7 +40,7 @@ impl From for DbError { } #[async_trait] -pub trait TransactionalOutbox: Send + Sync { +pub trait SideEffectStorage: Send + Sync { async fn store_side_effects( &self, transaction: &mut Transaction<'_, Postgres>, @@ -59,7 +59,7 @@ where + Sync + 'static, ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, - O: TransactionalOutbox + Send + Sync, + O: SideEffectStorage + Send + Sync, { async fn load( transaction: &mut PostgresTransaction<'_, O>, @@ -79,6 +79,6 @@ where + Sync + 'static, ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, - O: TransactionalOutbox + Send + Sync, + O: SideEffectStorage + Send + Sync, { } diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index eade1f9..56d3521 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -1,4 +1,4 @@ -use crate::{PostgresTransaction, TransactionalOutbox}; +use crate::{PostgresTransaction, SideEffectStorage}; use sqlx::{ Pool, Postgres, postgres::{PgConnectOptions, PgPoolOptions}, @@ -7,7 +7,7 @@ use sqlx::{ #[derive(Clone)] pub struct PostgresRepository where - O: TransactionalOutbox + Clone, + O: SideEffectStorage + Clone, { pub(crate) inner: Pool, pub(crate) outbox: O, @@ -15,7 +15,7 @@ where impl PostgresRepository where - O: TransactionalOutbox + Clone, + O: SideEffectStorage + Clone, { pub async fn new( connect_options: PgConnectOptions, diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 8a85b59..8066dce 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use crate::{DbError, TransactionalOutbox}; +use crate::{DbError, SideEffectStorage}; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; @@ -22,7 +22,7 @@ use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; pub struct PostgresTransaction<'a, O> where - O: TransactionalOutbox, + O: SideEffectStorage, { pub(crate) inner: Transaction<'a, Postgres>, pub(crate) outbox: &'a O, @@ -30,7 +30,7 @@ where impl<'a, O> PostgresTransaction<'a, O> where - O: TransactionalOutbox, + O: SideEffectStorage, { /// Commit the transaction to the db. pub async fn commit(self) -> Result<(), DbError> { @@ -90,7 +90,7 @@ where #[async_trait] impl<'a, O, S, T> RepositoryTransaction for PostgresTransaction<'a, O> where - O: TransactionalOutbox, + O: SideEffectStorage, S: SideEffect + 'a + Serialize + Send + Sync, T: Aggregate + 'a diff --git a/eventastic_postgres/tests/common/helpers.rs b/eventastic_postgres/tests/common/helpers.rs index 5e156bc..2f39a65 100644 --- a/eventastic_postgres/tests/common/helpers.rs +++ b/eventastic_postgres/tests/common/helpers.rs @@ -2,12 +2,13 @@ use super::test_aggregate::{Account, AccountEvent}; use chrono::{DateTime, Utc}; use eventastic::aggregate::{Context, Root}; use eventastic_postgres::PostgresRepository; +use eventastic_outbox_postgres::TableOutbox; use sqlx::Row; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; use std::str::FromStr; use uuid::Uuid; -pub async fn get_repository() -> PostgresRepository { +pub async fn get_repository() -> PostgresRepository { let host = std::env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".to_string()); let connection_string = format!("postgres://postgres:password@{host}/postgres"); let connection_options = PgConnectOptions::from_str(connection_string.as_str()) @@ -15,7 +16,7 @@ pub async fn get_repository() -> PostgresRepository { let pool_options = PoolOptions::default(); - let repo = PostgresRepository::new(connection_options, pool_options) + let repo = PostgresRepository::new(connection_options, pool_options, TableOutbox) .await .expect("Failed to connect to postgres"); repo.run_migrations() From 05dd5d1548e20867d8777e606795e3a65fab3386 Mon Sep 17 00:00:00 2001 From: Jonathan Donaldson Date: Wed, 4 Jun 2025 01:06:17 +0100 Subject: [PATCH 3/6] Implement postgres outbox processing --- eventastic_outbox_postgres/Cargo.toml | 3 + eventastic_outbox_postgres/src/lib.rs | 131 ++++++++++++++++++++++++- eventastic_postgres/src/lib.rs | 2 + eventastic_postgres/src/outbox.rs | 32 ++++++ eventastic_postgres/src/transaction.rs | 68 +++++++++++++ examples/bank/src/main.rs | 31 +++++- 6 files changed, 264 insertions(+), 3 deletions(-) create mode 100644 eventastic_postgres/src/outbox.rs diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml index b037a33..6fbcbfe 100644 --- a/eventastic_outbox_postgres/Cargo.toml +++ b/eventastic_outbox_postgres/Cargo.toml @@ -11,3 +11,6 @@ sqlx = { workspace = true } chrono = { workspace = true } uuid = { workspace = true } serde = { workspace = true } +eventastic = { path = "../eventastic" } +futures-util = { workspace = true } +tokio = { workspace = true } diff --git a/eventastic_outbox_postgres/src/lib.rs b/eventastic_outbox_postgres/src/lib.rs index 58888f2..ebd5f78 100644 --- a/eventastic_outbox_postgres/src/lib.rs +++ b/eventastic_outbox_postgres/src/lib.rs @@ -1,8 +1,11 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use eventastic_postgres::{DbError, SideEffectStorage}; -use sqlx::{Postgres, Transaction}; +use eventastic::aggregate::SideEffect; +use eventastic_postgres::{DbError, PostgresRepository, SideEffectStorage}; +use serde::de::DeserializeOwned; use sqlx::types::Uuid; +use sqlx::{Postgres, Transaction}; +use std::sync::Arc; /// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table. #[derive(Clone, Copy, Default)] @@ -49,3 +52,127 @@ impl SideEffectStorage for TableOutbox { Ok(()) } } + +/// A message retrieved from the outbox table. +#[derive(Debug, Clone)] +pub struct OutboxMessage +where + T: SideEffect, +{ + /// The stored side effect. + pub message: T, + /// The amount of times this message has been retried. + pub(crate) retries: u16, + /// Whether the message should be requeued on failure. + pub requeue: bool, +} + +impl OutboxMessage +where + T: SideEffect, +{ + pub fn new(message: T, retries: u16, requeue: bool) -> Self { + Self { + message, + retries, + requeue, + } + } + + /// Returns the retry count for this message. + pub fn retries(&self) -> u16 { + self.retries + } +} + +/// Trait used to handle side effects pulled from the outbox. +#[async_trait] +pub trait SideEffectHandler { + type SideEffect: SideEffect; + type Error: Send; + + /// Handle a side effect message. + /// + /// Returning `Ok(())` deletes the message from the outbox. Returning + /// `Err((true, E))` requeues the message. Returning `Err((false, E))` + /// leaves the message without requeuing. + async fn handle( + &self, + msg: &Self::SideEffect, + retries: u16, + ) -> Result<(), (bool, Self::Error)>; +} + + +/// Extension trait for running the outbox worker using a [`TableOutbox`]. +#[async_trait] +pub trait RepositoryOutboxExt { + async fn start_outbox( + &self, + handler: H, + poll_interval: std::time::Duration, + ) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned + Send + Sync, + T::Id: Clone + Send, + H: SideEffectHandler + Send + Sync, + for<'sql> T::Id: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin; +} + +#[async_trait] +impl RepositoryOutboxExt for PostgresRepository { + async fn start_outbox( + &self, + handler: H, + poll_interval: std::time::Duration, + ) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned + Send + Sync, + T::Id: Clone + Send, + H: SideEffectHandler + Send + Sync, + for<'sql> T::Id: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, + { + let handler = Arc::new(handler); + loop { + let deadline = std::time::Instant::now() + poll_interval; + let _ = process_outbox_batch::(self, handler.clone()).await; + tokio::time::sleep_until(deadline.into()).await; + } + } +} + +async fn process_outbox_batch( + repo: &PostgresRepository, + handler: Arc, +) -> Result<(), DbError> +where + T: SideEffect + DeserializeOwned + Send + Sync, + T::Id: Clone + Send, + H: SideEffectHandler + Send + Sync, + for<'sql> T::Id: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + let mut tx = repo.begin_transaction().await?; + + let outbox_items = tx.get_outbox_batch::().await?; + + for mut item in outbox_items { + let id = item.message.id().clone(); + + match handler.handle(&item.message, item.retries).await { + Ok(()) => { + tx.delete_outbox_item(id).await?; + } + Err((requeue, _)) => { + item.retries += 1; + item.requeue = requeue; + tx.update_outbox_item(item).await?; + } + } + } + + tx.commit().await +} + diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 15ef413..1731e1d 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,5 +1,6 @@ mod repository; mod transaction; +mod outbox; use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, @@ -8,6 +9,7 @@ use eventastic::{ use sqlx::{Postgres, Transaction}; pub use repository::PostgresRepository; pub use transaction::PostgresTransaction; +pub use outbox::OutboxMessage; use serde::{Serialize, de::DeserializeOwned}; use sqlx::types::Uuid; use thiserror::Error; diff --git a/eventastic_postgres/src/outbox.rs b/eventastic_postgres/src/outbox.rs new file mode 100644 index 0000000..6fcc2e2 --- /dev/null +++ b/eventastic_postgres/src/outbox.rs @@ -0,0 +1,32 @@ +use eventastic::aggregate::SideEffect; + +/// Message stored in the transactional outbox. +#[derive(Debug, Clone)] +pub struct OutboxMessage +where + T: SideEffect, +{ + /// The side effect payload. + pub message: T, + pub retries: u16, + /// Whether the message should be requeued on failure. + pub requeue: bool, +} + +impl OutboxMessage +where + T: SideEffect, +{ + pub fn new(message: T, retries: u16, requeue: bool) -> Self { + Self { + message, + retries, + requeue, + } + } + + /// Returns the retry count for this message. + pub fn retries(&self) -> u16 { + self.retries + } +} diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 8066dce..928fc5d 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -20,6 +20,7 @@ use sqlx::query_as; use sqlx::types::JsonValue; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; +use crate::OutboxMessage; pub struct PostgresTransaction<'a, O> where O: SideEffectStorage, @@ -46,6 +47,73 @@ where pub fn into_inner(self) -> Transaction<'a, Postgres> { self.inner } + + /// Returns a batch of up to 10 side effects from the outbox table. + pub async fn get_outbox_batch(&mut self) -> Result>, DbError> + where + T: SideEffect + DeserializeOwned, + for<'sql> T::Id: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin, + { + #[derive(sqlx::FromRow)] + struct OutboxRow { + message: JsonValue, + retries: i32, + requeue: bool, + } + + let rows = query_as::<_, OutboxRow>( + "SELECT message, retries, requeue FROM outbox \ + WHERE requeue = true ORDER BY created_at \ + FOR UPDATE SKIP LOCKED LIMIT 10", + ) + .fetch_all(&mut *self.inner) + .await?; + + rows + .into_iter() + .map(|row| { + let msg = serde_json::from_value(row.message)?; + Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) + }) + .collect::, serde_json::Error>>() + .map_err(DbError::SerializationError) + } + + /// Delete a side effect from the outbox table. + pub async fn delete_outbox_item(&mut self, id: I) -> Result<(), DbError> + where + for<'sql> I: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin, + { + query("DELETE FROM outbox WHERE id = $1") + .bind(id) + .execute(&mut *self.inner) + .await?; + Ok(()) + } + + /// Update the retries and requeue flag for a side effect message. + pub async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned, + for<'sql> T::Id: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin, + { + query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") + .bind(item.message.id()) + .bind(i32::from(item.retries)) + .bind(item.requeue) + .execute(&mut *self.inner) + .await?; + Ok(()) + } } #[derive(sqlx::FromRow)] diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index e62a637..3327365 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -9,7 +9,9 @@ use eventastic::aggregate::SideEffect; use eventastic::event::Event; use eventastic_postgres::PostgresRepository; use eventastic_postgres::RootExt; -use eventastic_outbox_postgres::TableOutbox; +use eventastic_outbox_postgres::{ + RepositoryOutboxExt, SideEffectHandler, TableOutbox, +}; use serde::Deserialize; use serde::Serialize; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; @@ -25,6 +27,16 @@ async fn main() -> Result<(), anyhow::Error> { repository.run_migrations().await?; + // Run our side effect handler in the background + tokio::spawn({ + let repo = repository.clone(); + async move { + let _ = repo + .start_outbox(SideEffectContext {}, std::time::Duration::from_secs(5)) + .await; + } + }); + // Start transaction let mut transaction = repository.begin_transaction().await?; @@ -206,6 +218,23 @@ impl SideEffect for SideEffects { } } +pub struct SideEffectContext; + +#[async_trait::async_trait] +impl SideEffectHandler for SideEffectContext { + type SideEffect = SideEffects; + type Error = (); + + async fn handle( + &self, + msg: &SideEffects, + retries: u16, + ) -> Result<(), (bool, Self::Error)> { + println!("handling side effect {:?} retries {}", msg, retries); + Ok(()) + } +} + // Implement the aggregate trait for our aggregate struct impl Aggregate for Account { /// The current version of the snapshot to store. From 0e365fcbe5c57dcc8264fa7dae4e99d4e33bd29a Mon Sep 17 00:00:00 2001 From: Jonathan Donaldson Date: Sat, 7 Jun 2025 21:07:28 +0100 Subject: [PATCH 4/6] Fix lints --- eventastic_outbox_postgres/src/lib.rs | 21 ++++++++++----------- eventastic_postgres/src/lib.rs | 8 ++++---- eventastic_postgres/src/repository.rs | 5 ++++- eventastic_postgres/src/transaction.rs | 11 +++-------- eventastic_postgres/tests/common/helpers.rs | 2 +- examples/bank/src/main.rs | 10 ++-------- 6 files changed, 24 insertions(+), 33 deletions(-) diff --git a/eventastic_outbox_postgres/src/lib.rs b/eventastic_outbox_postgres/src/lib.rs index ebd5f78..e2bcd51 100644 --- a/eventastic_outbox_postgres/src/lib.rs +++ b/eventastic_outbox_postgres/src/lib.rs @@ -96,14 +96,10 @@ pub trait SideEffectHandler { /// Returning `Ok(())` deletes the message from the outbox. Returning /// `Err((true, E))` requeues the message. Returning `Err((false, E))` /// leaves the message without requeuing. - async fn handle( - &self, - msg: &Self::SideEffect, - retries: u16, - ) -> Result<(), (bool, Self::Error)>; + async fn handle(&self, msg: &Self::SideEffect, retries: u16) + -> Result<(), (bool, Self::Error)>; } - /// Extension trait for running the outbox worker using a [`TableOutbox`]. #[async_trait] pub trait RepositoryOutboxExt { @@ -116,8 +112,10 @@ pub trait RepositoryOutboxExt { T: SideEffect + DeserializeOwned + Send + Sync, T::Id: Clone + Send, H: SideEffectHandler + Send + Sync, - for<'sql> T::Id: - sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin; + for<'sql> T::Id: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin; } #[async_trait] @@ -131,8 +129,10 @@ impl RepositoryOutboxExt for PostgresRepository { T: SideEffect + DeserializeOwned + Send + Sync, T::Id: Clone + Send, H: SideEffectHandler + Send + Sync, - for<'sql> T::Id: - sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, + for<'sql> T::Id: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin, { let handler = Arc::new(handler); loop { @@ -175,4 +175,3 @@ where tx.commit().await } - diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 1731e1d..94a6a72 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,18 +1,18 @@ +mod outbox; mod repository; mod transaction; -mod outbox; use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, repository::RepositoryError, }; -use sqlx::{Postgres, Transaction}; -pub use repository::PostgresRepository; -pub use transaction::PostgresTransaction; pub use outbox::OutboxMessage; +pub use repository::PostgresRepository; use serde::{Serialize, de::DeserializeOwned}; use sqlx::types::Uuid; +use sqlx::{Postgres, Transaction}; use thiserror::Error; +pub use transaction::PostgresTransaction; #[derive(Error, Debug)] pub enum DbError { diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index 56d3521..2104963 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -24,7 +24,10 @@ where ) -> Result { let pool = pool_options.connect_with(connect_options).await?; - Ok(Self { inner: pool, outbox }) + Ok(Self { + inner: pool, + outbox, + }) } /// Start a new transaction using the default isolation level diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 928fc5d..d59c37e 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use crate::OutboxMessage; use crate::{DbError, SideEffectStorage}; use async_trait::async_trait; use chrono::DateTime; @@ -20,7 +21,6 @@ use sqlx::query_as; use sqlx::types::JsonValue; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; -use crate::OutboxMessage; pub struct PostgresTransaction<'a, O> where O: SideEffectStorage, @@ -72,8 +72,7 @@ where .fetch_all(&mut *self.inner) .await?; - rows - .into_iter() + rows.into_iter() .map(|row| { let msg = serde_json::from_value(row.message)?; Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) @@ -326,10 +325,6 @@ where )); } - self - .outbox - .store_side_effects(&mut self.inner, items) - .await - + self.outbox.store_side_effects(&mut self.inner, items).await } } diff --git a/eventastic_postgres/tests/common/helpers.rs b/eventastic_postgres/tests/common/helpers.rs index 2f39a65..9a694d7 100644 --- a/eventastic_postgres/tests/common/helpers.rs +++ b/eventastic_postgres/tests/common/helpers.rs @@ -1,8 +1,8 @@ use super::test_aggregate::{Account, AccountEvent}; use chrono::{DateTime, Utc}; use eventastic::aggregate::{Context, Root}; -use eventastic_postgres::PostgresRepository; use eventastic_outbox_postgres::TableOutbox; +use eventastic_postgres::PostgresRepository; use sqlx::Row; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; use std::str::FromStr; diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 3327365..f6be328 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -7,11 +7,9 @@ use eventastic::aggregate::Root; use eventastic::aggregate::SaveError; use eventastic::aggregate::SideEffect; use eventastic::event::Event; +use eventastic_outbox_postgres::{RepositoryOutboxExt, SideEffectHandler, TableOutbox}; use eventastic_postgres::PostgresRepository; use eventastic_postgres::RootExt; -use eventastic_outbox_postgres::{ - RepositoryOutboxExt, SideEffectHandler, TableOutbox, -}; use serde::Deserialize; use serde::Serialize; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; @@ -225,11 +223,7 @@ impl SideEffectHandler for SideEffectContext { type SideEffect = SideEffects; type Error = (); - async fn handle( - &self, - msg: &SideEffects, - retries: u16, - ) -> Result<(), (bool, Self::Error)> { + async fn handle(&self, msg: &SideEffects, retries: u16) -> Result<(), (bool, Self::Error)> { println!("handling side effect {:?} retries {}", msg, retries); Ok(()) } From f801265248fddc4c45525f82b18647ec5550215e Mon Sep 17 00:00:00 2001 From: Jonathan Donaldson Date: Sat, 7 Jun 2025 22:06:51 +0100 Subject: [PATCH 5/6] Refactor outbox queries --- eventastic_outbox_postgres/src/lib.rs | 129 ++++++++++++++++++++----- eventastic_postgres/src/transaction.rs | 68 +------------ 2 files changed, 107 insertions(+), 90 deletions(-) diff --git a/eventastic_outbox_postgres/src/lib.rs b/eventastic_outbox_postgres/src/lib.rs index e2bcd51..9d99757 100644 --- a/eventastic_outbox_postgres/src/lib.rs +++ b/eventastic_outbox_postgres/src/lib.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use eventastic::aggregate::SideEffect; -use eventastic_postgres::{DbError, PostgresRepository, SideEffectStorage}; +use eventastic_postgres::{DbError, PostgresRepository, PostgresTransaction, SideEffectStorage}; use serde::de::DeserializeOwned; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; @@ -51,37 +51,116 @@ impl SideEffectStorage for TableOutbox { Ok(()) } + } -/// A message retrieved from the outbox table. -#[derive(Debug, Clone)] -pub struct OutboxMessage -where - T: SideEffect, -{ - /// The stored side effect. - pub message: T, - /// The amount of times this message has been retried. - pub(crate) retries: u16, - /// Whether the message should be requeued on failure. - pub requeue: bool, +#[async_trait] +pub trait TransactionOutboxExt { + async fn get_outbox_batch( + &mut self, + ) -> Result>, DbError> + where + T: SideEffect + DeserializeOwned + Send, + for<'sql> T::Id: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin; + + async fn delete_outbox_item(&mut self, id: I) -> Result<(), DbError> + where + for<'sql> I: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin + + Send; + + async fn update_outbox_item( + &mut self, + item: eventastic_postgres::OutboxMessage, + ) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned + Send, + for<'sql> T::Id: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin; } -impl OutboxMessage -where - T: SideEffect, -{ - pub fn new(message: T, retries: u16, requeue: bool) -> Self { - Self { - message, - retries, - requeue, +#[async_trait] +impl<'a> TransactionOutboxExt for PostgresTransaction<'a, TableOutbox> { + async fn get_outbox_batch( + &mut self, + ) -> Result>, DbError> + where + T: SideEffect + DeserializeOwned + Send, + for<'sql> T::Id: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin, + { + #[derive(sqlx::FromRow)] + struct OutboxRow { + message: serde_json::Value, + retries: i32, + requeue: bool, } + + let rows = sqlx::query_as::<_, OutboxRow>( + "SELECT message, retries, requeue FROM outbox \ + WHERE requeue = true ORDER BY created_at \ + FOR UPDATE SKIP LOCKED LIMIT 10", + ) + .fetch_all(self.inner_mut().as_mut()) + .await?; + + rows + .into_iter() + .map(|row| { + let msg = serde_json::from_value(row.message)?; + Ok(eventastic_postgres::OutboxMessage::new( + msg, + row.retries as u16, + row.requeue, + )) + }) + .collect::, serde_json::Error>>() + .map_err(DbError::SerializationError) + } + + async fn delete_outbox_item(&mut self, id: I) -> Result<(), DbError> + where + for<'sql> I: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin + + Send, + { + sqlx::query("DELETE FROM outbox WHERE id = $1") + .bind(id) + .execute(self.inner_mut().as_mut()) + .await?; + Ok(()) } - /// Returns the retry count for this message. - pub fn retries(&self) -> u16 { - self.retries + async fn update_outbox_item( + &mut self, + item: eventastic_postgres::OutboxMessage, + ) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned + Send, + for<'sql> T::Id: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin, + { + sqlx::query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") + .bind(item.message.id()) + .bind(i32::from(item.retries)) + .bind(item.requeue) + .execute(self.inner_mut().as_mut()) + .await?; + + Ok(()) } } diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index d59c37e..9a8a40f 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,6 +1,5 @@ use std::fmt::Debug; -use crate::OutboxMessage; use crate::{DbError, SideEffectStorage}; use async_trait::async_trait; use chrono::DateTime; @@ -48,70 +47,9 @@ where self.inner } - /// Returns a batch of up to 10 side effects from the outbox table. - pub async fn get_outbox_batch(&mut self) -> Result>, DbError> - where - T: SideEffect + DeserializeOwned, - for<'sql> T::Id: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin, - { - #[derive(sqlx::FromRow)] - struct OutboxRow { - message: JsonValue, - retries: i32, - requeue: bool, - } - - let rows = query_as::<_, OutboxRow>( - "SELECT message, retries, requeue FROM outbox \ - WHERE requeue = true ORDER BY created_at \ - FOR UPDATE SKIP LOCKED LIMIT 10", - ) - .fetch_all(&mut *self.inner) - .await?; - - rows.into_iter() - .map(|row| { - let msg = serde_json::from_value(row.message)?; - Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) - }) - .collect::, serde_json::Error>>() - .map_err(DbError::SerializationError) - } - - /// Delete a side effect from the outbox table. - pub async fn delete_outbox_item(&mut self, id: I) -> Result<(), DbError> - where - for<'sql> I: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin, - { - query("DELETE FROM outbox WHERE id = $1") - .bind(id) - .execute(&mut *self.inner) - .await?; - Ok(()) - } - - /// Update the retries and requeue flag for a side effect message. - pub async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError> - where - T: SideEffect + DeserializeOwned, - for<'sql> T::Id: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin, - { - query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") - .bind(item.message.id()) - .bind(i32::from(item.retries)) - .bind(item.requeue) - .execute(&mut *self.inner) - .await?; - Ok(()) + /// Returns a mutable reference to the underlying [`sqlx::Transaction`]. + pub fn inner_mut(&mut self) -> &mut Transaction<'a, Postgres> { + &mut self.inner } } From 3397fa12c4b3c055dc6506dcbb7909e89faad56c Mon Sep 17 00:00:00 2001 From: Jonathan Donaldson Date: Sun, 8 Jun 2025 01:35:13 +0100 Subject: [PATCH 6/6] Refactor outbox + remove domain event type --- Cargo.toml | 7 +- eventastic/src/aggregate.rs | 11 +- eventastic/src/aggregate/root.rs | 21 +- eventastic/src/event.rs | 39 +-- eventastic/src/repository.rs | 23 +- eventastic_outbox_postgres/Cargo.toml | 4 +- eventastic_outbox_postgres/src/lib.rs | 259 +----------------- eventastic_outbox_postgres/src/outbox.rs | 219 +++++++++++++++ .../src/outbox_message.rs | 0 eventastic_postgres/src/lib.rs | 53 ++-- eventastic_postgres/src/side_effect.rs | 15 + eventastic_postgres/src/transaction.rs | 65 ++--- .../tests/common/test_aggregate.rs | 12 +- examples/bank/src/main.rs | 12 +- 14 files changed, 349 insertions(+), 391 deletions(-) create mode 100644 eventastic_outbox_postgres/src/outbox.rs rename eventastic_postgres/src/outbox.rs => eventastic_outbox_postgres/src/outbox_message.rs (100%) create mode 100644 eventastic_postgres/src/side_effect.rs diff --git a/Cargo.toml b/Cargo.toml index 718f72c..76767e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,11 @@ [workspace] resolver = "2" -members = ["eventastic", "eventastic_outbox_postgres", "eventastic_postgres", "examples/*"] +members = [ + "eventastic", + "eventastic_outbox_postgres", + "eventastic_postgres", + "examples/*", +] [workspace.package] license = "MIT" diff --git a/eventastic/src/aggregate.rs b/eventastic/src/aggregate.rs index 56a561a..64f54ae 100644 --- a/eventastic/src/aggregate.rs +++ b/eventastic/src/aggregate.rs @@ -25,7 +25,7 @@ //! Aggregates should provide a way to **fold** Domain Events on the //! current value of the state, to produce the next state. -use crate::event::Event; +use crate::event::DomainEvent; use std::fmt::Debug; mod root; @@ -54,10 +54,7 @@ pub trait Aggregate: Sized + Clone { /// The type of Domain Events that interest this Aggregate. /// Usually, this type should be an `enum`. - type DomainEvent: Clone + Debug + Eq + PartialEq + Event; - - /// The type used to uniquely identify the given domain event. - type DomainEventId: Clone + Debug + Eq + PartialEq; + type DomainEvent: Clone + Debug + Eq + PartialEq + DomainEvent; /// The error type that can be returned by [`Aggregate::apply`] when /// mutating the Aggregate state. @@ -93,8 +90,8 @@ pub trait Aggregate: Sized + Clone { pub trait SideEffect { /// The type used to uniquely identify this side effect. - type Id; + type SideEffectId; /// Returns read access to the [`SideEffect::Id`] - fn id(&self) -> &Self::Id; + fn id(&self) -> &Self::SideEffectId; } diff --git a/eventastic/src/aggregate/root.rs b/eventastic/src/aggregate/root.rs index 803b4ee..b1c5a9f 100644 --- a/eventastic/src/aggregate/root.rs +++ b/eventastic/src/aggregate/root.rs @@ -3,7 +3,7 @@ use futures::TryStreamExt; use crate::repository::{RepositoryError, RepositoryTransaction, Snapshot}; use crate::{ aggregate::Aggregate, - event::{Event, EventStoreEvent}, + event::{DomainEvent, EventStoreEvent}, }; use std::fmt::Debug; @@ -17,7 +17,7 @@ where { aggregate: T, version: u64, - uncommitted_events: Vec>, + uncommitted_events: Vec>, uncommitted_side_effects: Vec, } @@ -43,9 +43,7 @@ where /// Returns the list of uncommitted, recorded Domain [Events] from the [Context] /// and resets the internal list to its default value. #[doc(hidden)] - pub fn take_uncommitted_events( - &mut self, - ) -> Vec> { + pub fn take_uncommitted_events(&mut self) -> Vec> { std::mem::take(&mut self.uncommitted_events) } @@ -65,7 +63,7 @@ where /// given the current state of the Aggregate. #[doc(hidden)] pub fn rehydrate_from( - event: &EventStoreEvent, + event: &EventStoreEvent, ) -> Result, T::ApplyError> { Ok(Context { version: event.version, @@ -85,7 +83,7 @@ where #[doc(hidden)] pub fn apply_rehydrated_event( mut self, - event: &EventStoreEvent, + event: &EventStoreEvent, ) -> Result, T::ApplyError> { self.version += 1; debug_assert!(self.version == event.version); @@ -220,7 +218,14 @@ where pub async fn load( transaction: &mut R, aggregate_id: &T::AggregateId, - ) -> Result, RepositoryError> + ) -> Result< + Context, + RepositoryError< + T::ApplyError, + <::DomainEvent as DomainEvent>::EventId, + R::DbError, + >, + > where R: RepositoryTransaction, { diff --git a/eventastic/src/event.rs b/eventastic/src/event.rs index 1f0448e..a1a0c95 100644 --- a/eventastic/src/event.rs +++ b/eventastic/src/event.rs @@ -3,15 +3,14 @@ use std::fmt::Debug; -/// An [`Event`] that will be / has been persisted to the Event Store. +/// A [`DomainEvent`] that will be / has been persisted to the Event Store. #[derive(Debug, Clone, Eq, PartialEq)] -pub struct EventStoreEvent +pub struct EventStoreEvent where - Id: Debug, - Evt: Clone + Eq + PartialEq, + Evt: DomainEvent, { /// The id of the event - pub id: Id, + pub id: Evt::EventId, // The version of the event pub version: u64, @@ -20,20 +19,28 @@ where pub event: Evt, } -/// A domain event. -pub trait Event +impl EventStoreEvent where - Id: Debug, + Evt: DomainEvent, { - fn id(&self) -> &Id; -} + /// Creates a new `EventStoreEvent`. + pub fn new(id: Evt::EventId, version: u64, event: Evt) -> Self { + Self { id, version, event } + } -impl Event for EventStoreEvent -where - Id: Debug, - Evt: Clone + Eq + PartialEq, -{ - fn id(&self) -> &Id { + /// Returns the id of the event. + pub fn id(&self) -> &Evt::EventId { &self.id } + + /// Returns the version of the event. + pub fn version(&self) -> u64 { + self.version + } +} + +/// A domain event. +pub trait DomainEvent: Clone + Eq + PartialEq { + type EventId: Debug + Clone + Eq + PartialEq; + fn id(&self) -> &Self::EventId; } diff --git a/eventastic/src/repository.rs b/eventastic/src/repository.rs index 0619bf1..138f270 100644 --- a/eventastic/src/repository.rs +++ b/eventastic/src/repository.rs @@ -2,7 +2,10 @@ use async_trait::async_trait; use futures::Stream; use std::fmt::Debug; -use crate::{aggregate::Aggregate, event::EventStoreEvent}; +use crate::{ + aggregate::Aggregate, + event::{DomainEvent, EventStoreEvent}, +}; /// List of possible errors that can be returned by the [`RepositoryTransaction`] trait. #[derive(Debug, thiserror::Error)] @@ -50,23 +53,15 @@ pub trait RepositoryTransaction { &mut self, id: &T::AggregateId, version: u64, - ) -> impl Stream< - Item = Result< - EventStoreEvent::DomainEvent>, - Self::DbError, - >, - >; + ) -> impl Stream, Self::DbError>>; // Get a specific event from the event store. #[doc(hidden)] async fn get_event( &mut self, aggregate_id: &T::AggregateId, - event_id: &T::DomainEventId, - ) -> Result< - Option::DomainEvent>>, - Self::DbError, - >; + event_id: &<::DomainEvent as DomainEvent>::EventId, + ) -> Result>, Self::DbError>; /// Retrieves the latest version of the Aggregate from the Event Store. /// This method must check that the snapshot version is correct @@ -83,8 +78,8 @@ pub trait RepositoryTransaction { async fn store_events( &mut self, id: &T::AggregateId, - events: Vec>, - ) -> Result, Self::DbError>; + events: Vec>, + ) -> Result::DomainEvent as DomainEvent>::EventId>, Self::DbError>; #[doc(hidden)] async fn store_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Self::DbError>; diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml index 6fbcbfe..889cc8b 100644 --- a/eventastic_outbox_postgres/Cargo.toml +++ b/eventastic_outbox_postgres/Cargo.toml @@ -4,13 +4,13 @@ version = "0.1.0" edition = "2024" [dependencies] -eventastic_postgres = { path = "../eventastic_postgres" } +eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" } async-trait = { workspace = true } serde_json = { workspace = true } sqlx = { workspace = true } chrono = { workspace = true } uuid = { workspace = true } serde = { workspace = true } -eventastic = { path = "../eventastic" } +eventastic = { path = "../eventastic", version = "0.5" } futures-util = { workspace = true } tokio = { workspace = true } diff --git a/eventastic_outbox_postgres/src/lib.rs b/eventastic_outbox_postgres/src/lib.rs index 9d99757..b26033f 100644 --- a/eventastic_outbox_postgres/src/lib.rs +++ b/eventastic_outbox_postgres/src/lib.rs @@ -1,256 +1,5 @@ -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use eventastic::aggregate::SideEffect; -use eventastic_postgres::{DbError, PostgresRepository, PostgresTransaction, SideEffectStorage}; -use serde::de::DeserializeOwned; -use sqlx::types::Uuid; -use sqlx::{Postgres, Transaction}; -use std::sync::Arc; +mod outbox; +mod outbox_message; -/// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table. -#[derive(Clone, Copy, Default)] -pub struct TableOutbox; - -#[async_trait] -impl SideEffectStorage for TableOutbox { - async fn store_side_effects( - &self, - transaction: &mut Transaction<'_, Postgres>, - items: Vec<(Uuid, serde_json::Value)>, - ) -> Result<(), DbError> { - let mut ids: Vec = Vec::with_capacity(items.len()); - let mut messages: Vec = Vec::with_capacity(items.len()); - let mut retries: Vec = Vec::with_capacity(items.len()); - let mut requeues: Vec = Vec::with_capacity(items.len()); - let mut created_ats: Vec> = Vec::with_capacity(items.len()); - - for (id, msg) in items { - ids.push(id); - messages.push(msg); - retries.push(0); - requeues.push(true); - created_ats.push(Utc::now()); - } - - sqlx::query( - "INSERT INTO outbox(id, message, retries, requeue, created_at) - SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[]) - ON CONFLICT (id) DO UPDATE SET - message = excluded.message, - retries = excluded.retries, - requeue = excluded.requeue, - created_at = excluded.created_at", - ) - .bind(&ids) - .bind(&messages) - .bind(&retries) - .bind(&requeues) - .bind(&created_ats) - .execute(transaction.as_mut()) - .await?; - - Ok(()) - } - -} - -#[async_trait] -pub trait TransactionOutboxExt { - async fn get_outbox_batch( - &mut self, - ) -> Result>, DbError> - where - T: SideEffect + DeserializeOwned + Send, - for<'sql> T::Id: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin; - - async fn delete_outbox_item(&mut self, id: I) -> Result<(), DbError> - where - for<'sql> I: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin - + Send; - - async fn update_outbox_item( - &mut self, - item: eventastic_postgres::OutboxMessage, - ) -> Result<(), DbError> - where - T: SideEffect + DeserializeOwned + Send, - for<'sql> T::Id: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin; -} - -#[async_trait] -impl<'a> TransactionOutboxExt for PostgresTransaction<'a, TableOutbox> { - async fn get_outbox_batch( - &mut self, - ) -> Result>, DbError> - where - T: SideEffect + DeserializeOwned + Send, - for<'sql> T::Id: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin, - { - #[derive(sqlx::FromRow)] - struct OutboxRow { - message: serde_json::Value, - retries: i32, - requeue: bool, - } - - let rows = sqlx::query_as::<_, OutboxRow>( - "SELECT message, retries, requeue FROM outbox \ - WHERE requeue = true ORDER BY created_at \ - FOR UPDATE SKIP LOCKED LIMIT 10", - ) - .fetch_all(self.inner_mut().as_mut()) - .await?; - - rows - .into_iter() - .map(|row| { - let msg = serde_json::from_value(row.message)?; - Ok(eventastic_postgres::OutboxMessage::new( - msg, - row.retries as u16, - row.requeue, - )) - }) - .collect::, serde_json::Error>>() - .map_err(DbError::SerializationError) - } - - async fn delete_outbox_item(&mut self, id: I) -> Result<(), DbError> - where - for<'sql> I: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin - + Send, - { - sqlx::query("DELETE FROM outbox WHERE id = $1") - .bind(id) - .execute(self.inner_mut().as_mut()) - .await?; - Ok(()) - } - - async fn update_outbox_item( - &mut self, - item: eventastic_postgres::OutboxMessage, - ) -> Result<(), DbError> - where - T: SideEffect + DeserializeOwned + Send, - for<'sql> T::Id: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin, - { - sqlx::query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") - .bind(item.message.id()) - .bind(i32::from(item.retries)) - .bind(item.requeue) - .execute(self.inner_mut().as_mut()) - .await?; - - Ok(()) - } -} - -/// Trait used to handle side effects pulled from the outbox. -#[async_trait] -pub trait SideEffectHandler { - type SideEffect: SideEffect; - type Error: Send; - - /// Handle a side effect message. - /// - /// Returning `Ok(())` deletes the message from the outbox. Returning - /// `Err((true, E))` requeues the message. Returning `Err((false, E))` - /// leaves the message without requeuing. - async fn handle(&self, msg: &Self::SideEffect, retries: u16) - -> Result<(), (bool, Self::Error)>; -} - -/// Extension trait for running the outbox worker using a [`TableOutbox`]. -#[async_trait] -pub trait RepositoryOutboxExt { - async fn start_outbox( - &self, - handler: H, - poll_interval: std::time::Duration, - ) -> Result<(), DbError> - where - T: SideEffect + DeserializeOwned + Send + Sync, - T::Id: Clone + Send, - H: SideEffectHandler + Send + Sync, - for<'sql> T::Id: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin; -} - -#[async_trait] -impl RepositoryOutboxExt for PostgresRepository { - async fn start_outbox( - &self, - handler: H, - poll_interval: std::time::Duration, - ) -> Result<(), DbError> - where - T: SideEffect + DeserializeOwned + Send + Sync, - T::Id: Clone + Send, - H: SideEffectHandler + Send + Sync, - for<'sql> T::Id: sqlx::Decode<'sql, Postgres> - + sqlx::Type - + sqlx::Encode<'sql, Postgres> - + Unpin, - { - let handler = Arc::new(handler); - loop { - let deadline = std::time::Instant::now() + poll_interval; - let _ = process_outbox_batch::(self, handler.clone()).await; - tokio::time::sleep_until(deadline.into()).await; - } - } -} - -async fn process_outbox_batch( - repo: &PostgresRepository, - handler: Arc, -) -> Result<(), DbError> -where - T: SideEffect + DeserializeOwned + Send + Sync, - T::Id: Clone + Send, - H: SideEffectHandler + Send + Sync, - for<'sql> T::Id: - sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, -{ - let mut tx = repo.begin_transaction().await?; - - let outbox_items = tx.get_outbox_batch::().await?; - - for mut item in outbox_items { - let id = item.message.id().clone(); - - match handler.handle(&item.message, item.retries).await { - Ok(()) => { - tx.delete_outbox_item(id).await?; - } - Err((requeue, _)) => { - item.retries += 1; - item.requeue = requeue; - tx.update_outbox_item(item).await?; - } - } - } - - tx.commit().await -} +pub use outbox::*; +pub use outbox_message::OutboxMessage; diff --git a/eventastic_outbox_postgres/src/outbox.rs b/eventastic_outbox_postgres/src/outbox.rs new file mode 100644 index 0000000..31a0557 --- /dev/null +++ b/eventastic_outbox_postgres/src/outbox.rs @@ -0,0 +1,219 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use eventastic::aggregate::SideEffect; +use eventastic_postgres::{DbError, PostgresRepository, PostgresTransaction, SideEffectStorage}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use sqlx::types::Uuid; +use sqlx::{Postgres, Transaction}; +use std::sync::Arc; + +use crate::OutboxMessage; + +/// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table. +#[derive(Clone, Copy, Default)] +pub struct TableOutbox; + +#[async_trait] +impl SideEffectStorage for TableOutbox { + async fn store_side_effects + Serialize + Send + Sync>( + &self, + transaction: &mut Transaction<'_, Postgres>, + items: Vec, + ) -> Result<(), DbError> { + let mut ids: Vec = Vec::with_capacity(items.len()); + let mut messages: Vec = Vec::with_capacity(items.len()); + let mut retries: Vec = Vec::with_capacity(items.len()); + let mut requeues: Vec = Vec::with_capacity(items.len()); + let mut created_ats: Vec> = Vec::with_capacity(items.len()); + + for side_effect in items { + let id = side_effect.id().clone(); + let msg = serde_json::to_value(side_effect).map_err(DbError::SerializationError)?; + ids.push(id); + messages.push(msg); + retries.push(0); + requeues.push(true); + created_ats.push(Utc::now()); + } + + sqlx::query( + "INSERT INTO outbox(id, message, retries, requeue, created_at) + SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[]) + ON CONFLICT (id) DO UPDATE SET + message = excluded.message, + retries = excluded.retries, + requeue = excluded.requeue, + created_at = excluded.created_at", + ) + .bind(&ids) + .bind(&messages) + .bind(&retries) + .bind(&requeues) + .bind(&created_ats) + .execute(transaction.as_mut()) + .await?; + + Ok(()) + } +} + +#[async_trait] +pub trait TransactionOutboxExt +where + T: SideEffect + DeserializeOwned + Send + 'static, + T::SideEffectId: Clone + Send + 'static, + for<'sql> T::SideEffectId: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + async fn get_outbox_batch(&mut self) -> Result>, DbError>; + + async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError>; + + async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError>; +} + +#[async_trait] +impl<'a, T> TransactionOutboxExt for PostgresTransaction<'a, TableOutbox> +where + T: SideEffect + DeserializeOwned + Send + 'static, + T::SideEffectId: Clone + Send + 'static, + for<'sql> T::SideEffectId: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + async fn get_outbox_batch(&mut self) -> Result>, DbError> { + #[derive(sqlx::FromRow)] + struct OutboxRow { + message: serde_json::Value, + retries: i32, + requeue: bool, + } + + let rows = sqlx::query_as::<_, OutboxRow>( + "SELECT message, retries, requeue FROM outbox \ + WHERE requeue = true ORDER BY created_at \ + FOR UPDATE SKIP LOCKED LIMIT 10", + ) + .fetch_all(self.inner_mut().as_mut()) + .await?; + + rows.into_iter() + .map(|row| { + let msg = serde_json::from_value(row.message)?; + Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) + }) + .collect::, serde_json::Error>>() + .map_err(DbError::SerializationError) + } + + async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError> { + sqlx::query("DELETE FROM outbox WHERE id = $1") + .bind(id) + .execute(self.inner_mut().as_mut()) + .await?; + Ok(()) + } + + async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError> { + sqlx::query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") + .bind(item.message.id()) + .bind(i32::from(item.retries)) + .bind(item.requeue) + .execute(self.inner_mut().as_mut()) + .await?; + + Ok(()) + } +} + +/// Trait used to handle side effects pulled from the outbox. +#[async_trait] +pub trait SideEffectHandler { + type SideEffect: SideEffect; + type Error: Send; + + /// Handle a side effect message. + /// + /// Returning `Ok(())` deletes the message from the outbox. Returning + /// `Err((true, E))` requeues the message. Returning `Err((false, E))` + /// leaves the message without requeuing. + async fn handle(&self, msg: &Self::SideEffect, retries: u16) + -> Result<(), (bool, Self::Error)>; +} + +/// Extension trait for running the outbox worker using a [`TableOutbox`]. +#[async_trait] +pub trait RepositoryOutboxExt { + async fn start_outbox( + &self, + handler: H, + poll_interval: std::time::Duration, + ) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T::SideEffectId: Clone + Send + 'static, + H: SideEffectHandler + Send + Sync, + for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin; +} + +#[async_trait] +impl RepositoryOutboxExt for PostgresRepository { + async fn start_outbox( + &self, + handler: H, + poll_interval: std::time::Duration, + ) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T::SideEffectId: Clone + Send + 'static, + H: SideEffectHandler + Send + Sync, + for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin, + { + let handler = Arc::new(handler); + loop { + let deadline = std::time::Instant::now() + poll_interval; + let _ = process_outbox_batch::(self, handler.clone()).await; + tokio::time::sleep_until(deadline.into()).await; + } + } +} + +async fn process_outbox_batch( + repo: &PostgresRepository, + handler: Arc, +) -> Result<(), DbError> +where + T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T::SideEffectId: Clone + Send + 'static, + H: SideEffectHandler + Send + Sync, + for<'a> PostgresTransaction<'a, TableOutbox>: TransactionOutboxExt, + for<'sql> T::SideEffectId: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + let mut tx = repo.begin_transaction().await?; + + let outbox_items: Vec> = tx.get_outbox_batch().await?; + + for mut item in outbox_items { + let id: T::SideEffectId = item.message.id().clone(); + + match handler.handle(&item.message, item.retries).await { + Ok(()) => { + tx.delete_outbox_item(id).await?; + } + Err((requeue, _)) => { + item.retries += 1; + item.requeue = requeue; + tx.update_outbox_item(item).await?; + } + } + } + + tx.commit().await +} diff --git a/eventastic_postgres/src/outbox.rs b/eventastic_outbox_postgres/src/outbox_message.rs similarity index 100% rename from eventastic_postgres/src/outbox.rs rename to eventastic_outbox_postgres/src/outbox_message.rs diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 94a6a72..7133d60 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,16 +1,17 @@ -mod outbox; mod repository; +mod side_effect; mod transaction; use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, + event::DomainEvent, repository::RepositoryError, }; -pub use outbox::OutboxMessage; pub use repository::PostgresRepository; use serde::{Serialize, de::DeserializeOwned}; +pub use side_effect::SideEffectStorage; use sqlx::types::Uuid; -use sqlx::{Postgres, Transaction}; + use thiserror::Error; pub use transaction::PostgresTransaction; @@ -42,45 +43,35 @@ impl From for DbError { } #[async_trait] -pub trait SideEffectStorage: Send + Sync { - async fn store_side_effects( - &self, - transaction: &mut Transaction<'_, Postgres>, - items: Vec<(Uuid, serde_json::Value)>, - ) -> Result<(), DbError>; -} - -#[async_trait] -pub trait RootExt +pub trait RootExt where - S: SideEffect + Serialize + Send + Sync + 'static, - T: Aggregate - + Serialize - + DeserializeOwned - + Send - + Sync - + 'static, - ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + Serialize + DeserializeOwned + Send + Sync + 'static, + ::DomainEvent: + DomainEvent + Serialize + DeserializeOwned + Send + Sync, + ::SideEffect: SideEffect + Serialize + Send + Sync, O: SideEffectStorage + Send + Sync, { async fn load( transaction: &mut PostgresTransaction<'_, O>, aggregate_id: Uuid, - ) -> Result, RepositoryError> { + ) -> Result< + Context, + RepositoryError< + T::ApplyError, + <::DomainEvent as DomainEvent>::EventId, + DbError, + >, + > { Context::load(transaction, &aggregate_id).await } } -impl RootExt for T +impl RootExt for T where - S: SideEffect + Serialize + Send + Sync + 'static, - T: Aggregate - + Serialize - + DeserializeOwned - + Send - + Sync - + 'static, - ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + Serialize + DeserializeOwned + Send + Sync + 'static, + ::DomainEvent: + DomainEvent + Serialize + DeserializeOwned + Send + Sync, + ::SideEffect: SideEffect + Serialize + Send + Sync, O: SideEffectStorage + Send + Sync, { } diff --git a/eventastic_postgres/src/side_effect.rs b/eventastic_postgres/src/side_effect.rs new file mode 100644 index 0000000..270a544 --- /dev/null +++ b/eventastic_postgres/src/side_effect.rs @@ -0,0 +1,15 @@ +use crate::DbError; +use async_trait::async_trait; +use eventastic::aggregate::SideEffect; +use serde::Serialize; +use sqlx::types::Uuid; +use sqlx::{Postgres, Transaction}; + +#[async_trait] +pub trait SideEffectStorage: Send + Sync { + async fn store_side_effects + Serialize + Send + Sync>( + &self, + transaction: &mut Transaction<'_, Postgres>, + items: Vec, + ) -> Result<(), DbError>; +} diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 9a8a40f..97db8f0 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -6,7 +6,7 @@ use chrono::DateTime; use chrono::Utc; use eventastic::aggregate::Aggregate; use eventastic::aggregate::SideEffect; -use eventastic::event::Event; +use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; use eventastic::repository::RepositoryTransaction; use eventastic::repository::Snapshot; @@ -61,24 +61,18 @@ struct PartialSnapShotRow { } #[derive(Debug, sqlx::FromRow)] -struct PartialEventRow -where - EId: Unpin, -{ - event_id: EId, +struct PartialEventRow { + event_id: Uuid, version: i64, event: JsonValue, } -impl PartialEventRow -where - EId: Debug + Send + Unpin, -{ +impl PartialEventRow { fn to_event( - row: PartialEventRow, - ) -> Result, DbError> + row: PartialEventRow, + ) -> Result, DbError> where - Evt: Send + Clone + Eq + DeserializeOwned, + Evt: DomainEvent + DeserializeOwned, { let row_version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; @@ -93,17 +87,12 @@ where } #[async_trait] -impl<'a, O, S, T> RepositoryTransaction for PostgresTransaction<'a, O> +impl<'a, O, T> RepositoryTransaction for PostgresTransaction<'a, O> where + T: Aggregate + 'a + DeserializeOwned + Serialize + Send + Sync, + T::SideEffect: SideEffect + Serialize + Send + Sync, + T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, O: SideEffectStorage, - S: SideEffect + 'a + Serialize + Send + Sync, - T: Aggregate - + 'a - + DeserializeOwned - + Serialize - + Send - + Sync, - ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, { /// The type of error that is returned from the database. type DbError = DbError; @@ -116,7 +105,6 @@ where ) -> impl futures::Stream< Item = std::result::Result< eventastic::event::EventStoreEvent< - ::DomainEventId, ::DomainEvent, >, >::DbError, @@ -126,7 +114,7 @@ where return stream::iter(vec![Err(DbError::InvalidVersionNumber)]).boxed(); }; - let res = query_as::<_, PartialEventRow>( + let res = query_as::<_, PartialEventRow>( " SELECT event, event_id, version FROM events @@ -147,12 +135,9 @@ where async fn get_event( &mut self, aggregate_id: &T::AggregateId, - event_id: &T::DomainEventId, - ) -> Result< - Option::DomainEventId, ::DomainEvent>>, - Self::DbError, - > { - query_as::<_, PartialEventRow>( + event_id: &<::DomainEvent as DomainEvent>::EventId, + ) -> Result::DomainEvent>>, Self::DbError> { + query_as::<_, PartialEventRow>( "SELECT event, event_id, version FROM events where aggregate_id = $1 AND event_id = $2", ) .bind(aggregate_id) @@ -167,9 +152,10 @@ where async fn store_events( &mut self, id: &T::AggregateId, - events: Vec>, - ) -> Result, Self::DbError> { - let mut event_ids_to_insert: Vec = Vec::with_capacity(events.len()); + events: Vec>, + ) -> Result::DomainEvent as DomainEvent>::EventId>, Self::DbError> { + let mut event_ids_to_insert: Vec<<::DomainEvent as DomainEvent>::EventId> = + Vec::with_capacity(events.len()); let mut versions_to_insert: Vec = Vec::with_capacity(events.len()); let mut aggregate_ids_to_insert: Vec = Vec::with_capacity(events.len()); let mut events_to_insert: Vec = Vec::with_capacity(events.len()); @@ -254,15 +240,8 @@ where &mut self, outbox_item: Vec, ) -> Result<(), Self::DbError> { - let mut items: Vec<(Uuid, serde_json::Value)> = Vec::with_capacity(outbox_item.len()); - - for item in outbox_item { - items.push(( - *item.id(), - serde_json::to_value(item).map_err(DbError::SerializationError)?, - )); - } - - self.outbox.store_side_effects(&mut self.inner, items).await + self.outbox + .store_side_effects(&mut self.inner, outbox_item) + .await } } diff --git a/eventastic_postgres/tests/common/test_aggregate.rs b/eventastic_postgres/tests/common/test_aggregate.rs index 606ed9f..da79c7f 100644 --- a/eventastic_postgres/tests/common/test_aggregate.rs +++ b/eventastic_postgres/tests/common/test_aggregate.rs @@ -1,6 +1,6 @@ use eventastic::aggregate::Aggregate; use eventastic::aggregate::SideEffect; -use eventastic::event::Event; +use eventastic::event::DomainEvent; use serde::Deserialize; use serde::Serialize; use thiserror::Error; @@ -32,7 +32,8 @@ pub enum AccountEvent { }, } -impl Event for AccountEvent { +impl DomainEvent for AccountEvent { + type EventId = Uuid; fn id(&self) -> &Uuid { match self { AccountEvent::Open { event_id, .. } @@ -67,9 +68,9 @@ pub enum SideEffects { impl SideEffect for SideEffects { /// The type used to uniquely identify this side effect. - type Id = Uuid; + type SideEffectId = Uuid; - fn id(&self) -> &Self::Id { + fn id(&self) -> &Self::SideEffectId { match self { SideEffects::PublishMessage { id, .. } | SideEffects::SendEmail { id, .. } => id, } @@ -89,9 +90,6 @@ impl Aggregate for Account { /// Usually, this type should be an `enum`. type DomainEvent = AccountEvent; - /// The type used to uniquely identify the a given domain event. - type DomainEventId = Uuid; - /// The error type that can be returned by [`Aggregate::apply`] when /// mutating the Aggregate state. type ApplyError = DomainError; diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index f6be328..a5b9325 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -6,7 +6,7 @@ use eventastic::aggregate::Context; use eventastic::aggregate::Root; use eventastic::aggregate::SaveError; use eventastic::aggregate::SideEffect; -use eventastic::event::Event; +use eventastic::event::DomainEvent; use eventastic_outbox_postgres::{RepositoryOutboxExt, SideEffectHandler, TableOutbox}; use eventastic_postgres::PostgresRepository; use eventastic_postgres::RootExt; @@ -172,7 +172,8 @@ pub enum AccountEvent { }, } -impl Event for AccountEvent { +impl DomainEvent for AccountEvent { + type EventId = Uuid; fn id(&self) -> &Uuid { match self { AccountEvent::Open { event_id, .. } @@ -207,9 +208,9 @@ pub enum SideEffects { impl SideEffect for SideEffects { /// The type used to uniquely identify this side effect. - type Id = Uuid; + type SideEffectId = Uuid; - fn id(&self) -> &Self::Id { + fn id(&self) -> &Self::SideEffectId { match self { SideEffects::PublishMessage { id, .. } | SideEffects::SendEmail { id, .. } => id, } @@ -242,9 +243,6 @@ impl Aggregate for Account { /// Usually, this type should be an `enum`. type DomainEvent = AccountEvent; - /// The type used to uniquely identify the a given domain event. - type DomainEventId = Uuid; - /// The error type that can be returned by [`Aggregate::apply`] when /// mutating the Aggregate state. type ApplyError = DomainError;