Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ Changelog for the bc-prometheus-ruby gem.

### Pending Release

- Add opt-in per-Resque-job histograms `resque_job_queue_latency_seconds` and `resque_job_perform_duration_seconds`, labelled by `job_class`. Recorded from the parent worker process (via `Resque.before_fork` and a `Module#prepend` around `Resque::Worker#perform_with_fork`), so no synchronous flush is needed in the forked child. Gated by `PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED` (default off).

## 0.8.1

- Prometheus client respects the enabled setting
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ require 'bigcommerce/prometheus'
Bigcommerce::Prometheus::Instrumentors::Resque.new(app: Rails.application).start
```

### Per-job metrics (opt-in)

Set `PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED=1` on Resque worker pods to enable two additional histograms recorded from the parent worker process (no per-job synchronous flush):

- `resque_job_queue_latency_seconds{job_class}` — time from `scheduled_at` (falling back to `enqueued_at`) until a worker picks the job up. Per attempt; retries-with-backoff anchor on `scheduled_at` so the intentional backoff doesn't show as latency.
- `resque_job_perform_duration_seconds{job_class}` — total Resque child lifetime (fork → `Process.waitpid` return). Includes fork overhead, Redis reconnect, after_fork hooks, perform, and exit.

These are off by default because they emit one histogram observation per job per worker pod, which adds cardinality. Opt in per service.

ActiveJob-wrapped jobs are unwrapped automatically — the `job_class` label is the user's job class name, not `ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper`.

## Configuration

After requiring the main file, you can further configure with:
Expand All @@ -58,6 +69,7 @@ After requiring the main file, you can further configure with:
| server_thread_pool_size | The number of threads used for the exporter server | `3` | `ENV['PROMETHEUS_SERVER_THREAD_POOL_SIZE']` |
| process_name | What the current process name is (used in logging) | `"unknown"` | `ENV['PROCESS']` |
| railtie_disabled | Opt out flag for Railtie; use `Bigcommerce::Prometheus::Instrumentors::Web.new(app: Rails.application).start` in your app's code to start it up yourself | `0` | `ENV['PROMETHEUS_DISABLE_RAILTIE']` |
| resque_per_job_metrics_enabled | Enable per-job queue-latency and perform-duration histograms (parent-side, no synchronous flush) | `0` | `ENV['PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED']` |

## Custom Collectors

Expand Down
3 changes: 3 additions & 0 deletions lib/bigcommerce/prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
require_relative 'prometheus/collectors/resque'
require_relative 'prometheus/type_collectors/base'
require_relative 'prometheus/type_collectors/resque'
require_relative 'prometheus/type_collectors/resque_job'
require_relative 'prometheus/integrations/active_record'
require_relative 'prometheus/type_collectors/active_record'

Expand All @@ -44,6 +45,8 @@
require_relative 'prometheus/integrations/railtie' if defined?(Rails)
require_relative 'prometheus/integrations/puma'
require_relative 'prometheus/integrations/resque'
require_relative 'prometheus/integrations/resque/job_payload'
require_relative 'prometheus/integrations/resque/job_metrics'

require_relative 'prometheus/servers/puma/server'
require_relative 'prometheus/servers/puma/rack_app'
Expand Down
1 change: 1 addition & 0 deletions lib/bigcommerce/prometheus/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module Configuration
puma_process_label: ENV.fetch('PROMETHEUS_PUMA_PROCESS_LABEL', 'web').to_s,
resque_collection_frequency: ENV.fetch('PROMETHEUS_RESQUE_COLLECTION_FREQUENCY', 30).to_i,
resque_process_label: ENV.fetch('PROMETHEUS_REQUEST_PROCESS_LABEL', 'resque').to_s,
resque_per_job_metrics_enabled: ENV.fetch('PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED', 0).to_i.positive?,

# Server configuration
not_found_body: ENV.fetch('PROMETHEUS_SERVER_NOT_FOUND_BODY', 'Not Found! The Prometheus Ruby Exporter only listens on /metrics and /send-metrics').to_s,
Expand Down
1 change: 1 addition & 0 deletions lib/bigcommerce/prometheus/instrumentors/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def start

server.add_type_collector(PrometheusExporter::Server::ActiveRecordCollector.new)
server.add_type_collector(Bigcommerce::Prometheus::TypeCollectors::Resque.new)
server.add_type_collector(Bigcommerce::Prometheus::TypeCollectors::ResqueJob.new)
@type_collectors.each do |tc|
server.add_type_collector(tc)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/bigcommerce/prometheus/integrations/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def self.start(client: nil)
client: client || ::Bigcommerce::Prometheus.client,
frequency: ::Bigcommerce::Prometheus.resque_collection_frequency
)
::Bigcommerce::Prometheus::Integrations::Resque::JobMetrics.start(
client: client || ::Bigcommerce::Prometheus.client
)
end
end
end
Expand Down
152 changes: 152 additions & 0 deletions lib/bigcommerce/prometheus/integrations/resque/job_metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# frozen_string_literal: true

# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
module Bigcommerce
module Prometheus
module Integrations
class Resque
##
# Per-Resque-job histogram metrics, recorded from the parent worker
# process. Hooked via a prepend around
# Resque::Worker#perform_with_fork — queue latency is captured before
# super, perform duration after.
#
# Off unless PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED=1 — emits one
# histogram observation per job per worker process, which can be high
# cardinality at scale.
#
# NOTE: queue_latency only emits for jobs whose Resque payload is
# ActiveJob-shaped. The gem reads three fields from
# `payload['args'][0]` (which must be a Hash):
#
# * job_class — the user's actual job class name; used as the
# metric label.
# * enqueued_at — ISO 8601 string; used as the queue-latency
# anchor when scheduled_at is absent.
# * scheduled_at — ISO 8601 string; preferred over enqueued_at
# when present (e.g. retries-with-backoff, so
# the intentional wait isn't counted as latency).
#
# ActiveJob produces this shape natively when jobs are enqueued via
# .perform_later — the payload is wrapped by
# ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper, which stamps
# the three fields above into `args[0]`.
#
# Vanilla Resque jobs (`class MyJob; @queue = :foo; def self.perform;
# end`, enqueued via Resque.enqueue) do not produce this shape —
# their args are raw primitive values, not a wrapping hash. For
# these jobs, queue_latency silently no-ops. perform_duration works
# for both styles regardless.
#
# Services that want queue_latency for vanilla Resque jobs can
# produce the ActiveJob-shaped payload themselves via a service-local
# wrapper class — see the PR description for options.
#
module JobMetrics
class << self
##
# Install the parent-side hooks if the per-job metrics feature is
# enabled. Idempotent: safe to call multiple times.
#
# @param [PrometheusExporter::Client] client
#
def start(client:)
return unless ::Bigcommerce::Prometheus.resque_per_job_metrics_enabled

