diff --git a/db/migrate/create_events.rb b/db/migrate/create_events.rb index 728b8d3e..dc14d350 100644 --- a/db/migrate/create_events.rb +++ b/db/migrate/create_events.rb @@ -1,11 +1,11 @@ class CreateEvents < ActiveRecord::Migration[7.0] def up create_table :events do |t| + t.datetime :created_at, null: false + t.string :type, null: false, limit: 255 t.send json_column_type, :body - t.datetime :created_at, null: false - t.index :created_at t.index [:type, :created_at] end diff --git a/db/migrate/create_outboxer_events.rb b/db/migrate/create_outboxer_events.rb new file mode 100644 index 00000000..5a18dfe5 --- /dev/null +++ b/db/migrate/create_outboxer_events.rb @@ -0,0 +1,32 @@ +class CreateOutboxerEvents < ActiveRecord::Migration[7.0] + def up + create_table :outboxer_events do |t| + t.integer :order, null: false + t.datetime :created_at, null: false + + t.string :eventable_id, limit: 255, null: false + t.string :eventable_type, limit: 255, null: false + + t.string :type, null: false, limit: 255 + t.send json_column_type, :body + + t.index :created_at + t.index [:type, :created_at] + end + end + + def down + drop_table :outboxer_events if table_exists?(:outboxer_events) + end + + private + + def json_column_type + case ActiveRecord::Base.connection.adapter_name + when /PostgreSQL/ + :jsonb + else + :json + end + end +end diff --git a/lib/outboxer/database.rb b/lib/outboxer/database.rb index 26f8b9fd..1929e720 100644 --- a/lib/outboxer/database.rb +++ b/lib/outboxer/database.rb @@ -83,6 +83,7 @@ def truncate(logger: nil) if connection.adapter_name.downcase.include?("postgres") connection.execute(<<~SQL) TRUNCATE TABLE + outboxer_events, outboxer_frames, outboxer_exceptions, outboxer_threads, @@ -97,6 +98,7 @@ def truncate(logger: nil) begin connection.execute("SET FOREIGN_KEY_CHECKS = 0;") + connection.execute("TRUNCATE TABLE outboxer_events;") connection.execute("TRUNCATE TABLE outboxer_frames;") connection.execute("TRUNCATE TABLE outboxer_exceptions;") connection.execute("TRUNCATE TABLE outboxer_threads;") diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index c4c570f8..52c0487d 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -71,17 +71,22 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil, ActiveRecord::Base.transaction do message = Models::Message.create!( status: Status::QUEUED, - messageable_id: id, messageable_type: type, + messageable_id: id, queued_at: current_utc_time, updated_at: current_utc_time ) + event = Models::QueuedEvent.create!( + order: lock_version, created_at: current_utc_time, + eventable: message, + body: { hostname: hostname, process_id: process_id, thread_id: thread_id }) + Models::Thread.update_message_counts_by!( hostname: hostname, process_id: process_id, thread_id: thread_id, queued_message_count: 1, current_utc_time: current_utc_time) - { id: message.id, lock_version: message.lock_version } + { id: message.id, lock_version: message.lock_version, event_id: event.id } end end end @@ -242,6 +247,11 @@ def publishing(hostname: Socket.gethostname, Status::PUBLISHING, current_utc_time, current_utc_time ]) + event = Models::PublishingEvent.create!( + order: message_row[1] + 1, created_at: current_utc_time, + eventable_type: Models::Message.class, eventable_id: message_row[0], + body: { hostname: hostname, process_id: process_id, thread_id: thread_id }) + Models::Thread.update_message_counts_by!( hostname: hostname, process_id: process_id, thread_id: thread_id, queued_message_count: -1, publishing_message_count: 1, @@ -251,7 +261,8 @@ def publishing(hostname: Socket.gethostname, id: message_row[0], lock_version: message_row[1] + 1, messageable_type: message_row[2], - messageable_id: message_row[3] + messageable_id: message_row[3], + event_id: event.id } end end @@ -346,6 +357,12 @@ def publishing_failed(id:, lock_version:, error: nil, end end + event = Models::PublishingFailedEvent.create!( + order: lock_version, + created_at: current_utc_time, + eventable: message, + body: { exception: exception }) + Models::Thread.update_message_counts_by!( hostname: hostname, process_id: process_id, thread_id: thread_id, publishing_message_count: -1, failed_message_count: 1, @@ -353,7 +370,8 @@ def publishing_failed(id:, lock_version:, error: nil, { id: message.id, - lock_version: message.lock_version + lock_version: message.lock_version, + event_id: event.id } end end diff --git a/lib/outboxer/models/event.rb b/lib/outboxer/models/event.rb new file mode 100644 index 00000000..3a829fd5 --- /dev/null +++ b/lib/outboxer/models/event.rb @@ -0,0 +1,2 @@ +class Event < ActiveRecord::Base +end diff --git a/lib/outboxer/models/message_publishing_event.rb b/lib/outboxer/models/message_publishing_event.rb new file mode 100644 index 00000000..844db936 --- /dev/null +++ b/lib/outboxer/models/message_publishing_event.rb @@ -0,0 +1,6 @@ +module Outboxer + module Models + class MessagePublishingEvent < Event + end + end +end diff --git a/lib/outboxer/models/message_publishing_failed_event.rb b/lib/outboxer/models/message_publishing_failed_event.rb new file mode 100644 index 00000000..844db936 --- /dev/null +++ b/lib/outboxer/models/message_publishing_failed_event.rb @@ -0,0 +1,6 @@ +module Outboxer + module Models + class MessagePublishingEvent < Event + end + end +end diff --git a/lib/outboxer/models/message_queued_event.rb b/lib/outboxer/models/message_queued_event.rb new file mode 100644 index 00000000..fb7a284b --- /dev/null +++ b/lib/outboxer/models/message_queued_event.rb @@ -0,0 +1,6 @@ +module Outboxer + module Models + class MessageQueuedEvent < Event + end + end +end diff --git a/tasks/database.rake b/tasks/database.rake index a4af1324..79e68171 100644 --- a/tasks/database.rake +++ b/tasks/database.rake @@ -46,6 +46,9 @@ namespace :outboxer do db_config = Outboxer::Database.config(environment: environment, pool: 1) ActiveRecord::Base.establish_connection(db_config) + require_relative "../db/migrate/create_outboxer_events" + CreateOutboxerEvents.new.up + require_relative "../db/migrate/create_outboxer_messages" CreateOutboxerMessages.new.up