Skip to content

Publisher Integration guide

Adam Mikulasev edited this page Dec 19, 2025 · 32 revisions

This page details how to implement the Outboxer::Publisher.publish_message block for popular background job libraries and message publishers.


Sidekiq

Enable superfetch

If you're using Sidekiq Pro, it is strongly recommended to enable superfetch to prevent Sidekiq from loosing jobs during a crash.

Sidekiq.configure_server do |config|
  config.super_fetch!
end

Handle async

Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
  job_class_name = "#{message[:messageable_type]}CreatedJob"
  job_class = Object.const_get(job_class_name)
  job_class.perform_async(message[:messageable_type], message[:messageable_id])
end

Action Cable

Backend channel

class EventCreatedChannel < ApplicationCable::Channel
  def subscribed
    stream_from "event_created"
  end
end

Frontend subscription

consumer.subscriptions.create("EventCreatedChannel", {
  received(data) {
    console.log("event:", data);
  },
});

AWS SQS

require "aws-sdk-sqs"

Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
  if message[:messageable_type].end_with?("Event")
    aws_sqs_resource = Thread.current[:aws_sqs_resource] ||= Aws::SQS::Resource.new(region: "us-east-1")
    aws_sqs_queue = Thread.current[:aws_sqs_queue] ||= aws_sqs_resource.create_queue(queue_name: "event-created")
    aws_sqs_queue.send_message(
      message_body: {
        id: message[:messageable_id],
        type: message[:messageable_type]
      }.to_json
    )
  end
end

RabbitMQ / Bunny

require "bunny"

Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
  if message[:messageable_type].end_with?("Event")
    bunny_connection = Thread.current[:bunny_connection] ||= Bunny.new.tap(&:start)
    bunny_channel = Thread.current[:bunny_channel] ||= bunny_connection.create_channel
    bunny_channel.confirm_select unless bunny_channel.using_publisher_confirmations?
    bunny_channel.default_exchange.publish(
      {
        id: message[:messageable_id],
        type: message[:messageable_type]
      }.to_json,
      routing_key: "event.created",
      persistent: true
    )
    bunny_channel.wait_for_confirms
  end
end

Kafka (ruby-kafka)

require "kafka"

Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
  if message[:messageable_type].end_with?("Event")
    kafka_client = Thread.current[:kafka_client] ||= Kafka.new
    kafka_producer = Thread.current[:kafka_producer] ||= kafka_client.producer(required_acks: :all)
    kafka_producer.produce(
      {
        id: message[:messageable_id],
        type: message[:messageable_type]
      }.to_json,
      topic: "event_created",
      key: message[:messageable_id]
    )
    kafka_producer.deliver_messages
  end
end

Redis Streams

require "redis"
require "connection_pool"

REDIS_POOL = ConnectionPool.new(size: 5) { Redis.new }

Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
  if message[:messageable_type].end_with?("Event")
    REDIS_POOL.with do |redis_conn|
      redis_conn.xadd("event_created",
        {
          "id" => message[:messageable_id],
          "type" => message[:messageable_type]
        }
      )
    end
  end
end

Google Cloud Pub/Sub

require "google/cloud/pubsub"

Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
  if message[:messageable_type].end_with?("Event")
    gcp_pubsub_client = Thread.current[:gcp_pubsub_client] ||= Google::Cloud::Pubsub.new
    gcp_pubsub_topic = Thread.current[:gcp_pubsub_topic] ||= gcp_pubsub_client.topic("event-created")
    gcp_pubsub_topic.publish(
      {
        id: message[:messageable_id],
        type: message[:messageable_type]
      }.to_json
    )
  end
end

ActiveMQ / STOMP

require "stomp"

Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
  if message[:messageable_type].end_with?("Event")
    stomp_client = Thread.current[:stomp_client] ||= Stomp::Client.new
    stomp_client.publish(
      "/queue/event.created",
      {
        id: message[:messageable_id],
        type: message[:messageable_type]
      }.to_json,
      { "persistent" => "true" }
    )
  end
end

NATS JetStream

require "nats/io/client"

Outboxer::Publisher.publish_message(concurrency: 5) do |publisher, message|
  if message[:messageable_type].end_with?("Event")
    nats_connection = Thread.current[:nats_connection] ||= NATS::IO::Client.new.tap { |c| c.connect }
    nats_jetstream = Thread.current[:nats_jetstream] ||= nats_connection.jetstream
    nats_jetstream.publish(
      "event.created",
      {
        id: message[:messageable_id],
        type: message[:messageable_type]
      }.to_json
    )
  end
end

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.


License

MIT