@client = client
install_hooks
end

##
# Push the queue-latency observation for a job that's about to be
# picked up by a worker. Anchors on scheduled_at if present (so
# retries-with-backoff don't show the intentional wait as latency),
# falling back to enqueued_at.
#
# @param [JobPayload] payload
#
def record_queue_latency(payload)
anchor = payload.anchor_time
return unless anchor

@client.send_json(
type: 'resque_job',
metric: 'queue_latency',
value: (Time.now - anchor).to_f,
custom_labels: { job_class: payload.job_class }
)
rescue StandardError => e
::Bigcommerce::Prometheus.logger&.warn(
"[bc-prometheus-ruby] resque_job queue_latency push failed: #{e.message}"
)
end

##
# Push the perform-duration observation for a completed job. Called
# from the `Resque::Worker#perform_with_fork` prepend, so it measures
# the full child lifetime (fork + reconnect + perform + exit).
#
# @param [JobPayload] payload
# @param [Float] duration in seconds
#
def record_perform_duration(payload, duration)
@client.send_json(
type: 'resque_job',
metric: 'perform_duration',
value: duration,
custom_labels: { job_class: payload.job_class }
)
rescue StandardError => e
::Bigcommerce::Prometheus.logger&.warn(
"[bc-prometheus-ruby] resque_job perform_duration push failed: #{e.message}"
)
end

private

def install_hooks
return if @hooks_installed

