Skip to content

feat: Track queue latency#745

Open
zeljkoX wants to merge 2 commits intomainfrom
queue-latency-metric
Open

feat: Track queue latency#745
zeljkoX wants to merge 2 commits intomainfrom
queue-latency-metric

Conversation

@zeljkoX
Copy link
Copy Markdown
Collaborator

@zeljkoX zeljkoX commented Apr 3, 2026

Summary

  • Add queue_pickup_latency_seconds Prometheus histogram tracking time from message send to consumer pickup, across all 8 queue types and both
    backends (SQS/Redis)
    • SQS uses SentTimestamp system attribute (ms precision), with target_scheduled_on fallback to exclude intentional scheduling delay;
    • Redis uses Job.timestamp with new available_at field fallback to exclude scheduling delay from status check/health check retries
    • Add available_at optional field to Job struct, set by producers when scheduled_on is provided, backward-compatible via serde(default)

Testing Process

Checklist

  • Add a reference to related issues in the PR description.
  • Add unit tests if applicable.

Note

If you are using Relayer in your stack, consider adding your team or organization to our list of Relayer Users in the Wild!

Summary by CodeRabbit

  • New Features

    • Added support for scheduling jobs with a specific availability time.
    • Introduced queue pickup latency monitoring to measure job processing delays across Redis and SQS backends.
  • Tests

    • Extended test coverage for scheduled job functionality and serialization.
    • Added tests to verify latency measurement and observation across different queue types.

@zeljkoX zeljkoX requested a review from a team as a code owner April 3, 2026 12:21
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 3, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 405498bc-8e6e-46e9-b083-cf116d92c9fb

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Walkthrough

Introduces job scheduling metadata and queue pickup latency metrics. Jobs gain an available_at field populated via with_scheduled_on(). New QUEUE_PICKUP_LATENCY histogram tracks time from job availability to processing. Redis and SQS workers implement latency observation using job timestamps and message attributes, with Redis splitting status handlers into EVM/Stellar variants.

Changes

Cohort / File(s) Summary
Job Scheduling Foundation
src/jobs/job.rs, src/jobs/job_producer.rs
Added available_at: Option<String> field to Job<T> struct with with_scheduled_on() method for setting scheduled timestamps. Updated all job producer implementations to invoke with_scheduled_on(scheduled_on) across transaction-request, transaction-submission, status-check, notification, token-swap, and relayer-health-check handlers. Extended test scaffolding with TestRedisStorage to track pushed/scheduled jobs and their timestamps.
Metrics Infrastructure
src/metrics/mod.rs
Introduced observe_queue_pickup_latency() public function and new QUEUE_PICKUP_LATENCY Prometheus HistogramVec with queue_type and backend labels. Added comprehensive test coverage verifying histogram sample counts across multiple backends.
Redis Worker Integration
src/queues/redis/worker.rs
Added observe_redis_pickup_latency() helper to compute latency from available_at or timestamp fields and report via metrics. Integrated latency observation into Apalis job adapter handlers. Split generic apalis_transaction_status_handler into apalis_transaction_status_evm_handler and apalis_transaction_status_stellar_handler with distinct queue-type labels for metrics differentiation.
SQS Worker Integration
src/queues/sqs/worker.rs
Introduced queue_pickup_baseline_ms() helper to extract scheduled timestamp or fall back to SQS SentTimestamp attribute. Extended SQS polling to request SentTimestamp system attribute. Added latency observation in message processing with negative value clamping, excluding scheduling delay for scheduled messages. Included unit tests for baseline selection and retry skipping.

Sequence Diagram(s)

sequenceDiagram
    participant Job Producer
    participant Job Storage
    participant Worker
    participant Metrics

    Job Producer->>Job Producer: Create Job
    Job Producer->>Job Producer: Call with_scheduled_on(timestamp)
    Job Producer->>Job Storage: Store job with available_at set
    Note over Job Storage: Job remains queued until available_at passes

    Worker->>Job Storage: Poll for available jobs
    Job Storage->>Worker: Return job with available_at
    Note over Worker: Calculate latency<br/>(now - available_at)
    Worker->>Metrics: observe_queue_pickup_latency(queue_type, backend, latency)
    Metrics->>Metrics: Record to QUEUE_PICKUP_LATENCY histogram
    Worker->>Worker: Process job
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

