-
Notifications
You must be signed in to change notification settings - Fork 1
Publisher Integration guide
Adam Mikulasev edited this page Dec 19, 2025
·
32 revisions
This page details how to implement the Outboxer::Publisher.publish_message block for popular background job libraries and message publishers.
If you're using Sidekiq Pro, it is strongly recommended to enable superfetch to prevent Sidekiq from loosing jobs during a crash.
Sidekiq.configure_server do |config|
config.super_fetch!
endOutboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
job_class_name = "#{message[:messageable_type]}CreatedJob"
job_class = Object.const_get(job_class_name)
job_class.perform_async(message[:messageable_type], message[:messageable_id])
endclass EventCreatedChannel < ApplicationCable::Channel
def subscribed
stream_from "event_created"
end
endconsumer.subscriptions.create("EventCreatedChannel", {
received(data) {
console.log("event:", data);
},
});require "aws-sdk-sqs"
Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
if message[:messageable_type].end_with?("Event")
aws_sqs_resource = Thread.current[:aws_sqs_resource] ||= Aws::SQS::Resource.new(region: "us-east-1")
aws_sqs_queue = Thread.current[:aws_sqs_queue] ||= aws_sqs_resource.create_queue(queue_name: "event-created")
aws_sqs_queue.send_message(
message_body: {
id: message[:messageable_id],
type: message[:messageable_type]
}.to_json
)
end
endrequire "bunny"
Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
if message[:messageable_type].end_with?("Event")
bunny_connection = Thread.current[:bunny_connection] ||= Bunny.new.tap(&:start)
bunny_channel = Thread.current[:bunny_channel] ||= bunny_connection.create_channel
bunny_channel.confirm_select unless bunny_channel.using_publisher_confirmations?
bunny_channel.default_exchange.publish(
{
id: message[:messageable_id],
type: message[:messageable_type]
}.to_json,
routing_key: "event.created",
persistent: true
)
bunny_channel.wait_for_confirms
end
endrequire "kafka"
Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
if message[:messageable_type].end_with?("Event")
kafka_client = Thread.current[:kafka_client] ||= Kafka.new
kafka_producer = Thread.current[:kafka_producer] ||= kafka_client.producer(required_acks: :all)
kafka_producer.produce(
{
id: message[:messageable_id],
type: message[:messageable_type]
}.to_json,
topic: "event_created",
key: message[:messageable_id]
)
kafka_producer.deliver_messages
end
endrequire "redis"
require "connection_pool"
REDIS_POOL = ConnectionPool.new(size: 5) { Redis.new }
Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
if message[:messageable_type].end_with?("Event")
REDIS_POOL.with do |redis_conn|
redis_conn.xadd("event_created",
{
"id" => message[:messageable_id],
"type" => message[:messageable_type]
}
)
end
end
endrequire "google/cloud/pubsub"
Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
if message[:messageable_type].end_with?("Event")
gcp_pubsub_client = Thread.current[:gcp_pubsub_client] ||= Google::Cloud::Pubsub.new
gcp_pubsub_topic = Thread.current[:gcp_pubsub_topic] ||= gcp_pubsub_client.topic("event-created")
gcp_pubsub_topic.publish(
{
id: message[:messageable_id],
type: message[:messageable_type]
}.to_json
)
end
endrequire "stomp"
Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
if message[:messageable_type].end_with?("Event")
stomp_client = Thread.current[:stomp_client] ||= Stomp::Client.new
stomp_client.publish(
"/queue/event.created",
{
id: message[:messageable_id],
type: message[:messageable_type]
}.to_json,
{ "persistent" => "true" }
)
end
endrequire "nats/io/client"
Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
if message[:messageable_type].end_with?("Event")
nats_connection = Thread.current[:nats_connection] ||= NATS::IO::Client.new.tap { |c| c.connect }
nats_jetstream = Thread.current[:nats_jetstream] ||= nats_connection.jetstream
nats_jetstream.publish(
"event.created",
{
id: message[:messageable_id],
type: message[:messageable_type]
}.to_json
)
end
endPull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
MIT