Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
[workspace]
resolver = "2"
members = ["eventastic", "eventastic_postgres", "examples/*"]
members = [
"eventastic",
"eventastic_outbox_postgres",
"eventastic_postgres",
"examples/*",
]

[workspace.package]
license = "MIT"
Expand Down
11 changes: 4 additions & 7 deletions eventastic/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! Aggregates should provide a way to **fold** Domain Events on the
//! current value of the state, to produce the next state.

use crate::event::Event;
use crate::event::DomainEvent;
use std::fmt::Debug;

mod root;
Expand Down Expand Up @@ -54,10 +54,7 @@ pub trait Aggregate: Sized + Clone {

/// The type of Domain Events that interest this Aggregate.
/// Usually, this type should be an `enum`.
type DomainEvent: Clone + Debug + Eq + PartialEq + Event<Self::DomainEventId>;

/// The type used to uniquely identify the given domain event.
type DomainEventId: Clone + Debug + Eq + PartialEq;
type DomainEvent: Clone + Debug + Eq + PartialEq + DomainEvent;

/// The error type that can be returned by [`Aggregate::apply`] when
/// mutating the Aggregate state.
Expand Down Expand Up @@ -93,8 +90,8 @@ pub trait Aggregate: Sized + Clone {

pub trait SideEffect {
/// The type used to uniquely identify this side effect.
type Id;
type SideEffectId;

/// Returns read access to the [`SideEffect::Id`]
fn id(&self) -> &Self::Id;
fn id(&self) -> &Self::SideEffectId;
}
21 changes: 13 additions & 8 deletions eventastic/src/aggregate/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::TryStreamExt;
use crate::repository::{RepositoryError, RepositoryTransaction, Snapshot};
use crate::{
aggregate::Aggregate,
event::{Event, EventStoreEvent},
event::{DomainEvent, EventStoreEvent},
};
use std::fmt::Debug;

Expand All @@ -17,7 +17,7 @@ where
{
aggregate: T,
version: u64,
uncommitted_events: Vec<EventStoreEvent<T::DomainEventId, T::DomainEvent>>,
uncommitted_events: Vec<EventStoreEvent<T::DomainEvent>>,
uncommitted_side_effects: Vec<T::SideEffect>,
}

Expand All @@ -43,9 +43,7 @@ where
/// Returns the list of uncommitted, recorded Domain [Events] from the [Context]
/// and resets the internal list to its default value.
#[doc(hidden)]
pub fn take_uncommitted_events(
&mut self,
) -> Vec<EventStoreEvent<T::DomainEventId, T::DomainEvent>> {
pub fn take_uncommitted_events(&mut self) -> Vec<EventStoreEvent<T::DomainEvent>> {
std::mem::take(&mut self.uncommitted_events)
}

Expand All @@ -65,7 +63,7 @@ where
/// given the current state of the Aggregate.
#[doc(hidden)]
pub fn rehydrate_from(
event: &EventStoreEvent<T::DomainEventId, T::DomainEvent>,
event: &EventStoreEvent<T::DomainEvent>,
) -> Result<Context<T>, T::ApplyError> {
Ok(Context {
version: event.version,
Expand All @@ -85,7 +83,7 @@ where
#[doc(hidden)]
pub fn apply_rehydrated_event(
mut self,
event: &EventStoreEvent<T::DomainEventId, T::DomainEvent>,
event: &EventStoreEvent<T::DomainEvent>,
) -> Result<Context<T>, T::ApplyError> {
self.version += 1;
debug_assert!(self.version == event.version);
Expand Down Expand Up @@ -220,7 +218,14 @@ where
pub async fn load<R>(
transaction: &mut R,
aggregate_id: &T::AggregateId,
) -> Result<Context<T>, RepositoryError<T::ApplyError, T::DomainEventId, R::DbError>>
) -> Result<
Context<T>,
RepositoryError<
T::ApplyError,
<<T as Aggregate>::DomainEvent as DomainEvent>::EventId,
R::DbError,
>,
>
where
R: RepositoryTransaction<T>,
{
Expand Down
39 changes: 23 additions & 16 deletions eventastic/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@

use std::fmt::Debug;

/// An [`Event`] that will be / has been persisted to the Event Store.
/// A [`DomainEvent`] that will be / has been persisted to the Event Store.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EventStoreEvent<Id, Evt>
pub struct EventStoreEvent<Evt>
where
Id: Debug,
Evt: Clone + Eq + PartialEq,
Evt: DomainEvent,
{
/// The id of the event
pub id: Id,
pub id: Evt::EventId,

// The version of the event
pub version: u64,
Expand All @@ -20,20 +19,28 @@ where
pub event: Evt,
}

/// A domain event.
pub trait Event<Id>
impl<Evt> EventStoreEvent<Evt>
where
Id: Debug,
Evt: DomainEvent,
{
fn id(&self) -> &Id;
}
/// Creates a new `EventStoreEvent`.
pub fn new(id: Evt::EventId, version: u64, event: Evt) -> Self {
Self { id, version, event }
}

impl<Id, Evt> Event<Id> for EventStoreEvent<Id, Evt>
where
Id: Debug,
Evt: Clone + Eq + PartialEq,
{
fn id(&self) -> &Id {
/// Returns the id of the event.
pub fn id(&self) -> &Evt::EventId {
&self.id
}

/// Returns the version of the event.
pub fn version(&self) -> u64 {
self.version
}
}

/// A domain event.
pub trait DomainEvent: Clone + Eq + PartialEq {
type EventId: Debug + Clone + Eq + PartialEq;
fn id(&self) -> &Self::EventId;
}
23 changes: 9 additions & 14 deletions eventastic/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use async_trait::async_trait;
use futures::Stream;
use std::fmt::Debug;

use crate::{aggregate::Aggregate, event::EventStoreEvent};
use crate::{
aggregate::Aggregate,
event::{DomainEvent, EventStoreEvent},
};

/// List of possible errors that can be returned by the [`RepositoryTransaction`] trait.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -50,23 +53,15 @@ pub trait RepositoryTransaction<T: Aggregate> {
&mut self,
id: &T::AggregateId,
version: u64,
) -> impl Stream<
Item = Result<
EventStoreEvent<T::DomainEventId, <T as Aggregate>::DomainEvent>,
Self::DbError,
>,
>;
) -> impl Stream<Item = Result<EventStoreEvent<T::DomainEvent>, Self::DbError>>;

// Get a specific event from the event store.
#[doc(hidden)]
async fn get_event(
&mut self,
aggregate_id: &T::AggregateId,
event_id: &T::DomainEventId,
) -> Result<
Option<EventStoreEvent<T::DomainEventId, <T as Aggregate>::DomainEvent>>,
Self::DbError,
>;
event_id: &<<T as Aggregate>::DomainEvent as DomainEvent>::EventId,
) -> Result<Option<EventStoreEvent<T::DomainEvent>>, Self::DbError>;

/// Retrieves the latest version of the Aggregate from the Event Store.
/// This method must check that the snapshot version is correct
Expand All @@ -83,8 +78,8 @@ pub trait RepositoryTransaction<T: Aggregate> {
async fn store_events(
&mut self,
id: &T::AggregateId,
events: Vec<EventStoreEvent<T::DomainEventId, T::DomainEvent>>,
) -> Result<Vec<T::DomainEventId>, Self::DbError>;
events: Vec<EventStoreEvent<T::DomainEvent>>,
) -> Result<Vec<<<T as Aggregate>::DomainEvent as DomainEvent>::EventId>, Self::DbError>;

#[doc(hidden)]
async fn store_snapshot(&mut self, snapshot: Snapshot<T>) -> Result<(), Self::DbError>;
Expand Down
16 changes: 16 additions & 0 deletions eventastic_outbox_postgres/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "eventastic_outbox_postgres"
version = "0.1.0"
edition = "2024"

[dependencies]
eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" }
async-trait = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
serde = { workspace = true }
eventastic = { path = "../eventastic", version = "0.5" }
futures-util = { workspace = true }
tokio = { workspace = true }
5 changes: 5 additions & 0 deletions eventastic_outbox_postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod outbox;
mod outbox_message;

pub use outbox::*;
pub use outbox_message::OutboxMessage;
Loading
Loading