cla: allowlist

Suggested reviewers

  • tirumerla
  • collins-w

Poem

🐰 A job waits in the queue, marked by time,
Until its available_at chimes sublime!
We measure the latency, second by second,
From availability to pickup reckoned. 📊
Metrics now bloom as the workers dance fast! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.04% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Track queue latency' clearly and concisely summarizes the main change—adding queue pickup latency tracking across queue backends.
Description check ✅ Passed The description covers the Summary section with implementation details and notes that unit tests were added. However, the Testing Process section is empty and the related issues reference checklist item is unchecked.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch queue-latency-metric

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Prometheus histogram to measure queue pickup latency (time from message/job creation/scheduled-availability to consumer pickup) across both queue backends (SQS + Redis/Apalis), with logic intended to exclude intentional scheduling delay.

Changes:

  • Add queue_pickup_latency_seconds histogram and observe_queue_pickup_latency() helper.
  • Instrument SQS worker to compute pickup latency using target_scheduled_on (preferred) or SQS SentTimestamp (fallback), only on first delivery.
  • Add available_at to Job<T> and populate it from producers so Redis workers can compute pickup latency excluding scheduling delay; instrument Redis worker adapters.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/queues/sqs/worker.rs Requests SentTimestamp and observes pickup latency on first SQS delivery (scheduled baseline preferred).
src/queues/redis/worker.rs Adds Redis/Apalis pickup-latency observation helper and wires it into each queue handler adapter; splits status handlers by network.
src/metrics/mod.rs Defines/registers queue_pickup_latency_seconds histogram and exposes an observation helper + tests.
src/jobs/job.rs Adds available_at field and builder method to persist intended availability time for scheduled jobs.
src/jobs/job_producer.rs Populates available_at via .with_scheduled_on(...) and extends tests to verify it’s persisted for scheduled jobs.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/queues/redis/worker.rs Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 3, 2026

Codecov Report

❌ Patch coverage is 63.17992% with 88 lines in your changes missing coverage. Please review.
✅ Project coverage is 90.17%. Comparing base (05c3eee) to head (3c8a5fd).
⚠️ Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
src/queues/redis/worker.rs 0.00% 81 Missing ⚠️
src/queues/sqs/worker.rs 89.65% 6 Missing ⚠️
src/metrics/mod.rs 96.42% 1 Missing ⚠️
Additional details and impacted files
Flag Coverage Δ
ai 0.00% <0.00%> (ø)
dev 90.17% <63.17%> (-0.06%) ⬇️
properties 0.01% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

@@            Coverage Diff             @@
##             main     #745      +/-   ##
==========================================
- Coverage   90.22%   90.17%   -0.06%     
==========================================
  Files         290      290              
  Lines      122082   122317     +235     
==========================================
+ Hits       110153   110300     +147     
- Misses      11929    12017      +88     
Files with missing lines Coverage Δ
src/jobs/job.rs 95.30% <100.00%> (+0.33%) ⬆️
src/jobs/job_producer.rs 92.30% <100.00%> (+0.74%) ⬆️
src/metrics/mod.rs 92.85% <96.42%> (+0.79%) ⬆️
src/queues/sqs/worker.rs 55.40% <89.65%> (+1.77%) ⬆️
src/queues/redis/worker.rs 33.23% <0.00%> (-4.23%) ⬇️

... and 2 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
src/jobs/job.rs (1)

48-51: Add a doc comment to with_scheduled_on.

This is now part of the public scheduling/metrics contract, so it should spell out the units and serialization expectations explicitly.