::Resque::Worker.prepend(WorkerInstrumentation)
@hooks_installed = true
end
end

##
# Prepended onto Resque::Worker to capture both queue latency
# (before super) and perform duration (after super) for every
# job that goes through perform_with_fork. JobPayload is built
# once per job and shared between the two recordings.
#
module WorkerInstrumentation
def perform_with_fork(job, &block)
started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
payload = JobPayload.new(job)
JobMetrics.record_queue_latency(payload)
super
ensure
JobMetrics.record_perform_duration(
payload,
Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at
)
end
end
end
end
end
end
end
94 changes: 94 additions & 0 deletions lib/bigcommerce/prometheus/integrations/resque/job_payload.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# frozen_string_literal: true

# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
module Bigcommerce
module Prometheus
module Integrations
class Resque
##
# Extracts the fields per-job metrics need from a Resque::Job's
# payload. Eagerly parses in #initialize and exposes plain
# attr_readers — does not hold a reference to the Resque::Job after
# construction.
#
# See JobMetrics's class-level docs for the ActiveJob-shaped payload
# contract this class consumes.
#
class JobPayload
# @return [String] the user's actual job class name. For
# ActiveJob-shaped payloads this is `args[0]['job_class']`.
# Falls back to the top-level Resque `class` field for vanilla
# Resque payloads. `'unknown'` if neither is available, or if
# the payload is malformed (nil, non-Hash, etc.).
attr_reader :job_class

# @return [Time, nil] the queue-latency anchor. Prefers
# `scheduled_at` over `enqueued_at` (so retries-with-backoff
# don't count the intentional wait). nil for non-ActiveJob-shaped
# payloads (where `args[0]` isn't a Hash), for payloads where
# both timestamps are absent or unparseable, and for malformed
# payloads.
attr_reader :anchor_time

# @param [Resque::Job] resque_job
def initialize(resque_job)
payload = resque_job.payload || {}
inner = extract_activejob_inner(payload)

@job_class = extract_job_class(payload, inner)
@anchor_time = extract_anchor_time(inner)
end

private

# Returns the inner ActiveJob-shaped payload (`args[0]` if it's a
# Hash), or nil for any payload shape that isn't ActiveJob-wrapped
# (vanilla Resque jobs with primitive args, non-Hash payloads,
# `args` not being an Array, etc.).
def extract_activejob_inner(payload)
return nil unless payload.is_a?(Hash)

args = payload['args']
first = args.is_a?(Array) ? args.first : nil
first.is_a?(Hash) ? first : nil
end

def extract_job_class(payload, inner)
inner_class = inner.is_a?(Hash) ? inner['job_class'] : nil
outer_class = payload.is_a?(Hash) ? payload['class'] : nil
inner_class || outer_class || 'unknown'
end

def extract_anchor_time(inner)
return nil unless inner.is_a?(Hash)

parse_time(inner['scheduled_at']) || parse_time(inner['enqueued_at'])
end

def parse_time(value)
return value if value.is_a?(Time)
return nil if value.nil? || value.to_s.empty?

Time.iso8601(value.to_s)
rescue ArgumentError
nil
end
end
end
end
end
end
11 changes: 5 additions & 6 deletions lib/bigcommerce/prometheus/type_collectors/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ module Bigcommerce
module Prometheus
module TypeCollectors
##
# Collect resque data from collectors and parse them into metrics
# Aggregates the periodic worker/queue state pushed from
# `Bigcommerce::Prometheus::Collectors::Resque#collect`. Per-job
# histograms (`queue_latency`, `perform_duration`) live in
# `TypeCollectors::ResqueJob` so the upstream router can dispatch each
# envelope type to its own collector.
#
class Resque < Bigcommerce::Prometheus::TypeCollectors::Base
##
# Initialize the collector
#
# @return [Hash]
#
def build_metrics
Expand All @@ -38,9 +40,6 @@ def build_metrics
}
end

##
# Collect resque metrics from input data
#
def collect_metrics(data:, labels: {})
metric(:workers_total).observe(data['workers_total'], labels)
metric(:jobs_failed_total).observe(data['jobs_failed_total'], labels)
Expand Down
Loading