diff --git a/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Events/AccountRegisteredEvent.cs b/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Events/AccountRegisteredEvent.cs index de2c900..41ccce0 100644 --- a/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Events/AccountRegisteredEvent.cs +++ b/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Events/AccountRegisteredEvent.cs @@ -7,13 +7,13 @@ namespace Softeq.NetKit.Components.EventBus.Service.Tests.Samples.Events { public class AccountRegisteredEvent : IntegrationEvent { - public string UserId { get; } - public string Email { get; } - public AccountRegisteredEvent(string userId, string email) { UserId = userId; Email = email; } + + public string UserId { get; } + public string Email { get; } } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Handlers/AccountRegisteredEventHandler.cs b/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Handlers/AccountRegisteredEventHandler.cs index 5a235f9..85a49ec 100644 --- a/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Handlers/AccountRegisteredEventHandler.cs +++ b/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Handlers/AccountRegisteredEventHandler.cs @@ -3,18 +3,19 @@ using System.Threading.Tasks; using Softeq.NetKit.Components.EventBus.Abstract; +using Softeq.NetKit.Components.EventBus.Events; using Softeq.NetKit.Components.EventBus.Service.Tests.Samples.Events; namespace Softeq.NetKit.Components.EventBus.Service.Tests.Samples.Handlers { - public class AccountRegisteredEventHandler : IEventHandler + public class AccountRegisteredEventHandler : IEventEnvelopeHandler { // Inject your app service - // TODO: Need to change this action according to requirments - public async Task Handle(AccountRegisteredEvent @event) + // TODO: Need to change this action according to requirements + public Task HandleAsync(IntegrationEventEnvelope eventEnvelope) { - await Task.CompletedTask; + throw new System.NotImplementedException(); } } } diff --git a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs index 798c74c..fbcbf61 100644 --- a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs +++ b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs @@ -6,7 +6,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; -using Newtonsoft.Json.Linq; using Softeq.NetKit.Components.EventBus.Abstract; using Softeq.NetKit.Components.EventBus.Events; using Softeq.NetKit.Components.EventBus.Managers; @@ -19,9 +18,8 @@ namespace Softeq.NetKit.Components.EventBus.Service { - public class EventBusService : IEventBusPublisher, IEventBusSubscriber + public class EventBusService : IEventBusSubscriber, IEventBusPublisher { - private readonly MessageQueueConfiguration _messageQueueConfiguration; private readonly IEventBusSubscriptionsManager _subscriptionsManager; private readonly IServiceProvider _serviceProvider; private readonly ServiceBusTopicConnection _topicConnection; @@ -29,7 +27,7 @@ public class EventBusService : IEventBusPublisher, IEventBusSubscriber private readonly ILogger _logger; private readonly EventPublishConfiguration _eventPublishConfiguration; - private bool IsSubscriptionAvailable => _topicConnection?.SubscriptionClient != null; + private bool IsTopicSubscriptionAvailable => _topicConnection?.SubscriptionClient != null; private bool IsQueueAvailable => _queueConnection != null; public EventBusService( @@ -37,71 +35,17 @@ public EventBusService( IEventBusSubscriptionsManager subscriptionsManager, IServiceProvider serviceProvider, ILoggerFactory loggerFactory, - MessageQueueConfiguration messageQueueConfiguration, EventPublishConfiguration eventPublishConfiguration) { _topicConnection = serviceBusPersisterConnection.TopicConnection; _queueConnection = serviceBusPersisterConnection.QueueConnection; _subscriptionsManager = subscriptionsManager; _serviceProvider = serviceProvider; - _messageQueueConfiguration = messageQueueConfiguration; _eventPublishConfiguration = eventPublishConfiguration; _logger = loggerFactory.CreateLogger(GetType()); } - public Task PublishToTopicAsync(IntegrationEvent @event, int? delayInSeconds = null) - { - ValidateTopic(); - return PublishEventAsync(@event, _topicConnection.TopicClient, delayInSeconds); - } - - public Task PublishToQueueAsync(IntegrationEvent @event, int? delayInSeconds = null) - { - ValidateQueue(); - return PublishEventAsync(@event, _queueConnection.QueueClient, delayInSeconds); - } - - public async Task SubscribeAsync() where TEvent : IntegrationEvent - where TEventHandler : IEventHandler - { - var eventName = typeof(TEvent).Name; - - var containsKey = _subscriptionsManager.HasSubscriptionsForEvent(); - if (!containsKey) - { - if (IsSubscriptionAvailable) - { - await AddSubscriptionRuleIfNotExists(eventName); - } - - _subscriptionsManager.AddSubscription(); - } - } - - public async Task UnsubscribeAsync() where TEvent : IntegrationEvent - where TEventHandler : IEventHandler - { - var eventName = typeof(TEvent).Name; - - if (IsSubscriptionAvailable) - { - await RemoveSubscriptionRule(eventName); - } - - _subscriptionsManager.RemoveSubscription(); - } - - public void SubscribeDynamic(string eventName) where TEventHandler : IDynamicEventHandler - { - _subscriptionsManager.AddDynamicSubscription(eventName); - } - - public void UnsubscribeDynamic(string eventName) where TEventHandler : IDynamicEventHandler - { - _subscriptionsManager.RemoveDynamicSubscription(eventName); - } - - public async Task RegisterSubscriptionListenerAsync() + public async Task RegisterTopicEventListenerAsync() { ValidateSubscription(); @@ -132,7 +76,7 @@ async Task RemoveDefaultRuleIfExists() } } - public void RegisterQueueListener(QueueListenerConfiguration configuration = null) + public void RegisterQueueEventListener(QueueListenerConfiguration configuration = null) { ValidateQueue(); @@ -160,80 +104,100 @@ await HandleReceivedMessage( MaxConcurrentCalls = configuration?.MaxConcurrent ?? 1, AutoComplete = false }; - _queueConnection.QueueClient.RegisterMessageHandler( async (message, token) => await HandleReceivedMessage( _queueConnection.QueueClient, - _queueConnection.QueueClient, + _topicConnection.TopicClient, message, token), handlerOptions); } } - private static Task PublishMessageAsync(Message message, ISenderClient client, int? delayInSeconds = null) - { - return delayInSeconds.HasValue - ? client.ScheduleMessageAsync(message, DateTime.UtcNow.AddSeconds(delayInSeconds.Value)) - : client.SendAsync(message); - } - - private Task PublishEventAsync(IntegrationEvent @event, ISenderClient client, int? delayInSeconds = null) - { - var message = GetMessageForPublish(@event); - - return PublishMessageAsync(message, client, delayInSeconds); - } - - private async Task CheckIfRuleExists(string ruleName) + public async Task RegisterEventAsync() where TEvent : IntegrationEvent { - try + var eventName = typeof(TEvent).Name; + if (IsTopicSubscriptionAvailable) { - var rules = await _topicConnection.SubscriptionClient.GetRulesAsync(); - - return rules != null - && rules.Any(rule => - string.Equals(rule.Name, ruleName, StringComparison.InvariantCultureIgnoreCase)); + await AddTopicSubscriptionRuleIfNotExists(); } - catch (ServiceBusException ex) + _subscriptionsManager.RegisterEventType(); + + async Task AddTopicSubscriptionRuleIfNotExists() { - throw new Exceptions.ServiceBusException( - $"Checking rule {ruleName} existence failed.", ex); + try + { + if (!await CheckIfRuleExists(eventName)) + { + await _topicConnection.SubscriptionClient.AddRuleAsync(new RuleDescription + { + Filter = new CorrelationFilter { Label = eventName }, + Name = eventName + }); + } + } + catch (ServiceBusException ex) + { + throw new Exceptions.ServiceBusException( + $"Adding subscription rule for the entity {eventName} failed.", ex); + } } } - private async Task AddSubscriptionRuleIfNotExists(string eventName) + public async Task RemoveEventRegistrationAsync() where TEvent : IntegrationEvent { - try + if (IsTopicSubscriptionAvailable) { - if (!await CheckIfRuleExists(eventName)) - { - await _topicConnection.SubscriptionClient.AddRuleAsync(new RuleDescription - { - Filter = new CorrelationFilter { Label = eventName }, - Name = eventName - }); - } + var eventName = typeof(TEvent).Name; + await RemoveTopicSubscriptionRule(eventName); } - catch (ServiceBusException ex) + _subscriptionsManager.RemoveEventType(); + + async Task RemoveTopicSubscriptionRule(string eventName) { - throw new Exceptions.ServiceBusException( - $"Adding subscription rule for the entity {eventName} failed.", ex); + try + { + await _topicConnection.SubscriptionClient.RemoveRuleAsync(eventName); + } + catch (MessagingEntityNotFoundException ex) + { + throw new Exceptions.ServiceBusException( + $"The messaging entity {eventName} could not be found.", ex); + } } } - private async Task RemoveSubscriptionRule(string eventName) + public Task PublishToTopicAsync(IntegrationEventEnvelope eventEnvelope) { - try + ValidateTopic(); + return PublishEventAsync(eventEnvelope, _topicConnection.TopicClient); + } + + public Task PublishToQueueAsync(IntegrationEventEnvelope eventEnvelope) + { + ValidateQueue(); + return PublishEventAsync(eventEnvelope, _queueConnection.QueueClient); + } + + private Task PublishEventAsync(IntegrationEventEnvelope eventEnvelope, ISenderClient client) + { + var eventType = eventEnvelope.Event.GetType().Name; + var eventEnvelopeJson = JsonConvert.SerializeObject(eventEnvelope); + var eventEnvelopeBytes = Encoding.UTF8.GetBytes(eventEnvelopeJson); + var message = new Message { - await _topicConnection.SubscriptionClient.RemoveRuleAsync(eventName); - } - catch (MessagingEntityNotFoundException ex) + MessageId = Guid.NewGuid().ToString(), + Body = eventEnvelopeBytes, + Label = eventType, + CorrelationId = eventEnvelope.CorrelationId, + SessionId = eventEnvelope.SequenceId + }; + if (_eventPublishConfiguration.EventTimeToLive.HasValue) { - throw new Exceptions.ServiceBusException( - $"The messaging entity {eventName} could not be found.", ex); + message.TimeToLive = _eventPublishConfiguration.EventTimeToLive.Value; } + return client.SendAsync(message); } private async Task HandleReceivedMessage( @@ -243,59 +207,41 @@ private async Task HandleReceivedMessage( CancellationToken token) { var eventName = message.Label; - var messageData = Encoding.UTF8.GetString(message.Body); - await ProcessEvent(eventName, messageData); - - // Complete the message so that it is not received again. - await receiverClient.CompleteAsync(message.SystemProperties.LockToken); - - if (!_eventPublishConfiguration.SendCompletionEvent) + if (!_subscriptionsManager.IsEventRegistered(eventName)) { + // Skip processing if event type is not registered return; } var eventType = _subscriptionsManager.GetEventTypeByName(eventName); - if (eventType == null || eventType == typeof(CompletedEvent)) + var envelopeBody = Encoding.UTF8.GetString(message.Body); + var eventEnvelopeType = typeof(IntegrationEventEnvelope<>).MakeGenericType(eventType); + var eventEnvelope = (dynamic)JsonConvert.DeserializeObject(envelopeBody, eventEnvelopeType); + if (eventEnvelope == null) { - return; + throw new InvalidOperationException( + $"Failed to parse received message '{eventName}'. Raw body: '{envelopeBody}'."); } - var eventData = JObject.Parse(messageData); - if (Guid.TryParse((string)eventData["Id"], out var eventId)) + await ProcessEventEnvelopeAsync(eventEnvelope); + await receiverClient.CompleteAsync(message.SystemProperties.LockToken); + if (_eventPublishConfiguration.SendCompletionEvent && eventType != typeof(CompletedEvent)) { - var publisherId = (string)eventData["PublisherId"]; - var completedEvent = new CompletedEvent(eventId, publisherId); - await PublishEventAsync(completedEvent, senderClient); + var completedEvent = new CompletedEvent(eventEnvelope.Id, eventEnvelope.PublisherId); + var completedEventEnvelope = new IntegrationEventEnvelope( + completedEvent, _eventPublishConfiguration.EventPublisherId); + await PublishEventAsync(completedEventEnvelope, senderClient); } } - private async Task ProcessEvent(string eventName, string message) + private async Task ProcessEventEnvelopeAsync(IntegrationEventEnvelope eventEnvelope) + where TEvent : IntegrationEvent { - if (!_subscriptionsManager.HasSubscriptionsForEvent(eventName)) - { - return; - } - + var handlerType = typeof(IEventEnvelopeHandler<>).MakeGenericType(typeof(TEvent)); using (var scope = _serviceProvider.CreateScope()) { - var subscriptions = _subscriptionsManager.GetEventHandlers(eventName); - foreach (var subscription in subscriptions) - { - var handler = scope.ServiceProvider.GetService(subscription.HandlerType); - - if (subscription.IsDynamic && handler is IDynamicEventHandler eventHandler) - { - dynamic eventData = JObject.Parse(message); - await eventHandler.Handle(eventData); - } - else if (handler != null) - { - var eventType = _subscriptionsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonConvert.DeserializeObject(message, eventType); - var concreteType = typeof(IEventHandler<>).MakeGenericType(eventType); - await (Task)concreteType.GetMethod(nameof(IEventHandler.Handle)).Invoke(handler, new[] { integrationEvent }); - } - } + var handler = scope.ServiceProvider.GetRequiredService(handlerType); + await ((dynamic)handler).HandleAsync(eventEnvelope); } } @@ -311,30 +257,6 @@ private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceiv return Task.CompletedTask; } - private Message GetMessageForPublish(IntegrationEvent @event) - { - @event.PublisherId = _eventPublishConfiguration.EventPublisherId; - var eventName = @event.GetType().Name; - var jsonMessage = JsonConvert.SerializeObject(@event); - var body = Encoding.UTF8.GetBytes(jsonMessage); - - var message = new Message - { - MessageId = Guid.NewGuid().ToString(), - Body = body, - Label = eventName, - CorrelationId = @event.CorrelationId, - SessionId = @event.SessionId - }; - - if (_messageQueueConfiguration.TimeToLiveInMinutes.HasValue) - { - message.TimeToLive = TimeSpan.FromMinutes(_messageQueueConfiguration.TimeToLiveInMinutes.Value); - } - - return message; - } - private void ValidateTopic() { if (_topicConnection?.TopicClient == null) @@ -345,7 +267,7 @@ private void ValidateTopic() private void ValidateSubscription() { - if (!IsSubscriptionAvailable) + if (!IsTopicSubscriptionAvailable) { throw new InvalidOperationException("Topic Subscription connection is not configured"); } @@ -358,5 +280,22 @@ private void ValidateQueue() throw new InvalidOperationException("Queue connection is not configured"); } } + + private async Task CheckIfRuleExists(string ruleName) + { + try + { + var rules = await _topicConnection.SubscriptionClient.GetRulesAsync(); + + return rules != null + && rules.Any(rule => + string.Equals(rule.Name, ruleName, StringComparison.InvariantCultureIgnoreCase)); + } + catch (ServiceBusException ex) + { + throw new Exceptions.ServiceBusException( + $"Checking rule {ruleName} existence failed.", ex); + } + } } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IDynamicEventHandler.cs b/Softeq.NetKit.Components.EventBus/Abstract/IDynamicEventHandler.cs deleted file mode 100644 index 98b84fc..0000000 --- a/Softeq.NetKit.Components.EventBus/Abstract/IDynamicEventHandler.cs +++ /dev/null @@ -1,12 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -using System.Threading.Tasks; - -namespace Softeq.NetKit.Components.EventBus.Abstract -{ - public interface IDynamicEventHandler - { - Task Handle(dynamic eventData); - } -} diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs index 81bb6fc..edd03eb 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs @@ -8,7 +8,7 @@ namespace Softeq.NetKit.Components.EventBus.Abstract { public interface IEventBusPublisher { - Task PublishToTopicAsync(IntegrationEvent @event, int? delayInSeconds = null); - Task PublishToQueueAsync(IntegrationEvent @event, int? delayInSeconds = null); + Task PublishToTopicAsync(IntegrationEventEnvelope eventEnvelope); + Task PublishToQueueAsync(IntegrationEventEnvelope eventEnvelope); } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs index 07836a1..754ed22 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs @@ -8,22 +8,9 @@ namespace Softeq.NetKit.Components.EventBus.Abstract { public interface IEventBusSubscriber { - void RegisterQueueListener(QueueListenerConfiguration configuration = null); - - Task RegisterSubscriptionListenerAsync(); - - Task SubscribeAsync() - where TEvent : IntegrationEvent - where TEventHandler : IEventHandler; - - Task UnsubscribeAsync() - where TEvent : IntegrationEvent - where TEventHandler : IEventHandler; - - void SubscribeDynamic(string eventName) - where TEventHandler : IDynamicEventHandler; - - void UnsubscribeDynamic(string eventName) - where TEventHandler : IDynamicEventHandler; + Task RegisterTopicEventListenerAsync(); + void RegisterQueueEventListener(QueueListenerConfiguration configuration = null); + Task RegisterEventAsync() where TEvent : IntegrationEvent; + Task RemoveEventRegistrationAsync() where TEvent : IntegrationEvent; } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventHandler.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventEnvelopeHandler.cs similarity index 58% rename from Softeq.NetKit.Components.EventBus/Abstract/IEventHandler.cs rename to Softeq.NetKit.Components.EventBus/Abstract/IEventEnvelopeHandler.cs index a41ee8d..9904adf 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventHandler.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventEnvelopeHandler.cs @@ -6,8 +6,8 @@ namespace Softeq.NetKit.Components.EventBus.Abstract { - public interface IEventHandler where TIntegrationEvent : IntegrationEvent + public interface IEventEnvelopeHandler where TEvent : IntegrationEvent { - Task Handle(TIntegrationEvent @event); + Task HandleAsync(IntegrationEventEnvelope eventEnvelope); } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs b/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs index ec813b2..0591ab8 100644 --- a/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs +++ b/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs @@ -1,17 +1,24 @@ // Developed by Softeq Development Corporation // http://www.softeq.com +using System; + namespace Softeq.NetKit.Components.EventBus { public class EventPublishConfiguration { - public EventPublishConfiguration(string eventPublisherId) + public EventPublishConfiguration( + string eventPublisherId, + bool sendCompletionEvent = true, + TimeSpan? eventTimeToLive = null) { EventPublisherId = eventPublisherId; - SendCompletionEvent = true; + SendCompletionEvent = sendCompletionEvent; + EventTimeToLive = eventTimeToLive; } public string EventPublisherId { get; } - public bool SendCompletionEvent { get; set; } + public bool SendCompletionEvent { get; } + public TimeSpan? EventTimeToLive { get; } } } diff --git a/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs b/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs deleted file mode 100644 index 1508169..0000000 --- a/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs +++ /dev/null @@ -1,29 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -using System; - -namespace Softeq.NetKit.Components.EventBus -{ - public class EventSubscriptionInfo - { - public bool IsDynamic { get; } - public Type HandlerType { get; } - - private EventSubscriptionInfo(bool isDynamic, Type handlerType) - { - IsDynamic = isDynamic; - HandlerType = handlerType; - } - - public static EventSubscriptionInfo Dynamic(Type handlerType) - { - return new EventSubscriptionInfo(true, handlerType); - } - - public static EventSubscriptionInfo Typed(Type handlerType) - { - return new EventSubscriptionInfo(false, handlerType); - } - } -} diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs index 6e16add..541c867 100644 --- a/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs @@ -1,25 +1,9 @@ // Developed by Softeq Development Corporation // http://www.softeq.com -using System; -using Newtonsoft.Json; - namespace Softeq.NetKit.Components.EventBus.Events { public abstract class IntegrationEvent { - protected IntegrationEvent() - { - Id = Guid.NewGuid(); - CreationDate = DateTimeOffset.UtcNow; - } - - [JsonProperty] - public Guid Id { get; private set; } //Do not remove 'private set' so compiler won't drop backing field setter - [JsonProperty] - public DateTimeOffset CreationDate { get; private set; } //Do not remove 'private set' so compiler won't drop backing field setter - public string PublisherId { get; set; } - public string CorrelationId { get; set; } - public string SessionId { get; set; } } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs new file mode 100644 index 0000000..03d20e7 --- /dev/null +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs @@ -0,0 +1,42 @@ +// Developed by Softeq Development Corporation +// http://www.softeq.com + +using System; + +namespace Softeq.NetKit.Components.EventBus.Events +{ + public sealed class IntegrationEventEnvelope + { + public IntegrationEventEnvelope( + IntegrationEvent @event, + string publisherId, + string sequenceId = null, + string correlationId = null) + : this(Guid.NewGuid(), @event, publisherId, sequenceId, correlationId) + { + } + + private IntegrationEventEnvelope( + Guid id, + IntegrationEvent @event, + string publisherId, + string sequenceId = null, + string correlationId = null) + { + Id = id; + Event = @event ?? throw new ArgumentNullException(nameof(@event)); + PublisherId = publisherId ?? throw new ArgumentNullException(nameof(publisherId)); + Created = DateTimeOffset.UtcNow; + SequenceId = sequenceId; + CorrelationId = correlationId; + } + + public Guid Id { get; private set; } + public DateTimeOffset Created { get; private set; } + public string PublisherId { get; private set; } + public string SequenceId { get; private set; } + public string CorrelationId { get; private set; } + public IntegrationEvent Event { get; private set; } + public bool IsSequential => !string.IsNullOrEmpty(SequenceId); + } +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope`1.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope`1.cs new file mode 100644 index 0000000..67654b7 --- /dev/null +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope`1.cs @@ -0,0 +1,50 @@ +// Developed by Softeq Development Corporation +// http://www.softeq.com + +using Newtonsoft.Json; +using System; + +namespace Softeq.NetKit.Components.EventBus.Events +{ + public class IntegrationEventEnvelope where TEvent : IntegrationEvent + { + [JsonConstructor] + private IntegrationEventEnvelope( + Guid id, + DateTimeOffset created, + string publisherId, + string sequenceId, + string correlationId, + TEvent @event) + { + Id = id; + Created = created; + PublisherId = publisherId; + SequenceId = sequenceId; + CorrelationId = correlationId; + Event = @event; + } + + public IntegrationEventEnvelope(IntegrationEventEnvelope eventEnvelope) + { + if (eventEnvelope == null) + { + throw new ArgumentNullException(nameof(eventEnvelope)); + } + + Event = eventEnvelope.Event as TEvent ?? throw new ArgumentException(nameof(eventEnvelope.Event)); + Id = eventEnvelope.Id; + Created = eventEnvelope.Created; + PublisherId = eventEnvelope.PublisherId; + SequenceId = eventEnvelope.SequenceId; + CorrelationId = eventEnvelope.CorrelationId; + } + + public Guid Id { get; private set; } + public DateTimeOffset Created { get; private set; } + public string PublisherId { get; private set; } + public string SequenceId { get; private set; } + public string CorrelationId { get; private set; } + public TEvent Event { get; } + } +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs b/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs index 8d05233..2aa6f56 100644 --- a/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs +++ b/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs @@ -4,140 +4,50 @@ using System; using System.Collections.Generic; using System.Linq; -using Softeq.NetKit.Components.EventBus.Abstract; using Softeq.NetKit.Components.EventBus.Events; namespace Softeq.NetKit.Components.EventBus.Managers { public class EventBusSubscriptionsManager : IEventBusSubscriptionsManager { - private readonly Dictionary> _handlers; - private readonly List _eventTypes; + private readonly HashSet _eventTypes = new HashSet(); - public event EventHandler OnEventRemoved; - - public EventBusSubscriptionsManager() - { - _handlers = new Dictionary>(); - _eventTypes = new List(); - } - - public bool IsEmpty => !_handlers.Keys.Any(); - public void Clear() => _handlers.Clear(); - - public void AddDynamicSubscription(string eventName) where TEventHandler : IDynamicEventHandler - { - DoAddSubscription(typeof(TEventHandler), eventName, true); - } - - public void AddSubscription() where TEvent : IntegrationEvent where TEventHandler : IEventHandler - { - var eventName = GetEventKey(); - DoAddSubscription(typeof(TEventHandler), eventName, false); - _eventTypes.Add(typeof(TEvent)); - } - - public void RemoveSubscription() where TEvent : IntegrationEvent where TEventHandler : IEventHandler - { - var handlerToRemove = FindSubscriptionToRemove(); - var eventName = GetEventKey(); - DoRemoveSubscription(eventName, handlerToRemove); - } - - public void RemoveDynamicSubscription(string eventName) where TEventHandler : IDynamicEventHandler - { - var handlerToRemove = FindDynamicSubscriptionToRemove(eventName); - DoRemoveSubscription(eventName, handlerToRemove); - } - - private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic) - { - if (!HasSubscriptionsForEvent(eventName)) - { - _handlers.Add(eventName, new List()); - } - - if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) - { - throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType)); - } - - _handlers[eventName].Add(isDynamic ? EventSubscriptionInfo.Dynamic(handlerType) : EventSubscriptionInfo.Typed(handlerType)); - } - - private void DoRemoveSubscription(string eventName, EventSubscriptionInfo subsToRemove) + public void RegisterEventType() where TEvent : IntegrationEvent { - if (subsToRemove != null) + var eventType = typeof(TEvent); + if (_eventTypes.Contains(eventType)) { - _handlers[eventName].Remove(subsToRemove); - if (!_handlers[eventName].Any()) - { - _handlers.Remove(eventName); - var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); - if (eventType != null) - { - _eventTypes.Remove(eventType); - } - RaiseOnEventRemoved(eventName); - } + throw new ArgumentException($"Event {GetEventName()} is already registered."); } + _eventTypes.Add(typeof(TEvent)); } - private void RaiseOnEventRemoved(string eventName) + public void RemoveEventType() where TEvent : IntegrationEvent { - var handler = OnEventRemoved; - if (handler != null) + var eventType = typeof(TEvent); + if (!_eventTypes.Contains(eventType)) { - OnEventRemoved?.Invoke(this, eventName); + throw new ArgumentException($"Event {GetEventName()} is not registered."); } + _eventTypes.Remove(typeof(TEvent)); } - private EventSubscriptionInfo FindDynamicSubscriptionToRemove(string eventName) - where TEventHandler : IDynamicEventHandler - { - return DoFindSubscriptionToRemove(eventName, typeof(TEventHandler)); - } - - private EventSubscriptionInfo FindSubscriptionToRemove() - where TEvent : IntegrationEvent - where TEventHandler : IEventHandler - { - var eventName = GetEventKey(); - return DoFindSubscriptionToRemove(eventName, typeof(TEventHandler)); - } - - private EventSubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType) - { - return HasSubscriptionsForEvent(eventName) - ? _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType) - : null; - } - public bool HasSubscriptionsForEvent() where TEvent : IntegrationEvent + public bool IsEventRegistered(Type eventType) { - var key = GetEventKey(); - return HasSubscriptionsForEvent(key); + return _eventTypes.Any(x => x == eventType); } - public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); - - public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(x => x.Name == eventName); - - public IEnumerable GetEventHandlers() where TEvent : IntegrationEvent + public bool IsEventRegistered(string eventName) { - var key = GetEventKey(); - return GetEventHandlers(key); + return _eventTypes.Any(x => x.Name == eventName); } - public IEnumerable GetEventHandlers(string eventName) + public Type GetEventTypeByName(string eventName) { - return _handlers.TryGetValue(eventName, out var subscriptions) - ? (IEnumerable) subscriptions - : Array.Empty(); + var eventType = _eventTypes.SingleOrDefault(x => x.Name == eventName); + return eventType ?? throw new ArgumentException($"Event '{eventName}' is not registered."); } - public string GetEventKey() - { - return typeof(TEvent).Name; - } + private static string GetEventName() => typeof(TEvent).Name; } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs b/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs index 06ad365..de6f8f4 100644 --- a/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs +++ b/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs @@ -2,44 +2,16 @@ // http://www.softeq.com using System; -using System.Collections.Generic; -using Softeq.NetKit.Components.EventBus.Abstract; using Softeq.NetKit.Components.EventBus.Events; namespace Softeq.NetKit.Components.EventBus.Managers { public interface IEventBusSubscriptionsManager { - bool IsEmpty { get; } - - event EventHandler OnEventRemoved; - - void AddDynamicSubscription(string eventName) - where TEventHandler : IDynamicEventHandler; - - void AddSubscription() - where TEvent : IntegrationEvent - where TEventHandler : IEventHandler; - - void RemoveSubscription() - where TEventHandler : IEventHandler - where TEvent : IntegrationEvent; - - void RemoveDynamicSubscription(string eventName) - where TEventHandler : IDynamicEventHandler; - - bool HasSubscriptionsForEvent() where T : IntegrationEvent; - - bool HasSubscriptionsForEvent(string eventName); - + void RegisterEventType() where TEvent : IntegrationEvent; + void RemoveEventType() where TEvent : IntegrationEvent; + bool IsEventRegistered(Type eventType); + bool IsEventRegistered(string eventName); Type GetEventTypeByName(string eventName); - - void Clear(); - - IEnumerable GetEventHandlers() where T : IntegrationEvent; - - IEnumerable GetEventHandlers(string eventName); - - string GetEventKey(); } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/MessageQueueConfiguration.cs b/Softeq.NetKit.Components.EventBus/MessageQueueConfiguration.cs deleted file mode 100644 index a4835a1..0000000 --- a/Softeq.NetKit.Components.EventBus/MessageQueueConfiguration.cs +++ /dev/null @@ -1,10 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -namespace Softeq.NetKit.Components.EventBus -{ - public class MessageQueueConfiguration - { - public int? TimeToLiveInMinutes { get; set; } - } -} diff --git a/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs index 40b92f6..2fbd8f8 100644 --- a/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs @@ -1,7 +1,6 @@ // Developed by Softeq Development Corporation // http://www.softeq.com -using Softeq.NetKit.Components.EventBus.Events; using System; using System.Collections.Generic; using System.Linq.Expressions; @@ -11,11 +10,12 @@ namespace Softeq.NetKit.Integrations.EventLog.Abstract { public interface IIntegrationEventLogService { - Task GetAsync(Guid eventId); + Task GetAsync(Guid eventEnvelopeId); Task> GetAsync(Expression> condition); - Task CreateAsync(IntegrationEvent @event); - Task MarkAsPublishedAsync(IntegrationEvent @event); - Task MarkAsPublishedFailedAsync(IntegrationEvent @event); - Task MarkAsCompletedAsync(Guid eventId); + Task AnyAsync(Expression> condition); + Task CreateAsync(IntegrationEventLog eventLog); + Task MarkAsPublishedAsync(IntegrationEventLog eventLog); + Task MarkAsPublishFailedAsync(IntegrationEventLog eventLog); + Task MarkAsCompletedAsync(IntegrationEventLog eventLog); } } \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/EventState.cs b/Softeq.NetKit.Integrations.EventLog/EventState.cs index 9ffd1f0..b06600a 100644 --- a/Softeq.NetKit.Integrations.EventLog/EventState.cs +++ b/Softeq.NetKit.Integrations.EventLog/EventState.cs @@ -7,7 +7,7 @@ public enum EventState { NotPublished = 0, Published = 1, - PublishedFailed = 2, + PublishFailed = 2, Completed = 3 } } diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs index 4d3a624..805ffdf 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs @@ -9,47 +9,35 @@ namespace Softeq.NetKit.Integrations.EventLog { public class IntegrationEventLog { - private IntegrationEventLog() - { - } + private IntegrationEventLog() { } - public IntegrationEventLog(IntegrationEvent @event) + public IntegrationEventLog(IntegrationEventEnvelope eventEnvelope) { - if (@event == null) - { - throw new ArgumentNullException(nameof(@event)); - } - - EventId = @event.Id; - Created = @event.CreationDate; - EventTypeName = @event.GetType().FullName; + EventLogId = Guid.NewGuid(); + EventEnvelope = eventEnvelope ?? throw new ArgumentNullException(nameof(eventEnvelope)); EventState = EventState.NotPublished; - SessionId = @event.SessionId; - Content = @event; + TimesSent = 0; } - public Guid EventId { get; private set; } - public string EventTypeName { get; private set; } + public Guid EventLogId { get; private set; } public EventState EventState { get; private set; } public int TimesSent { get; private set; } - public DateTimeOffset Created { get; private set; } public DateTimeOffset? Updated { get; private set; } - public string SessionId { get; private set; } - public IntegrationEvent Content { get; private set; } + public IntegrationEventEnvelope EventEnvelope { get; private set; } public void ChangeEventState(EventState newEventState) { switch (newEventState) { case EventState.Published: - EnsureStateTransitionAllowed(EventState.NotPublished, EventState.PublishedFailed); + EnsureStateTransitionAllowed(EventState.NotPublished, EventState.Published, EventState.PublishFailed); TimesSent++; break; - case EventState.PublishedFailed: - EnsureStateTransitionAllowed(EventState.Published); + case EventState.PublishFailed: + EnsureStateTransitionAllowed(EventState.Published, EventState.PublishFailed); break; case EventState.Completed: - EnsureStateTransitionAllowed(EventState.Published, EventState.PublishedFailed); + EnsureStateTransitionAllowed(EventState.Published, EventState.PublishFailed); break; case EventState.NotPublished: default: @@ -77,4 +65,4 @@ void EnsureStateTransitionAllowed(params EventState[] allowedFromStates) } } } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogContextFactory.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogContextFactory.cs index c54f1f1..f68e2a9 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogContextFactory.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogContextFactory.cs @@ -10,10 +10,11 @@ namespace Softeq.NetKit.Integrations.EventLog /// This factory is needed to create db migrations in class library via Package Manager Console /// https://docs.microsoft.com/en-us/ef/core/cli/dbcontext-creation?tabs=dotnet-core-cli#from-a-design-time-factory /// To use this factory - /// 1. Set current project as startup project - /// 2. Edit project file: change the project TargetFramework to, for example, netcoreapp3.1 - /// 3. Open Package Manager Console. Set current project as the default project - /// 4. Create migration by running command: add-migration migration_name -context IntegrationEventLogContext + /// 1. Build solution + /// 2. Set the current project as startup project + /// 3. Edit the project file: change the project TargetFramework to, for example, netcoreapp3.1 + /// 4. Open Package Manager Console. Set current project as the default project + /// 5. Create migration by running command: add-migration migration-name -context IntegrationEventLogContext /// public class IntegrationEventLogContextFactory : IDesignTimeDbContextFactory { diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs index 2f74319..04422f9 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs @@ -2,8 +2,6 @@ // http://www.softeq.com using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging; -using Softeq.NetKit.Components.EventBus.Events; using Softeq.NetKit.Integrations.EventLog.Abstract; using Softeq.NetKit.Integrations.EventLog.Exceptions; using System; @@ -20,94 +18,72 @@ public class IntegrationEventLogService : IIntegrationEventLogService public IntegrationEventLogService(IntegrationEventLogContext eventLogContext) { - EventLogContext = eventLogContext; + EventLogContext = eventLogContext ?? throw new ArgumentNullException(nameof(eventLogContext)); } - public async Task GetAsync(Guid eventId) + public Task CreateAsync(IntegrationEventLog eventLog) { - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.EventId == eventId); + EventLogContext.IntegrationEventLogs.Add(eventLog); + return EventLogContext.SaveChangesAsync(); + } + + public async Task GetAsync(Guid eventEnvelopeId) + { + var eventLog = await EventLogContext.IntegrationEventLogs + .FirstOrDefaultAsync(log => log.EventEnvelope.Id == eventEnvelopeId); if (eventLog == null) { - throw new EventLogNotFoundException(eventId); + throw new EventLogNotFoundException(eventEnvelopeId); } return eventLog; } - public async Task> GetAsync(Expression> condition) + public Task> GetAsync(Expression> condition) { if (condition == null) { throw new ArgumentNullException(nameof(condition)); } - return await EventLogContext.IntegrationEventLogs.Where(condition).ToListAsync(); + return EventLogContext.IntegrationEventLogs.Where(condition).ToListAsync(); } - public Task CreateAsync(IntegrationEvent @event) + public Task AnyAsync(Expression> condition) { - var eventLog = new IntegrationEventLog(@event); - EventLogContext.IntegrationEventLogs.Add(eventLog); - return EventLogContext.SaveChangesAsync(); - } - - public async Task MarkAsPublishedAsync(IntegrationEvent @event) - { - if (@event == null) - { - throw new ArgumentNullException(nameof(@event)); - } - - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.EventId == @event.Id); - if (eventLog == null) + if (condition == null) { - throw new EventLogNotFoundException(@event.Id); + throw new ArgumentNullException(nameof(condition)); } - // Published event has PublisherId so need to update it also - eventLog.Content.PublisherId = @event.PublisherId; - eventLog.ChangeEventState(EventState.Published); - await UpdateAsync(eventLog); + return EventLogContext.IntegrationEventLogs.AnyAsync(condition); } - public async Task MarkAsPublishedFailedAsync(IntegrationEvent @event) + public Task MarkAsPublishedAsync(IntegrationEventLog eventLog) { - if (@event == null) - { - throw new ArgumentNullException(nameof(@event)); - } - - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.EventId == @event.Id); - if (eventLog == null) - { - throw new EventLogNotFoundException(@event.Id); - } - - eventLog.ChangeEventState(EventState.PublishedFailed); - await UpdateAsync(eventLog); + return UpdateEventStateAsync(eventLog, EventState.Published); } - public async Task MarkAsCompletedAsync(Guid eventId) + public Task MarkAsPublishFailedAsync(IntegrationEventLog eventLog) { - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.EventId == eventId); - if (eventLog == null) - { - throw new EventLogNotFoundException(eventId); - } + return UpdateEventStateAsync(eventLog, EventState.PublishFailed); + } - eventLog.ChangeEventState(EventState.Completed); - await UpdateAsync(eventLog); + public Task MarkAsCompletedAsync(IntegrationEventLog eventLog) + { + return UpdateEventStateAsync(eventLog, EventState.Completed); } - private async Task UpdateAsync(IntegrationEventLog @event) + private Task UpdateEventStateAsync(IntegrationEventLog eventLog, EventState newState) { - if (@event == null) + if (eventLog == null) { - throw new ArgumentNullException(nameof(@event)); + throw new ArgumentNullException(nameof(eventLog)); } - EventLogContext.IntegrationEventLogs.Update(@event); - await EventLogContext.SaveChangesAsync(); + eventLog.ChangeEventState(newState); + EventLogContext.IntegrationEventLogs.Update(eventLog); + return EventLogContext.SaveChangesAsync(); } } } \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMapping.cs b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMapping.cs deleted file mode 100644 index c606634..0000000 --- a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMapping.cs +++ /dev/null @@ -1,45 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -using Microsoft.EntityFrameworkCore.Metadata.Builders; -using Newtonsoft.Json; -using Softeq.NetKit.Components.EventBus.Events; -using Softeq.NetKit.Integrations.EventLog.Abstract; -using Softeq.NetKit.Integrations.EventLog.Utility; - -namespace Softeq.NetKit.Integrations.EventLog.Mappings -{ - internal class IntegrationEventLogMapping : DomainModelBuilder, IEntityMappingConfiguration - { - public override void Build(EntityTypeBuilder builder) - { - builder.HasKey(eventLog => eventLog.EventId); - builder.Property(eventLog => eventLog.EventId).IsRequired(); - - builder.Property(eventLog => eventLog.Created).IsRequired(); - builder.HasIndex(eventLog => eventLog.Created).IsUnique(false); - - var serializerSettings = new JsonSerializerSettings - { - TypeNameHandling = TypeNameHandling.All, - ContractResolver = new PrivateFieldContractResolver() - }; - // TODO: Get rid of EventLog properties serialization within Content object - builder.Property(eventLog => eventLog.Content) - .HasConversion( - integrationEvent => JsonConvert.SerializeObject(integrationEvent, serializerSettings), - json => (IntegrationEvent)JsonConvert.DeserializeObject(json, serializerSettings)) - .IsRequired(); - - builder.Property(eventLog => eventLog.EventState).HasConversion().IsRequired(); - builder.HasIndex(eventLog => eventLog.EventState).IsUnique(false); - - builder.Property(eventLog => eventLog.EventTypeName).IsRequired(); - - builder.Property(eventLog => eventLog.TimesSent).IsRequired(); - - builder.Property(eventLog => eventLog.SessionId).IsRequired(false); - builder.HasIndex(eventLog => eventLog.SessionId).IsUnique(false); - } - } -} \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs new file mode 100644 index 0000000..ca868f2 --- /dev/null +++ b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs @@ -0,0 +1,44 @@ +// Developed by Softeq Development Corporation +// http://www.softeq.com + +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using Newtonsoft.Json; +using Softeq.NetKit.Components.EventBus.Events; +using Softeq.NetKit.Integrations.EventLog.Abstract; +using Softeq.NetKit.Integrations.EventLog.Utility; + +namespace Softeq.NetKit.Integrations.EventLog.Mappings +{ + internal class IntegrationEventLogMappingConfiguration : DomainModelBuilder, IEntityMappingConfiguration + { + private static readonly JsonSerializerSettings EventSerializerSettings = new JsonSerializerSettings + { + TypeNameHandling = TypeNameHandling.Objects, + ContractResolver = new PrivateFieldContractResolver() + }; + + public override void Build(EntityTypeBuilder builder) + { + builder.HasKey(eventLog => eventLog.EventLogId); + builder.Property(eventLog => eventLog.EventState).HasConversion().IsRequired(); + builder.HasIndex(eventLog => eventLog.EventState).IsUnique(false); + builder.Property(eventLog => eventLog.TimesSent).IsRequired(); + builder.OwnsOne(entity => entity.EventEnvelope, ownershipBuilder => + { + ownershipBuilder.HasIndex(x => x.Id).IsUnique(); + ownershipBuilder.Property(x => x.Created).IsRequired(); + ownershipBuilder.HasIndex(x => x.Created).IsUnique(false); + ownershipBuilder.Property(x => x.PublisherId).IsRequired(); + ownershipBuilder.HasIndex(x => x.PublisherId).IsUnique(false); + ownershipBuilder.Property(x => x.SequenceId).IsRequired(false); + ownershipBuilder.HasIndex(x => x.SequenceId).IsUnique(false); + ownershipBuilder.Property(x => x.CorrelationId).IsRequired(false); + ownershipBuilder.Property(x => x.Event) + .HasConversion( + @event => JsonConvert.SerializeObject(@event, EventSerializerSettings), + json => (IntegrationEvent)JsonConvert.DeserializeObject(json, EventSerializerSettings)) + .IsRequired(); + }); + } + } +} \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.Designer.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.Designer.cs new file mode 100644 index 0000000..3c10ec4 --- /dev/null +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.Designer.cs @@ -0,0 +1,85 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Softeq.NetKit.Integrations.EventLog; + +namespace Softeq.NetKit.Integrations.EventLog.Migrations +{ + [DbContext(typeof(IntegrationEventLogContext))] + [Migration("20231205132729_UpdateIntegrationEventLogStructureV2")] + partial class UpdateIntegrationEventLogStructureV2 + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("dbo") + .HasAnnotation("ProductVersion", "2.2.6-servicing-10079") + .HasAnnotation("Relational:MaxIdentifierLength", 128) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => + { + b.Property("EventLogId") + .ValueGeneratedOnAdd(); + + b.Property("EventState"); + + b.Property("TimesSent"); + + b.Property("Updated"); + + b.HasKey("EventLogId"); + + b.HasIndex("EventState"); + + b.ToTable("IntegrationEventLogs"); + }); + + modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => + { + b.OwnsOne("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "EventEnvelope", b1 => + { + b1.Property("IntegrationEventLogEventLogId"); + + b1.Property("CorrelationId"); + + b1.Property("Created"); + + b1.Property("Event") + .IsRequired(); + + b1.Property("Id"); + + b1.Property("PublisherId") + .IsRequired(); + + b1.Property("SequenceId"); + + b1.HasKey("IntegrationEventLogEventLogId"); + + b1.HasIndex("Created"); + + b1.HasIndex("Id") + .IsUnique(); + + b1.HasIndex("PublisherId"); + + b1.HasIndex("SequenceId"); + + b1.ToTable("IntegrationEventLogs","dbo"); + + b1.HasOne("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog") + .WithOne("EventEnvelope") + .HasForeignKey("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "IntegrationEventLogEventLogId") + .OnDelete(DeleteBehavior.Cascade); + }); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs new file mode 100644 index 0000000..17924c0 --- /dev/null +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs @@ -0,0 +1,204 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +namespace Softeq.NetKit.Integrations.EventLog.Migrations +{ + public partial class UpdateIntegrationEventLogStructureV2 : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs"); + migrationBuilder.DropColumn( + name: "EventTypeName", + schema: "dbo", + table: "IntegrationEventLogs"); + + + migrationBuilder.RenameColumn( + name: "EventId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_Id"); + migrationBuilder.RenameColumn( + name: "Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_Created"); + migrationBuilder.RenameColumn( + name: "SessionId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_SequenceId"); + + + migrationBuilder.AddColumn( + name: "EventEnvelope_CorrelationId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: true); + + + migrationBuilder.RenameColumn( + name: "Content", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_Event"); + migrationBuilder.AddColumn( + name: "EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + defaultValue: ""); + migrationBuilder.Sql(@" +DECLARE @PublisherId UNIQUEIDENTIFIER; +SET @PublisherId = ( + SELECT TOP 1 JsonData.PublisherId + FROM IntegrationEventLogs + CROSS APPLY OPENJSON(IntegrationEventLogs.EventEnvelope_Event, N'$') WITH (PublisherId UNIQUEIDENTIFIER N'$.PublisherId') AS JsonData); +UPDATE IntegrationEventLogs SET EventEnvelope_PublisherId = @PublisherId;"); + migrationBuilder.AlterColumn( + name: "EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + oldClrType: typeof(string)); + + + migrationBuilder.AddColumn( + name: "EventLogId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); + migrationBuilder.Sql("UPDATE IntegrationEventLogs SET EventLogId = NEWID();"); + migrationBuilder.AddPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventLogId"); + + + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_SessionId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_EventEnvelope_SequenceId"); + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_EventEnvelope_Created"); + migrationBuilder.CreateIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_Id", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventEnvelope_Id", + unique: true); + migrationBuilder.CreateIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventEnvelope_PublisherId"); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs"); + migrationBuilder.DropIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_Id", + schema: "dbo", + table: "IntegrationEventLogs"); + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_Created"); + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_SequenceId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_SessionId"); + + + migrationBuilder.DropPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs"); + migrationBuilder.DropColumn( + name: "EventLogId", + schema: "dbo", + table: "IntegrationEventLogs"); + + + migrationBuilder.RenameColumn( + name: "EventEnvelope_Event", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "Content"); + migrationBuilder.Sql(@" +UPDATE IntegrationEventLogs +SET Content = + JSON_MODIFY( + JSON_MODIFY( + JSON_MODIFY( + JSON_MODIFY( + JSON_MODIFY( + Content, + '$.SessionId', IntegrationEventLogs.EventEnvelope_SequenceId), + '$.CorrelationId', IntegrationEventLogs.EventEnvelope_CorrelationId), + '$.PublisherId', IntegrationEventLogs.EventEnvelope_PublisherId), + '$.CreationDate', FORMAT(IntegrationEventLogs.EventEnvelope_Created, 'yyyy-MM-ddTHH:mm:ss.fffffff zzz')), + '$.Id', CONVERT(nvarchar(36), IntegrationEventLogs.EventEnvelope_Id)); +"); + migrationBuilder.DropColumn( + name: "EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs"); + migrationBuilder.DropColumn( + name: "EventEnvelope_CorrelationId", + schema: "dbo", + table: "IntegrationEventLogs"); + + + migrationBuilder.AddColumn( + name: "EventTypeName", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + defaultValue: ""); + migrationBuilder.Sql(@" +UPDATE IntegrationEventLogs +SET EventTypeName = SUBSTRING(JSON_VALUE(Content, '$.""$type""'), 0, CHARINDEX(',', JSON_VALUE(Content, '$.""$type""'), 0))"); + + + migrationBuilder.RenameColumn( + name: "EventEnvelope_SequenceId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "SessionId"); + migrationBuilder.RenameColumn( + name: "EventEnvelope_Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "Created"); + migrationBuilder.RenameColumn( + name: "EventEnvelope_Id", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventId"); + + + migrationBuilder.AddPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventId"); + } + } +} diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs index c58b743..5e90aba 100644 --- a/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs @@ -22,34 +22,60 @@ protected override void BuildModel(ModelBuilder modelBuilder) modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => { - b.Property("EventId") + b.Property("EventLogId") .ValueGeneratedOnAdd(); - b.Property("Content") - .IsRequired(); + b.Property("EventState"); - b.Property("Created"); + b.Property("TimesSent"); - b.Property("EventState"); + b.Property("Updated"); - b.Property("EventTypeName") - .IsRequired(); + b.HasKey("EventLogId"); - b.Property("SessionId"); + b.HasIndex("EventState"); - b.Property("TimesSent"); + b.ToTable("IntegrationEventLogs"); + }); - b.Property("Updated"); + modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => + { + b.OwnsOne("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "EventEnvelope", b1 => + { + b1.Property("IntegrationEventLogEventLogId"); - b.HasKey("EventId"); + b1.Property("CorrelationId"); - b.HasIndex("Created"); + b1.Property("Created"); - b.HasIndex("EventState"); + b1.Property("Event") + .IsRequired(); - b.HasIndex("SessionId"); + b1.Property("Id"); - b.ToTable("IntegrationEventLogs"); + b1.Property("PublisherId") + .IsRequired(); + + b1.Property("SequenceId"); + + b1.HasKey("IntegrationEventLogEventLogId"); + + b1.HasIndex("Created"); + + b1.HasIndex("Id") + .IsUnique(); + + b1.HasIndex("PublisherId"); + + b1.HasIndex("SequenceId"); + + b1.ToTable("IntegrationEventLogs","dbo"); + + b1.HasOne("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog") + .WithOne("EventEnvelope") + .HasForeignKey("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "IntegrationEventLogEventLogId") + .OnDelete(DeleteBehavior.Cascade); + }); }); #pragma warning restore 612, 618 } diff --git a/Softeq.NetKit.Integrations.EventLog/Utility/PrivateFieldContractResolver.cs b/Softeq.NetKit.Integrations.EventLog/Utility/PrivateFieldContractResolver.cs index d345bd2..4416929 100644 --- a/Softeq.NetKit.Integrations.EventLog/Utility/PrivateFieldContractResolver.cs +++ b/Softeq.NetKit.Integrations.EventLog/Utility/PrivateFieldContractResolver.cs @@ -9,11 +9,11 @@ internal class PrivateFieldContractResolver : DefaultContractResolver protected override JsonProperty CreateProperty(MemberInfo member, MemberSerialization memberSerialization) { var property = base.CreateProperty(member, memberSerialization); - MakeWriteable(property, member); + MakeWritable(property, member); return property; } - internal static JsonProperty MakeWriteable(JsonProperty jProperty, MemberInfo member) + internal static JsonProperty MakeWritable(JsonProperty jProperty, MemberInfo member) { var property = member as PropertyInfo; if (property == null)