💡 Suggested doc comment
+    /// Persists the scheduled availability time as a Unix epoch in seconds.
+    /// Consumers use it to exclude intentional scheduling delay from pickup latency.
     pub fn with_scheduled_on(mut self, scheduled_on: Option<i64>) -> Self {
As per coding guidelines, "Include relevant doc comments (///) on public functions, structs, and modules."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/jobs/job.rs` around lines 48 - 51, Add a public doc comment (///) to the
method with_scheduled_on explaining the expected units and format of the
scheduled_on parameter and how it is serialized to available_at: state that
scheduled_on is an Option<i64> representing an epoch timestamp (explicitly state
whether seconds or milliseconds), that Some(ts) will be converted to a decimal
string via ts.to_string() and stored in available_at, and that None
clears/unsets available_at; include a short usage example and any downstream
contract expectations (e.g., timezone/UTC) in the doc comment for the public
scheduling/metrics contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/jobs/job.rs`:
- Around line 24-29: available_at is currently documented/serialized as epoch
seconds which forces second-only resolution for queue_pickup_latency_seconds;
change available_at to carry millisecond precision and propagate that precision
through the consumer path that computes queue_pickup_latency_seconds and any
comparisons against timestamp. Concretely: update the Job struct field
available_at to an integer millisecond epoch type (e.g. Option<i64> or
Option<i128>) and adjust serde (and the field doc) to serialize/deserialize
milliseconds, update the consumer code that reads available_at and the existing
timestamp handling so both use millisecond-resolution arithmetic when computing
queue_pickup_latency_seconds, and ensure any Redis serialization/deserialization
paths that consume available_at accept the millisecond format.

In `@src/queues/redis/worker.rs`:
- Around line 67-84: The helper observe_redis_pickup_latency currently ignores
Apalis attempt count and records latency on retries; modify it to accept an
attempt parameter (e.g., attempt: u32) and return early when attempt != 0 so
only the first attempt emits a sample, and update all call-sites that invoke
observe_redis_pickup_latency to pass the job attempt (from Apalis) into the new
parameter; keep the function name observe_redis_pickup_latency and ensure the
early-return check is performed before parsing timestamps or calling
observe_queue_pickup_latency.

In `@src/queues/sqs/worker.rs`:
- Around line 759-772: The gate in queue_pickup_baseline_ms incorrectly uses
ApproximateReceiveCount (receive_count) to detect a first logical pickup;
instead, change the logic to detect first pickup by checking the message's
logical retry state (inspect the message body/attributes for retry_attempt or
equivalent) and scheduling state (use parse_target_scheduled_on to detect
scheduled messages) rather than only receive_count; update
queue_pickup_baseline_ms to return a baseline when either the message has no
retry_attempt (or retry_attempt == 0) and parse_target_scheduled_on is present
or when SentTimestamp should be used for non-scheduled first pickups, and ensure
you don’t treat re-enqueued retry messages as first pickups; add regression
tests covering (1) standard queue retry messages that are re-enqueued with
retry_attempt set and should not be treated as first pickups, and (2) scheduled
FIFO messages whose visibility-timeout increments receive_count but whose first
eligible pickup must still be recognized via scheduling state (use the
parse_target_scheduled_on and MessageSystemAttributeName::SentTimestamp paths to
assert correct behavior).

---

Nitpick comments:
In `@src/jobs/job.rs`:
- Around line 48-51: Add a public doc comment (///) to the method
with_scheduled_on explaining the expected units and format of the scheduled_on
parameter and how it is serialized to available_at: state that scheduled_on is
an Option<i64> representing an epoch timestamp (explicitly state whether seconds
or milliseconds), that Some(ts) will be converted to a decimal string via
ts.to_string() and stored in available_at, and that None clears/unsets
available_at; include a short usage example and any downstream contract
expectations (e.g., timezone/UTC) in the doc comment for the public
scheduling/metrics contract.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 797e70e6-2327-424f-a7a5-fbb3ad78f65c

📥 Commits

Reviewing files that changed from the base of the PR and between 05c3eee and 34ebb38.

📒 Files selected for processing (5)
  • src/jobs/job.rs
  • src/jobs/job_producer.rs
  • src/metrics/mod.rs
  • src/queues/redis/worker.rs
  • src/queues/sqs/worker.rs

Comment thread src/jobs/job.rs
Comment thread src/queues/redis/worker.rs
Comment thread src/queues/sqs/worker.rs Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants