From 12b01d85239a027eb81628540b3ac00a300b52c8 Mon Sep 17 00:00:00 2001 From: Willem Homan Date: Fri, 29 May 2026 10:11:44 +1000 Subject: [PATCH] feat(platform): PAYMENTS-11567 Resque metrics for time spent on queue on worker processing time --- CHANGELOG.md | 2 + README.md | 12 ++ lib/bigcommerce/prometheus.rb | 3 + lib/bigcommerce/prometheus/configuration.rb | 1 + .../prometheus/instrumentors/resque.rb | 1 + .../prometheus/integrations/resque.rb | 3 + .../integrations/resque/job_metrics.rb | 152 +++++++++++++++ .../integrations/resque/job_payload.rb | 94 +++++++++ .../prometheus/type_collectors/resque.rb | 11 +- .../prometheus/type_collectors/resque_job.rb | 86 +++++++++ .../integrations/resque/job_metrics_spec.rb | 151 +++++++++++++++ .../integrations/resque/job_payload_spec.rb | 179 ++++++++++++++++++ .../type_collectors/resque_job_spec.rb | 136 +++++++++++++ .../prometheus/type_collectors/resque_spec.rb | 34 ++-- 14 files changed, 844 insertions(+), 21 deletions(-) create mode 100644 lib/bigcommerce/prometheus/integrations/resque/job_metrics.rb create mode 100644 lib/bigcommerce/prometheus/integrations/resque/job_payload.rb create mode 100644 lib/bigcommerce/prometheus/type_collectors/resque_job.rb create mode 100644 spec/bigcommerce/prometheus/integrations/resque/job_metrics_spec.rb create mode 100644 spec/bigcommerce/prometheus/integrations/resque/job_payload_spec.rb create mode 100644 spec/bigcommerce/prometheus/type_collectors/resque_job_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b08c51..d76462a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ Changelog for the bc-prometheus-ruby gem. ### Pending Release +- Add opt-in per-Resque-job histograms `resque_job_queue_latency_seconds` and `resque_job_perform_duration_seconds`, labelled by `job_class`. Recorded from the parent worker process (via `Resque.before_fork` and a `Module#prepend` around `Resque::Worker#perform_with_fork`), so no synchronous flush is needed in the forked child. Gated by `PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED` (default off). + ## 0.8.1 - Prometheus client respects the enabled setting diff --git a/README.md b/README.md index dc25274..2dfb2d8 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,17 @@ require 'bigcommerce/prometheus' Bigcommerce::Prometheus::Instrumentors::Resque.new(app: Rails.application).start ``` +### Per-job metrics (opt-in) + +Set `PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED=1` on Resque worker pods to enable two additional histograms recorded from the parent worker process (no per-job synchronous flush): + +- `resque_job_queue_latency_seconds{job_class}` — time from `scheduled_at` (falling back to `enqueued_at`) until a worker picks the job up. Per attempt; retries-with-backoff anchor on `scheduled_at` so the intentional backoff doesn't show as latency. +- `resque_job_perform_duration_seconds{job_class}` — total Resque child lifetime (fork → `Process.waitpid` return). Includes fork overhead, Redis reconnect, after_fork hooks, perform, and exit. + +These are off by default because they emit one histogram observation per job per worker pod, which adds cardinality. Opt in per service. + +ActiveJob-wrapped jobs are unwrapped automatically — the `job_class` label is the user's job class name, not `ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper`. + ## Configuration After requiring the main file, you can further configure with: @@ -58,6 +69,7 @@ After requiring the main file, you can further configure with: | server_thread_pool_size | The number of threads used for the exporter server | `3` | `ENV['PROMETHEUS_SERVER_THREAD_POOL_SIZE']` | | process_name | What the current process name is (used in logging) | `"unknown"` | `ENV['PROCESS']` | | railtie_disabled | Opt out flag for Railtie; use `Bigcommerce::Prometheus::Instrumentors::Web.new(app: Rails.application).start` in your app's code to start it up yourself | `0` | `ENV['PROMETHEUS_DISABLE_RAILTIE']` | +| resque_per_job_metrics_enabled | Enable per-job queue-latency and perform-duration histograms (parent-side, no synchronous flush) | `0` | `ENV['PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED']` | ## Custom Collectors diff --git a/lib/bigcommerce/prometheus.rb b/lib/bigcommerce/prometheus.rb index 105c1e1..0a0cbbb 100644 --- a/lib/bigcommerce/prometheus.rb +++ b/lib/bigcommerce/prometheus.rb @@ -35,6 +35,7 @@ require_relative 'prometheus/collectors/resque' require_relative 'prometheus/type_collectors/base' require_relative 'prometheus/type_collectors/resque' +require_relative 'prometheus/type_collectors/resque_job' require_relative 'prometheus/integrations/active_record' require_relative 'prometheus/type_collectors/active_record' @@ -44,6 +45,8 @@ require_relative 'prometheus/integrations/railtie' if defined?(Rails) require_relative 'prometheus/integrations/puma' require_relative 'prometheus/integrations/resque' +require_relative 'prometheus/integrations/resque/job_payload' +require_relative 'prometheus/integrations/resque/job_metrics' require_relative 'prometheus/servers/puma/server' require_relative 'prometheus/servers/puma/rack_app' diff --git a/lib/bigcommerce/prometheus/configuration.rb b/lib/bigcommerce/prometheus/configuration.rb index d7f0a48..ca5c9f1 100644 --- a/lib/bigcommerce/prometheus/configuration.rb +++ b/lib/bigcommerce/prometheus/configuration.rb @@ -35,6 +35,7 @@ module Configuration puma_process_label: ENV.fetch('PROMETHEUS_PUMA_PROCESS_LABEL', 'web').to_s, resque_collection_frequency: ENV.fetch('PROMETHEUS_RESQUE_COLLECTION_FREQUENCY', 30).to_i, resque_process_label: ENV.fetch('PROMETHEUS_REQUEST_PROCESS_LABEL', 'resque').to_s, + resque_per_job_metrics_enabled: ENV.fetch('PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED', 0).to_i.positive?, # Server configuration not_found_body: ENV.fetch('PROMETHEUS_SERVER_NOT_FOUND_BODY', 'Not Found! The Prometheus Ruby Exporter only listens on /metrics and /send-metrics').to_s, diff --git a/lib/bigcommerce/prometheus/instrumentors/resque.rb b/lib/bigcommerce/prometheus/instrumentors/resque.rb index bffa45b..61a73dc 100644 --- a/lib/bigcommerce/prometheus/instrumentors/resque.rb +++ b/lib/bigcommerce/prometheus/instrumentors/resque.rb @@ -46,6 +46,7 @@ def start server.add_type_collector(PrometheusExporter::Server::ActiveRecordCollector.new) server.add_type_collector(Bigcommerce::Prometheus::TypeCollectors::Resque.new) + server.add_type_collector(Bigcommerce::Prometheus::TypeCollectors::ResqueJob.new) @type_collectors.each do |tc| server.add_type_collector(tc) end diff --git a/lib/bigcommerce/prometheus/integrations/resque.rb b/lib/bigcommerce/prometheus/integrations/resque.rb index 765a0a3..53de6c7 100644 --- a/lib/bigcommerce/prometheus/integrations/resque.rb +++ b/lib/bigcommerce/prometheus/integrations/resque.rb @@ -34,6 +34,9 @@ def self.start(client: nil) client: client || ::Bigcommerce::Prometheus.client, frequency: ::Bigcommerce::Prometheus.resque_collection_frequency ) + ::Bigcommerce::Prometheus::Integrations::Resque::JobMetrics.start( + client: client || ::Bigcommerce::Prometheus.client + ) end end end diff --git a/lib/bigcommerce/prometheus/integrations/resque/job_metrics.rb b/lib/bigcommerce/prometheus/integrations/resque/job_metrics.rb new file mode 100644 index 0000000..01eef92 --- /dev/null +++ b/lib/bigcommerce/prometheus/integrations/resque/job_metrics.rb @@ -0,0 +1,152 @@ +# frozen_string_literal: true + +# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +module Bigcommerce + module Prometheus + module Integrations + class Resque + ## + # Per-Resque-job histogram metrics, recorded from the parent worker + # process. Hooked via a prepend around + # Resque::Worker#perform_with_fork — queue latency is captured before + # super, perform duration after. + # + # Off unless PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED=1 — emits one + # histogram observation per job per worker process, which can be high + # cardinality at scale. + # + # NOTE: queue_latency only emits for jobs whose Resque payload is + # ActiveJob-shaped. The gem reads three fields from + # `payload['args'][0]` (which must be a Hash): + # + # * job_class — the user's actual job class name; used as the + # metric label. + # * enqueued_at — ISO 8601 string; used as the queue-latency + # anchor when scheduled_at is absent. + # * scheduled_at — ISO 8601 string; preferred over enqueued_at + # when present (e.g. retries-with-backoff, so + # the intentional wait isn't counted as latency). + # + # ActiveJob produces this shape natively when jobs are enqueued via + # .perform_later — the payload is wrapped by + # ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper, which stamps + # the three fields above into `args[0]`. + # + # Vanilla Resque jobs (`class MyJob; @queue = :foo; def self.perform; + # end`, enqueued via Resque.enqueue) do not produce this shape — + # their args are raw primitive values, not a wrapping hash. For + # these jobs, queue_latency silently no-ops. perform_duration works + # for both styles regardless. + # + # Services that want queue_latency for vanilla Resque jobs can + # produce the ActiveJob-shaped payload themselves via a service-local + # wrapper class — see the PR description for options. + # + module JobMetrics + class << self + ## + # Install the parent-side hooks if the per-job metrics feature is + # enabled. Idempotent: safe to call multiple times. + # + # @param [PrometheusExporter::Client] client + # + def start(client:) + return unless ::Bigcommerce::Prometheus.resque_per_job_metrics_enabled + + @client = client + install_hooks + end + + ## + # Push the queue-latency observation for a job that's about to be + # picked up by a worker. Anchors on scheduled_at if present (so + # retries-with-backoff don't show the intentional wait as latency), + # falling back to enqueued_at. + # + # @param [JobPayload] payload + # + def record_queue_latency(payload) + anchor = payload.anchor_time + return unless anchor + + @client.send_json( + type: 'resque_job', + metric: 'queue_latency', + value: (Time.now - anchor).to_f, + custom_labels: { job_class: payload.job_class } + ) + rescue StandardError => e + ::Bigcommerce::Prometheus.logger&.warn( + "[bc-prometheus-ruby] resque_job queue_latency push failed: #{e.message}" + ) + end + + ## + # Push the perform-duration observation for a completed job. Called + # from the `Resque::Worker#perform_with_fork` prepend, so it measures + # the full child lifetime (fork + reconnect + perform + exit). + # + # @param [JobPayload] payload + # @param [Float] duration in seconds + # + def record_perform_duration(payload, duration) + @client.send_json( + type: 'resque_job', + metric: 'perform_duration', + value: duration, + custom_labels: { job_class: payload.job_class } + ) + rescue StandardError => e + ::Bigcommerce::Prometheus.logger&.warn( + "[bc-prometheus-ruby] resque_job perform_duration push failed: #{e.message}" + ) + end + + private + + def install_hooks + return if @hooks_installed + + ::Resque::Worker.prepend(WorkerInstrumentation) + @hooks_installed = true + end + end + + ## + # Prepended onto Resque::Worker to capture both queue latency + # (before super) and perform duration (after super) for every + # job that goes through perform_with_fork. JobPayload is built + # once per job and shared between the two recordings. + # + module WorkerInstrumentation + def perform_with_fork(job, &block) + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + payload = JobPayload.new(job) + JobMetrics.record_queue_latency(payload) + super + ensure + JobMetrics.record_perform_duration( + payload, + Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at + ) + end + end + end + end + end + end +end diff --git a/lib/bigcommerce/prometheus/integrations/resque/job_payload.rb b/lib/bigcommerce/prometheus/integrations/resque/job_payload.rb new file mode 100644 index 0000000..a9ed243 --- /dev/null +++ b/lib/bigcommerce/prometheus/integrations/resque/job_payload.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +module Bigcommerce + module Prometheus + module Integrations + class Resque + ## + # Extracts the fields per-job metrics need from a Resque::Job's + # payload. Eagerly parses in #initialize and exposes plain + # attr_readers — does not hold a reference to the Resque::Job after + # construction. + # + # See JobMetrics's class-level docs for the ActiveJob-shaped payload + # contract this class consumes. + # + class JobPayload + # @return [String] the user's actual job class name. For + # ActiveJob-shaped payloads this is `args[0]['job_class']`. + # Falls back to the top-level Resque `class` field for vanilla + # Resque payloads. `'unknown'` if neither is available, or if + # the payload is malformed (nil, non-Hash, etc.). + attr_reader :job_class + + # @return [Time, nil] the queue-latency anchor. Prefers + # `scheduled_at` over `enqueued_at` (so retries-with-backoff + # don't count the intentional wait). nil for non-ActiveJob-shaped + # payloads (where `args[0]` isn't a Hash), for payloads where + # both timestamps are absent or unparseable, and for malformed + # payloads. + attr_reader :anchor_time + + # @param [Resque::Job] resque_job + def initialize(resque_job) + payload = resque_job.payload || {} + inner = extract_activejob_inner(payload) + + @job_class = extract_job_class(payload, inner) + @anchor_time = extract_anchor_time(inner) + end + + private + + # Returns the inner ActiveJob-shaped payload (`args[0]` if it's a + # Hash), or nil for any payload shape that isn't ActiveJob-wrapped + # (vanilla Resque jobs with primitive args, non-Hash payloads, + # `args` not being an Array, etc.). + def extract_activejob_inner(payload) + return nil unless payload.is_a?(Hash) + + args = payload['args'] + first = args.is_a?(Array) ? args.first : nil + first.is_a?(Hash) ? first : nil + end + + def extract_job_class(payload, inner) + inner_class = inner.is_a?(Hash) ? inner['job_class'] : nil + outer_class = payload.is_a?(Hash) ? payload['class'] : nil + inner_class || outer_class || 'unknown' + end + + def extract_anchor_time(inner) + return nil unless inner.is_a?(Hash) + + parse_time(inner['scheduled_at']) || parse_time(inner['enqueued_at']) + end + + def parse_time(value) + return value if value.is_a?(Time) + return nil if value.nil? || value.to_s.empty? + + Time.iso8601(value.to_s) + rescue ArgumentError + nil + end + end + end + end + end +end diff --git a/lib/bigcommerce/prometheus/type_collectors/resque.rb b/lib/bigcommerce/prometheus/type_collectors/resque.rb index d26950f..8f39e61 100644 --- a/lib/bigcommerce/prometheus/type_collectors/resque.rb +++ b/lib/bigcommerce/prometheus/type_collectors/resque.rb @@ -19,12 +19,14 @@ module Bigcommerce module Prometheus module TypeCollectors ## - # Collect resque data from collectors and parse them into metrics + # Aggregates the periodic worker/queue state pushed from + # `Bigcommerce::Prometheus::Collectors::Resque#collect`. Per-job + # histograms (`queue_latency`, `perform_duration`) live in + # `TypeCollectors::ResqueJob` so the upstream router can dispatch each + # envelope type to its own collector. # class Resque < Bigcommerce::Prometheus::TypeCollectors::Base ## - # Initialize the collector - # # @return [Hash] # def build_metrics @@ -38,9 +40,6 @@ def build_metrics } end - ## - # Collect resque metrics from input data - # def collect_metrics(data:, labels: {}) metric(:workers_total).observe(data['workers_total'], labels) metric(:jobs_failed_total).observe(data['jobs_failed_total'], labels) diff --git a/lib/bigcommerce/prometheus/type_collectors/resque_job.rb b/lib/bigcommerce/prometheus/type_collectors/resque_job.rb new file mode 100644 index 0000000..7fe706b --- /dev/null +++ b/lib/bigcommerce/prometheus/type_collectors/resque_job.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +module Bigcommerce + module Prometheus + module TypeCollectors + ## + # Aggregates per-Resque-job histogram observations (`type: 'resque_job'`) + # pushed from worker processes by + # `Bigcommerce::Prometheus::Integrations::Resque::JobMetrics`. Separate + # from `TypeCollectors::Resque` (which handles the aggregate worker/queue + # gauges) so the upstream router can dispatch each envelope type to its + # own collector — no in-collector case-dispatch needed. + # + class ResqueJob < Bigcommerce::Prometheus::TypeCollectors::Base + ## + # Override the auto-derived type so envelopes tagged `type: 'resque_job'` + # route to this collector via `PrometheusExporter::Server::Collector`. + # + def initialize(default_labels: {}) + super(type: 'resque_job', default_labels: default_labels) + end + + ## + # @return [Hash] + # + def build_metrics + { + queue_latency: build_queue_latency_histogram, + perform_duration: build_perform_duration_histogram + } + end + + ## + # Observe the histogram for the named metric. Labels have already been + # merged with `custom_labels` by `TypeCollectors::Base#collect`. + # + def collect_metrics(data:, labels: {}) + name = data['metric']&.to_sym + return unless %i[queue_latency perform_duration].include?(name) + + metric(name).observe(data['value'], labels) + end + + private + + def build_queue_latency_histogram + PrometheusExporter::Metric::Histogram.new( + 'resque_job_queue_latency_seconds', + 'Seconds between when a Resque job was due to run (scheduled_at if set, ' \ + 'falling back to enqueued_at) and when a worker process picked it up. ' \ + 'Recorded per attempt; retries-with-backoff anchor on scheduled_at, ' \ + 'excluding the intentional backoff wait. Opt-in via ' \ + 'PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED.', + buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 30, 60, 120, 300] + ) + end + + def build_perform_duration_histogram + PrometheusExporter::Metric::Histogram.new( + 'resque_job_perform_duration_seconds', + 'Total Resque child process lifetime (fork to waitpid). Includes ' \ + 'fork overhead, Redis reconnect, after_fork hooks, perform, and ' \ + 'exit. Used as the per-job throughput signal at the worker-pod ' \ + 'level. Opt-in via PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED.', + buckets: [0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30, 60] + ) + end + end + end + end +end diff --git a/spec/bigcommerce/prometheus/integrations/resque/job_metrics_spec.rb b/spec/bigcommerce/prometheus/integrations/resque/job_metrics_spec.rb new file mode 100644 index 0000000..1e1c500 --- /dev/null +++ b/spec/bigcommerce/prometheus/integrations/resque/job_metrics_spec.rb @@ -0,0 +1,151 @@ +# frozen_string_literal: true + +# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +require 'spec_helper' + +# NOTE: this spec deliberately does not `require 'resque'`. Pulling Resque +# into this gem's dev bundle conflicts with the gemspec's `rack >= 3.0` +# requirement (Resque -> sinatra (old) -> rack < 3 under cold bundle +# resolution). See the PR description for the testing-gap implications. +# +# We test the pure logic in JobMetrics (anchor selection, payload unwrapping, +# label assembly, error rescue) by injecting a client directly via +# `instance_variable_set(:@client, ...)` rather than calling `.start`, which +# would invoke `::Resque.before_fork` and `::Resque::Worker.prepend`. The +# `.start` install behaviour itself is not tested in this gem until Resque +# can be added to the dev bundle (blocked on bumping the Sinatra dep — see +# follow-up). + +describe Bigcommerce::Prometheus::Integrations::Resque::JobMetrics do + let(:client) { instance_double(PrometheusExporter::Client, send_json: nil) } + + before do + # Inject the client directly. We can't go through `.start` because that + # would call ::Resque::Worker.prepend, which is unavailable in this gem's + # dev bundle. The production code path is the same once `@client` is set. + described_class.instance_variable_set(:@client, client) + end + + after do + described_class.instance_variable_set(:@client, nil) + end + + # A minimal stand-in for Resque::Job — the production code only ever calls + # `.payload` on it, so a plain double is sufficient. + def resque_job_double(payload) + double('Resque::Job', payload: payload) + end + + # Build a real JobPayload from a payload hash. Both record_* methods take + # a JobPayload instance (the WorkerInstrumentation prepend constructs it + # once per job and shares it between the two recordings); JobPayload's own + # parsing edge cases are covered in job_payload_spec.rb. + def payload_for(payload_hash) + Bigcommerce::Prometheus::Integrations::Resque::JobPayload.new(resque_job_double(payload_hash)) + end + + def active_job_payload(job_class: 'MyJob', enqueued_at: nil, scheduled_at: nil) + { + 'class' => 'ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper', + 'args' => [ + { + 'job_class' => job_class, + 'arguments' => [], + 'enqueued_at' => enqueued_at, + 'scheduled_at' => scheduled_at + }.compact + ] + } + end + + # Payload-shape edge cases (anchor selection, ActiveJob unwrapping, time + # parsing) are covered directly in job_payload_spec.rb. Tests here focus + # on JobMetrics's distinct responsibilities: envelope shape, gating, + # error rescue, and the JobMetrics → JobPayload integration. + + describe '.record_queue_latency' do + it 'pushes the correct envelope for an ActiveJob-shaped payload' do + enqueued = (Time.now - 3).iso8601(6) + + expect(client).to receive(:send_json).with( + type: 'resque_job', + metric: 'queue_latency', + value: a_value_within(0.5).of(3), + custom_labels: { job_class: 'MyJob' } + ) + + described_class.record_queue_latency(payload_for(active_job_payload(enqueued_at: enqueued))) + end + + it 'is a no-op for a vanilla Resque payload (no anchor available, no exception)' do + payload = { 'class' => 'RawResqueJob', 'args' => [12345, 'some_string'] } + + expect(client).not_to receive(:send_json) + + expect do + described_class.record_queue_latency(payload_for(payload)) + end.not_to raise_error + end + + context 'when client.send_json raises' do + it 'rescues the error and logs a warning' do + allow(client).to receive(:send_json).and_raise(StandardError, 'boom') + expect(Bigcommerce::Prometheus.logger).to receive(:warn).with(/queue_latency push failed: boom/) + + expect do + described_class.record_queue_latency( + payload_for(active_job_payload(enqueued_at: Time.now.iso8601(6))) + ) + end.not_to raise_error + end + end + end + + describe '.record_perform_duration' do + it 'pushes the correct envelope for an ActiveJob-shaped payload' do + expect(client).to receive(:send_json).with( + type: 'resque_job', + metric: 'perform_duration', + value: 0.42, + custom_labels: { job_class: 'MyJob' } + ) + + described_class.record_perform_duration(payload_for(active_job_payload), 0.42) + end + + it 'labels with the raw Resque payload class for vanilla Resque jobs' do + payload = { 'class' => 'RawResqueJob', 'args' => [12345, 'some_string'] } + + expect(client).to receive(:send_json).with( + hash_including(custom_labels: { job_class: 'RawResqueJob' }) + ) + + described_class.record_perform_duration(payload_for(payload), 0.5) + end + + context 'when client.send_json raises' do + it 'rescues the error and logs a warning' do + allow(client).to receive(:send_json).and_raise(StandardError, 'boom') + expect(Bigcommerce::Prometheus.logger).to receive(:warn).with(/perform_duration push failed: boom/) + + expect do + described_class.record_perform_duration(payload_for(active_job_payload), 0.1) + end.not_to raise_error + end + end + end +end diff --git a/spec/bigcommerce/prometheus/integrations/resque/job_payload_spec.rb b/spec/bigcommerce/prometheus/integrations/resque/job_payload_spec.rb new file mode 100644 index 0000000..4984d42 --- /dev/null +++ b/spec/bigcommerce/prometheus/integrations/resque/job_payload_spec.rb @@ -0,0 +1,179 @@ +# frozen_string_literal: true + +# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +require 'spec_helper' + +describe Bigcommerce::Prometheus::Integrations::Resque::JobPayload do + def resque_job_double(payload) + double('Resque::Job', payload: payload) + end + + def active_job_payload(job_class: 'MyJob', enqueued_at: nil, scheduled_at: nil) + { + 'class' => 'ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper', + 'args' => [ + { + 'job_class' => job_class, + 'arguments' => [], + 'enqueued_at' => enqueued_at, + 'scheduled_at' => scheduled_at + }.compact + ] + } + end + + describe '#job_class' do + it 'returns the inner job_class for an ActiveJob-shaped payload' do + payload = active_job_payload(job_class: 'BigPay::SomePublishJob') + + expect(described_class.new(resque_job_double(payload)).job_class).to eq('BigPay::SomePublishJob') + end + + it 'returns the top-level class for a vanilla Resque payload (primitive args)' do + payload = { + 'class' => 'BigPay::EnablePpcpJob', + 'args' => [12345, 'some_string'] + } + + expect(described_class.new(resque_job_double(payload)).job_class).to eq('BigPay::EnablePpcpJob') + end + + it 'returns the top-level class for a vanilla Resque payload with no args' do + payload = { 'class' => 'BigPay::EmptyJob', 'args' => [] } + + expect(described_class.new(resque_job_double(payload)).job_class).to eq('BigPay::EmptyJob') + end + + it "returns 'unknown' for a malformed payload (no class, no usable args)" do + payload = { 'args' => [] } + + expect(described_class.new(resque_job_double(payload)).job_class).to eq('unknown') + end + + it "returns 'unknown' when the resque job's payload is nil" do + expect(described_class.new(resque_job_double(nil)).job_class).to eq('unknown') + end + + it "returns 'unknown' when the payload is not a Hash" do + expect(described_class.new(resque_job_double('not a hash')).job_class).to eq('unknown') + end + + it "falls back to the outer class when 'args' is not an Array" do + payload = { 'class' => 'BigPay::WeirdJob', 'args' => 'not an array' } + + expect(described_class.new(resque_job_double(payload)).job_class).to eq('BigPay::WeirdJob') + end + + it 'prefers the inner job_class over the wrapper class' do + # Belt-and-braces: even when the top-level class is the JobWrapper, + # the inner job_class wins. + payload = active_job_payload(job_class: 'BigPay::InnerJob') + + expect(payload['class']).to eq('ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper') + expect(described_class.new(resque_job_double(payload)).job_class).to eq('BigPay::InnerJob') + end + end + + describe '#anchor_time' do + it 'prefers scheduled_at when both are present (excludes the intentional backoff)' do + enqueued = (Time.now - 30).iso8601(6) + scheduled = (Time.now - 1).iso8601(6) + + anchor = described_class.new( + resque_job_double(active_job_payload(enqueued_at: enqueued, scheduled_at: scheduled)) + ).anchor_time + + expect(anchor).to be_within(0.5).of(Time.now - 1) + end + + it 'falls back to enqueued_at when scheduled_at is absent' do + enqueued = (Time.now - 3).iso8601(6) + + anchor = described_class.new( + resque_job_double(active_job_payload(enqueued_at: enqueued)) + ).anchor_time + + expect(anchor).to be_within(0.5).of(Time.now - 3) + end + + it 'returns nil when neither timestamp is present' do + anchor = described_class.new(resque_job_double(active_job_payload)).anchor_time + + expect(anchor).to be_nil + end + + it 'returns nil for a vanilla Resque payload (primitive args)' do + payload = { + 'class' => 'BigPay::EnablePpcpJob', + 'args' => [12345, 'some_string'] + } + + expect(described_class.new(resque_job_double(payload)).anchor_time).to be_nil + end + + it 'returns nil for a payload with no args at all' do + payload = { 'class' => 'BigPay::EmptyJob' } + + expect(described_class.new(resque_job_double(payload)).anchor_time).to be_nil + end + + it 'handles a Time instance directly in the payload' do + time = Time.now - 2 + payload = active_job_payload + payload['args'][0]['enqueued_at'] = time + + anchor = described_class.new(resque_job_double(payload)).anchor_time + + expect(anchor).to eq(time) + end + + it 'returns nil for an unparseable string (no exception)' do + payload = active_job_payload(enqueued_at: 'not a real time') + + expect do + @anchor = described_class.new(resque_job_double(payload)).anchor_time + end.not_to raise_error + expect(@anchor).to be_nil + end + + it 'returns nil for an empty string' do + payload = active_job_payload(enqueued_at: '') + + expect(described_class.new(resque_job_double(payload)).anchor_time).to be_nil + end + + it "returns nil when the resque job's payload is nil" do + expect(described_class.new(resque_job_double(nil)).anchor_time).to be_nil + end + + it "returns nil when 'args' is not an Array" do + payload = { 'class' => 'BigPay::WeirdJob', 'args' => 'not an array' } + + expect(described_class.new(resque_job_double(payload)).anchor_time).to be_nil + end + end + + describe 'field independence under partial failure' do + it 'still returns job_class when the enqueued_at timestamp is unparseable' do + payload = active_job_payload(job_class: 'BigPay::SomeJob', enqueued_at: 'not a real time') + result = described_class.new(resque_job_double(payload)) + + expect(result.job_class).to eq('BigPay::SomeJob') + expect(result.anchor_time).to be_nil + end + end +end diff --git a/spec/bigcommerce/prometheus/type_collectors/resque_job_spec.rb b/spec/bigcommerce/prometheus/type_collectors/resque_job_spec.rb new file mode 100644 index 0000000..757b1ea --- /dev/null +++ b/spec/bigcommerce/prometheus/type_collectors/resque_job_spec.rb @@ -0,0 +1,136 @@ +# frozen_string_literal: true + +# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +require 'spec_helper' + +describe Bigcommerce::Prometheus::TypeCollectors::ResqueJob do + let(:type_collector) { described_class.new } + + # The registered type is the production-routing contract: + # PrometheusExporter::Server::Collector keys @collectors by collector.type + # and dispatches each envelope by envelope['type']. JobMetrics.record_* + # hardcodes envelopes with type: 'resque_job', so this collector must + # register under the same string for the router to find it. Asserting it + # here keeps the contract from drifting silently (e.g. someone removing the + # explicit type: argument and falling back to the gsub auto-derivation). + describe '#type' do + it 'returns "resque_job" so the upstream router can dispatch resque_job envelopes' do + expect(type_collector.type).to eq('resque_job') + end + end + + describe '#build_metrics' do + subject { type_collector.build_metrics } + + it 'returns a hash of prometheus metric objects' do + expect(subject).to be_a(Hash) + expect(subject.count).to eq 2 + end + + { + queue_latency: { + name: 'resque_job_queue_latency_seconds', + class: PrometheusExporter::Metric::Histogram + }, + perform_duration: { + name: 'resque_job_perform_duration_seconds', + class: PrometheusExporter::Metric::Histogram + } + }.each do |hash_key, config| + it "builds the #{hash_key} with the expected class and name" do + metric = subject[hash_key] + expect(metric).to be_a config[:class] + expect(metric.name).to eq config[:name] + end + end + end + + # Exercise the public #collect entry point (defined on + # TypeCollectors::Base) so the spec catches any label-handling bugs in the + # Base → subclass dispatch — e.g. custom_labels getting merged twice. + describe '#collect' do + subject { type_collector.collect(data) } + + let(:type_collector) { described_class.new(default_labels: default_labels) } + let(:default_labels) { { environment: 'development' } } + + context 'with a queue_latency payload' do + let(:data) do + { + 'type' => 'resque_job', + 'metric' => 'queue_latency', + 'value' => 1.5, + 'custom_labels' => { 'job_class' => 'MyJob' } + } + end + + # Base#collect merges data['custom_labels'] into labels once. The + # subclass must not re-merge — observing with the labels as-is is the + # correct behaviour. If a future refactor reintroduces a second merge, + # this assertion fails because the observed label hash would also + # include a duplicated/symbolised key. + it 'observes the queue_latency histogram with custom_labels merged exactly once' do + metrics = type_collector.instance_variable_get(:@metrics) + + expect(metrics[:queue_latency]).to receive(:observe).with(1.5, default_labels.merge('job_class' => 'MyJob')) + expect(metrics[:perform_duration]).not_to receive(:observe) + + subject + end + end + + context 'with a perform_duration payload' do + let(:data) do + { + 'type' => 'resque_job', + 'metric' => 'perform_duration', + 'value' => 0.25, + 'custom_labels' => { 'job_class' => 'MyJob', 'event_name' => 'foo' } + } + end + + it 'observes the perform_duration histogram with custom_labels merged exactly once' do + metrics = type_collector.instance_variable_get(:@metrics) + + expect(metrics[:perform_duration]).to receive(:observe).with(0.25, default_labels.merge('job_class' => 'MyJob', 'event_name' => 'foo')) + expect(metrics[:queue_latency]).not_to receive(:observe) + + subject + end + end + + context 'with a payload with an unrecognised metric name' do + let(:data) do + { + 'type' => 'resque_job', + 'metric' => 'mystery', + 'value' => 0.25, + 'custom_labels' => {} + } + end + + it 'is a no-op' do + metrics = type_collector.instance_variable_get(:@metrics) + + expect(metrics[:queue_latency]).not_to receive(:observe) + expect(metrics[:perform_duration]).not_to receive(:observe) + + subject + end + end + end +end diff --git a/spec/bigcommerce/prometheus/type_collectors/resque_spec.rb b/spec/bigcommerce/prometheus/type_collectors/resque_spec.rb index 95ca845..cc74a2c 100644 --- a/spec/bigcommerce/prometheus/type_collectors/resque_spec.rb +++ b/spec/bigcommerce/prometheus/type_collectors/resque_spec.rb @@ -58,22 +58,26 @@ name: 'queue_sizes', class: PrometheusExporter::Metric::Gauge, help: 'Size of each queue' - }, - + } }.each do |hash_key, config| - it "builds the #{hash_key} with stat key of #{config[:key]}" do + it "builds the #{hash_key} with the expected class and name" do metric = subject[hash_key] expect(metric).to be_a config[:class] expect(metric.name).to eq config[:name] - expect(metric.help).to eq config[:help] + expect(metric.help).to eq(config[:help]) if config[:help] end end end - describe '#collect_metrics' do - subject { type_collector.collect_metrics(data: data, labels: labels) } + # Exercise the public #collect entry point (defined on + # TypeCollectors::Base) so the spec catches any label-handling bugs in the + # Base → subclass dispatch — e.g. custom_labels getting merged twice. + describe '#collect' do + subject { type_collector.collect(data) } + + let(:type_collector) { described_class.new(default_labels: default_labels) } + let(:default_labels) { { environment: 'development' } } - let(:labels) { { environment: 'development' } } let(:data) do { 'workers_total' => 10, @@ -92,15 +96,15 @@ it 'properly logs metrics for all passed values' do metrics = type_collector.instance_variable_get(:@metrics) - expect(metrics[:workers_total]).to receive(:observe).with(data['workers_total'], labels) - expect(metrics[:jobs_failed_total]).to receive(:observe).with(data['jobs_failed_total'], labels) - expect(metrics[:jobs_pending_total]).to receive(:observe).with(data['jobs_pending_total'], labels) - expect(metrics[:jobs_processed_total]).to receive(:observe).with(data['jobs_processed_total'], labels) - expect(metrics[:queues_total]).to receive(:observe).with(data['queues_total'], labels) + expect(metrics[:workers_total]).to receive(:observe).with(data['workers_total'], default_labels) + expect(metrics[:jobs_failed_total]).to receive(:observe).with(data['jobs_failed_total'], default_labels) + expect(metrics[:jobs_pending_total]).to receive(:observe).with(data['jobs_pending_total'], default_labels) + expect(metrics[:jobs_processed_total]).to receive(:observe).with(data['jobs_processed_total'], default_labels) + expect(metrics[:queues_total]).to receive(:observe).with(data['queues_total'], default_labels) - expect(metrics[:queue_sizes]).to receive(:observe).ordered.with(data['queues']['low'], labels.merge(queue: 'low')) - expect(metrics[:queue_sizes]).to receive(:observe).ordered.with(data['queues']['medium'], labels.merge(queue: 'medium')) - expect(metrics[:queue_sizes]).to receive(:observe).ordered.with(data['queues']['high'], labels.merge(queue: 'high')) + expect(metrics[:queue_sizes]).to receive(:observe).ordered.with(data['queues']['low'], default_labels.merge(queue: 'low')) + expect(metrics[:queue_sizes]).to receive(:observe).ordered.with(data['queues']['medium'], default_labels.merge(queue: 'medium')) + expect(metrics[:queue_sizes]).to receive(:observe).ordered.with(data['queues']['high'], default_labels.merge(queue: 'high')) subject end