diff --git a/Cargo.toml b/Cargo.toml index c5caa3d..76767e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,11 @@ [workspace] resolver = "2" -members = ["eventastic", "eventastic_postgres", "examples/*"] +members = [ + "eventastic", + "eventastic_outbox_postgres", + "eventastic_postgres", + "examples/*", +] [workspace.package] license = "MIT" diff --git a/eventastic/src/aggregate.rs b/eventastic/src/aggregate.rs index 56a561a..64f54ae 100644 --- a/eventastic/src/aggregate.rs +++ b/eventastic/src/aggregate.rs @@ -25,7 +25,7 @@ //! Aggregates should provide a way to **fold** Domain Events on the //! current value of the state, to produce the next state. -use crate::event::Event; +use crate::event::DomainEvent; use std::fmt::Debug; mod root; @@ -54,10 +54,7 @@ pub trait Aggregate: Sized + Clone { /// The type of Domain Events that interest this Aggregate. /// Usually, this type should be an `enum`. - type DomainEvent: Clone + Debug + Eq + PartialEq + Event; - - /// The type used to uniquely identify the given domain event. - type DomainEventId: Clone + Debug + Eq + PartialEq; + type DomainEvent: Clone + Debug + Eq + PartialEq + DomainEvent; /// The error type that can be returned by [`Aggregate::apply`] when /// mutating the Aggregate state. @@ -93,8 +90,8 @@ pub trait Aggregate: Sized + Clone { pub trait SideEffect { /// The type used to uniquely identify this side effect. - type Id; + type SideEffectId; /// Returns read access to the [`SideEffect::Id`] - fn id(&self) -> &Self::Id; + fn id(&self) -> &Self::SideEffectId; } diff --git a/eventastic/src/aggregate/root.rs b/eventastic/src/aggregate/root.rs index 803b4ee..b1c5a9f 100644 --- a/eventastic/src/aggregate/root.rs +++ b/eventastic/src/aggregate/root.rs @@ -3,7 +3,7 @@ use futures::TryStreamExt; use crate::repository::{RepositoryError, RepositoryTransaction, Snapshot}; use crate::{ aggregate::Aggregate, - event::{Event, EventStoreEvent}, + event::{DomainEvent, EventStoreEvent}, }; use std::fmt::Debug; @@ -17,7 +17,7 @@ where { aggregate: T, version: u64, - uncommitted_events: Vec>, + uncommitted_events: Vec>, uncommitted_side_effects: Vec, } @@ -43,9 +43,7 @@ where /// Returns the list of uncommitted, recorded Domain [Events] from the [Context] /// and resets the internal list to its default value. #[doc(hidden)] - pub fn take_uncommitted_events( - &mut self, - ) -> Vec> { + pub fn take_uncommitted_events(&mut self) -> Vec> { std::mem::take(&mut self.uncommitted_events) } @@ -65,7 +63,7 @@ where /// given the current state of the Aggregate. #[doc(hidden)] pub fn rehydrate_from( - event: &EventStoreEvent, + event: &EventStoreEvent, ) -> Result, T::ApplyError> { Ok(Context { version: event.version, @@ -85,7 +83,7 @@ where #[doc(hidden)] pub fn apply_rehydrated_event( mut self, - event: &EventStoreEvent, + event: &EventStoreEvent, ) -> Result, T::ApplyError> { self.version += 1; debug_assert!(self.version == event.version); @@ -220,7 +218,14 @@ where pub async fn load( transaction: &mut R, aggregate_id: &T::AggregateId, - ) -> Result, RepositoryError> + ) -> Result< + Context, + RepositoryError< + T::ApplyError, + <::DomainEvent as DomainEvent>::EventId, + R::DbError, + >, + > where R: RepositoryTransaction, { diff --git a/eventastic/src/event.rs b/eventastic/src/event.rs index 1f0448e..a1a0c95 100644 --- a/eventastic/src/event.rs +++ b/eventastic/src/event.rs @@ -3,15 +3,14 @@ use std::fmt::Debug; -/// An [`Event`] that will be / has been persisted to the Event Store. +/// A [`DomainEvent`] that will be / has been persisted to the Event Store. #[derive(Debug, Clone, Eq, PartialEq)] -pub struct EventStoreEvent +pub struct EventStoreEvent where - Id: Debug, - Evt: Clone + Eq + PartialEq, + Evt: DomainEvent, { /// The id of the event - pub id: Id, + pub id: Evt::EventId, // The version of the event pub version: u64, @@ -20,20 +19,28 @@ where pub event: Evt, } -/// A domain event. -pub trait Event +impl EventStoreEvent where - Id: Debug, + Evt: DomainEvent, { - fn id(&self) -> &Id; -} + /// Creates a new `EventStoreEvent`. + pub fn new(id: Evt::EventId, version: u64, event: Evt) -> Self { + Self { id, version, event } + } -impl Event for EventStoreEvent -where - Id: Debug, - Evt: Clone + Eq + PartialEq, -{ - fn id(&self) -> &Id { + /// Returns the id of the event. + pub fn id(&self) -> &Evt::EventId { &self.id } + + /// Returns the version of the event. + pub fn version(&self) -> u64 { + self.version + } +} + +/// A domain event. +pub trait DomainEvent: Clone + Eq + PartialEq { + type EventId: Debug + Clone + Eq + PartialEq; + fn id(&self) -> &Self::EventId; } diff --git a/eventastic/src/repository.rs b/eventastic/src/repository.rs index 0619bf1..138f270 100644 --- a/eventastic/src/repository.rs +++ b/eventastic/src/repository.rs @@ -2,7 +2,10 @@ use async_trait::async_trait; use futures::Stream; use std::fmt::Debug; -use crate::{aggregate::Aggregate, event::EventStoreEvent}; +use crate::{ + aggregate::Aggregate, + event::{DomainEvent, EventStoreEvent}, +}; /// List of possible errors that can be returned by the [`RepositoryTransaction`] trait. #[derive(Debug, thiserror::Error)] @@ -50,23 +53,15 @@ pub trait RepositoryTransaction { &mut self, id: &T::AggregateId, version: u64, - ) -> impl Stream< - Item = Result< - EventStoreEvent::DomainEvent>, - Self::DbError, - >, - >; + ) -> impl Stream, Self::DbError>>; // Get a specific event from the event store. #[doc(hidden)] async fn get_event( &mut self, aggregate_id: &T::AggregateId, - event_id: &T::DomainEventId, - ) -> Result< - Option::DomainEvent>>, - Self::DbError, - >; + event_id: &<::DomainEvent as DomainEvent>::EventId, + ) -> Result>, Self::DbError>; /// Retrieves the latest version of the Aggregate from the Event Store. /// This method must check that the snapshot version is correct @@ -83,8 +78,8 @@ pub trait RepositoryTransaction { async fn store_events( &mut self, id: &T::AggregateId, - events: Vec>, - ) -> Result, Self::DbError>; + events: Vec>, + ) -> Result::DomainEvent as DomainEvent>::EventId>, Self::DbError>; #[doc(hidden)] async fn store_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Self::DbError>; diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml new file mode 100644 index 0000000..889cc8b --- /dev/null +++ b/eventastic_outbox_postgres/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "eventastic_outbox_postgres" +version = "0.1.0" +edition = "2024" + +[dependencies] +eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" } +async-trait = { workspace = true } +serde_json = { workspace = true } +sqlx = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } +serde = { workspace = true } +eventastic = { path = "../eventastic", version = "0.5" } +futures-util = { workspace = true } +tokio = { workspace = true } diff --git a/eventastic_outbox_postgres/src/lib.rs b/eventastic_outbox_postgres/src/lib.rs new file mode 100644 index 0000000..b26033f --- /dev/null +++ b/eventastic_outbox_postgres/src/lib.rs @@ -0,0 +1,5 @@ +mod outbox; +mod outbox_message; + +pub use outbox::*; +pub use outbox_message::OutboxMessage; diff --git a/eventastic_outbox_postgres/src/outbox.rs b/eventastic_outbox_postgres/src/outbox.rs new file mode 100644 index 0000000..31a0557 --- /dev/null +++ b/eventastic_outbox_postgres/src/outbox.rs @@ -0,0 +1,219 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use eventastic::aggregate::SideEffect; +use eventastic_postgres::{DbError, PostgresRepository, PostgresTransaction, SideEffectStorage}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use sqlx::types::Uuid; +use sqlx::{Postgres, Transaction}; +use std::sync::Arc; + +use crate::OutboxMessage; + +/// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table. +#[derive(Clone, Copy, Default)] +pub struct TableOutbox; + +#[async_trait] +impl SideEffectStorage for TableOutbox { + async fn store_side_effects + Serialize + Send + Sync>( + &self, + transaction: &mut Transaction<'_, Postgres>, + items: Vec, + ) -> Result<(), DbError> { + let mut ids: Vec = Vec::with_capacity(items.len()); + let mut messages: Vec = Vec::with_capacity(items.len()); + let mut retries: Vec = Vec::with_capacity(items.len()); + let mut requeues: Vec = Vec::with_capacity(items.len()); + let mut created_ats: Vec> = Vec::with_capacity(items.len()); + + for side_effect in items { + let id = side_effect.id().clone(); + let msg = serde_json::to_value(side_effect).map_err(DbError::SerializationError)?; + ids.push(id); + messages.push(msg); + retries.push(0); + requeues.push(true); + created_ats.push(Utc::now()); + } + + sqlx::query( + "INSERT INTO outbox(id, message, retries, requeue, created_at) + SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[]) + ON CONFLICT (id) DO UPDATE SET + message = excluded.message, + retries = excluded.retries, + requeue = excluded.requeue, + created_at = excluded.created_at", + ) + .bind(&ids) + .bind(&messages) + .bind(&retries) + .bind(&requeues) + .bind(&created_ats) + .execute(transaction.as_mut()) + .await?; + + Ok(()) + } +} + +#[async_trait] +pub trait TransactionOutboxExt +where + T: SideEffect + DeserializeOwned + Send + 'static, + T::SideEffectId: Clone + Send + 'static, + for<'sql> T::SideEffectId: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + async fn get_outbox_batch(&mut self) -> Result>, DbError>; + + async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError>; + + async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError>; +} + +#[async_trait] +impl<'a, T> TransactionOutboxExt for PostgresTransaction<'a, TableOutbox> +where + T: SideEffect + DeserializeOwned + Send + 'static, + T::SideEffectId: Clone + Send + 'static, + for<'sql> T::SideEffectId: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + async fn get_outbox_batch(&mut self) -> Result>, DbError> { + #[derive(sqlx::FromRow)] + struct OutboxRow { + message: serde_json::Value, + retries: i32, + requeue: bool, + } + + let rows = sqlx::query_as::<_, OutboxRow>( + "SELECT message, retries, requeue FROM outbox \ + WHERE requeue = true ORDER BY created_at \ + FOR UPDATE SKIP LOCKED LIMIT 10", + ) + .fetch_all(self.inner_mut().as_mut()) + .await?; + + rows.into_iter() + .map(|row| { + let msg = serde_json::from_value(row.message)?; + Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) + }) + .collect::, serde_json::Error>>() + .map_err(DbError::SerializationError) + } + + async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError> { + sqlx::query("DELETE FROM outbox WHERE id = $1") + .bind(id) + .execute(self.inner_mut().as_mut()) + .await?; + Ok(()) + } + + async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError> { + sqlx::query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") + .bind(item.message.id()) + .bind(i32::from(item.retries)) + .bind(item.requeue) + .execute(self.inner_mut().as_mut()) + .await?; + + Ok(()) + } +} + +/// Trait used to handle side effects pulled from the outbox. +#[async_trait] +pub trait SideEffectHandler { + type SideEffect: SideEffect; + type Error: Send; + + /// Handle a side effect message. + /// + /// Returning `Ok(())` deletes the message from the outbox. Returning + /// `Err((true, E))` requeues the message. Returning `Err((false, E))` + /// leaves the message without requeuing. + async fn handle(&self, msg: &Self::SideEffect, retries: u16) + -> Result<(), (bool, Self::Error)>; +} + +/// Extension trait for running the outbox worker using a [`TableOutbox`]. +#[async_trait] +pub trait RepositoryOutboxExt { + async fn start_outbox( + &self, + handler: H, + poll_interval: std::time::Duration, + ) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T::SideEffectId: Clone + Send + 'static, + H: SideEffectHandler + Send + Sync, + for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin; +} + +#[async_trait] +impl RepositoryOutboxExt for PostgresRepository { + async fn start_outbox( + &self, + handler: H, + poll_interval: std::time::Duration, + ) -> Result<(), DbError> + where + T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T::SideEffectId: Clone + Send + 'static, + H: SideEffectHandler + Send + Sync, + for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + + sqlx::Type + + sqlx::Encode<'sql, Postgres> + + Unpin, + { + let handler = Arc::new(handler); + loop { + let deadline = std::time::Instant::now() + poll_interval; + let _ = process_outbox_batch::(self, handler.clone()).await; + tokio::time::sleep_until(deadline.into()).await; + } + } +} + +async fn process_outbox_batch( + repo: &PostgresRepository, + handler: Arc, +) -> Result<(), DbError> +where + T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T::SideEffectId: Clone + Send + 'static, + H: SideEffectHandler + Send + Sync, + for<'a> PostgresTransaction<'a, TableOutbox>: TransactionOutboxExt, + for<'sql> T::SideEffectId: + sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, +{ + let mut tx = repo.begin_transaction().await?; + + let outbox_items: Vec> = tx.get_outbox_batch().await?; + + for mut item in outbox_items { + let id: T::SideEffectId = item.message.id().clone(); + + match handler.handle(&item.message, item.retries).await { + Ok(()) => { + tx.delete_outbox_item(id).await?; + } + Err((requeue, _)) => { + item.retries += 1; + item.requeue = requeue; + tx.update_outbox_item(item).await?; + } + } + } + + tx.commit().await +} diff --git a/eventastic_outbox_postgres/src/outbox_message.rs b/eventastic_outbox_postgres/src/outbox_message.rs new file mode 100644 index 0000000..6fcc2e2 --- /dev/null +++ b/eventastic_outbox_postgres/src/outbox_message.rs @@ -0,0 +1,32 @@ +use eventastic::aggregate::SideEffect; + +/// Message stored in the transactional outbox. +#[derive(Debug, Clone)] +pub struct OutboxMessage +where + T: SideEffect, +{ + /// The side effect payload. + pub message: T, + pub retries: u16, + /// Whether the message should be requeued on failure. + pub requeue: bool, +} + +impl OutboxMessage +where + T: SideEffect, +{ + pub fn new(message: T, retries: u16, requeue: bool) -> Self { + Self { + message, + retries, + requeue, + } + } + + /// Returns the retry count for this message. + pub fn retries(&self) -> u16 { + self.retries + } +} diff --git a/eventastic_postgres/Cargo.toml b/eventastic_postgres/Cargo.toml index 81b610f..8f9396f 100644 --- a/eventastic_postgres/Cargo.toml +++ b/eventastic_postgres/Cargo.toml @@ -24,3 +24,4 @@ thiserror = { workspace = true } [dev-dependencies] uuid = { workspace = true } +eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" } diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 2491669..7133d60 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,13 +1,17 @@ mod repository; +mod side_effect; mod transaction; use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, + event::DomainEvent, repository::RepositoryError, }; pub use repository::PostgresRepository; use serde::{Serialize, de::DeserializeOwned}; +pub use side_effect::SideEffectStorage; use sqlx::types::Uuid; + use thiserror::Error; pub use transaction::PostgresTransaction; @@ -39,34 +43,35 @@ impl From for DbError { } #[async_trait] -pub trait RootExt +pub trait RootExt where - S: SideEffect + Serialize + Send + Sync + 'static, - T: Aggregate - + Serialize - + DeserializeOwned - + Send - + Sync - + 'static, - ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + Serialize + DeserializeOwned + Send + Sync + 'static, + ::DomainEvent: + DomainEvent + Serialize + DeserializeOwned + Send + Sync, + ::SideEffect: SideEffect + Serialize + Send + Sync, + O: SideEffectStorage + Send + Sync, { async fn load( - transaction: &mut PostgresTransaction<'_>, + transaction: &mut PostgresTransaction<'_, O>, aggregate_id: Uuid, - ) -> Result, RepositoryError> { + ) -> Result< + Context, + RepositoryError< + T::ApplyError, + <::DomainEvent as DomainEvent>::EventId, + DbError, + >, + > { Context::load(transaction, &aggregate_id).await } } -impl RootExt for T +impl RootExt for T where - S: SideEffect + Serialize + Send + Sync + 'static, - T: Aggregate - + Serialize - + DeserializeOwned - + Send - + Sync - + 'static, - ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + Serialize + DeserializeOwned + Send + Sync + 'static, + ::DomainEvent: + DomainEvent + Serialize + DeserializeOwned + Send + Sync, + ::SideEffect: SideEffect + Serialize + Send + Sync, + O: SideEffectStorage + Send + Sync, { } diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index b66b496..2104963 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -1,28 +1,40 @@ -use crate::PostgresTransaction; +use crate::{PostgresTransaction, SideEffectStorage}; use sqlx::{ Pool, Postgres, postgres::{PgConnectOptions, PgPoolOptions}, }; #[derive(Clone)] -pub struct PostgresRepository { +pub struct PostgresRepository +where + O: SideEffectStorage + Clone, +{ pub(crate) inner: Pool, + pub(crate) outbox: O, } -impl PostgresRepository { +impl PostgresRepository +where + O: SideEffectStorage + Clone, +{ pub async fn new( connect_options: PgConnectOptions, pool_options: PgPoolOptions, + outbox: O, ) -> Result { let pool = pool_options.connect_with(connect_options).await?; - Ok(Self { inner: pool }) + Ok(Self { + inner: pool, + outbox, + }) } /// Start a new transaction using the default isolation level - pub async fn begin_transaction(&self) -> Result, sqlx::Error> { + pub async fn begin_transaction(&self) -> Result, sqlx::Error> { Ok(PostgresTransaction { inner: self.inner.begin().await?, + outbox: &self.outbox, }) } diff --git a/eventastic_postgres/src/side_effect.rs b/eventastic_postgres/src/side_effect.rs new file mode 100644 index 0000000..270a544 --- /dev/null +++ b/eventastic_postgres/src/side_effect.rs @@ -0,0 +1,15 @@ +use crate::DbError; +use async_trait::async_trait; +use eventastic::aggregate::SideEffect; +use serde::Serialize; +use sqlx::types::Uuid; +use sqlx::{Postgres, Transaction}; + +#[async_trait] +pub trait SideEffectStorage: Send + Sync { + async fn store_side_effects + Serialize + Send + Sync>( + &self, + transaction: &mut Transaction<'_, Postgres>, + items: Vec, + ) -> Result<(), DbError>; +} diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 64e1ce9..97db8f0 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,12 +1,12 @@ use std::fmt::Debug; -use crate::DbError; +use crate::{DbError, SideEffectStorage}; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; use eventastic::aggregate::Aggregate; use eventastic::aggregate::SideEffect; -use eventastic::event::Event; +use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; use eventastic::repository::RepositoryTransaction; use eventastic::repository::Snapshot; @@ -20,11 +20,18 @@ use sqlx::query_as; use sqlx::types::JsonValue; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; -pub struct PostgresTransaction<'a> { +pub struct PostgresTransaction<'a, O> +where + O: SideEffectStorage, +{ pub(crate) inner: Transaction<'a, Postgres>, + pub(crate) outbox: &'a O, } -impl<'a> PostgresTransaction<'a> { +impl<'a, O> PostgresTransaction<'a, O> +where + O: SideEffectStorage, +{ /// Commit the transaction to the db. pub async fn commit(self) -> Result<(), DbError> { Ok(self.inner.commit().await?) @@ -39,6 +46,11 @@ impl<'a> PostgresTransaction<'a> { pub fn into_inner(self) -> Transaction<'a, Postgres> { self.inner } + + /// Returns a mutable reference to the underlying [`sqlx::Transaction`]. + pub fn inner_mut(&mut self) -> &mut Transaction<'a, Postgres> { + &mut self.inner + } } #[derive(sqlx::FromRow)] @@ -49,24 +61,18 @@ struct PartialSnapShotRow { } #[derive(Debug, sqlx::FromRow)] -struct PartialEventRow -where - EId: Unpin, -{ - event_id: EId, +struct PartialEventRow { + event_id: Uuid, version: i64, event: JsonValue, } -impl PartialEventRow -where - EId: Debug + Send + Unpin, -{ +impl PartialEventRow { fn to_event( - row: PartialEventRow, - ) -> Result, DbError> + row: PartialEventRow, + ) -> Result, DbError> where - Evt: Send + Clone + Eq + DeserializeOwned, + Evt: DomainEvent + DeserializeOwned, { let row_version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; @@ -81,16 +87,12 @@ where } #[async_trait] -impl<'a, S, T> RepositoryTransaction for PostgresTransaction<'a> +impl<'a, O, T> RepositoryTransaction for PostgresTransaction<'a, O> where - S: SideEffect + 'a + Serialize + Send + Sync, - T: Aggregate - + 'a - + DeserializeOwned - + Serialize - + Send - + Sync, - ::DomainEvent: Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + 'a + DeserializeOwned + Serialize + Send + Sync, + T::SideEffect: SideEffect + Serialize + Send + Sync, + T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, + O: SideEffectStorage, { /// The type of error that is returned from the database. type DbError = DbError; @@ -103,7 +105,6 @@ where ) -> impl futures::Stream< Item = std::result::Result< eventastic::event::EventStoreEvent< - ::DomainEventId, ::DomainEvent, >, >::DbError, @@ -113,7 +114,7 @@ where return stream::iter(vec![Err(DbError::InvalidVersionNumber)]).boxed(); }; - let res = query_as::<_, PartialEventRow>( + let res = query_as::<_, PartialEventRow>( " SELECT event, event_id, version FROM events @@ -134,12 +135,9 @@ where async fn get_event( &mut self, aggregate_id: &T::AggregateId, - event_id: &T::DomainEventId, - ) -> Result< - Option::DomainEventId, ::DomainEvent>>, - Self::DbError, - > { - query_as::<_, PartialEventRow>( + event_id: &<::DomainEvent as DomainEvent>::EventId, + ) -> Result::DomainEvent>>, Self::DbError> { + query_as::<_, PartialEventRow>( "SELECT event, event_id, version FROM events where aggregate_id = $1 AND event_id = $2", ) .bind(aggregate_id) @@ -154,9 +152,10 @@ where async fn store_events( &mut self, id: &T::AggregateId, - events: Vec>, - ) -> Result, Self::DbError> { - let mut event_ids_to_insert: Vec = Vec::with_capacity(events.len()); + events: Vec>, + ) -> Result::DomainEvent as DomainEvent>::EventId>, Self::DbError> { + let mut event_ids_to_insert: Vec<<::DomainEvent as DomainEvent>::EventId> = + Vec::with_capacity(events.len()); let mut versions_to_insert: Vec = Vec::with_capacity(events.len()); let mut aggregate_ids_to_insert: Vec = Vec::with_capacity(events.len()); let mut events_to_insert: Vec = Vec::with_capacity(events.len()); @@ -241,37 +240,8 @@ where &mut self, outbox_item: Vec, ) -> Result<(), Self::DbError> { - let mut ids: Vec = Vec::with_capacity(outbox_item.len()); - let mut messages: Vec = Vec::with_capacity(outbox_item.len()); - let mut retries: Vec = Vec::with_capacity(outbox_item.len()); - let mut requeues: Vec = Vec::with_capacity(outbox_item.len()); - let mut created_ats: Vec> = Vec::with_capacity(outbox_item.len()); - - for item in outbox_item { - ids.push(*item.id()); - messages.push(serde_json::to_value(item).map_err(DbError::SerializationError)?); - retries.push(0); - requeues.push(true); - created_ats.push(Utc::now()); - } - - sqlx::query( - "INSERT INTO outbox(id, message, retries, requeue, created_at) - SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[]) - ON CONFLICT (id) DO UPDATE SET - message = excluded.message, - retries = excluded.retries, - requeue = excluded.requeue, - created_at = excluded.created_at", - ) - .bind(&ids) - .bind(&messages) - .bind(&retries) - .bind(&requeues) - .bind(&created_ats) - .execute(&mut *self.inner) - .await?; - - Ok(()) + self.outbox + .store_side_effects(&mut self.inner, outbox_item) + .await } } diff --git a/eventastic_postgres/tests/common/helpers.rs b/eventastic_postgres/tests/common/helpers.rs index 5e156bc..9a694d7 100644 --- a/eventastic_postgres/tests/common/helpers.rs +++ b/eventastic_postgres/tests/common/helpers.rs @@ -1,13 +1,14 @@ use super::test_aggregate::{Account, AccountEvent}; use chrono::{DateTime, Utc}; use eventastic::aggregate::{Context, Root}; +use eventastic_outbox_postgres::TableOutbox; use eventastic_postgres::PostgresRepository; use sqlx::Row; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; use std::str::FromStr; use uuid::Uuid; -pub async fn get_repository() -> PostgresRepository { +pub async fn get_repository() -> PostgresRepository { let host = std::env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".to_string()); let connection_string = format!("postgres://postgres:password@{host}/postgres"); let connection_options = PgConnectOptions::from_str(connection_string.as_str()) @@ -15,7 +16,7 @@ pub async fn get_repository() -> PostgresRepository { let pool_options = PoolOptions::default(); - let repo = PostgresRepository::new(connection_options, pool_options) + let repo = PostgresRepository::new(connection_options, pool_options, TableOutbox) .await .expect("Failed to connect to postgres"); repo.run_migrations() diff --git a/eventastic_postgres/tests/common/test_aggregate.rs b/eventastic_postgres/tests/common/test_aggregate.rs index 606ed9f..da79c7f 100644 --- a/eventastic_postgres/tests/common/test_aggregate.rs +++ b/eventastic_postgres/tests/common/test_aggregate.rs @@ -1,6 +1,6 @@ use eventastic::aggregate::Aggregate; use eventastic::aggregate::SideEffect; -use eventastic::event::Event; +use eventastic::event::DomainEvent; use serde::Deserialize; use serde::Serialize; use thiserror::Error; @@ -32,7 +32,8 @@ pub enum AccountEvent { }, } -impl Event for AccountEvent { +impl DomainEvent for AccountEvent { + type EventId = Uuid; fn id(&self) -> &Uuid { match self { AccountEvent::Open { event_id, .. } @@ -67,9 +68,9 @@ pub enum SideEffects { impl SideEffect for SideEffects { /// The type used to uniquely identify this side effect. - type Id = Uuid; + type SideEffectId = Uuid; - fn id(&self) -> &Self::Id { + fn id(&self) -> &Self::SideEffectId { match self { SideEffects::PublishMessage { id, .. } | SideEffects::SendEmail { id, .. } => id, } @@ -89,9 +90,6 @@ impl Aggregate for Account { /// Usually, this type should be an `enum`. type DomainEvent = AccountEvent; - /// The type used to uniquely identify the a given domain event. - type DomainEventId = Uuid; - /// The error type that can be returned by [`Aggregate::apply`] when /// mutating the Aggregate state. type ApplyError = DomainError; diff --git a/examples/bank/Cargo.toml b/examples/bank/Cargo.toml index 3c4f26b..e64ff49 100644 --- a/examples/bank/Cargo.toml +++ b/examples/bank/Cargo.toml @@ -8,6 +8,7 @@ edition = "2024" [dependencies] eventastic = { path = "../../eventastic" } eventastic_postgres = { path = "../../eventastic_postgres" } +eventastic_outbox_postgres = { path = "../../eventastic_outbox_postgres" } thiserror = { workspace = true } uuid = { workspace = true } tokio = { workspace = true } diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 6ca2f89..a5b9325 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -6,9 +6,9 @@ use eventastic::aggregate::Context; use eventastic::aggregate::Root; use eventastic::aggregate::SaveError; use eventastic::aggregate::SideEffect; -use eventastic::event::Event; +use eventastic::event::DomainEvent; +use eventastic_outbox_postgres::{RepositoryOutboxExt, SideEffectHandler, TableOutbox}; use eventastic_postgres::PostgresRepository; - use eventastic_postgres::RootExt; use serde::Deserialize; use serde::Serialize; @@ -25,6 +25,16 @@ async fn main() -> Result<(), anyhow::Error> { repository.run_migrations().await?; + // Run our side effect handler in the background + tokio::spawn({ + let repo = repository.clone(); + async move { + let _ = repo + .start_outbox(SideEffectContext {}, std::time::Duration::from_secs(5)) + .await; + } + }); + // Start transaction let mut transaction = repository.begin_transaction().await?; @@ -162,7 +172,8 @@ pub enum AccountEvent { }, } -impl Event for AccountEvent { +impl DomainEvent for AccountEvent { + type EventId = Uuid; fn id(&self) -> &Uuid { match self { AccountEvent::Open { event_id, .. } @@ -197,15 +208,28 @@ pub enum SideEffects { impl SideEffect for SideEffects { /// The type used to uniquely identify this side effect. - type Id = Uuid; + type SideEffectId = Uuid; - fn id(&self) -> &Self::Id { + fn id(&self) -> &Self::SideEffectId { match self { SideEffects::PublishMessage { id, .. } | SideEffects::SendEmail { id, .. } => id, } } } +pub struct SideEffectContext; + +#[async_trait::async_trait] +impl SideEffectHandler for SideEffectContext { + type SideEffect = SideEffects; + type Error = (); + + async fn handle(&self, msg: &SideEffects, retries: u16) -> Result<(), (bool, Self::Error)> { + println!("handling side effect {:?} retries {}", msg, retries); + Ok(()) + } +} + // Implement the aggregate trait for our aggregate struct impl Aggregate for Account { /// The current version of the snapshot to store. @@ -219,9 +243,6 @@ impl Aggregate for Account { /// Usually, this type should be an `enum`. type DomainEvent = AccountEvent; - /// The type used to uniquely identify the a given domain event. - type DomainEventId = Uuid; - /// The error type that can be returned by [`Aggregate::apply`] when /// mutating the Aggregate state. type ApplyError = DomainError; @@ -298,13 +319,13 @@ impl Aggregate for Account { } } -async fn get_repository() -> PostgresRepository { +async fn get_repository() -> PostgresRepository { let connection_options = PgConnectOptions::from_str("postgres://postgres:password@localhost/postgres").unwrap(); let pool_options = PoolOptions::default(); - PostgresRepository::new(connection_options, pool_options) + PostgresRepository::new(connection_options, pool_options, TableOutbox) .await .unwrap() }