Skip to content

Implementing Handlers

Matt Towers edited this page Jul 8, 2016 · 10 revisions

Once you've finished creating a configuring a daemon to process your activities, you're ready to start implementing handlers for the activities published by your application.

A stream handler may be implemented as a simple Ruby class that meets the following requirements:

  1. Include the RealSelf::Handler module that matches the document type of the expected activity.
  2. Implement the handle method.
  3. Call register_handler with the routing key and activity type that the handler consumes.
  4. (optional) Implement custom error handling with an Enclosure
class MyActivityHandler
  include RealSelf::Handler::Activity

  ##
  # @param RealSelf::Stream::Activity
  def handle activity
    RealSelf::logger.info "[#{Time.now}] HANDLER HANDLING: #{activity}"

  end

  register_handler 'user.upload.photo'
end

Handler Modules

The included handler module shoudl match the document/activity type that the handler will process. Available options are:

RealSelf::Handler::Activity
RealSelf::Handler::StreamActivity
RealSelf::Handler::Digest

initialize

Implementing an initialize method is required only if you wish to pass custom arguments to the handler class when it is instantiated. e.g. database connection, environment flags, etc. The parameters will be passed as Ruby 2.0-style keyword arguments as defined in your worker configuration.

def initialize(db_client:, debug:)
  ...
end

handle

All handlers must implement a handle method that takes a single argument whose type will match the included handler module type.

Handler Type Parameter Type
RealSelf::Handler::Activity RealSelf::Stream::Activity
RealSelf::Handler::StreamActivity RealSelf::Stream::StreamActivity
RealSelf::Handler::Digest RealSelf::Stream::Digest

The return values from handle follow the Sneakers Message Handling Semantics ↗️ and can be one of the following:

Response Description
:ack The message was successfully handled
:requeue The message was NOT handled. Requeue for another attempt.*
:reject The message was NOT handled. Discard the message an continue.**
  • NOTE: If you are using the MaxRetry ↗️ handler, :requeue messages will NOT cause the retry count to be incremented. Messages are simply rerouted back to the originating queue.

** If you are using the MaxRetry ↗️ handler in your Daemon Configuration, :reject messages will be requeued and retried for a specified number of attempts. When all attempts are exhausted, the message will be routed to an error queue. See also: Retry ↗️

register_handler

The register_handler method is provided by the included handler module and serves to bind your RabbitMQ queue to the appropriate routing keys and route messages with matching keys to the appropriate handler(s). The method takes a single argument that matches the routing key and activity prototype associated with the current handler and must be called at class-load time.

  • A single handler class may register for multiple routing keys by calling this method once for each key.
  • If multiple handlers register the same routing key, handle will be called in each handler in an unspecified order.
class MyHandler
  include RealSelf::Handler::Activity

  def handle activity
    # ...
  end

  register_handler 'user.upload.photo'
  register_handler 'user.upload.video'
end

Enclosures

To facilitate centralized error and exception handling, Handlers may optionally take advantage of an Enclosure from within the handle method. An enclosure is simply a module containing a Ruby block that you can define to wrap your handle method in which you can rescue from known exceptions and reject or requeue the message as appropriate. This is particularly useful if your handler makes outbound calls to an external REST service where certain errors may be transient and you wish to simply retry at a later time. See: Worker Configuration and sample-daemon for more information and examples.

module MyEnclosure
  def self.handle
    begin
      yield
      :ack

    rescue SomeUnrecoverableError => sue
      RealSelf::logger.error "Handler Error - #{sue.message}"
      :error # route the message to the error queue

    rescue SomeTransientError => ste
      RealSelf::logger.warn "Handler Warning - #{ste.message}"
      :reject # route to the retry queue
    end

    # Use catch-all rescue with caution
    rescue => e
      RealSelf::logger.error "Unknown Handler Error - #{e.message}"
      :error # route the message to the error queue
  end
end

Examples

Activity Handler

The following example will listen for activities with the prototype and routing key of user.upload.photo and log the activity as JSON.

class MyActivityHandler
  include RealSelf::Handler::Activity

  ##
  # @param RealSelf::Stream::Activity
  def handle activity
    RealSelf::logger.info "[#{Time.now}] HANDLER HANDLING: #{activity}"

  end

  register_handler 'user.upload.photo'
end

StreamActivity Handler

The following example will listen for stream activities with the prototype and routing keys of user.upload.photo and user.upload.video and log the activity as JSON.

class MyActivityHandler
  include RealSelf::Handler::StreamActivity

  ##
  # @param RealSelf::Stream::StreamActivity
  def handle stream_activity
    RealSelf::logger.info "[#{Time.now}] HANDLER HANDLING: #{activity}"

  end

  register_handler 'user.upload.photo'
  register_handler 'user.upload.video'
end

Digest Handler

The following example will listen for activities with the prototype and routing key of cron.send.digest and log the activity as JSON.

class MyActivityHandler
  include RealSelf::Handler::Digest

  ##
  # @param RealSelf::Stream::Digest
  def handle digest
    RealSelf::logger.info "[#{Time.now}] HANDLER HANDLING: #{digest}"

  end

  register_handler 'cron.send.digest'
end

Clone this wiki locally