From 5ab5c3a052379913ff78973d7a0fd9ae0c33db43 Mon Sep 17 00:00:00 2001 From: Aleksey Zhukov Date: Wed, 21 Jun 2023 10:27:52 +0200 Subject: [PATCH 1/8] [HALO-9922] use existing message publisher ID if possible --- .../EventBusService.cs | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs index 798c74c..102e6db 100644 --- a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs +++ b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs @@ -172,18 +172,38 @@ await HandleReceivedMessage( } } - private static Task PublishMessageAsync(Message message, ISenderClient client, int? delayInSeconds = null) + private Task PublishEventAsync(IntegrationEvent @event, ISenderClient client, int? delayInSeconds = null) { + if (string.IsNullOrEmpty(@event.PublisherId)) + { + @event.PublisherId = _eventPublishConfiguration.EventPublisherId; + } + + var message = CreateMessageForPublish(@event); + return delayInSeconds.HasValue ? client.ScheduleMessageAsync(message, DateTime.UtcNow.AddSeconds(delayInSeconds.Value)) : client.SendAsync(message); - } - private Task PublishEventAsync(IntegrationEvent @event, ISenderClient client, int? delayInSeconds = null) - { - var message = GetMessageForPublish(@event); - - return PublishMessageAsync(message, client, delayInSeconds); + Message CreateMessageForPublish(IntegrationEvent integrationEvent) + { + var eventName = integrationEvent.GetType().Name; + var jsonMessage = JsonConvert.SerializeObject(integrationEvent); + var body = Encoding.UTF8.GetBytes(jsonMessage); + var result = new Message + { + MessageId = Guid.NewGuid().ToString(), + Body = body, + Label = eventName, + CorrelationId = integrationEvent.CorrelationId, + SessionId = integrationEvent.SessionId + }; + if (_messageQueueConfiguration.TimeToLiveInMinutes.HasValue) + { + result.TimeToLive = TimeSpan.FromMinutes(_messageQueueConfiguration.TimeToLiveInMinutes.Value); + } + return result; + } } private async Task CheckIfRuleExists(string ruleName) @@ -311,30 +331,6 @@ private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceiv return Task.CompletedTask; } - private Message GetMessageForPublish(IntegrationEvent @event) - { - @event.PublisherId = _eventPublishConfiguration.EventPublisherId; - var eventName = @event.GetType().Name; - var jsonMessage = JsonConvert.SerializeObject(@event); - var body = Encoding.UTF8.GetBytes(jsonMessage); - - var message = new Message - { - MessageId = Guid.NewGuid().ToString(), - Body = body, - Label = eventName, - CorrelationId = @event.CorrelationId, - SessionId = @event.SessionId - }; - - if (_messageQueueConfiguration.TimeToLiveInMinutes.HasValue) - { - message.TimeToLive = TimeSpan.FromMinutes(_messageQueueConfiguration.TimeToLiveInMinutes.Value); - } - - return message; - } - private void ValidateTopic() { if (_topicConnection?.TopicClient == null) From 0d5d414b79c071ffad6d4d36c874511ef1873545 Mon Sep 17 00:00:00 2001 From: Aleksey Zhukov Date: Tue, 11 Jul 2023 13:29:28 +0300 Subject: [PATCH 2/8] [HALO-16735] WIP remove field duplicates --- .../Samples/Events/AccountRegisteredEvent.cs | 8 +- .../EventBusService.cs | 266 ++++++++---------- .../IIntegrationEventHandler.cs | 13 + .../Abstract/IDynamicEventHandler.cs | 12 - .../Abstract/IEventBusPublisher.cs | 4 +- .../Abstract/IEventBusSubscriber.cs | 21 +- .../EventPublishConfiguration.cs | 15 +- .../EventSubscriptionInfo.cs | 19 +- .../Events/IIntegrationEvent.cs | 11 + .../Events/IntegrationEvent.cs | 21 +- .../Events/IntegrationEventEnvelope.cs | 36 +++ .../Managers/EventBusSubscriptionsManager.cs | 127 ++------- .../Managers/IEventBusSubscriptionsManager.cs | 38 +-- .../MessageQueueConfiguration.cs | 10 - .../Abstract/IIntegrationEventLogService.cs | 9 +- .../IntegrationEventLog.cs | 13 +- .../IntegrationEventLogService.cs | 34 +-- .../Mappings/IntegrationEventLogMapping.cs | 45 --- ...IntegrationEventLogMappingConfiguration.cs | 55 ++++ .../Utility/PrivateFieldContractResolver.cs | 4 +- 20 files changed, 307 insertions(+), 454 deletions(-) create mode 100644 Softeq.NetKit.Components.EventBus.Service/IIntegrationEventHandler.cs delete mode 100644 Softeq.NetKit.Components.EventBus/Abstract/IDynamicEventHandler.cs create mode 100644 Softeq.NetKit.Components.EventBus/Events/IIntegrationEvent.cs create mode 100644 Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs delete mode 100644 Softeq.NetKit.Components.EventBus/MessageQueueConfiguration.cs delete mode 100644 Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMapping.cs create mode 100644 Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs diff --git a/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Events/AccountRegisteredEvent.cs b/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Events/AccountRegisteredEvent.cs index de2c900..41ccce0 100644 --- a/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Events/AccountRegisteredEvent.cs +++ b/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Events/AccountRegisteredEvent.cs @@ -7,13 +7,13 @@ namespace Softeq.NetKit.Components.EventBus.Service.Tests.Samples.Events { public class AccountRegisteredEvent : IntegrationEvent { - public string UserId { get; } - public string Email { get; } - public AccountRegisteredEvent(string userId, string email) { UserId = userId; Email = email; } + + public string UserId { get; } + public string Email { get; } } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs index 102e6db..1230d37 100644 --- a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs +++ b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs @@ -6,7 +6,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; -using Newtonsoft.Json.Linq; using Softeq.NetKit.Components.EventBus.Abstract; using Softeq.NetKit.Components.EventBus.Events; using Softeq.NetKit.Components.EventBus.Managers; @@ -21,7 +20,6 @@ namespace Softeq.NetKit.Components.EventBus.Service { public class EventBusService : IEventBusPublisher, IEventBusSubscriber { - private readonly MessageQueueConfiguration _messageQueueConfiguration; private readonly IEventBusSubscriptionsManager _subscriptionsManager; private readonly IServiceProvider _serviceProvider; private readonly ServiceBusTopicConnection _topicConnection; @@ -29,7 +27,7 @@ public class EventBusService : IEventBusPublisher, IEventBusSubscriber private readonly ILogger _logger; private readonly EventPublishConfiguration _eventPublishConfiguration; - private bool IsSubscriptionAvailable => _topicConnection?.SubscriptionClient != null; + private bool IsTopicSubscriptionAvailable => _topicConnection?.SubscriptionClient != null; private bool IsQueueAvailable => _queueConnection != null; public EventBusService( @@ -37,71 +35,17 @@ public EventBusService( IEventBusSubscriptionsManager subscriptionsManager, IServiceProvider serviceProvider, ILoggerFactory loggerFactory, - MessageQueueConfiguration messageQueueConfiguration, EventPublishConfiguration eventPublishConfiguration) { _topicConnection = serviceBusPersisterConnection.TopicConnection; _queueConnection = serviceBusPersisterConnection.QueueConnection; _subscriptionsManager = subscriptionsManager; _serviceProvider = serviceProvider; - _messageQueueConfiguration = messageQueueConfiguration; _eventPublishConfiguration = eventPublishConfiguration; _logger = loggerFactory.CreateLogger(GetType()); } - public Task PublishToTopicAsync(IntegrationEvent @event, int? delayInSeconds = null) - { - ValidateTopic(); - return PublishEventAsync(@event, _topicConnection.TopicClient, delayInSeconds); - } - - public Task PublishToQueueAsync(IntegrationEvent @event, int? delayInSeconds = null) - { - ValidateQueue(); - return PublishEventAsync(@event, _queueConnection.QueueClient, delayInSeconds); - } - - public async Task SubscribeAsync() where TEvent : IntegrationEvent - where TEventHandler : IEventHandler - { - var eventName = typeof(TEvent).Name; - - var containsKey = _subscriptionsManager.HasSubscriptionsForEvent(); - if (!containsKey) - { - if (IsSubscriptionAvailable) - { - await AddSubscriptionRuleIfNotExists(eventName); - } - - _subscriptionsManager.AddSubscription(); - } - } - - public async Task UnsubscribeAsync() where TEvent : IntegrationEvent - where TEventHandler : IEventHandler - { - var eventName = typeof(TEvent).Name; - - if (IsSubscriptionAvailable) - { - await RemoveSubscriptionRule(eventName); - } - - _subscriptionsManager.RemoveSubscription(); - } - - public void SubscribeDynamic(string eventName) where TEventHandler : IDynamicEventHandler - { - _subscriptionsManager.AddDynamicSubscription(eventName); - } - - public void UnsubscribeDynamic(string eventName) where TEventHandler : IDynamicEventHandler - { - _subscriptionsManager.RemoveDynamicSubscription(eventName); - } - - public async Task RegisterSubscriptionListenerAsync() + public async Task RegisterTopicListenerAsync() { ValidateSubscription(); @@ -160,99 +104,131 @@ await HandleReceivedMessage( MaxConcurrentCalls = configuration?.MaxConcurrent ?? 1, AutoComplete = false }; - _queueConnection.QueueClient.RegisterMessageHandler( async (message, token) => await HandleReceivedMessage( _queueConnection.QueueClient, - _queueConnection.QueueClient, + _topicConnection.TopicClient, message, token), handlerOptions); } } - private Task PublishEventAsync(IntegrationEvent @event, ISenderClient client, int? delayInSeconds = null) + public async Task RegisterTopicEventAsync() where TEvent : IntegrationEvent { - if (string.IsNullOrEmpty(@event.PublisherId)) + var eventName = typeof(TEvent).Name; + if (IsTopicSubscriptionAvailable) { - @event.PublisherId = _eventPublishConfiguration.EventPublisherId; + await AddTopicSubscriptionRuleIfNotExists(); } + _subscriptionsManager.RegisterEventType(); - var message = CreateMessageForPublish(@event); - - return delayInSeconds.HasValue - ? client.ScheduleMessageAsync(message, DateTime.UtcNow.AddSeconds(delayInSeconds.Value)) - : client.SendAsync(message); - - Message CreateMessageForPublish(IntegrationEvent integrationEvent) + async Task AddTopicSubscriptionRuleIfNotExists() { - var eventName = integrationEvent.GetType().Name; - var jsonMessage = JsonConvert.SerializeObject(integrationEvent); - var body = Encoding.UTF8.GetBytes(jsonMessage); - var result = new Message + try { - MessageId = Guid.NewGuid().ToString(), - Body = body, - Label = eventName, - CorrelationId = integrationEvent.CorrelationId, - SessionId = integrationEvent.SessionId - }; - if (_messageQueueConfiguration.TimeToLiveInMinutes.HasValue) + if (!await CheckIfRuleExists(eventName)) + { + await _topicConnection.SubscriptionClient.AddRuleAsync(new RuleDescription + { + Filter = new CorrelationFilter { Label = eventName }, + Name = eventName + }); + } + } + catch (ServiceBusException ex) { - result.TimeToLive = TimeSpan.FromMinutes(_messageQueueConfiguration.TimeToLiveInMinutes.Value); + throw new Exceptions.ServiceBusException( + $"Adding subscription rule for the entity {eventName} failed.", ex); } - return result; } } - private async Task CheckIfRuleExists(string ruleName) + public async Task RemoveTopicEventRegistrationAsync() where TEvent : IntegrationEvent { - try + if (IsTopicSubscriptionAvailable) { - var rules = await _topicConnection.SubscriptionClient.GetRulesAsync(); - - return rules != null - && rules.Any(rule => - string.Equals(rule.Name, ruleName, StringComparison.InvariantCultureIgnoreCase)); + var eventName = typeof(TEvent).Name; + await RemoveTopicSubscriptionRule(eventName); } - catch (ServiceBusException ex) + _subscriptionsManager.RemoveEventType(); + + async Task RemoveTopicSubscriptionRule(string eventName) { - throw new Exceptions.ServiceBusException( - $"Checking rule {ruleName} existence failed.", ex); + try + { + await _topicConnection.SubscriptionClient.RemoveRuleAsync(eventName); + } + catch (MessagingEntityNotFoundException ex) + { + throw new Exceptions.ServiceBusException( + $"The messaging entity {eventName} could not be found.", ex); + } } } - private async Task AddSubscriptionRuleIfNotExists(string eventName) + public void RegisterQueueEventAsync() where TEvent : IntegrationEvent { - try + _subscriptionsManager.RegisterEventType(); + } + + public void RemoveQueueEventRegistrationAsync() where TEvent : IntegrationEvent + { + _subscriptionsManager.RemoveEventType(); + } + + public Task PublishToTopicAsync(IntegrationEventEnvelope @event) + { + ValidateTopic(); + return PublishEventAsync(@event, _topicConnection.TopicClient); + } + + public Task PublishToQueueAsync(IntegrationEventEnvelope @event) + { + ValidateQueue(); + return PublishEventAsync(@event, _queueConnection.QueueClient); + } + + private Task PublishEventAsync(IntegrationEventEnvelope integrationEventEnvelope, ISenderClient client) + { + //if (string.IsNullOrEmpty(integrationEventV2.PublisherId)) + //{ + // integrationEventV2.PublisherId = _eventPublishConfiguration.EventPublisherId; + //} + + var eventName = integrationEventEnvelope.Event.GetType().Name; + var jsonMessage = JsonConvert.SerializeObject(integrationEventEnvelope); + var body = Encoding.UTF8.GetBytes(jsonMessage); + var message = new Message { - if (!await CheckIfRuleExists(eventName)) - { - await _topicConnection.SubscriptionClient.AddRuleAsync(new RuleDescription - { - Filter = new CorrelationFilter { Label = eventName }, - Name = eventName - }); - } - } - catch (ServiceBusException ex) + MessageId = Guid.NewGuid().ToString(), + Body = body, + Label = eventName, + CorrelationId = integrationEventEnvelope.CorrelationId, + SessionId = integrationEventEnvelope.SessionId + }; + if (_eventPublishConfiguration.EventTimeToLive.HasValue) { - throw new Exceptions.ServiceBusException( - $"Adding subscription rule for the entity {eventName} failed.", ex); + message.TimeToLive = _eventPublishConfiguration.EventTimeToLive.Value; } + return client.SendAsync(message); } - private async Task RemoveSubscriptionRule(string eventName) + private async Task CheckIfRuleExists(string ruleName) { try { - await _topicConnection.SubscriptionClient.RemoveRuleAsync(eventName); + var rules = await _topicConnection.SubscriptionClient.GetRulesAsync(); + + return rules != null + && rules.Any(rule => + string.Equals(rule.Name, ruleName, StringComparison.InvariantCultureIgnoreCase)); } - catch (MessagingEntityNotFoundException ex) + catch (ServiceBusException ex) { throw new Exceptions.ServiceBusException( - $"The messaging entity {eventName} could not be found.", ex); + $"Checking rule {ruleName} existence failed.", ex); } } @@ -263,62 +239,54 @@ private async Task HandleReceivedMessage( CancellationToken token) { var eventName = message.Label; - var messageData = Encoding.UTF8.GetString(message.Body); - await ProcessEvent(eventName, messageData); - - // Complete the message so that it is not received again. + var eventType = _subscriptionsManager.GetEventTypeByName(eventName); + var envelope = ParseEventEnvelopeAsync(message); + await ProcessEvent(eventName, envelope); await receiverClient.CompleteAsync(message.SystemProperties.LockToken); - - if (!_eventPublishConfiguration.SendCompletionEvent) + if (_eventPublishConfiguration.SendCompletionEvent && eventType != typeof(CompletedEvent)) { - return; + var completedEvent = new CompletedEvent(envelope.Id, envelope.PublisherId); + var completedEventEnvelope = new IntegrationEventEnvelope(_eventPublishConfiguration.EventPublisherId, completedEvent); + await PublishEventAsync(completedEventEnvelope, senderClient); } + } + private IntegrationEventEnvelope ParseEventEnvelopeAsync(Message message) + { + var eventName = message.Label; var eventType = _subscriptionsManager.GetEventTypeByName(eventName); - if (eventType == null || eventType == typeof(CompletedEvent)) + var envelopeBody = Encoding.UTF8.GetString(message.Body); + var envelope = (IntegrationEventEnvelope)JsonConvert.DeserializeObject(envelopeBody, eventType); + if (envelope == null) { - return; - } - - var eventData = JObject.Parse(messageData); - if (Guid.TryParse((string)eventData["Id"], out var eventId)) - { - var publisherId = (string)eventData["PublisherId"]; - var completedEvent = new CompletedEvent(eventId, publisherId); - await PublishEventAsync(completedEvent, senderClient); + throw new InvalidOperationException($"Failed to parse received message '{eventName}'. Raw body: '{envelopeBody}'."); } + envelope.Event.Id = envelope.Id; + envelope.Event.Created = envelope.Created; + return envelope; } - private async Task ProcessEvent(string eventName, string message) + private async Task ProcessEvent(string eventName, IntegrationEventEnvelope integrationEventEnvelope) { - if (!_subscriptionsManager.HasSubscriptionsForEvent(eventName)) + if (!_subscriptionsManager.IsEventRegistered(eventName)) { return; } using (var scope = _serviceProvider.CreateScope()) { - var subscriptions = _subscriptionsManager.GetEventHandlers(eventName); - foreach (var subscription in subscriptions) - { - var handler = scope.ServiceProvider.GetService(subscription.HandlerType); - - if (subscription.IsDynamic && handler is IDynamicEventHandler eventHandler) - { - dynamic eventData = JObject.Parse(message); - await eventHandler.Handle(eventData); - } - else if (handler != null) - { - var eventType = _subscriptionsManager.GetEventTypeByName(eventName); - var integrationEvent = JsonConvert.DeserializeObject(message, eventType); - var concreteType = typeof(IEventHandler<>).MakeGenericType(eventType); - await (Task)concreteType.GetMethod(nameof(IEventHandler.Handle)).Invoke(handler, new[] { integrationEvent }); - } - } + var handlerType = typeof(IIntegrationEventHandler<>).MakeGenericType(integrationEventEnvelope.Event.GetType()); + var handler = scope.ServiceProvider.GetRequiredService(handlerType); + await HandleParsedEventAsync((dynamic)handler, (dynamic)integrationEventEnvelope.Event); } } + private static async Task HandleParsedEventAsync(IIntegrationEventHandler handler, TEventContent content) + where TEventContent : IntegrationEvent + { + await handler.HandleAsync(content); + } + private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { var context = exceptionReceivedEventArgs.ExceptionReceivedContext; @@ -341,7 +309,7 @@ private void ValidateTopic() private void ValidateSubscription() { - if (!IsSubscriptionAvailable) + if (!IsTopicSubscriptionAvailable) { throw new InvalidOperationException("Topic Subscription connection is not configured"); } diff --git a/Softeq.NetKit.Components.EventBus.Service/IIntegrationEventHandler.cs b/Softeq.NetKit.Components.EventBus.Service/IIntegrationEventHandler.cs new file mode 100644 index 0000000..ea877ac --- /dev/null +++ b/Softeq.NetKit.Components.EventBus.Service/IIntegrationEventHandler.cs @@ -0,0 +1,13 @@ +// Developed by Softeq Development Corporation +// http://www.softeq.com + +using System.Threading.Tasks; +using Softeq.NetKit.Components.EventBus.Events; + +namespace Softeq.NetKit.Components.EventBus.Service +{ + public interface IIntegrationEventHandler where TEvent : IIntegrationEvent + { + Task HandleAsync(TEvent @event); + } +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IDynamicEventHandler.cs b/Softeq.NetKit.Components.EventBus/Abstract/IDynamicEventHandler.cs deleted file mode 100644 index 98b84fc..0000000 --- a/Softeq.NetKit.Components.EventBus/Abstract/IDynamicEventHandler.cs +++ /dev/null @@ -1,12 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -using System.Threading.Tasks; - -namespace Softeq.NetKit.Components.EventBus.Abstract -{ - public interface IDynamicEventHandler - { - Task Handle(dynamic eventData); - } -} diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs index 81bb6fc..0d9d367 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs @@ -8,7 +8,7 @@ namespace Softeq.NetKit.Components.EventBus.Abstract { public interface IEventBusPublisher { - Task PublishToTopicAsync(IntegrationEvent @event, int? delayInSeconds = null); - Task PublishToQueueAsync(IntegrationEvent @event, int? delayInSeconds = null); + Task PublishToTopicAsync(IntegrationEventEnvelope @event); + Task PublishToQueueAsync(IntegrationEventEnvelope @event); } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs index 07836a1..695df5f 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs @@ -8,22 +8,11 @@ namespace Softeq.NetKit.Components.EventBus.Abstract { public interface IEventBusSubscriber { + Task RegisterTopicListenerAsync(); void RegisterQueueListener(QueueListenerConfiguration configuration = null); - - Task RegisterSubscriptionListenerAsync(); - - Task SubscribeAsync() - where TEvent : IntegrationEvent - where TEventHandler : IEventHandler; - - Task UnsubscribeAsync() - where TEvent : IntegrationEvent - where TEventHandler : IEventHandler; - - void SubscribeDynamic(string eventName) - where TEventHandler : IDynamicEventHandler; - - void UnsubscribeDynamic(string eventName) - where TEventHandler : IDynamicEventHandler; + Task RegisterTopicEventAsync() where TEvent : IntegrationEvent; + Task RemoveTopicEventRegistrationAsync() where TEvent : IntegrationEvent; + void RegisterQueueEventAsync() where TEvent : IntegrationEvent; + void RemoveQueueEventRegistrationAsync() where TEvent : IntegrationEvent; } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs b/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs index ec813b2..70555fd 100644 --- a/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs +++ b/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs @@ -1,17 +1,26 @@ // Developed by Softeq Development Corporation // http://www.softeq.com +using System; + namespace Softeq.NetKit.Components.EventBus { public class EventPublishConfiguration { - public EventPublishConfiguration(string eventPublisherId) + public EventPublishConfiguration( + string eventPublisherId, + bool sendCompletionEvent = true, + TimeSpan? eventTimeToLive = null) { EventPublisherId = eventPublisherId; - SendCompletionEvent = true; + SendCompletionEvent = sendCompletionEvent; + EventTimeToLive = eventTimeToLive; } public string EventPublisherId { get; } - public bool SendCompletionEvent { get; set; } + + public bool SendCompletionEvent { get; } + + public TimeSpan? EventTimeToLive { get; } } } diff --git a/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs b/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs index 1508169..445a60d 100644 --- a/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs +++ b/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs @@ -5,25 +5,14 @@ namespace Softeq.NetKit.Components.EventBus { + // TODO: Is that needed? public class EventSubscriptionInfo { - public bool IsDynamic { get; } - public Type HandlerType { get; } - - private EventSubscriptionInfo(bool isDynamic, Type handlerType) + public EventSubscriptionInfo(Type handlerType) { - IsDynamic = isDynamic; HandlerType = handlerType; } - public static EventSubscriptionInfo Dynamic(Type handlerType) - { - return new EventSubscriptionInfo(true, handlerType); - } - - public static EventSubscriptionInfo Typed(Type handlerType) - { - return new EventSubscriptionInfo(false, handlerType); - } + public Type HandlerType { get; } } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IIntegrationEvent.cs b/Softeq.NetKit.Components.EventBus/Events/IIntegrationEvent.cs new file mode 100644 index 0000000..3235178 --- /dev/null +++ b/Softeq.NetKit.Components.EventBus/Events/IIntegrationEvent.cs @@ -0,0 +1,11 @@ +using System; + +namespace Softeq.NetKit.Components.EventBus.Events +{ + public interface IIntegrationEvent + { + Guid Id { get; } + + DateTimeOffset Created { get; } + } +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs index 6e16add..92eb8b2 100644 --- a/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs @@ -1,25 +1,20 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -using System; +using System; using Newtonsoft.Json; namespace Softeq.NetKit.Components.EventBus.Events { - public abstract class IntegrationEvent + public abstract class IntegrationEvent : IIntegrationEvent { protected IntegrationEvent() { Id = Guid.NewGuid(); - CreationDate = DateTimeOffset.UtcNow; + Created = DateTimeOffset.UtcNow; } - [JsonProperty] - public Guid Id { get; private set; } //Do not remove 'private set' so compiler won't drop backing field setter - [JsonProperty] - public DateTimeOffset CreationDate { get; private set; } //Do not remove 'private set' so compiler won't drop backing field setter - public string PublisherId { get; set; } - public string CorrelationId { get; set; } - public string SessionId { get; set; } + [JsonIgnore] + public Guid Id { get; set; } + + [JsonIgnore] + public DateTimeOffset Created { get; set; } } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs new file mode 100644 index 0000000..dc263a7 --- /dev/null +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs @@ -0,0 +1,36 @@ +// Developed by Softeq Development Corporation +// http://www.softeq.com + +using System; + +namespace Softeq.NetKit.Components.EventBus.Events +{ + public sealed class IntegrationEventEnvelope + { + public IntegrationEventEnvelope( + string publisherId, + IntegrationEvent @event, + string sessionId = null, + string correlationId = null) + { + PublisherId = publisherId ?? throw new ArgumentNullException(nameof(publisherId)); + Event = @event ?? throw new ArgumentNullException(nameof(@event)); + Id = @event.Id; + Created = @event.Created; + SessionId = sessionId; + CorrelationId = correlationId; + } + + public Guid Id { get; private set; } + + public DateTimeOffset Created { get; private set; } + + public string PublisherId { get; private set; } + + public string SessionId { get; private set; } + + public string CorrelationId { get; private set; } + + public IntegrationEvent Event { get; private set; } + } +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs b/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs index 8d05233..11b0764 100644 --- a/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs +++ b/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs @@ -4,140 +4,47 @@ using System; using System.Collections.Generic; using System.Linq; -using Softeq.NetKit.Components.EventBus.Abstract; using Softeq.NetKit.Components.EventBus.Events; namespace Softeq.NetKit.Components.EventBus.Managers { public class EventBusSubscriptionsManager : IEventBusSubscriptionsManager { - private readonly Dictionary> _handlers; - private readonly List _eventTypes; + private readonly List _eventTypes = new List(); - public event EventHandler OnEventRemoved; - - public EventBusSubscriptionsManager() - { - _handlers = new Dictionary>(); - _eventTypes = new List(); - } - - public bool IsEmpty => !_handlers.Keys.Any(); - public void Clear() => _handlers.Clear(); - - public void AddDynamicSubscription(string eventName) where TEventHandler : IDynamicEventHandler - { - DoAddSubscription(typeof(TEventHandler), eventName, true); - } - - public void AddSubscription() where TEvent : IntegrationEvent where TEventHandler : IEventHandler - { - var eventName = GetEventKey(); - DoAddSubscription(typeof(TEventHandler), eventName, false); - _eventTypes.Add(typeof(TEvent)); - } - - public void RemoveSubscription() where TEvent : IntegrationEvent where TEventHandler : IEventHandler - { - var handlerToRemove = FindSubscriptionToRemove(); - var eventName = GetEventKey(); - DoRemoveSubscription(eventName, handlerToRemove); - } - - public void RemoveDynamicSubscription(string eventName) where TEventHandler : IDynamicEventHandler - { - var handlerToRemove = FindDynamicSubscriptionToRemove(eventName); - DoRemoveSubscription(eventName, handlerToRemove); - } - - private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic) - { - if (!HasSubscriptionsForEvent(eventName)) - { - _handlers.Add(eventName, new List()); - } - - if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) - { - throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType)); - } - - _handlers[eventName].Add(isDynamic ? EventSubscriptionInfo.Dynamic(handlerType) : EventSubscriptionInfo.Typed(handlerType)); - } - - private void DoRemoveSubscription(string eventName, EventSubscriptionInfo subsToRemove) + public void RegisterEventType() where TEvent : IntegrationEvent { - if (subsToRemove != null) + var eventType = typeof(TEvent); + if (_eventTypes.Contains(eventType)) { - _handlers[eventName].Remove(subsToRemove); - if (!_handlers[eventName].Any()) - { - _handlers.Remove(eventName); - var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); - if (eventType != null) - { - _eventTypes.Remove(eventType); - } - RaiseOnEventRemoved(eventName); - } + throw new ArgumentException($"Event {GetEventName()} is already registered."); } + _eventTypes.Add(typeof(TEvent)); } - private void RaiseOnEventRemoved(string eventName) + public void RemoveEventType() where TEvent : IntegrationEvent { - var handler = OnEventRemoved; - if (handler != null) + var eventType = typeof(TEvent); + if (!_eventTypes.Contains(eventType)) { - OnEventRemoved?.Invoke(this, eventName); + throw new ArgumentException($"Event {GetEventName()} is not registered."); } + _eventTypes.Remove(typeof(TEvent)); } - private EventSubscriptionInfo FindDynamicSubscriptionToRemove(string eventName) - where TEventHandler : IDynamicEventHandler + public bool IsEventRegistered() where TEvent : IntegrationEvent { - return DoFindSubscriptionToRemove(eventName, typeof(TEventHandler)); + var eventName = GetEventName(); + return IsEventRegistered(eventName); } - private EventSubscriptionInfo FindSubscriptionToRemove() - where TEvent : IntegrationEvent - where TEventHandler : IEventHandler - { - var eventName = GetEventKey(); - return DoFindSubscriptionToRemove(eventName, typeof(TEventHandler)); - } - - private EventSubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType) - { - return HasSubscriptionsForEvent(eventName) - ? _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType) - : null; - } - public bool HasSubscriptionsForEvent() where TEvent : IntegrationEvent - { - var key = GetEventKey(); - return HasSubscriptionsForEvent(key); - } - - public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); + public bool IsEventRegistered(string eventName) => _eventTypes.Any(x => x.Name == eventName); public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(x => x.Name == eventName); - public IEnumerable GetEventHandlers() where TEvent : IntegrationEvent - { - var key = GetEventKey(); - return GetEventHandlers(key); - } - - public IEnumerable GetEventHandlers(string eventName) - { - return _handlers.TryGetValue(eventName, out var subscriptions) - ? (IEnumerable) subscriptions - : Array.Empty(); - } - - public string GetEventKey() + private static string GetEventName() { return typeof(TEvent).Name; } } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs b/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs index 06ad365..6204c90 100644 --- a/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs +++ b/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs @@ -2,44 +2,16 @@ // http://www.softeq.com using System; -using System.Collections.Generic; -using Softeq.NetKit.Components.EventBus.Abstract; using Softeq.NetKit.Components.EventBus.Events; namespace Softeq.NetKit.Components.EventBus.Managers { public interface IEventBusSubscriptionsManager { - bool IsEmpty { get; } - - event EventHandler OnEventRemoved; - - void AddDynamicSubscription(string eventName) - where TEventHandler : IDynamicEventHandler; - - void AddSubscription() - where TEvent : IntegrationEvent - where TEventHandler : IEventHandler; - - void RemoveSubscription() - where TEventHandler : IEventHandler - where TEvent : IntegrationEvent; - - void RemoveDynamicSubscription(string eventName) - where TEventHandler : IDynamicEventHandler; - - bool HasSubscriptionsForEvent() where T : IntegrationEvent; - - bool HasSubscriptionsForEvent(string eventName); - + void RegisterEventType() where TEvent : IntegrationEvent; + void RemoveEventType() where TEvent : IntegrationEvent; + bool IsEventRegistered() where TEvent : IntegrationEvent; + bool IsEventRegistered(string eventName); Type GetEventTypeByName(string eventName); - - void Clear(); - - IEnumerable GetEventHandlers() where T : IntegrationEvent; - - IEnumerable GetEventHandlers(string eventName); - - string GetEventKey(); } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/MessageQueueConfiguration.cs b/Softeq.NetKit.Components.EventBus/MessageQueueConfiguration.cs deleted file mode 100644 index a4835a1..0000000 --- a/Softeq.NetKit.Components.EventBus/MessageQueueConfiguration.cs +++ /dev/null @@ -1,10 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -namespace Softeq.NetKit.Components.EventBus -{ - public class MessageQueueConfiguration - { - public int? TimeToLiveInMinutes { get; set; } - } -} diff --git a/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs index 40b92f6..d8e66cc 100644 --- a/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs @@ -1,7 +1,6 @@ // Developed by Softeq Development Corporation // http://www.softeq.com -using Softeq.NetKit.Components.EventBus.Events; using System; using System.Collections.Generic; using System.Linq.Expressions; @@ -13,9 +12,9 @@ public interface IIntegrationEventLogService { Task GetAsync(Guid eventId); Task> GetAsync(Expression> condition); - Task CreateAsync(IntegrationEvent @event); - Task MarkAsPublishedAsync(IntegrationEvent @event); - Task MarkAsPublishedFailedAsync(IntegrationEvent @event); - Task MarkAsCompletedAsync(Guid eventId); + Task CreateAsync(IntegrationEventLog eventLog); + Task MarkAsPublishedAsync(IntegrationEventLog eventLog); + Task MarkAsPublishedFailedAsync(IntegrationEventLog eventLog); + Task MarkAsCompletedAsync(IntegrationEventLog eventLog); } } \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs index 4d3a624..9c3f450 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs @@ -13,29 +13,24 @@ private IntegrationEventLog() { } - public IntegrationEventLog(IntegrationEvent @event) + public IntegrationEventLog(IntegrationEventEnvelope @event) { if (@event == null) { throw new ArgumentNullException(nameof(@event)); } - EventId = @event.Id; - Created = @event.CreationDate; EventTypeName = @event.GetType().FullName; EventState = EventState.NotPublished; - SessionId = @event.SessionId; + TimesSent = 0; Content = @event; } - public Guid EventId { get; private set; } public string EventTypeName { get; private set; } public EventState EventState { get; private set; } public int TimesSent { get; private set; } - public DateTimeOffset Created { get; private set; } public DateTimeOffset? Updated { get; private set; } - public string SessionId { get; private set; } - public IntegrationEvent Content { get; private set; } + public IntegrationEventEnvelope Content { get; private set; } public void ChangeEventState(EventState newEventState) { @@ -77,4 +72,4 @@ void EnsureStateTransitionAllowed(params EventState[] allowedFromStates) } } } -} +} \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs index 2f74319..a8a83fb 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs @@ -2,8 +2,6 @@ // http://www.softeq.com using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging; -using Softeq.NetKit.Components.EventBus.Events; using Softeq.NetKit.Integrations.EventLog.Abstract; using Softeq.NetKit.Integrations.EventLog.Exceptions; using System; @@ -25,7 +23,7 @@ public IntegrationEventLogService(IntegrationEventLogContext eventLogContext) public async Task GetAsync(Guid eventId) { - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.EventId == eventId); + var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.Content.Id == eventId); if (eventLog == null) { throw new EventLogNotFoundException(eventId); @@ -44,55 +42,39 @@ public async Task> GetAsync(Expression log.EventId == @event.Id); if (eventLog == null) { - throw new EventLogNotFoundException(@event.Id); + throw new ArgumentNullException(nameof(eventLog)); } - // Published event has PublisherId so need to update it also - eventLog.Content.PublisherId = @event.PublisherId; eventLog.ChangeEventState(EventState.Published); await UpdateAsync(eventLog); } - public async Task MarkAsPublishedFailedAsync(IntegrationEvent @event) + public async Task MarkAsPublishedFailedAsync(IntegrationEventLog eventLog) { - if (@event == null) - { - throw new ArgumentNullException(nameof(@event)); - } - - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.EventId == @event.Id); if (eventLog == null) { - throw new EventLogNotFoundException(@event.Id); + throw new ArgumentNullException(nameof(eventLog)); } eventLog.ChangeEventState(EventState.PublishedFailed); await UpdateAsync(eventLog); } - public async Task MarkAsCompletedAsync(Guid eventId) + public async Task MarkAsCompletedAsync(IntegrationEventLog eventLog) { - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.EventId == eventId); if (eventLog == null) { - throw new EventLogNotFoundException(eventId); + throw new ArgumentNullException(nameof(eventLog)); } eventLog.ChangeEventState(EventState.Completed); diff --git a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMapping.cs b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMapping.cs deleted file mode 100644 index c606634..0000000 --- a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMapping.cs +++ /dev/null @@ -1,45 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -using Microsoft.EntityFrameworkCore.Metadata.Builders; -using Newtonsoft.Json; -using Softeq.NetKit.Components.EventBus.Events; -using Softeq.NetKit.Integrations.EventLog.Abstract; -using Softeq.NetKit.Integrations.EventLog.Utility; - -namespace Softeq.NetKit.Integrations.EventLog.Mappings -{ - internal class IntegrationEventLogMapping : DomainModelBuilder, IEntityMappingConfiguration - { - public override void Build(EntityTypeBuilder builder) - { - builder.HasKey(eventLog => eventLog.EventId); - builder.Property(eventLog => eventLog.EventId).IsRequired(); - - builder.Property(eventLog => eventLog.Created).IsRequired(); - builder.HasIndex(eventLog => eventLog.Created).IsUnique(false); - - var serializerSettings = new JsonSerializerSettings - { - TypeNameHandling = TypeNameHandling.All, - ContractResolver = new PrivateFieldContractResolver() - }; - // TODO: Get rid of EventLog properties serialization within Content object - builder.Property(eventLog => eventLog.Content) - .HasConversion( - integrationEvent => JsonConvert.SerializeObject(integrationEvent, serializerSettings), - json => (IntegrationEvent)JsonConvert.DeserializeObject(json, serializerSettings)) - .IsRequired(); - - builder.Property(eventLog => eventLog.EventState).HasConversion().IsRequired(); - builder.HasIndex(eventLog => eventLog.EventState).IsUnique(false); - - builder.Property(eventLog => eventLog.EventTypeName).IsRequired(); - - builder.Property(eventLog => eventLog.TimesSent).IsRequired(); - - builder.Property(eventLog => eventLog.SessionId).IsRequired(false); - builder.HasIndex(eventLog => eventLog.SessionId).IsUnique(false); - } - } -} \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs new file mode 100644 index 0000000..91cf1f8 --- /dev/null +++ b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs @@ -0,0 +1,55 @@ +// Developed by Softeq Development Corporation +// http://www.softeq.com + +using System; +using System.Linq; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using Newtonsoft.Json; +using Softeq.NetKit.Components.EventBus.Events; +using Softeq.NetKit.Integrations.EventLog.Abstract; +using Softeq.NetKit.Integrations.EventLog.Utility; + +namespace Softeq.NetKit.Integrations.EventLog.Mappings +{ + internal class IntegrationEventLogMappingConfiguration : DomainModelBuilder, IEntityMappingConfiguration + { + public override void Build(EntityTypeBuilder builder) + { + builder.Property(eventLog => eventLog.EventTypeName).IsRequired(); + builder.Property(eventLog => eventLog.EventState).HasConversion().IsRequired(); + builder.HasIndex(eventLog => eventLog.EventState).IsUnique(false); + builder.Property(eventLog => eventLog.TimesSent).IsRequired(); + builder.OwnsOne(entity => entity.Content, ownershipBuilder => + { + ownershipBuilder.Property(x => x.Created).IsRequired(); + ownershipBuilder.HasIndex(x => x.Created).IsUnique(false); + ownershipBuilder.Property(x => x.PublisherId).IsRequired(); + ownershipBuilder.HasIndex(x => x.PublisherId).IsUnique(false); + ownershipBuilder.Property(x => x.SessionId).IsRequired(false); + ownershipBuilder.HasIndex(x => x.SessionId).IsUnique(false); + ownershipBuilder.Property(x => x.CorrelationId).IsRequired(false); + + var serializerSettings = new JsonSerializerSettings + { + TypeNameHandling = TypeNameHandling.Objects, + ContractResolver = new PrivateFieldContractResolver() + }; + ownershipBuilder.Property(x => x.Event) + .HasConversion( + @event => JsonConvert.SerializeObject(@event, serializerSettings), + json => (IntegrationEvent)JsonConvert.DeserializeObject(json, serializerSettings)) + .IsRequired(); + + // Use Content.Id as Primary Key for IntegrationEventLog table + var identityProperty = ownershipBuilder.OwnedEntityType.ClrType.GetProperties() + .Single(p => p.Name == nameof(IntegrationEventLog.Content.Id)); + var propertyType = identityProperty.PropertyType; + var identityPropertyName = $"{nameof(IntegrationEventLog.Content)}_{nameof(IntegrationEventLog.Content.Id)}"; + ownershipBuilder.Property(propertyType, identityProperty.Name).HasColumnName(identityPropertyName); + builder.Property(propertyType, identityPropertyName).ValueGeneratedNever(); + builder.HasKey(identityPropertyName); + }); + } + } +} \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/Utility/PrivateFieldContractResolver.cs b/Softeq.NetKit.Integrations.EventLog/Utility/PrivateFieldContractResolver.cs index d345bd2..4416929 100644 --- a/Softeq.NetKit.Integrations.EventLog/Utility/PrivateFieldContractResolver.cs +++ b/Softeq.NetKit.Integrations.EventLog/Utility/PrivateFieldContractResolver.cs @@ -9,11 +9,11 @@ internal class PrivateFieldContractResolver : DefaultContractResolver protected override JsonProperty CreateProperty(MemberInfo member, MemberSerialization memberSerialization) { var property = base.CreateProperty(member, memberSerialization); - MakeWriteable(property, member); + MakeWritable(property, member); return property; } - internal static JsonProperty MakeWriteable(JsonProperty jProperty, MemberInfo member) + internal static JsonProperty MakeWritable(JsonProperty jProperty, MemberInfo member) { var property = member as PropertyInfo; if (property == null) From 4146ae6977d0f18a62d781db8d3664209c8cc844 Mon Sep 17 00:00:00 2001 From: Aleksey Zhukov Date: Thu, 20 Jul 2023 16:51:09 +0300 Subject: [PATCH 3/8] WIP --- .../Handlers/AccountRegisteredEventHandler.cs | 9 +- .../EventBusService.cs | 115 ++++----- .../IIntegrationEventHandler.cs | 13 - .../Abstract/IEventBusPublisher.cs | 4 +- .../Abstract/IEventBusSubscriber.cs | 6 +- ...entHandler.cs => IEventEnvelopeHandler.cs} | 4 +- .../Events/IIntegrationEvent.cs | 11 - .../Events/IntegrationEvent.cs | 17 +- .../Events/IntegrationEventEnvelope.cs | 40 +++- .../Events/IntegrationEventEnvelope`1.cs | 50 ++++ .../Managers/EventBusSubscriptionsManager.cs | 2 +- .../Managers/IEventBusSubscriptionsManager.cs | 1 - .../Abstract/IIntegrationEventLogService.cs | 3 +- .../IntegrationEventLog.cs | 17 +- .../IntegrationEventLogContextFactory.cs | 9 +- .../IntegrationEventLogService.cs | 39 +-- ...IntegrationEventLogMappingConfiguration.cs | 14 +- ...IntegrationEventLogStructureV2.Designer.cs | 82 +++++++ ...14_UpdateIntegrationEventLogStructureV2.cs | 226 ++++++++++++++++++ ...IntegrationEventLogContextModelSnapshot.cs | 55 +++-- 20 files changed, 534 insertions(+), 183 deletions(-) delete mode 100644 Softeq.NetKit.Components.EventBus.Service/IIntegrationEventHandler.cs rename Softeq.NetKit.Components.EventBus/Abstract/{IEventHandler.cs => IEventEnvelopeHandler.cs} (58%) delete mode 100644 Softeq.NetKit.Components.EventBus/Events/IIntegrationEvent.cs create mode 100644 Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope`1.cs create mode 100644 Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.Designer.cs create mode 100644 Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.cs diff --git a/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Handlers/AccountRegisteredEventHandler.cs b/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Handlers/AccountRegisteredEventHandler.cs index 5a235f9..85a49ec 100644 --- a/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Handlers/AccountRegisteredEventHandler.cs +++ b/Softeq.NetKit.Components.EventBus.Service.Tests/Samples/Handlers/AccountRegisteredEventHandler.cs @@ -3,18 +3,19 @@ using System.Threading.Tasks; using Softeq.NetKit.Components.EventBus.Abstract; +using Softeq.NetKit.Components.EventBus.Events; using Softeq.NetKit.Components.EventBus.Service.Tests.Samples.Events; namespace Softeq.NetKit.Components.EventBus.Service.Tests.Samples.Handlers { - public class AccountRegisteredEventHandler : IEventHandler + public class AccountRegisteredEventHandler : IEventEnvelopeHandler { // Inject your app service - // TODO: Need to change this action according to requirments - public async Task Handle(AccountRegisteredEvent @event) + // TODO: Need to change this action according to requirements + public Task HandleAsync(IntegrationEventEnvelope eventEnvelope) { - await Task.CompletedTask; + throw new System.NotImplementedException(); } } } diff --git a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs index 1230d37..ee67202 100644 --- a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs +++ b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs @@ -18,7 +18,7 @@ namespace Softeq.NetKit.Components.EventBus.Service { - public class EventBusService : IEventBusPublisher, IEventBusSubscriber + public class EventBusService : IEventBusSubscriber, IEventBusPublisher { private readonly IEventBusSubscriptionsManager _subscriptionsManager; private readonly IServiceProvider _serviceProvider; @@ -115,7 +115,7 @@ await HandleReceivedMessage( } } - public async Task RegisterTopicEventAsync() where TEvent : IntegrationEvent + public async Task RegisterEventAsync() where TEvent : IntegrationEvent { var eventName = typeof(TEvent).Name; if (IsTopicSubscriptionAvailable) @@ -145,7 +145,7 @@ await _topicConnection.SubscriptionClient.AddRuleAsync(new RuleDescription } } - public async Task RemoveTopicEventRegistrationAsync() where TEvent : IntegrationEvent + public async Task RemoveEventRegistrationAsync() where TEvent : IntegrationEvent { if (IsTopicSubscriptionAvailable) { @@ -168,45 +168,30 @@ async Task RemoveTopicSubscriptionRule(string eventName) } } - public void RegisterQueueEventAsync() where TEvent : IntegrationEvent - { - _subscriptionsManager.RegisterEventType(); - } - - public void RemoveQueueEventRegistrationAsync() where TEvent : IntegrationEvent - { - _subscriptionsManager.RemoveEventType(); - } - - public Task PublishToTopicAsync(IntegrationEventEnvelope @event) + public Task PublishToTopicAsync(IntegrationEventEnvelope eventEnvelope) { ValidateTopic(); - return PublishEventAsync(@event, _topicConnection.TopicClient); + return PublishEventAsync(eventEnvelope, _topicConnection.TopicClient); } - public Task PublishToQueueAsync(IntegrationEventEnvelope @event) + public Task PublishToQueueAsync(IntegrationEventEnvelope eventEnvelope) { ValidateQueue(); - return PublishEventAsync(@event, _queueConnection.QueueClient); + return PublishEventAsync(eventEnvelope, _queueConnection.QueueClient); } - private Task PublishEventAsync(IntegrationEventEnvelope integrationEventEnvelope, ISenderClient client) + private Task PublishEventAsync(IntegrationEventEnvelope eventEnvelope, ISenderClient client) { - //if (string.IsNullOrEmpty(integrationEventV2.PublisherId)) - //{ - // integrationEventV2.PublisherId = _eventPublishConfiguration.EventPublisherId; - //} - - var eventName = integrationEventEnvelope.Event.GetType().Name; - var jsonMessage = JsonConvert.SerializeObject(integrationEventEnvelope); - var body = Encoding.UTF8.GetBytes(jsonMessage); + var eventType = eventEnvelope.Event.GetType().Name; + var eventEnvelopeJson = JsonConvert.SerializeObject(eventEnvelope); + var eventEnvelopeBytes = Encoding.UTF8.GetBytes(eventEnvelopeJson); var message = new Message { MessageId = Guid.NewGuid().ToString(), - Body = body, - Label = eventName, - CorrelationId = integrationEventEnvelope.CorrelationId, - SessionId = integrationEventEnvelope.SessionId + Body = eventEnvelopeBytes, + Label = eventType, + CorrelationId = eventEnvelope.CorrelationId, + SessionId = eventEnvelope.SequenceId }; if (_eventPublishConfiguration.EventTimeToLive.HasValue) { @@ -215,23 +200,6 @@ private Task PublishEventAsync(IntegrationEventEnvelope integrationEventEnvelope return client.SendAsync(message); } - private async Task CheckIfRuleExists(string ruleName) - { - try - { - var rules = await _topicConnection.SubscriptionClient.GetRulesAsync(); - - return rules != null - && rules.Any(rule => - string.Equals(rule.Name, ruleName, StringComparison.InvariantCultureIgnoreCase)); - } - catch (ServiceBusException ex) - { - throw new Exceptions.ServiceBusException( - $"Checking rule {ruleName} existence failed.", ex); - } - } - private async Task HandleReceivedMessage( IReceiverClient receiverClient, ISenderClient senderClient, @@ -240,51 +208,55 @@ private async Task HandleReceivedMessage( { var eventName = message.Label; var eventType = _subscriptionsManager.GetEventTypeByName(eventName); - var envelope = ParseEventEnvelopeAsync(message); + var envelope = (dynamic)ParseEventEnvelopeAsync(message); await ProcessEvent(eventName, envelope); await receiverClient.CompleteAsync(message.SystemProperties.LockToken); if (_eventPublishConfiguration.SendCompletionEvent && eventType != typeof(CompletedEvent)) { var completedEvent = new CompletedEvent(envelope.Id, envelope.PublisherId); - var completedEventEnvelope = new IntegrationEventEnvelope(_eventPublishConfiguration.EventPublisherId, completedEvent); + var completedEventEnvelope = new IntegrationEventEnvelope( + completedEvent, _eventPublishConfiguration.EventPublisherId); await PublishEventAsync(completedEventEnvelope, senderClient); } } - private IntegrationEventEnvelope ParseEventEnvelopeAsync(Message message) + private object ParseEventEnvelopeAsync(Message message) { + var envelopeBody = Encoding.UTF8.GetString(message.Body); var eventName = message.Label; var eventType = _subscriptionsManager.GetEventTypeByName(eventName); - var envelopeBody = Encoding.UTF8.GetString(message.Body); - var envelope = (IntegrationEventEnvelope)JsonConvert.DeserializeObject(envelopeBody, eventType); - if (envelope == null) + var eventEnvelopeType = typeof(IntegrationEventEnvelope<>).MakeGenericType(eventType); + var genericEventEnvelope = (dynamic)JsonConvert.DeserializeObject(envelopeBody, eventEnvelopeType); + if (genericEventEnvelope == null) { - throw new InvalidOperationException($"Failed to parse received message '{eventName}'. Raw body: '{envelopeBody}'."); + throw new InvalidOperationException( + $"Failed to parse received message '{eventName}'. Raw body: '{envelopeBody}'."); } - envelope.Event.Id = envelope.Id; - envelope.Event.Created = envelope.Created; - return envelope; + return genericEventEnvelope; } - private async Task ProcessEvent(string eventName, IntegrationEventEnvelope integrationEventEnvelope) + private async Task ProcessEvent(string eventName, IntegrationEventEnvelope eventEnvelope) + where TEvent : IntegrationEvent { if (!_subscriptionsManager.IsEventRegistered(eventName)) { return; } + var handlerType = typeof(IEventEnvelopeHandler<>).MakeGenericType(typeof(TEvent)); using (var scope = _serviceProvider.CreateScope()) { - var handlerType = typeof(IIntegrationEventHandler<>).MakeGenericType(integrationEventEnvelope.Event.GetType()); var handler = scope.ServiceProvider.GetRequiredService(handlerType); - await HandleParsedEventAsync((dynamic)handler, (dynamic)integrationEventEnvelope.Event); + await HandleParsedEventAsync((dynamic)handler, eventEnvelope); } } - private static async Task HandleParsedEventAsync(IIntegrationEventHandler handler, TEventContent content) - where TEventContent : IntegrationEvent + private static async Task HandleParsedEventAsync( + IEventEnvelopeHandler envelopeHandler, + IntegrationEventEnvelope eventEnvelope) + where TEvent : IntegrationEvent { - await handler.HandleAsync(content); + await envelopeHandler.HandleAsync(eventEnvelope); } private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) @@ -322,5 +294,22 @@ private void ValidateQueue() throw new InvalidOperationException("Queue connection is not configured"); } } + + private async Task CheckIfRuleExists(string ruleName) + { + try + { + var rules = await _topicConnection.SubscriptionClient.GetRulesAsync(); + + return rules != null + && rules.Any(rule => + string.Equals(rule.Name, ruleName, StringComparison.InvariantCultureIgnoreCase)); + } + catch (ServiceBusException ex) + { + throw new Exceptions.ServiceBusException( + $"Checking rule {ruleName} existence failed.", ex); + } + } } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus.Service/IIntegrationEventHandler.cs b/Softeq.NetKit.Components.EventBus.Service/IIntegrationEventHandler.cs deleted file mode 100644 index ea877ac..0000000 --- a/Softeq.NetKit.Components.EventBus.Service/IIntegrationEventHandler.cs +++ /dev/null @@ -1,13 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -using System.Threading.Tasks; -using Softeq.NetKit.Components.EventBus.Events; - -namespace Softeq.NetKit.Components.EventBus.Service -{ - public interface IIntegrationEventHandler where TEvent : IIntegrationEvent - { - Task HandleAsync(TEvent @event); - } -} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs index 0d9d367..edd03eb 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusPublisher.cs @@ -8,7 +8,7 @@ namespace Softeq.NetKit.Components.EventBus.Abstract { public interface IEventBusPublisher { - Task PublishToTopicAsync(IntegrationEventEnvelope @event); - Task PublishToQueueAsync(IntegrationEventEnvelope @event); + Task PublishToTopicAsync(IntegrationEventEnvelope eventEnvelope); + Task PublishToQueueAsync(IntegrationEventEnvelope eventEnvelope); } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs index 695df5f..98eecd6 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs @@ -10,9 +10,7 @@ public interface IEventBusSubscriber { Task RegisterTopicListenerAsync(); void RegisterQueueListener(QueueListenerConfiguration configuration = null); - Task RegisterTopicEventAsync() where TEvent : IntegrationEvent; - Task RemoveTopicEventRegistrationAsync() where TEvent : IntegrationEvent; - void RegisterQueueEventAsync() where TEvent : IntegrationEvent; - void RemoveQueueEventRegistrationAsync() where TEvent : IntegrationEvent; + Task RegisterEventAsync() where TEvent : IntegrationEvent; + Task RemoveEventRegistrationAsync() where TEvent : IntegrationEvent; } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventHandler.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventEnvelopeHandler.cs similarity index 58% rename from Softeq.NetKit.Components.EventBus/Abstract/IEventHandler.cs rename to Softeq.NetKit.Components.EventBus/Abstract/IEventEnvelopeHandler.cs index a41ee8d..9904adf 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventHandler.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventEnvelopeHandler.cs @@ -6,8 +6,8 @@ namespace Softeq.NetKit.Components.EventBus.Abstract { - public interface IEventHandler where TIntegrationEvent : IntegrationEvent + public interface IEventEnvelopeHandler where TEvent : IntegrationEvent { - Task Handle(TIntegrationEvent @event); + Task HandleAsync(IntegrationEventEnvelope eventEnvelope); } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IIntegrationEvent.cs b/Softeq.NetKit.Components.EventBus/Events/IIntegrationEvent.cs deleted file mode 100644 index 3235178..0000000 --- a/Softeq.NetKit.Components.EventBus/Events/IIntegrationEvent.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; - -namespace Softeq.NetKit.Components.EventBus.Events -{ - public interface IIntegrationEvent - { - Guid Id { get; } - - DateTimeOffset Created { get; } - } -} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs index 92eb8b2..541c867 100644 --- a/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEvent.cs @@ -1,20 +1,9 @@ -using System; -using Newtonsoft.Json; +// Developed by Softeq Development Corporation +// http://www.softeq.com namespace Softeq.NetKit.Components.EventBus.Events { - public abstract class IntegrationEvent : IIntegrationEvent + public abstract class IntegrationEvent { - protected IntegrationEvent() - { - Id = Guid.NewGuid(); - Created = DateTimeOffset.UtcNow; - } - - [JsonIgnore] - public Guid Id { get; set; } - - [JsonIgnore] - public DateTimeOffset Created { get; set; } } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs index dc263a7..3d87691 100644 --- a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs @@ -8,29 +8,45 @@ namespace Softeq.NetKit.Components.EventBus.Events public sealed class IntegrationEventEnvelope { public IntegrationEventEnvelope( + IntegrationEvent @event, string publisherId, + string sequenceId = null, + string correlationId = null) + : this(Guid.NewGuid(), @event, publisherId, sequenceId, correlationId) + { + } + + private IntegrationEventEnvelope( + Guid id, IntegrationEvent @event, - string sessionId = null, + string publisherId, + string sequenceId = null, string correlationId = null) { - PublisherId = publisherId ?? throw new ArgumentNullException(nameof(publisherId)); + Id = id; Event = @event ?? throw new ArgumentNullException(nameof(@event)); - Id = @event.Id; - Created = @event.Created; - SessionId = sessionId; - CorrelationId = correlationId; + PublisherId = publisherId ?? throw new ArgumentNullException(nameof(publisherId)); + Created = DateTimeOffset.UtcNow; + SequenceId = sequenceId; + CorrelationId = correlationId ?? sequenceId ?? Id.ToString(); } public Guid Id { get; private set; } - public DateTimeOffset Created { get; private set; } - public string PublisherId { get; private set; } - - public string SessionId { get; private set; } - + public string SequenceId { get; private set; } public string CorrelationId { get; private set; } - public IntegrationEvent Event { get; private set; } + + public static IntegrationEventEnvelope FromEnvelope(IntegrationEventEnvelope eventEnvelope) + where TEvent : IntegrationEvent + { + return new IntegrationEventEnvelope( + eventEnvelope.Id, + eventEnvelope.Event, + eventEnvelope.PublisherId, + eventEnvelope.SequenceId, + eventEnvelope.CorrelationId); + } } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope`1.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope`1.cs new file mode 100644 index 0000000..67654b7 --- /dev/null +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope`1.cs @@ -0,0 +1,50 @@ +// Developed by Softeq Development Corporation +// http://www.softeq.com + +using Newtonsoft.Json; +using System; + +namespace Softeq.NetKit.Components.EventBus.Events +{ + public class IntegrationEventEnvelope where TEvent : IntegrationEvent + { + [JsonConstructor] + private IntegrationEventEnvelope( + Guid id, + DateTimeOffset created, + string publisherId, + string sequenceId, + string correlationId, + TEvent @event) + { + Id = id; + Created = created; + PublisherId = publisherId; + SequenceId = sequenceId; + CorrelationId = correlationId; + Event = @event; + } + + public IntegrationEventEnvelope(IntegrationEventEnvelope eventEnvelope) + { + if (eventEnvelope == null) + { + throw new ArgumentNullException(nameof(eventEnvelope)); + } + + Event = eventEnvelope.Event as TEvent ?? throw new ArgumentException(nameof(eventEnvelope.Event)); + Id = eventEnvelope.Id; + Created = eventEnvelope.Created; + PublisherId = eventEnvelope.PublisherId; + SequenceId = eventEnvelope.SequenceId; + CorrelationId = eventEnvelope.CorrelationId; + } + + public Guid Id { get; private set; } + public DateTimeOffset Created { get; private set; } + public string PublisherId { get; private set; } + public string SequenceId { get; private set; } + public string CorrelationId { get; private set; } + public TEvent Event { get; } + } +} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs b/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs index 11b0764..18d8189 100644 --- a/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs +++ b/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs @@ -32,7 +32,7 @@ public void RemoveEventType() where TEvent : IntegrationEvent _eventTypes.Remove(typeof(TEvent)); } - public bool IsEventRegistered() where TEvent : IntegrationEvent + private bool IsEventRegistered() where TEvent : IntegrationEvent { var eventName = GetEventName(); return IsEventRegistered(eventName); diff --git a/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs b/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs index 6204c90..31869ab 100644 --- a/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs +++ b/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs @@ -10,7 +10,6 @@ public interface IEventBusSubscriptionsManager { void RegisterEventType() where TEvent : IntegrationEvent; void RemoveEventType() where TEvent : IntegrationEvent; - bool IsEventRegistered() where TEvent : IntegrationEvent; bool IsEventRegistered(string eventName); Type GetEventTypeByName(string eventName); } diff --git a/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs index d8e66cc..bddd833 100644 --- a/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs @@ -10,8 +10,9 @@ namespace Softeq.NetKit.Integrations.EventLog.Abstract { public interface IIntegrationEventLogService { - Task GetAsync(Guid eventId); + Task GetAsync(Guid eventEnvelopeId); Task> GetAsync(Expression> condition); + Task AnyAsync(Expression> condition); Task CreateAsync(IntegrationEventLog eventLog); Task MarkAsPublishedAsync(IntegrationEventLog eventLog); Task MarkAsPublishedFailedAsync(IntegrationEventLog eventLog); diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs index 9c3f450..e3db7d1 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs @@ -9,28 +9,19 @@ namespace Softeq.NetKit.Integrations.EventLog { public class IntegrationEventLog { - private IntegrationEventLog() - { - } + private IntegrationEventLog() { } - public IntegrationEventLog(IntegrationEventEnvelope @event) + public IntegrationEventLog(IntegrationEventEnvelope eventEnvelope) { - if (@event == null) - { - throw new ArgumentNullException(nameof(@event)); - } - - EventTypeName = @event.GetType().FullName; + EventEnvelope = eventEnvelope ?? throw new ArgumentNullException(nameof(eventEnvelope)); EventState = EventState.NotPublished; TimesSent = 0; - Content = @event; } - public string EventTypeName { get; private set; } public EventState EventState { get; private set; } public int TimesSent { get; private set; } public DateTimeOffset? Updated { get; private set; } - public IntegrationEventEnvelope Content { get; private set; } + public IntegrationEventEnvelope EventEnvelope { get; private set; } public void ChangeEventState(EventState newEventState) { diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogContextFactory.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogContextFactory.cs index c54f1f1..f68e2a9 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogContextFactory.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogContextFactory.cs @@ -10,10 +10,11 @@ namespace Softeq.NetKit.Integrations.EventLog /// This factory is needed to create db migrations in class library via Package Manager Console /// https://docs.microsoft.com/en-us/ef/core/cli/dbcontext-creation?tabs=dotnet-core-cli#from-a-design-time-factory /// To use this factory - /// 1. Set current project as startup project - /// 2. Edit project file: change the project TargetFramework to, for example, netcoreapp3.1 - /// 3. Open Package Manager Console. Set current project as the default project - /// 4. Create migration by running command: add-migration migration_name -context IntegrationEventLogContext + /// 1. Build solution + /// 2. Set the current project as startup project + /// 3. Edit the project file: change the project TargetFramework to, for example, netcoreapp3.1 + /// 4. Open Package Manager Console. Set current project as the default project + /// 5. Create migration by running command: add-migration migration-name -context IntegrationEventLogContext /// public class IntegrationEventLogContextFactory : IDesignTimeDbContextFactory { diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs index a8a83fb..626ff1c 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs @@ -18,28 +18,39 @@ public class IntegrationEventLogService : IIntegrationEventLogService public IntegrationEventLogService(IntegrationEventLogContext eventLogContext) { - EventLogContext = eventLogContext; + EventLogContext = eventLogContext ?? throw new ArgumentNullException(nameof(eventLogContext)); } - public async Task GetAsync(Guid eventId) + public async Task GetAsync(Guid eventEnvelopeId) { - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync(log => log.Content.Id == eventId); + var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync( + log => log.EventEnvelope.Id == eventEnvelopeId); if (eventLog == null) { - throw new EventLogNotFoundException(eventId); + throw new EventLogNotFoundException(eventEnvelopeId); } return eventLog; } - public async Task> GetAsync(Expression> condition) + public Task> GetAsync(Expression> condition) { if (condition == null) { throw new ArgumentNullException(nameof(condition)); } - return await EventLogContext.IntegrationEventLogs.Where(condition).ToListAsync(); + return EventLogContext.IntegrationEventLogs.Where(condition).ToListAsync(); + } + + public Task AnyAsync(Expression> condition) + { + if (condition == null) + { + throw new ArgumentNullException(nameof(condition)); + } + + return EventLogContext.IntegrationEventLogs.AnyAsync(condition); } public Task CreateAsync(IntegrationEventLog eventLog) @@ -48,7 +59,7 @@ public Task CreateAsync(IntegrationEventLog eventLog) return EventLogContext.SaveChangesAsync(); } - public async Task MarkAsPublishedAsync(IntegrationEventLog eventLog) + public Task MarkAsPublishedAsync(IntegrationEventLog eventLog) { if (eventLog == null) { @@ -56,10 +67,10 @@ public async Task MarkAsPublishedAsync(IntegrationEventLog eventLog) } eventLog.ChangeEventState(EventState.Published); - await UpdateAsync(eventLog); + return UpdateAsync(eventLog); } - public async Task MarkAsPublishedFailedAsync(IntegrationEventLog eventLog) + public Task MarkAsPublishedFailedAsync(IntegrationEventLog eventLog) { if (eventLog == null) { @@ -67,10 +78,10 @@ public async Task MarkAsPublishedFailedAsync(IntegrationEventLog eventLog) } eventLog.ChangeEventState(EventState.PublishedFailed); - await UpdateAsync(eventLog); + return UpdateAsync(eventLog); } - public async Task MarkAsCompletedAsync(IntegrationEventLog eventLog) + public Task MarkAsCompletedAsync(IntegrationEventLog eventLog) { if (eventLog == null) { @@ -78,10 +89,10 @@ public async Task MarkAsCompletedAsync(IntegrationEventLog eventLog) } eventLog.ChangeEventState(EventState.Completed); - await UpdateAsync(eventLog); + return UpdateAsync(eventLog); } - private async Task UpdateAsync(IntegrationEventLog @event) + private Task UpdateAsync(IntegrationEventLog @event) { if (@event == null) { @@ -89,7 +100,7 @@ private async Task UpdateAsync(IntegrationEventLog @event) } EventLogContext.IntegrationEventLogs.Update(@event); - await EventLogContext.SaveChangesAsync(); + return EventLogContext.SaveChangesAsync(); } } } \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs index 91cf1f8..ec3d867 100644 --- a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs +++ b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs @@ -1,7 +1,6 @@ // Developed by Softeq Development Corporation // http://www.softeq.com -using System; using System.Linq; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata.Builders; @@ -16,18 +15,17 @@ internal class IntegrationEventLogMappingConfiguration : DomainModelBuilder builder) { - builder.Property(eventLog => eventLog.EventTypeName).IsRequired(); builder.Property(eventLog => eventLog.EventState).HasConversion().IsRequired(); builder.HasIndex(eventLog => eventLog.EventState).IsUnique(false); builder.Property(eventLog => eventLog.TimesSent).IsRequired(); - builder.OwnsOne(entity => entity.Content, ownershipBuilder => + builder.OwnsOne(entity => entity.EventEnvelope, ownershipBuilder => { ownershipBuilder.Property(x => x.Created).IsRequired(); ownershipBuilder.HasIndex(x => x.Created).IsUnique(false); ownershipBuilder.Property(x => x.PublisherId).IsRequired(); ownershipBuilder.HasIndex(x => x.PublisherId).IsUnique(false); - ownershipBuilder.Property(x => x.SessionId).IsRequired(false); - ownershipBuilder.HasIndex(x => x.SessionId).IsUnique(false); + ownershipBuilder.Property(x => x.SequenceId).IsRequired(false); + ownershipBuilder.HasIndex(x => x.SequenceId).IsUnique(false); ownershipBuilder.Property(x => x.CorrelationId).IsRequired(false); var serializerSettings = new JsonSerializerSettings @@ -41,11 +39,11 @@ public override void Build(EntityTypeBuilder builder) json => (IntegrationEvent)JsonConvert.DeserializeObject(json, serializerSettings)) .IsRequired(); - // Use Content.Id as Primary Key for IntegrationEventLog table + // Use EventEnvelope.Id as Primary Key for IntegrationEventLog table var identityProperty = ownershipBuilder.OwnedEntityType.ClrType.GetProperties() - .Single(p => p.Name == nameof(IntegrationEventLog.Content.Id)); + .Single(p => p.Name == nameof(IntegrationEventLog.EventEnvelope.Id)); var propertyType = identityProperty.PropertyType; - var identityPropertyName = $"{nameof(IntegrationEventLog.Content)}_{nameof(IntegrationEventLog.Content.Id)}"; + var identityPropertyName = $"{nameof(IntegrationEventLog.EventEnvelope)}_{nameof(IntegrationEventLog.EventEnvelope.Id)}"; ownershipBuilder.Property(propertyType, identityProperty.Name).HasColumnName(identityPropertyName); builder.Property(propertyType, identityPropertyName).ValueGeneratedNever(); builder.HasKey(identityPropertyName); diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.Designer.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.Designer.cs new file mode 100644 index 0000000..b4e020e --- /dev/null +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.Designer.cs @@ -0,0 +1,82 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Softeq.NetKit.Integrations.EventLog; + +namespace Softeq.NetKit.Integrations.EventLog.Migrations +{ + [DbContext(typeof(IntegrationEventLogContext))] + [Migration("20230720082914_UpdateIntegrationEventLogStructureV2")] + partial class UpdateIntegrationEventLogStructureV2 + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("dbo") + .HasAnnotation("ProductVersion", "2.2.6-servicing-10079") + .HasAnnotation("Relational:MaxIdentifierLength", 128) + .HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn); + + modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => + { + b.Property("EventEnvelope_Id"); + + b.Property("EventState"); + + b.Property("TimesSent"); + + b.Property("Updated"); + + b.HasKey("EventEnvelope_Id"); + + b.HasIndex("EventState"); + + b.ToTable("IntegrationEventLogs"); + }); + + modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => + { + b.OwnsOne("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "EventEnvelope", b1 => + { + b1.Property("IntegrationEventLogEventEnvelope_Id"); + + b1.Property("CorrelationId"); + + b1.Property("Created"); + + b1.Property("Event") + .IsRequired(); + + b1.Property("Id") + .HasColumnName("EventEnvelope_Id"); + + b1.Property("PublisherId") + .IsRequired(); + + b1.Property("SequenceId"); + + b1.HasKey("IntegrationEventLogEventEnvelope_Id"); + + b1.HasIndex("Created"); + + b1.HasIndex("PublisherId"); + + b1.HasIndex("SequenceId"); + + b1.ToTable("IntegrationEventLogs","dbo"); + + b1.HasOne("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog") + .WithOne("EventEnvelope") + .HasForeignKey("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "IntegrationEventLogEventEnvelope_Id") + .OnDelete(DeleteBehavior.Cascade); + }); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.cs new file mode 100644 index 0000000..3bb3ab5 --- /dev/null +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.cs @@ -0,0 +1,226 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +namespace Softeq.NetKit.Integrations.EventLog.Migrations +{ + public partial class UpdateIntegrationEventLogStructureV2 : Migration + { + // TODO: 1. Cleanup code 2. Actualize 'Down' method + protected override void Up(MigrationBuilder migrationBuilder) + { + + migrationBuilder.DropPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs"); + + //// - + //migrationBuilder.DropColumn( + // name: "EventId", + // schema: "dbo", + // table: "IntegrationEventLogs"); + //// - + //migrationBuilder.AddColumn( + // name: "EventEnvelope_Id", + // schema: "dbo", + // table: "IntegrationEventLogs", + // nullable: false, + // defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); + migrationBuilder.RenameColumn( + name: "EventId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_Id"); + migrationBuilder.AddPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventEnvelope_Id"); + + + //// - + //migrationBuilder.DropColumn( + // name: "Content", + // schema: "dbo", + // table: "IntegrationEventLogs"); + //// - + //migrationBuilder.AddColumn( + // name: "EventEnvelope_Event", + // schema: "dbo", + // table: "IntegrationEventLogs", + // nullable: false, + // defaultValue: ""); + migrationBuilder.RenameColumn( + name: "Content", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_Event"); + + + + migrationBuilder.RenameColumn( + name: "Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_Created"); + + + migrationBuilder.RenameColumn( + name: "SessionId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_SequenceId"); + + //// - + //migrationBuilder.RenameColumn( + // name: "EventTypeName", + // schema: "dbo", + // table: "IntegrationEventLogs", + // newName: "EventEnvelope_PublisherId"); + migrationBuilder.DropColumn( + name: "EventTypeName", + schema: "dbo", + table: "IntegrationEventLogs"); + migrationBuilder.AddColumn( + name: "EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + defaultValue: ""); + migrationBuilder.Sql(@" +DECLARE @PublisherId UNIQUEIDENTIFIER; + +SET @PublisherId = ( + SELECT TOP 1 JsonData.PublisherId + FROM IntegrationEventLogs + CROSS APPLY OPENJSON(IntegrationEventLogs.EventEnvelope_Event, N'$') WITH (PublisherId UNIQUEIDENTIFIER N'$.PublisherId') AS JsonData); + +UPDATE IntegrationEventLogs SET EventEnvelope_PublisherId = @PublisherId;"); + + + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_SessionId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_EventEnvelope_SequenceId"); + + + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_EventEnvelope_Created"); + + + migrationBuilder.AlterColumn( + name: "EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + oldClrType: typeof(string)); + + + + + migrationBuilder.AddColumn( + name: "EventEnvelope_CorrelationId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: true); + + + + + migrationBuilder.CreateIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventEnvelope_PublisherId"); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs"); + + migrationBuilder.DropIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs"); + + migrationBuilder.DropColumn( + name: "EventEnvelope_Id", + schema: "dbo", + table: "IntegrationEventLogs"); + + migrationBuilder.DropColumn( + name: "EventEnvelope_CorrelationId", + schema: "dbo", + table: "IntegrationEventLogs"); + + migrationBuilder.DropColumn( + name: "EventEnvelope_Event", + schema: "dbo", + table: "IntegrationEventLogs"); + + migrationBuilder.RenameColumn( + name: "EventEnvelope_Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "Created"); + + migrationBuilder.RenameColumn( + name: "EventEnvelope_SequenceId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "SessionId"); + + migrationBuilder.RenameColumn( + name: "EventEnvelope_PublisherId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventTypeName"); + + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_SequenceId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_SessionId"); + + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_Created"); + + migrationBuilder.AlterColumn( + name: "EventTypeName", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + oldClrType: typeof(string)); + + migrationBuilder.AddColumn( + name: "EventId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); + + migrationBuilder.AddColumn( + name: "Content", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + defaultValue: ""); + + migrationBuilder.AddPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventId"); + } + } +} diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs index c58b743..a61b499 100644 --- a/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs @@ -22,34 +22,57 @@ protected override void BuildModel(ModelBuilder modelBuilder) modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => { - b.Property("EventId") - .ValueGeneratedOnAdd(); + b.Property("EventEnvelope_Id"); - b.Property("Content") - .IsRequired(); + b.Property("EventState"); - b.Property("Created"); + b.Property("TimesSent"); - b.Property("EventState"); + b.Property("Updated"); - b.Property("EventTypeName") - .IsRequired(); + b.HasKey("EventEnvelope_Id"); - b.Property("SessionId"); + b.HasIndex("EventState"); - b.Property("TimesSent"); + b.ToTable("IntegrationEventLogs"); + }); - b.Property("Updated"); + modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => + { + b.OwnsOne("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "EventEnvelope", b1 => + { + b1.Property("IntegrationEventLogEventEnvelope_Id"); - b.HasKey("EventId"); + b1.Property("CorrelationId"); - b.HasIndex("Created"); + b1.Property("Created"); - b.HasIndex("EventState"); + b1.Property("Event") + .IsRequired(); - b.HasIndex("SessionId"); + b1.Property("Id") + .HasColumnName("EventEnvelope_Id"); - b.ToTable("IntegrationEventLogs"); + b1.Property("PublisherId") + .IsRequired(); + + b1.Property("SequenceId"); + + b1.HasKey("IntegrationEventLogEventEnvelope_Id"); + + b1.HasIndex("Created"); + + b1.HasIndex("PublisherId"); + + b1.HasIndex("SequenceId"); + + b1.ToTable("IntegrationEventLogs","dbo"); + + b1.HasOne("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog") + .WithOne("EventEnvelope") + .HasForeignKey("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "IntegrationEventLogEventEnvelope_Id") + .OnDelete(DeleteBehavior.Cascade); + }); }); #pragma warning restore 612, 618 } From 2b85a8e765a54fa3d4b5aa62077e36629ce288db Mon Sep 17 00:00:00 2001 From: Aleksey Zhukov Date: Tue, 25 Jul 2023 21:03:48 +0300 Subject: [PATCH 4/8] WIP --- .../Events/IntegrationEventEnvelope.cs | 2 + .../IntegrationEventLog.cs | 2 + .../IntegrationEventLogService.cs | 8 +-- ...IntegrationEventLogMappingConfiguration.cs | 53 +++++++--------- ...ntegrationEventLogStructureV2.Designer.cs} | 19 +++--- ...1_UpdateIntegrationEventLogStructureV2.cs} | 63 ++++++++++++------- ...IntegrationEventLogContextModelSnapshot.cs | 17 ++--- 7 files changed, 90 insertions(+), 74 deletions(-) rename Softeq.NetKit.Integrations.EventLog/Migrations/{20230720082914_UpdateIntegrationEventLogStructureV2.Designer.cs => 20230725151721_UpdateIntegrationEventLogStructureV2.Designer.cs} (85%) rename Softeq.NetKit.Integrations.EventLog/Migrations/{20230720082914_UpdateIntegrationEventLogStructureV2.cs => 20230725151721_UpdateIntegrationEventLogStructureV2.cs} (85%) diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs index 3d87691..7448470 100644 --- a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs @@ -16,6 +16,7 @@ public IntegrationEventEnvelope( { } + // TODO: Remove private IntegrationEventEnvelope( Guid id, IntegrationEvent @event, @@ -38,6 +39,7 @@ private IntegrationEventEnvelope( public string CorrelationId { get; private set; } public IntegrationEvent Event { get; private set; } + // TODO: Remove public static IntegrationEventEnvelope FromEnvelope(IntegrationEventEnvelope eventEnvelope) where TEvent : IntegrationEvent { diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs index e3db7d1..c6949b6 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs @@ -13,11 +13,13 @@ private IntegrationEventLog() { } public IntegrationEventLog(IntegrationEventEnvelope eventEnvelope) { + EventLogId = Guid.NewGuid(); EventEnvelope = eventEnvelope ?? throw new ArgumentNullException(nameof(eventEnvelope)); EventState = EventState.NotPublished; TimesSent = 0; } + public Guid EventLogId { get; private set; } public EventState EventState { get; private set; } public int TimesSent { get; private set; } public DateTimeOffset? Updated { get; private set; } diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs index 626ff1c..1b9b662 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs @@ -92,14 +92,14 @@ public Task MarkAsCompletedAsync(IntegrationEventLog eventLog) return UpdateAsync(eventLog); } - private Task UpdateAsync(IntegrationEventLog @event) + private Task UpdateAsync(IntegrationEventLog integrationEventLog) { - if (@event == null) + if (integrationEventLog == null) { - throw new ArgumentNullException(nameof(@event)); + throw new ArgumentNullException(nameof(integrationEventLog)); } - EventLogContext.IntegrationEventLogs.Update(@event); + EventLogContext.IntegrationEventLogs.Update(integrationEventLog); return EventLogContext.SaveChangesAsync(); } } diff --git a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs index ec3d867..ca868f2 100644 --- a/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs +++ b/Softeq.NetKit.Integrations.EventLog/Mappings/IntegrationEventLogMappingConfiguration.cs @@ -1,8 +1,6 @@ // Developed by Softeq Development Corporation // http://www.softeq.com -using System.Linq; -using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata.Builders; using Newtonsoft.Json; using Softeq.NetKit.Components.EventBus.Events; @@ -13,41 +11,34 @@ namespace Softeq.NetKit.Integrations.EventLog.Mappings { internal class IntegrationEventLogMappingConfiguration : DomainModelBuilder, IEntityMappingConfiguration { + private static readonly JsonSerializerSettings EventSerializerSettings = new JsonSerializerSettings + { + TypeNameHandling = TypeNameHandling.Objects, + ContractResolver = new PrivateFieldContractResolver() + }; + public override void Build(EntityTypeBuilder builder) { + builder.HasKey(eventLog => eventLog.EventLogId); builder.Property(eventLog => eventLog.EventState).HasConversion().IsRequired(); builder.HasIndex(eventLog => eventLog.EventState).IsUnique(false); builder.Property(eventLog => eventLog.TimesSent).IsRequired(); builder.OwnsOne(entity => entity.EventEnvelope, ownershipBuilder => - { - ownershipBuilder.Property(x => x.Created).IsRequired(); - ownershipBuilder.HasIndex(x => x.Created).IsUnique(false); - ownershipBuilder.Property(x => x.PublisherId).IsRequired(); - ownershipBuilder.HasIndex(x => x.PublisherId).IsUnique(false); - ownershipBuilder.Property(x => x.SequenceId).IsRequired(false); - ownershipBuilder.HasIndex(x => x.SequenceId).IsUnique(false); - ownershipBuilder.Property(x => x.CorrelationId).IsRequired(false); - - var serializerSettings = new JsonSerializerSettings - { - TypeNameHandling = TypeNameHandling.Objects, - ContractResolver = new PrivateFieldContractResolver() - }; - ownershipBuilder.Property(x => x.Event) - .HasConversion( - @event => JsonConvert.SerializeObject(@event, serializerSettings), - json => (IntegrationEvent)JsonConvert.DeserializeObject(json, serializerSettings)) - .IsRequired(); - - // Use EventEnvelope.Id as Primary Key for IntegrationEventLog table - var identityProperty = ownershipBuilder.OwnedEntityType.ClrType.GetProperties() - .Single(p => p.Name == nameof(IntegrationEventLog.EventEnvelope.Id)); - var propertyType = identityProperty.PropertyType; - var identityPropertyName = $"{nameof(IntegrationEventLog.EventEnvelope)}_{nameof(IntegrationEventLog.EventEnvelope.Id)}"; - ownershipBuilder.Property(propertyType, identityProperty.Name).HasColumnName(identityPropertyName); - builder.Property(propertyType, identityPropertyName).ValueGeneratedNever(); - builder.HasKey(identityPropertyName); - }); + { + ownershipBuilder.HasIndex(x => x.Id).IsUnique(); + ownershipBuilder.Property(x => x.Created).IsRequired(); + ownershipBuilder.HasIndex(x => x.Created).IsUnique(false); + ownershipBuilder.Property(x => x.PublisherId).IsRequired(); + ownershipBuilder.HasIndex(x => x.PublisherId).IsUnique(false); + ownershipBuilder.Property(x => x.SequenceId).IsRequired(false); + ownershipBuilder.HasIndex(x => x.SequenceId).IsUnique(false); + ownershipBuilder.Property(x => x.CorrelationId).IsRequired(false); + ownershipBuilder.Property(x => x.Event) + .HasConversion( + @event => JsonConvert.SerializeObject(@event, EventSerializerSettings), + json => (IntegrationEvent)JsonConvert.DeserializeObject(json, EventSerializerSettings)) + .IsRequired(); + }); } } } \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.Designer.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.Designer.cs similarity index 85% rename from Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.Designer.cs rename to Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.Designer.cs index b4e020e..7b40247 100644 --- a/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.Designer.cs +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.Designer.cs @@ -10,7 +10,7 @@ namespace Softeq.NetKit.Integrations.EventLog.Migrations { [DbContext(typeof(IntegrationEventLogContext))] - [Migration("20230720082914_UpdateIntegrationEventLogStructureV2")] + [Migration("20230725151721_UpdateIntegrationEventLogStructureV2")] partial class UpdateIntegrationEventLogStructureV2 { protected override void BuildTargetModel(ModelBuilder modelBuilder) @@ -24,7 +24,8 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => { - b.Property("EventEnvelope_Id"); + b.Property("EventLogId") + .ValueGeneratedOnAdd(); b.Property("EventState"); @@ -32,7 +33,7 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) b.Property("Updated"); - b.HasKey("EventEnvelope_Id"); + b.HasKey("EventLogId"); b.HasIndex("EventState"); @@ -43,7 +44,7 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) { b.OwnsOne("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "EventEnvelope", b1 => { - b1.Property("IntegrationEventLogEventEnvelope_Id"); + b1.Property("IntegrationEventLogEventLogId"); b1.Property("CorrelationId"); @@ -52,18 +53,20 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) b1.Property("Event") .IsRequired(); - b1.Property("Id") - .HasColumnName("EventEnvelope_Id"); + b1.Property("Id"); b1.Property("PublisherId") .IsRequired(); b1.Property("SequenceId"); - b1.HasKey("IntegrationEventLogEventEnvelope_Id"); + b1.HasKey("IntegrationEventLogEventLogId"); b1.HasIndex("Created"); + b1.HasIndex("Id") + .IsUnique(); + b1.HasIndex("PublisherId"); b1.HasIndex("SequenceId"); @@ -72,7 +75,7 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) b1.HasOne("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog") .WithOne("EventEnvelope") - .HasForeignKey("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "IntegrationEventLogEventEnvelope_Id") + .HasForeignKey("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "IntegrationEventLogEventLogId") .OnDelete(DeleteBehavior.Cascade); }); }); diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.cs similarity index 85% rename from Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.cs rename to Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.cs index 3bb3ab5..b830f44 100644 --- a/Softeq.NetKit.Integrations.EventLog/Migrations/20230720082914_UpdateIntegrationEventLogStructureV2.cs +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.cs @@ -5,21 +5,18 @@ namespace Softeq.NetKit.Integrations.EventLog.Migrations { public partial class UpdateIntegrationEventLogStructureV2 : Migration { - // TODO: 1. Cleanup code 2. Actualize 'Down' method protected override void Up(MigrationBuilder migrationBuilder) { - migrationBuilder.DropPrimaryKey( name: "PK_IntegrationEventLogs", schema: "dbo", table: "IntegrationEventLogs"); - //// - + //migrationBuilder.DropColumn( // name: "EventId", // schema: "dbo", // table: "IntegrationEventLogs"); - //// - //migrationBuilder.AddColumn( // name: "EventEnvelope_Id", // schema: "dbo", @@ -31,19 +28,17 @@ protected override void Up(MigrationBuilder migrationBuilder) schema: "dbo", table: "IntegrationEventLogs", newName: "EventEnvelope_Id"); - migrationBuilder.AddPrimaryKey( - name: "PK_IntegrationEventLogs", - schema: "dbo", - table: "IntegrationEventLogs", - column: "EventEnvelope_Id"); + ////////migrationBuilder.AddPrimaryKey( + //////// name: "PK_IntegrationEventLogs", + //////// schema: "dbo", + //////// table: "IntegrationEventLogs", + //////// column: "EventEnvelope_Id"); - //// - //migrationBuilder.DropColumn( // name: "Content", // schema: "dbo", // table: "IntegrationEventLogs"); - //// - //migrationBuilder.AddColumn( // name: "EventEnvelope_Event", // schema: "dbo", @@ -57,21 +52,18 @@ protected override void Up(MigrationBuilder migrationBuilder) newName: "EventEnvelope_Event"); - migrationBuilder.RenameColumn( name: "Created", schema: "dbo", table: "IntegrationEventLogs", newName: "EventEnvelope_Created"); - migrationBuilder.RenameColumn( name: "SessionId", schema: "dbo", table: "IntegrationEventLogs", newName: "EventEnvelope_SequenceId"); - //// - //migrationBuilder.RenameColumn( // name: "EventTypeName", // schema: "dbo", @@ -89,29 +81,24 @@ protected override void Up(MigrationBuilder migrationBuilder) defaultValue: ""); migrationBuilder.Sql(@" DECLARE @PublisherId UNIQUEIDENTIFIER; - SET @PublisherId = ( SELECT TOP 1 JsonData.PublisherId FROM IntegrationEventLogs CROSS APPLY OPENJSON(IntegrationEventLogs.EventEnvelope_Event, N'$') WITH (PublisherId UNIQUEIDENTIFIER N'$.PublisherId') AS JsonData); - UPDATE IntegrationEventLogs SET EventEnvelope_PublisherId = @PublisherId;"); - migrationBuilder.RenameIndex( name: "IX_IntegrationEventLogs_SessionId", schema: "dbo", table: "IntegrationEventLogs", newName: "IX_IntegrationEventLogs_EventEnvelope_SequenceId"); - migrationBuilder.RenameIndex( name: "IX_IntegrationEventLogs_Created", schema: "dbo", table: "IntegrationEventLogs", newName: "IX_IntegrationEventLogs_EventEnvelope_Created"); - migrationBuilder.AlterColumn( name: "EventEnvelope_PublisherId", schema: "dbo", @@ -120,17 +107,35 @@ FROM IntegrationEventLogs oldClrType: typeof(string)); - - migrationBuilder.AddColumn( name: "EventEnvelope_CorrelationId", schema: "dbo", table: "IntegrationEventLogs", nullable: true); - - + + + migrationBuilder.AddColumn( + name: "EventLogId", + schema: "dbo", + table: "IntegrationEventLogs", + nullable: false, + defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); + migrationBuilder.Sql("UPDATE IntegrationEventLogs SET EventLogId = NEWID();"); + migrationBuilder.AddPrimaryKey( + name: "PK_IntegrationEventLogs", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventLogId"); + + migrationBuilder.CreateIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_Id", + schema: "dbo", + table: "IntegrationEventLogs", + column: "EventEnvelope_Id", + unique: true); + migrationBuilder.CreateIndex( name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", schema: "dbo", @@ -145,13 +150,18 @@ protected override void Down(MigrationBuilder migrationBuilder) schema: "dbo", table: "IntegrationEventLogs"); + migrationBuilder.DropIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_Id", + schema: "dbo", + table: "IntegrationEventLogs"); + migrationBuilder.DropIndex( name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", schema: "dbo", table: "IntegrationEventLogs"); migrationBuilder.DropColumn( - name: "EventEnvelope_Id", + name: "EventLogId", schema: "dbo", table: "IntegrationEventLogs"); @@ -165,6 +175,11 @@ protected override void Down(MigrationBuilder migrationBuilder) schema: "dbo", table: "IntegrationEventLogs"); + migrationBuilder.DropColumn( + name: "EventEnvelope_Id", + schema: "dbo", + table: "IntegrationEventLogs"); + migrationBuilder.RenameColumn( name: "EventEnvelope_Created", schema: "dbo", diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs index a61b499..5e90aba 100644 --- a/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/IntegrationEventLogContextModelSnapshot.cs @@ -22,7 +22,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) modelBuilder.Entity("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog", b => { - b.Property("EventEnvelope_Id"); + b.Property("EventLogId") + .ValueGeneratedOnAdd(); b.Property("EventState"); @@ -30,7 +31,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("Updated"); - b.HasKey("EventEnvelope_Id"); + b.HasKey("EventLogId"); b.HasIndex("EventState"); @@ -41,7 +42,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) { b.OwnsOne("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "EventEnvelope", b1 => { - b1.Property("IntegrationEventLogEventEnvelope_Id"); + b1.Property("IntegrationEventLogEventLogId"); b1.Property("CorrelationId"); @@ -50,18 +51,20 @@ protected override void BuildModel(ModelBuilder modelBuilder) b1.Property("Event") .IsRequired(); - b1.Property("Id") - .HasColumnName("EventEnvelope_Id"); + b1.Property("Id"); b1.Property("PublisherId") .IsRequired(); b1.Property("SequenceId"); - b1.HasKey("IntegrationEventLogEventEnvelope_Id"); + b1.HasKey("IntegrationEventLogEventLogId"); b1.HasIndex("Created"); + b1.HasIndex("Id") + .IsUnique(); + b1.HasIndex("PublisherId"); b1.HasIndex("SequenceId"); @@ -70,7 +73,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b1.HasOne("Softeq.NetKit.Integrations.EventLog.IntegrationEventLog") .WithOne("EventEnvelope") - .HasForeignKey("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "IntegrationEventLogEventEnvelope_Id") + .HasForeignKey("Softeq.NetKit.Components.EventBus.Events.IntegrationEventEnvelope", "IntegrationEventLogEventLogId") .OnDelete(DeleteBehavior.Cascade); }); }); From ed503e1f572ad496bbe2bbf1c196e95836bbbf5a Mon Sep 17 00:00:00 2001 From: Aleksey Zhukov Date: Wed, 26 Jul 2023 12:26:23 +0300 Subject: [PATCH 5/8] WIP --- .../EventBusService.cs | 50 +++++++------------ .../EventSubscriptionInfo.cs | 18 ------- .../Events/IntegrationEventEnvelope.cs | 2 - .../Managers/EventBusSubscriptionsManager.cs | 21 ++++---- .../Managers/IEventBusSubscriptionsManager.cs | 1 + 5 files changed, 31 insertions(+), 61 deletions(-) delete mode 100644 Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs diff --git a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs index ee67202..e4a1820 100644 --- a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs +++ b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs @@ -207,58 +207,44 @@ private async Task HandleReceivedMessage( CancellationToken token) { var eventName = message.Label; - var eventType = _subscriptionsManager.GetEventTypeByName(eventName); - var envelope = (dynamic)ParseEventEnvelopeAsync(message); - await ProcessEvent(eventName, envelope); - await receiverClient.CompleteAsync(message.SystemProperties.LockToken); - if (_eventPublishConfiguration.SendCompletionEvent && eventType != typeof(CompletedEvent)) + if (!_subscriptionsManager.IsEventRegistered(eventName)) { - var completedEvent = new CompletedEvent(envelope.Id, envelope.PublisherId); - var completedEventEnvelope = new IntegrationEventEnvelope( - completedEvent, _eventPublishConfiguration.EventPublisherId); - await PublishEventAsync(completedEventEnvelope, senderClient); + // Skip processing if event type is not registered + return; } - } - private object ParseEventEnvelopeAsync(Message message) - { - var envelopeBody = Encoding.UTF8.GetString(message.Body); - var eventName = message.Label; var eventType = _subscriptionsManager.GetEventTypeByName(eventName); + var envelopeBody = Encoding.UTF8.GetString(message.Body); var eventEnvelopeType = typeof(IntegrationEventEnvelope<>).MakeGenericType(eventType); - var genericEventEnvelope = (dynamic)JsonConvert.DeserializeObject(envelopeBody, eventEnvelopeType); - if (genericEventEnvelope == null) + var eventEnvelope = (dynamic)JsonConvert.DeserializeObject(envelopeBody, eventEnvelopeType); + if (eventEnvelope == null) { throw new InvalidOperationException( $"Failed to parse received message '{eventName}'. Raw body: '{envelopeBody}'."); } - return genericEventEnvelope; - } - private async Task ProcessEvent(string eventName, IntegrationEventEnvelope eventEnvelope) - where TEvent : IntegrationEvent - { - if (!_subscriptionsManager.IsEventRegistered(eventName)) + await ProcessEventEnvelopeAsync(eventEnvelope); + await receiverClient.CompleteAsync(message.SystemProperties.LockToken); + if (_eventPublishConfiguration.SendCompletionEvent && eventType != typeof(CompletedEvent)) { - return; + var completedEvent = new CompletedEvent(eventEnvelope.Id, eventEnvelope.PublisherId); + var completedEventEnvelope = new IntegrationEventEnvelope( + completedEvent, _eventPublishConfiguration.EventPublisherId); + await PublishEventAsync(completedEventEnvelope, senderClient); } + } + private async Task ProcessEventEnvelopeAsync(IntegrationEventEnvelope eventEnvelope) + where TEvent : IntegrationEvent + { var handlerType = typeof(IEventEnvelopeHandler<>).MakeGenericType(typeof(TEvent)); using (var scope = _serviceProvider.CreateScope()) { var handler = scope.ServiceProvider.GetRequiredService(handlerType); - await HandleParsedEventAsync((dynamic)handler, eventEnvelope); + await ((dynamic)handler).HandleAsync(eventEnvelope); } } - private static async Task HandleParsedEventAsync( - IEventEnvelopeHandler envelopeHandler, - IntegrationEventEnvelope eventEnvelope) - where TEvent : IntegrationEvent - { - await envelopeHandler.HandleAsync(eventEnvelope); - } - private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { var context = exceptionReceivedEventArgs.ExceptionReceivedContext; diff --git a/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs b/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs deleted file mode 100644 index 445a60d..0000000 --- a/Softeq.NetKit.Components.EventBus/EventSubscriptionInfo.cs +++ /dev/null @@ -1,18 +0,0 @@ -// Developed by Softeq Development Corporation -// http://www.softeq.com - -using System; - -namespace Softeq.NetKit.Components.EventBus -{ - // TODO: Is that needed? - public class EventSubscriptionInfo - { - public EventSubscriptionInfo(Type handlerType) - { - HandlerType = handlerType; - } - - public Type HandlerType { get; } - } -} \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs index 7448470..3d87691 100644 --- a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs @@ -16,7 +16,6 @@ public IntegrationEventEnvelope( { } - // TODO: Remove private IntegrationEventEnvelope( Guid id, IntegrationEvent @event, @@ -39,7 +38,6 @@ private IntegrationEventEnvelope( public string CorrelationId { get; private set; } public IntegrationEvent Event { get; private set; } - // TODO: Remove public static IntegrationEventEnvelope FromEnvelope(IntegrationEventEnvelope eventEnvelope) where TEvent : IntegrationEvent { diff --git a/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs b/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs index 18d8189..2aa6f56 100644 --- a/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs +++ b/Softeq.NetKit.Components.EventBus/Managers/EventBusSubscriptionsManager.cs @@ -10,7 +10,7 @@ namespace Softeq.NetKit.Components.EventBus.Managers { public class EventBusSubscriptionsManager : IEventBusSubscriptionsManager { - private readonly List _eventTypes = new List(); + private readonly HashSet _eventTypes = new HashSet(); public void RegisterEventType() where TEvent : IntegrationEvent { @@ -32,19 +32,22 @@ public void RemoveEventType() where TEvent : IntegrationEvent _eventTypes.Remove(typeof(TEvent)); } - private bool IsEventRegistered() where TEvent : IntegrationEvent + public bool IsEventRegistered(Type eventType) { - var eventName = GetEventName(); - return IsEventRegistered(eventName); + return _eventTypes.Any(x => x == eventType); } - public bool IsEventRegistered(string eventName) => _eventTypes.Any(x => x.Name == eventName); - - public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(x => x.Name == eventName); + public bool IsEventRegistered(string eventName) + { + return _eventTypes.Any(x => x.Name == eventName); + } - private static string GetEventName() + public Type GetEventTypeByName(string eventName) { - return typeof(TEvent).Name; + var eventType = _eventTypes.SingleOrDefault(x => x.Name == eventName); + return eventType ?? throw new ArgumentException($"Event '{eventName}' is not registered."); } + + private static string GetEventName() => typeof(TEvent).Name; } } \ No newline at end of file diff --git a/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs b/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs index 31869ab..de6f8f4 100644 --- a/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs +++ b/Softeq.NetKit.Components.EventBus/Managers/IEventBusSubscriptionsManager.cs @@ -10,6 +10,7 @@ public interface IEventBusSubscriptionsManager { void RegisterEventType() where TEvent : IntegrationEvent; void RemoveEventType() where TEvent : IntegrationEvent; + bool IsEventRegistered(Type eventType); bool IsEventRegistered(string eventName); Type GetEventTypeByName(string eventName); } From 90aebddd71eee7c954fe1401263e866fbe6cd730 Mon Sep 17 00:00:00 2001 From: Aleksey Zhukov Date: Tue, 12 Dec 2023 16:58:59 +0100 Subject: [PATCH 6/8] WIP update db migration --- .../Events/IntegrationEventEnvelope.cs | 2 +- .../IntegrationEventLogService.cs | 16 +- ...ntegrationEventLogStructureV2.Designer.cs} | 2 +- ...9_UpdateIntegrationEventLogStructureV2.cs} | 190 +++++++----------- 4 files changed, 88 insertions(+), 122 deletions(-) rename Softeq.NetKit.Integrations.EventLog/Migrations/{20230725151721_UpdateIntegrationEventLogStructureV2.Designer.cs => 20231205132729_UpdateIntegrationEventLogStructureV2.Designer.cs} (98%) rename Softeq.NetKit.Integrations.EventLog/Migrations/{20230725151721_UpdateIntegrationEventLogStructureV2.cs => 20231205132729_UpdateIntegrationEventLogStructureV2.cs} (79%) diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs index 3d87691..f486c1f 100644 --- a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs @@ -28,7 +28,7 @@ private IntegrationEventEnvelope( PublisherId = publisherId ?? throw new ArgumentNullException(nameof(publisherId)); Created = DateTimeOffset.UtcNow; SequenceId = sequenceId; - CorrelationId = correlationId ?? sequenceId ?? Id.ToString(); + CorrelationId = correlationId; } public Guid Id { get; private set; } diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs index 1b9b662..bc25742 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs @@ -21,10 +21,16 @@ public IntegrationEventLogService(IntegrationEventLogContext eventLogContext) EventLogContext = eventLogContext ?? throw new ArgumentNullException(nameof(eventLogContext)); } + public Task CreateAsync(IntegrationEventLog eventLog) + { + EventLogContext.IntegrationEventLogs.Add(eventLog); + return EventLogContext.SaveChangesAsync(); + } + public async Task GetAsync(Guid eventEnvelopeId) { - var eventLog = await EventLogContext.IntegrationEventLogs.FirstOrDefaultAsync( - log => log.EventEnvelope.Id == eventEnvelopeId); + var eventLog = await EventLogContext.IntegrationEventLogs + .FirstOrDefaultAsync(log => log.EventEnvelope.Id == eventEnvelopeId); if (eventLog == null) { throw new EventLogNotFoundException(eventEnvelopeId); @@ -53,12 +59,6 @@ public Task AnyAsync(Expression> condition return EventLogContext.IntegrationEventLogs.AnyAsync(condition); } - public Task CreateAsync(IntegrationEventLog eventLog) - { - EventLogContext.IntegrationEventLogs.Add(eventLog); - return EventLogContext.SaveChangesAsync(); - } - public Task MarkAsPublishedAsync(IntegrationEventLog eventLog) { if (eventLog == null) diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.Designer.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.Designer.cs similarity index 98% rename from Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.Designer.cs rename to Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.Designer.cs index 7b40247..3c10ec4 100644 --- a/Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.Designer.cs +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.Designer.cs @@ -10,7 +10,7 @@ namespace Softeq.NetKit.Integrations.EventLog.Migrations { [DbContext(typeof(IntegrationEventLogContext))] - [Migration("20230725151721_UpdateIntegrationEventLogStructureV2")] + [Migration("20231205132729_UpdateIntegrationEventLogStructureV2")] partial class UpdateIntegrationEventLogStructureV2 { protected override void BuildTargetModel(ModelBuilder modelBuilder) diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs similarity index 79% rename from Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.cs rename to Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs index b830f44..59ca2a3 100644 --- a/Softeq.NetKit.Integrations.EventLog/Migrations/20230725151721_UpdateIntegrationEventLogStructureV2.cs +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs @@ -11,68 +11,41 @@ protected override void Up(MigrationBuilder migrationBuilder) name: "PK_IntegrationEventLogs", schema: "dbo", table: "IntegrationEventLogs"); + migrationBuilder.DropColumn( + name: "EventTypeName", + schema: "dbo", + table: "IntegrationEventLogs"); - //migrationBuilder.DropColumn( - // name: "EventId", - // schema: "dbo", - // table: "IntegrationEventLogs"); - //migrationBuilder.AddColumn( - // name: "EventEnvelope_Id", - // schema: "dbo", - // table: "IntegrationEventLogs", - // nullable: false, - // defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); migrationBuilder.RenameColumn( name: "EventId", schema: "dbo", table: "IntegrationEventLogs", newName: "EventEnvelope_Id"); - ////////migrationBuilder.AddPrimaryKey( - //////// name: "PK_IntegrationEventLogs", - //////// schema: "dbo", - //////// table: "IntegrationEventLogs", - //////// column: "EventEnvelope_Id"); - - - //migrationBuilder.DropColumn( - // name: "Content", - // schema: "dbo", - // table: "IntegrationEventLogs"); - //migrationBuilder.AddColumn( - // name: "EventEnvelope_Event", - // schema: "dbo", - // table: "IntegrationEventLogs", - // nullable: false, - // defaultValue: ""); - migrationBuilder.RenameColumn( - name: "Content", - schema: "dbo", - table: "IntegrationEventLogs", - newName: "EventEnvelope_Event"); - - migrationBuilder.RenameColumn( name: "Created", schema: "dbo", table: "IntegrationEventLogs", newName: "EventEnvelope_Created"); - migrationBuilder.RenameColumn( name: "SessionId", schema: "dbo", table: "IntegrationEventLogs", newName: "EventEnvelope_SequenceId"); - //migrationBuilder.RenameColumn( - // name: "EventTypeName", - // schema: "dbo", - // table: "IntegrationEventLogs", - // newName: "EventEnvelope_PublisherId"); - migrationBuilder.DropColumn( - name: "EventTypeName", + + migrationBuilder.AddColumn( + name: "EventEnvelope_CorrelationId", schema: "dbo", - table: "IntegrationEventLogs"); + table: "IntegrationEventLogs", + nullable: true); + + + migrationBuilder.RenameColumn( + name: "Content", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "EventEnvelope_Event"); migrationBuilder.AddColumn( name: "EventEnvelope_PublisherId", schema: "dbo", @@ -86,19 +59,6 @@ SELECT TOP 1 JsonData.PublisherId FROM IntegrationEventLogs CROSS APPLY OPENJSON(IntegrationEventLogs.EventEnvelope_Event, N'$') WITH (PublisherId UNIQUEIDENTIFIER N'$.PublisherId') AS JsonData); UPDATE IntegrationEventLogs SET EventEnvelope_PublisherId = @PublisherId;"); - - migrationBuilder.RenameIndex( - name: "IX_IntegrationEventLogs_SessionId", - schema: "dbo", - table: "IntegrationEventLogs", - newName: "IX_IntegrationEventLogs_EventEnvelope_SequenceId"); - - migrationBuilder.RenameIndex( - name: "IX_IntegrationEventLogs_Created", - schema: "dbo", - table: "IntegrationEventLogs", - newName: "IX_IntegrationEventLogs_EventEnvelope_Created"); - migrationBuilder.AlterColumn( name: "EventEnvelope_PublisherId", schema: "dbo", @@ -106,15 +66,6 @@ FROM IntegrationEventLogs nullable: false, oldClrType: typeof(string)); - - migrationBuilder.AddColumn( - name: "EventEnvelope_CorrelationId", - schema: "dbo", - table: "IntegrationEventLogs", - nullable: true); - - - migrationBuilder.AddColumn( name: "EventLogId", @@ -129,13 +80,23 @@ FROM IntegrationEventLogs table: "IntegrationEventLogs", column: "EventLogId"); + + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_SessionId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_EventEnvelope_SequenceId"); + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_EventEnvelope_Created"); migrationBuilder.CreateIndex( name: "IX_IntegrationEventLogs_EventEnvelope_Id", schema: "dbo", table: "IntegrationEventLogs", column: "EventEnvelope_Id", unique: true); - migrationBuilder.CreateIndex( name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", schema: "dbo", @@ -145,91 +106,96 @@ FROM IntegrationEventLogs protected override void Down(MigrationBuilder migrationBuilder) { - migrationBuilder.DropPrimaryKey( - name: "PK_IntegrationEventLogs", + migrationBuilder.DropIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", schema: "dbo", table: "IntegrationEventLogs"); - migrationBuilder.DropIndex( name: "IX_IntegrationEventLogs_EventEnvelope_Id", schema: "dbo", table: "IntegrationEventLogs"); + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_Created", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_Created"); + migrationBuilder.RenameIndex( + name: "IX_IntegrationEventLogs_EventEnvelope_SequenceId", + schema: "dbo", + table: "IntegrationEventLogs", + newName: "IX_IntegrationEventLogs_SessionId"); - migrationBuilder.DropIndex( - name: "IX_IntegrationEventLogs_EventEnvelope_PublisherId", + + migrationBuilder.DropPrimaryKey( + name: "PK_IntegrationEventLogs", schema: "dbo", table: "IntegrationEventLogs"); - migrationBuilder.DropColumn( name: "EventLogId", schema: "dbo", table: "IntegrationEventLogs"); - migrationBuilder.DropColumn( - name: "EventEnvelope_CorrelationId", - schema: "dbo", - table: "IntegrationEventLogs"); - migrationBuilder.DropColumn( + migrationBuilder.RenameColumn( name: "EventEnvelope_Event", schema: "dbo", + table: "IntegrationEventLogs", + newName: "Content"); + migrationBuilder.Sql(@" +UPDATE IntegrationEventLogs +SET Content = + JSON_MODIFY( + JSON_MODIFY( + JSON_MODIFY( + JSON_MODIFY( + JSON_MODIFY( + Content, + '$.SessionId', IntegrationEventLogs.EventEnvelope_SequenceId), + '$.CorrelationId', IntegrationEventLogs.EventEnvelope_CorrelationId), + '$.PublisherId', IntegrationEventLogs.EventEnvelope_PublisherId), + '$.CreationDate', FORMAT(IntegrationEventLogs.EventEnvelope_Created, 'yyyy-MM-dd HH:mm:ss.fff zzz')), + '$.Id', CONVERT(nvarchar(36), IntegrationEventLogs.EventEnvelope_Id)); +"); + migrationBuilder.DropColumn( + name: "EventEnvelope_PublisherId", + schema: "dbo", table: "IntegrationEventLogs"); - migrationBuilder.DropColumn( - name: "EventEnvelope_Id", + name: "EventEnvelope_CorrelationId", schema: "dbo", table: "IntegrationEventLogs"); - migrationBuilder.RenameColumn( - name: "EventEnvelope_Created", + + migrationBuilder.AddColumn( + name: "EventTypeName", schema: "dbo", table: "IntegrationEventLogs", - newName: "Created"); + nullable: false, + defaultValue: ""); + //migrationBuilder.Sql("UPDATE IntegrationEventLogs SET EventTypeName = JSON_VALUE(Content, '$.type')"); + migrationBuilder.RenameColumn( name: "EventEnvelope_SequenceId", schema: "dbo", table: "IntegrationEventLogs", newName: "SessionId"); - migrationBuilder.RenameColumn( - name: "EventEnvelope_PublisherId", + name: "EventEnvelope_Created", schema: "dbo", table: "IntegrationEventLogs", - newName: "EventTypeName"); - - migrationBuilder.RenameIndex( - name: "IX_IntegrationEventLogs_EventEnvelope_SequenceId", + newName: "Created"); + migrationBuilder.RenameColumn( + name: "EventEnvelope_Id", schema: "dbo", table: "IntegrationEventLogs", - newName: "IX_IntegrationEventLogs_SessionId"); + newName: "EventId"); - migrationBuilder.RenameIndex( - name: "IX_IntegrationEventLogs_EventEnvelope_Created", - schema: "dbo", - table: "IntegrationEventLogs", - newName: "IX_IntegrationEventLogs_Created"); - migrationBuilder.AlterColumn( + migrationBuilder.DropColumn( name: "EventTypeName", schema: "dbo", - table: "IntegrationEventLogs", - nullable: false, - oldClrType: typeof(string)); - - migrationBuilder.AddColumn( - name: "EventId", - schema: "dbo", - table: "IntegrationEventLogs", - nullable: false, - defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); - - migrationBuilder.AddColumn( - name: "Content", - schema: "dbo", - table: "IntegrationEventLogs", - nullable: false, - defaultValue: ""); + table: "IntegrationEventLogs"); migrationBuilder.AddPrimaryKey( name: "PK_IntegrationEventLogs", From fdac84322194667051dfb624ad628297e09e1b6f Mon Sep 17 00:00:00 2001 From: Aleksey Zhukov Date: Thu, 14 Dec 2023 15:49:19 +0100 Subject: [PATCH 7/8] Fix db migration, update IntegrationEvntLog state change logic to allow transition from the same states for Published and PublishFailed --- .../Events/IntegrationEventEnvelope.cs | 12 +------ .../Abstract/IIntegrationEventLogService.cs | 2 +- .../EventState.cs | 2 +- .../IntegrationEventLog.cs | 8 ++--- .../IntegrationEventLogService.cs | 35 +++++-------------- ...29_UpdateIntegrationEventLogStructureV2.cs | 11 +++--- 6 files changed, 20 insertions(+), 50 deletions(-) diff --git a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs index f486c1f..03d20e7 100644 --- a/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs +++ b/Softeq.NetKit.Components.EventBus/Events/IntegrationEventEnvelope.cs @@ -37,16 +37,6 @@ private IntegrationEventEnvelope( public string SequenceId { get; private set; } public string CorrelationId { get; private set; } public IntegrationEvent Event { get; private set; } - - public static IntegrationEventEnvelope FromEnvelope(IntegrationEventEnvelope eventEnvelope) - where TEvent : IntegrationEvent - { - return new IntegrationEventEnvelope( - eventEnvelope.Id, - eventEnvelope.Event, - eventEnvelope.PublisherId, - eventEnvelope.SequenceId, - eventEnvelope.CorrelationId); - } + public bool IsSequential => !string.IsNullOrEmpty(SequenceId); } } \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs index bddd833..2fbd8f8 100644 --- a/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/Abstract/IIntegrationEventLogService.cs @@ -15,7 +15,7 @@ public interface IIntegrationEventLogService Task AnyAsync(Expression> condition); Task CreateAsync(IntegrationEventLog eventLog); Task MarkAsPublishedAsync(IntegrationEventLog eventLog); - Task MarkAsPublishedFailedAsync(IntegrationEventLog eventLog); + Task MarkAsPublishFailedAsync(IntegrationEventLog eventLog); Task MarkAsCompletedAsync(IntegrationEventLog eventLog); } } \ No newline at end of file diff --git a/Softeq.NetKit.Integrations.EventLog/EventState.cs b/Softeq.NetKit.Integrations.EventLog/EventState.cs index 9ffd1f0..b06600a 100644 --- a/Softeq.NetKit.Integrations.EventLog/EventState.cs +++ b/Softeq.NetKit.Integrations.EventLog/EventState.cs @@ -7,7 +7,7 @@ public enum EventState { NotPublished = 0, Published = 1, - PublishedFailed = 2, + PublishFailed = 2, Completed = 3 } } diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs index c6949b6..805ffdf 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLog.cs @@ -30,14 +30,14 @@ public void ChangeEventState(EventState newEventState) switch (newEventState) { case EventState.Published: - EnsureStateTransitionAllowed(EventState.NotPublished, EventState.PublishedFailed); + EnsureStateTransitionAllowed(EventState.NotPublished, EventState.Published, EventState.PublishFailed); TimesSent++; break; - case EventState.PublishedFailed: - EnsureStateTransitionAllowed(EventState.Published); + case EventState.PublishFailed: + EnsureStateTransitionAllowed(EventState.Published, EventState.PublishFailed); break; case EventState.Completed: - EnsureStateTransitionAllowed(EventState.Published, EventState.PublishedFailed); + EnsureStateTransitionAllowed(EventState.Published, EventState.PublishFailed); break; case EventState.NotPublished: default: diff --git a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs index bc25742..04422f9 100644 --- a/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs +++ b/Softeq.NetKit.Integrations.EventLog/IntegrationEventLogService.cs @@ -61,45 +61,28 @@ public Task AnyAsync(Expression> condition public Task MarkAsPublishedAsync(IntegrationEventLog eventLog) { - if (eventLog == null) - { - throw new ArgumentNullException(nameof(eventLog)); - } - - eventLog.ChangeEventState(EventState.Published); - return UpdateAsync(eventLog); + return UpdateEventStateAsync(eventLog, EventState.Published); } - public Task MarkAsPublishedFailedAsync(IntegrationEventLog eventLog) + public Task MarkAsPublishFailedAsync(IntegrationEventLog eventLog) { - if (eventLog == null) - { - throw new ArgumentNullException(nameof(eventLog)); - } - - eventLog.ChangeEventState(EventState.PublishedFailed); - return UpdateAsync(eventLog); + return UpdateEventStateAsync(eventLog, EventState.PublishFailed); } public Task MarkAsCompletedAsync(IntegrationEventLog eventLog) { - if (eventLog == null) - { - throw new ArgumentNullException(nameof(eventLog)); - } - - eventLog.ChangeEventState(EventState.Completed); - return UpdateAsync(eventLog); + return UpdateEventStateAsync(eventLog, EventState.Completed); } - private Task UpdateAsync(IntegrationEventLog integrationEventLog) + private Task UpdateEventStateAsync(IntegrationEventLog eventLog, EventState newState) { - if (integrationEventLog == null) + if (eventLog == null) { - throw new ArgumentNullException(nameof(integrationEventLog)); + throw new ArgumentNullException(nameof(eventLog)); } - EventLogContext.IntegrationEventLogs.Update(integrationEventLog); + eventLog.ChangeEventState(newState); + EventLogContext.IntegrationEventLogs.Update(eventLog); return EventLogContext.SaveChangesAsync(); } } diff --git a/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs index 59ca2a3..17924c0 100644 --- a/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs +++ b/Softeq.NetKit.Integrations.EventLog/Migrations/20231205132729_UpdateIntegrationEventLogStructureV2.cs @@ -153,7 +153,7 @@ UPDATE IntegrationEventLogs '$.SessionId', IntegrationEventLogs.EventEnvelope_SequenceId), '$.CorrelationId', IntegrationEventLogs.EventEnvelope_CorrelationId), '$.PublisherId', IntegrationEventLogs.EventEnvelope_PublisherId), - '$.CreationDate', FORMAT(IntegrationEventLogs.EventEnvelope_Created, 'yyyy-MM-dd HH:mm:ss.fff zzz')), + '$.CreationDate', FORMAT(IntegrationEventLogs.EventEnvelope_Created, 'yyyy-MM-ddTHH:mm:ss.fffffff zzz')), '$.Id', CONVERT(nvarchar(36), IntegrationEventLogs.EventEnvelope_Id)); "); migrationBuilder.DropColumn( @@ -172,7 +172,9 @@ UPDATE IntegrationEventLogs table: "IntegrationEventLogs", nullable: false, defaultValue: ""); - //migrationBuilder.Sql("UPDATE IntegrationEventLogs SET EventTypeName = JSON_VALUE(Content, '$.type')"); + migrationBuilder.Sql(@" +UPDATE IntegrationEventLogs +SET EventTypeName = SUBSTRING(JSON_VALUE(Content, '$.""$type""'), 0, CHARINDEX(',', JSON_VALUE(Content, '$.""$type""'), 0))"); migrationBuilder.RenameColumn( @@ -192,11 +194,6 @@ UPDATE IntegrationEventLogs newName: "EventId"); - migrationBuilder.DropColumn( - name: "EventTypeName", - schema: "dbo", - table: "IntegrationEventLogs"); - migrationBuilder.AddPrimaryKey( name: "PK_IntegrationEventLogs", schema: "dbo", From 67281e78b0f1fba6c87f90d10e25ccf2ca1e20fe Mon Sep 17 00:00:00 2001 From: Aleksey Zhukov Date: Thu, 14 Dec 2023 16:09:51 +0100 Subject: [PATCH 8/8] Minor cleanup --- Softeq.NetKit.Components.EventBus.Service/EventBusService.cs | 4 ++-- .../Abstract/IEventBusSubscriber.cs | 4 ++-- .../EventPublishConfiguration.cs | 2 -- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs index e4a1820..fbcbf61 100644 --- a/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs +++ b/Softeq.NetKit.Components.EventBus.Service/EventBusService.cs @@ -45,7 +45,7 @@ public EventBusService( _logger = loggerFactory.CreateLogger(GetType()); } - public async Task RegisterTopicListenerAsync() + public async Task RegisterTopicEventListenerAsync() { ValidateSubscription(); @@ -76,7 +76,7 @@ async Task RemoveDefaultRuleIfExists() } } - public void RegisterQueueListener(QueueListenerConfiguration configuration = null) + public void RegisterQueueEventListener(QueueListenerConfiguration configuration = null) { ValidateQueue(); diff --git a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs index 98eecd6..754ed22 100644 --- a/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs +++ b/Softeq.NetKit.Components.EventBus/Abstract/IEventBusSubscriber.cs @@ -8,8 +8,8 @@ namespace Softeq.NetKit.Components.EventBus.Abstract { public interface IEventBusSubscriber { - Task RegisterTopicListenerAsync(); - void RegisterQueueListener(QueueListenerConfiguration configuration = null); + Task RegisterTopicEventListenerAsync(); + void RegisterQueueEventListener(QueueListenerConfiguration configuration = null); Task RegisterEventAsync() where TEvent : IntegrationEvent; Task RemoveEventRegistrationAsync() where TEvent : IntegrationEvent; } diff --git a/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs b/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs index 70555fd..0591ab8 100644 --- a/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs +++ b/Softeq.NetKit.Components.EventBus/EventPublishConfiguration.cs @@ -18,9 +18,7 @@ public EventPublishConfiguration( } public string EventPublisherId { get; } - public bool SendCompletionEvent { get; } - public TimeSpan? EventTimeToLive { get; } } }