Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughIntroduces job scheduling metadata and queue pickup latency metrics. Jobs gain an Changes
Sequence Diagram(s)sequenceDiagram
participant Job Producer
participant Job Storage
participant Worker
participant Metrics
Job Producer->>Job Producer: Create Job
Job Producer->>Job Producer: Call with_scheduled_on(timestamp)
Job Producer->>Job Storage: Store job with available_at set
Note over Job Storage: Job remains queued until available_at passes
Worker->>Job Storage: Poll for available jobs
Job Storage->>Worker: Return job with available_at
Note over Worker: Calculate latency<br/>(now - available_at)
Worker->>Metrics: observe_queue_pickup_latency(queue_type, backend, latency)
Metrics->>Metrics: Record to QUEUE_PICKUP_LATENCY histogram
Worker->>Worker: Process job
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Pull request overview
Adds a new Prometheus histogram to measure queue pickup latency (time from message/job creation/scheduled-availability to consumer pickup) across both queue backends (SQS + Redis/Apalis), with logic intended to exclude intentional scheduling delay.
Changes:
- Add
queue_pickup_latency_secondshistogram andobserve_queue_pickup_latency()helper. - Instrument SQS worker to compute pickup latency using
target_scheduled_on(preferred) or SQSSentTimestamp(fallback), only on first delivery. - Add
available_attoJob<T>and populate it from producers so Redis workers can compute pickup latency excluding scheduling delay; instrument Redis worker adapters.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
src/queues/sqs/worker.rs |
Requests SentTimestamp and observes pickup latency on first SQS delivery (scheduled baseline preferred). |
src/queues/redis/worker.rs |
Adds Redis/Apalis pickup-latency observation helper and wires it into each queue handler adapter; splits status handlers by network. |
src/metrics/mod.rs |
Defines/registers queue_pickup_latency_seconds histogram and exposes an observation helper + tests. |
src/jobs/job.rs |
Adds available_at field and builder method to persist intended availability time for scheduled jobs. |
src/jobs/job_producer.rs |
Populates available_at via .with_scheduled_on(...) and extends tests to verify it’s persisted for scheduled jobs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report❌ Patch coverage is
Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## main #745 +/- ##
==========================================
- Coverage 90.22% 90.17% -0.06%
==========================================
Files 290 290
Lines 122082 122317 +235
==========================================
+ Hits 110153 110300 +147
- Misses 11929 12017 +88
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/jobs/job.rs (1)
48-51: Add a doc comment towith_scheduled_on.This is now part of the public scheduling/metrics contract, so it should spell out the units and serialization expectations explicitly.
As per coding guidelines, "Include relevant doc comments (///) on public functions, structs, and modules."💡 Suggested doc comment
+ /// Persists the scheduled availability time as a Unix epoch in seconds. + /// Consumers use it to exclude intentional scheduling delay from pickup latency. pub fn with_scheduled_on(mut self, scheduled_on: Option<i64>) -> Self {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/jobs/job.rs` around lines 48 - 51, Add a public doc comment (///) to the method with_scheduled_on explaining the expected units and format of the scheduled_on parameter and how it is serialized to available_at: state that scheduled_on is an Option<i64> representing an epoch timestamp (explicitly state whether seconds or milliseconds), that Some(ts) will be converted to a decimal string via ts.to_string() and stored in available_at, and that None clears/unsets available_at; include a short usage example and any downstream contract expectations (e.g., timezone/UTC) in the doc comment for the public scheduling/metrics contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/jobs/job.rs`:
- Around line 24-29: available_at is currently documented/serialized as epoch
seconds which forces second-only resolution for queue_pickup_latency_seconds;
change available_at to carry millisecond precision and propagate that precision
through the consumer path that computes queue_pickup_latency_seconds and any
comparisons against timestamp. Concretely: update the Job struct field
available_at to an integer millisecond epoch type (e.g. Option<i64> or
Option<i128>) and adjust serde (and the field doc) to serialize/deserialize
milliseconds, update the consumer code that reads available_at and the existing
timestamp handling so both use millisecond-resolution arithmetic when computing
queue_pickup_latency_seconds, and ensure any Redis serialization/deserialization
paths that consume available_at accept the millisecond format.
In `@src/queues/redis/worker.rs`:
- Around line 67-84: The helper observe_redis_pickup_latency currently ignores
Apalis attempt count and records latency on retries; modify it to accept an
attempt parameter (e.g., attempt: u32) and return early when attempt != 0 so
only the first attempt emits a sample, and update all call-sites that invoke
observe_redis_pickup_latency to pass the job attempt (from Apalis) into the new
parameter; keep the function name observe_redis_pickup_latency and ensure the
early-return check is performed before parsing timestamps or calling
observe_queue_pickup_latency.
In `@src/queues/sqs/worker.rs`:
- Around line 759-772: The gate in queue_pickup_baseline_ms incorrectly uses
ApproximateReceiveCount (receive_count) to detect a first logical pickup;
instead, change the logic to detect first pickup by checking the message's
logical retry state (inspect the message body/attributes for retry_attempt or
equivalent) and scheduling state (use parse_target_scheduled_on to detect
scheduled messages) rather than only receive_count; update
queue_pickup_baseline_ms to return a baseline when either the message has no
retry_attempt (or retry_attempt == 0) and parse_target_scheduled_on is present
or when SentTimestamp should be used for non-scheduled first pickups, and ensure
you don’t treat re-enqueued retry messages as first pickups; add regression
tests covering (1) standard queue retry messages that are re-enqueued with
retry_attempt set and should not be treated as first pickups, and (2) scheduled
FIFO messages whose visibility-timeout increments receive_count but whose first
eligible pickup must still be recognized via scheduling state (use the
parse_target_scheduled_on and MessageSystemAttributeName::SentTimestamp paths to
assert correct behavior).
---
Nitpick comments:
In `@src/jobs/job.rs`:
- Around line 48-51: Add a public doc comment (///) to the method
with_scheduled_on explaining the expected units and format of the scheduled_on
parameter and how it is serialized to available_at: state that scheduled_on is
an Option<i64> representing an epoch timestamp (explicitly state whether seconds
or milliseconds), that Some(ts) will be converted to a decimal string via
ts.to_string() and stored in available_at, and that None clears/unsets
available_at; include a short usage example and any downstream contract
expectations (e.g., timezone/UTC) in the doc comment for the public
scheduling/metrics contract.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 797e70e6-2327-424f-a7a5-fbb3ad78f65c
📒 Files selected for processing (5)
src/jobs/job.rssrc/jobs/job_producer.rssrc/metrics/mod.rssrc/queues/redis/worker.rssrc/queues/sqs/worker.rs
Summary
backends (SQS/Redis)
Testing Process
Checklist
Note
If you are using Relayer in your stack, consider adding your team or organization to our list of Relayer Users in the Wild!
Summary by CodeRabbit
New Features
Tests