Conversation
📝 WalkthroughWalkthroughThis PR transforms a monolithic Rust project into a modular workspace architecture with five specialized crates. It introduces payment webhook normalization, Redis-backed event queueing, HTTP backend communication, Docker containerization, and supporting infrastructure, enabling webhook ingestion, asynchronous event processing, and transaction forwarding. Changes
Sequence DiagramsequenceDiagram
participant Client
participant WebhookServer as Webhook Server<br/>(Axum)
participant Queue as Event Queue<br/>(Redis)
participant Worker
participant BackendClient as Backend Client<br/>(HTTP)
participant Backend as Backend API
Client->>WebhookServer: POST /webhook<br/>(JSON payload)
WebhookServer->>WebhookServer: normalize_webhook_payload()<br/>(extract provider)
alt Normalization Success
WebhookServer->>Queue: publish(TransactionEvent)
Queue-->>WebhookServer: OK
WebhookServer-->>Client: 200 OK
else Normalization Failure
WebhookServer-->>Client: 400 Bad Request
end
Worker->>Queue: consume()
Queue-->>Worker: TransactionEvent
Worker->>BackendClient: send_transaction(event)
BackendClient->>Backend: POST /webhook<br/>(serialized payload)
Backend-->>BackendClient: 200 OK
BackendClient-->>Worker: OK
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 13
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
🟡 Minor comments (8)
.githooks/pre-commit-1-5 (1)
1-5:⚠️ Potential issue | 🟡 MinorHook activation is undocumented — developers won't benefit from it without manual setup.
Git doesn't use
.githooks/automatically; each developer must run:git config core.hooksPath .githooksThis step should be mentioned in the README (e.g., in a "Development Setup" section) or in a one-time
make setup/scripts/setup.shtarget, otherwise the hook silently does nothing for anyone who doesn't know to configure it.Would you like me to open a new issue to track adding a setup script or README section that configures
core.hooksPath?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.githooks/pre-commit around lines 1 - 5, The pre-commit hook script (REPO_ROOT variable in the script) will not run for developers unless core.hooksPath is configured; add a documented setup step and/or an automated setup target that runs git config core.hooksPath .githooks so the hook is activated. Update the repository README (Development Setup) to describe the one-time command and add a new make target or scripts/setup.sh that executes git config core.hooksPath .githooks (and optionally checks/creates the .githooks directory), and mention this change near any existing contributor/dev onboarding notes..env.example-1-5 (1)
1-5:⚠️ Potential issue | 🟡 MinorTwo
.env.exampledefaults may trip up local developers.
APP_ENV=staging— Developers who copy this file and run locally without changingAPP_ENVwill unknowingly point at a live staging environment. Adevelopmentorlocaldefault is safer.REDIS_URL=redis://redis:6379— The hostnameredisis the Docker Compose service name and won't resolve outside a Compose network. Add a comment indicating the local equivalent (redis://localhost:6379).📝 Suggested fix
-APP_ENV=staging +APP_ENV=development BACKEND_STAGING_URL=https://staging.ourpocket.com BACKEND_PROD_URL=https://api.ourpocket.com -REDIS_URL=redis://redis:6379 +# For Docker Compose: redis://redis:6379 +REDIS_URL=redis://localhost:6379 REDIS_QUEUE_KEY=ourpocket:transactions🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.env.example around lines 1 - 5, Change the potentially dangerous defaults in .env.example: set APP_ENV to a local-safe value like "development" (instead of "staging") so local copies don't point at staging, and update the REDIS_URL entry to include a clarifying inline comment that the Compose service name "redis" only resolves in Docker (e.g., indicate the local equivalent is "redis://localhost:6379"); keep the REDIS_QUEUE_KEY as-is but ensure the comment is adjacent to REDIS_URL so developers see the intended local override.README.md-12-12 (1)
12-12:⚠️ Potential issue | 🟡 MinorInaccurate description — coverage script fails (not skips) when tool is missing.
The phrasing "if installed" implies a graceful no-op, but
scripts/coverage.shactually exits with a non-zero error code whencargo-llvm-covis absent.📝 Suggested wording fix
- `./scripts/coverage.sh` runs coverage with `cargo llvm-cov` if installed. + `./scripts/coverage.sh` runs coverage with `cargo llvm-cov` (requires `cargo install cargo-llvm-cov`).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@README.md` at line 12, The README line incorrectly implies ./scripts/coverage.sh is a no-op when cargo-llvm-cov is absent; update the wording to state the script requires cargo-llvm-cov and will exit with a non-zero error if the tool is not installed (referencing ./scripts/coverage.sh and cargo-llvm-cov) — e.g., change "runs coverage with `cargo llvm-cov` if installed" to a sentence that clearly says the script requires `cargo-llvm-cov` and will fail/exit with an error when it is missing.crates/webhook-server/Cargo.toml-1-4 (1)
1-4:⚠️ Potential issue | 🟡 MinorSet
rust-version = "1.85"in Cargo.toml per Rust 2024 edition migration guidance.While the current Docker image (
rust:slim-trixie, Rust 1.93.1) already exceeds the minimum requirement foredition = "2024"(Rust 1.85.0+), the Rust project recommends explicitly settingrust-version = "1.85"inCargo.tomlto document the MSRV and prevent accidental use of older toolchains. Additionally, consider pinning the Docker base image tag (e.g.,rust:1.93.1-slim-trixieinstead of the floatingrust:slim-trixie) for reproducible builds.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/webhook-server/Cargo.toml` around lines 1 - 4, Add the minimum supported Rust version to Cargo.toml by adding rust-version = "1.85" inside the [package] table (alongside name, version, edition) so the manifest documents the MSRV; also update any Docker base-image references elsewhere to a pinned tag (e.g., change rust:slim-trixie to rust:1.93.1-slim-trixie) for reproducible builds.crates/domain/src/paystack.rs-56-87 (1)
56-87:⚠️ Potential issue | 🟡 MinorTest uses a string
"id"value, masking the real-world type mismatch.The test payload has
"id": "cust-psk-1"(string), but real Paystack webhooks send"id": 46123456(integer). Add a test with a numericcustomer.idto verify the production path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/domain/src/paystack.rs` around lines 56 - 87, Add a new test variant to validate numeric customer IDs by copying or extending normalize_paystack_basic_payload to pass a payload where "customer": { "id": 46123456 } (an integer) and asserting normalize_paystack_payload returns the same event fields (transaction_ref, user_id, application_id, status, amount, category) while ensuring user_id handling in normalize_paystack_payload accepts numeric IDs (convert to string if necessary). Target the test function name (e.g., normalize_paystack_numeric_customer_id) and the normalize_paystack_payload function to verify/adjust parsing logic so it correctly handles integer customer.id values.crates/domain/src/paystack.rs-11-15 (1)
11-15:⚠️ Potential issue | 🟡 MinorThe fallback
data["data"]["reference"]is unreachable.
datais already resolved fromjson.get("data")(Line 9), sodata["data"]["reference"]looks upjson["data"]["data"]["reference"]— a path that doesn't exist in Paystack payloads. This fallback will never match.Proposed fix — remove the dead fallback or replace with the intended path
let transaction_ref = data["reference"] .as_str() - .or_else(|| data["data"]["reference"].as_str()) .unwrap_or("") .to_string();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/domain/src/paystack.rs` around lines 11 - 15, The fallback data["data"]["reference"] in the transaction_ref extraction is unreachable because data is already json.get("data"); remove that dead fallback and instead try the top-level reference field from the original JSON (e.g., check json["reference"] as a second option) or simply drop the extra lookup; update the transaction_ref construction (the let transaction_ref = ... expression) to use data["reference"].as_str().or_else(|| json["reference"].as_str()).unwrap_or("").to_string() (or just data["reference"].as_str().unwrap_or("").to_string() if you prefer to remove the top-level check).crates/domain/src/transaction.rs-14-26 (1)
14-26:⚠️ Potential issue | 🟡 Minor
normalize_payloadis exported but unused in the webhook dispatch flow.
normalize_webhook_payloadinlib.rsdispatches tonormalize_flutterwave_payloadornormalize_paystack_payload— it never callsnormalize_payload. The default case returns an error instead of falling back to this function. If this is intended as a generic/fallback normalizer, wire it as the default case in the match. Otherwise, remove the function and its re-export, as it's only referenced in its own test.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/domain/src/transaction.rs` around lines 14 - 26, The exported function normalize_payload is never used by normalize_webhook_payload (which only dispatches to normalize_flutterwave_payload or normalize_paystack_payload) so either wire normalize_payload as the default/fallback normalizer in the match inside normalize_webhook_payload in lib.rs or remove normalize_payload and its re-export and associated test; to wire it, update the default arm of the match in normalize_webhook_payload to call normalize_payload(raw) (ensure signature matches and import visibility), otherwise delete the normalize_payload function and its pub re-export so there are no unused exports or dead tests referencing it.crates/queue/src/lib.rs-158-202 (1)
158-202:⚠️ Potential issue | 🟡 MinorRedis test silently skips all post-connection failures, masking real bugs.
The early-return pattern at lines 185–199 means that if
publishorconsumereturns an error for any reason other than Redis unavailability (e.g., serialization failure, wrong key, BRPOP error), the test silently passes without reaching theassert_eq!. Only the initial availability probes (lines 164–172) should return early; subsequent steps should useexpectorunwrap:💡 Suggested fix
- if queue1.publish(event.clone()).await.is_err() { - return; - } + queue1.publish(event.clone()).await.expect("publish should succeed"); drop(queue1); let queue2 = match RedisQueue::new(&redis_url, queue_key) { Ok(q) => q, Err(_) => return, }; - let consumed = match queue2.consume().await { - Ok(ev) => ev, - Err(_) => return, - }; + let consumed = queue2.consume().await.expect("consume should succeed");
🧹 Nitpick comments (12)
docker-compose.yml (1)
25-28: Consider adding a healthcheck for Redis to ensure dependent services start only when Redis is ready.
depends_ononly waits for the container to start, not for Redis to be accepting connections. A healthcheck avoids race conditions on startup.♻️ Proposed improvement
redis: image: redis:7-alpine ports: - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5Then update
depends_onin webhook-server and worker to use condition:depends_on: redis: condition: service_healthy🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose.yml` around lines 25 - 28, Add a Docker healthcheck for the redis service in docker-compose.yml so other services wait for Redis to accept connections; update the redis service block (service name "redis") to include a healthcheck using a small command like "redis-cli ping" with sensible interval/retries/timeouts, and then change the depends_on clauses in the "webhook-server" and "worker" service definitions to use condition: service_healthy for redis so they only start when the redis healthcheck passes.scripts/start-all.sh (1)
4-8: Add a signal trap to clean up child processes.If the script is interrupted (Ctrl-C) or terminated, the backgrounded
cargo runprocesses will be orphaned. Add atrapto forward signals.♻️ Proposed fix
cd "$(dirname "$0")/.." + +cleanup() { + kill "$SERVER_PID" "$WORKER_PID" 2>/dev/null + wait "$SERVER_PID" "$WORKER_PID" 2>/dev/null +} +trap cleanup EXIT INT TERM + cargo run -p webhook-server & SERVER_PID=$! cargo run -p worker & WORKER_PID=$! wait "$SERVER_PID" "$WORKER_PID"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/start-all.sh` around lines 4 - 8, Add a signal trap that forwards termination signals to the background cargo processes and ensures they are cleaned up: after launching the processes and setting SERVER_PID and WORKER_PID, define a cleanup function that checks those variables and sends a TERM (and if necessary a KILL) to "$SERVER_PID" and "$WORKER_PID" and then waits for them, and register that function with trap for INT, TERM, and EXIT so signals (Ctrl-C) are forwarded and children are not orphaned; update the script around the SERVER_PID/WORKER_PID logic and the final wait call to use this cleanup trap.crates/rpc-client/Cargo.toml (1)
6-15: Consider removingasync-trait—native async traits are sufficient for this codebase.Since Rust 1.75+,
async fnin traits is natively supported in all contexts. Theasync-traitcrate is required only fordyn-dispatched trait objects. TheBackendClienttrait is used exclusively via static dispatch (generic bounds likeC: BackendClient), never asBox<dyn>or trait objects. With Rust 2024 edition, you can replace the#[async_trait]macros on the trait definition and implementations with native async syntax—no functional change, just simpler code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/rpc-client/Cargo.toml` around lines 6 - 15, Remove the unnecessary async-trait dependency from crates/rpc-client/Cargo.toml and replace all uses of the #[async_trait] macro on the BackendClient trait and its implementations with native async trait syntax: remove the attribute, declare async fns directly in the trait (async fn ...) and in each impl use native async fn implementations for the same method names; ensure there are no Box<dyn BackendClient> usages (only generic C: BackendClient) before removing the crate.crates/rpc-client/src/lib.rs (1)
57-66:TransactionRequestDtoduplicatesTransactionEventfield-for-field.
TransactionEventalready derivesSerialize. Unless the wire format intentionally diverges from the domain model, you can serializeTransactionEventdirectly and remove the DTO + manual mapping.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/rpc-client/src/lib.rs` around lines 57 - 66, TransactionRequestDto duplicates TransactionEvent field-for-field; remove TransactionRequestDto and stop manual mapping by serializing TransactionEvent directly wherever TransactionRequestDto is constructed or passed (search for uses of TransactionRequestDto and the mapping code that builds it), ensure TransactionEvent derives Serialize (it already does) and update any function signatures or RPC payload types expecting TransactionRequestDto to accept/serialize TransactionEvent instead, adjusting tests and callers accordingly.crates/domain/src/lib.rs (2)
9-11: JSON is parsed twice — once here for provider routing, then again inside each normalizer.Both
normalize_webhook_payloadand the provider-specific functions callserde_json::from_str(raw). Consider refactoring the normalizers to accept a pre-parsedserde_json::Valueto avoid the redundant parse. Not urgent, but worth a follow-up if payloads are large.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/domain/src/lib.rs` around lines 9 - 11, The code currently parses the raw webhook JSON twice: in normalize_webhook_payload and again inside each provider-specific normalizer; refactor by parsing raw once into a serde_json::Value in normalize_webhook_payload and change provider normalizer function signatures (e.g., any functions named like normalize_{provider}_payload or similar) to accept a &serde_json::Value (or owned Value) instead of &str, then call those normalizers with the pre-parsed Value and remove their internal serde_json::from_str calls; update all callers (including normalize_webhook_payload’s provider routing) to pass the Value and adjust error handling/return types accordingly.
9-17: Include the unrecognized provider value in the error for debuggability.When provider detection fails, the error
"unsupported provider"gives no hint about what was actually received. This makes troubleshooting bad payloads harder.Proposed fix
- _ => Err(anyhow::Error::msg("unsupported provider")), + other => Err(anyhow::anyhow!("unsupported provider: {other:?}")),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/domain/src/lib.rs` around lines 9 - 17, The error returned from normalize_webhook_payload when the provider is not matched should include the actual provider string for debugging; update the final match arm in normalize_webhook_payload to return an anyhow error that interpolates the provider value (use the existing provider variable, which is lowercased) into the message instead of the static "unsupported provider" text so logs show e.g. "unsupported provider: {provider}".crates/domain/src/transaction.rs (1)
3-12:f64for monetaryamountcan lose precision on large values.Floating-point cannot represent all decimal currency amounts exactly. For a pass-through relay this is likely acceptable, but if any arithmetic (discounts, fees, rounding) is added later, values like
0.1or amounts exceeding 2^53 will be imprecise. Consider using an integer (cents/kobo) or a decimal type if exact monetary arithmetic is anticipated.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/domain/src/transaction.rs` around lines 3 - 12, The TransactionEvent struct uses f64 for the amount field which can lose precision for currency; change the type of amount in TransactionEvent to an exact representation (e.g., i64 representing minor units/cents or rust_decimal::Decimal) and update serializers/deserializers and any callers/converters that construct or consume TransactionEvent to use the new type; specifically, modify the amount field on TransactionEvent, adjust serde annotations or feature flags if using rust_decimal (e.g., serde feature) or ensure conversion helpers exist where amounts are produced/parsed so existing code compiles and preserves exact monetary values.crates/domain/src/flutterwave.rs (1)
6-50: Consider extracting shared normalization logic between Flutterwave and Paystack.Both normalizers follow an identical structure: parse JSON → resolve
data→ extract fields with fallbacks → buildTransactionEvent. The only differences are the field names. A shared helper (or a config-driven normalizer) could reduce duplication. Not blocking, but worth considering as more providers are added.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/domain/src/flutterwave.rs` around lines 6 - 50, The Flutterwave normalizer (normalize_flutterwave_payload) duplicates logic used by the Paystack normalizer: parse JSON, resolve `data`, extract fields with fallbacks, and construct TransactionEvent; refactor by extracting shared helpers (e.g., a resolve_data(&Value) -> &Value, extract_str_field(&Value, keys: &[&str]) -> String, extract_f64_field(&Value, keys: &[&str]) -> f64) and a small builder function that takes the resolved data and a mapping of provider-specific keys to populate TransactionEvent; then update normalize_flutterwave_payload (and the Paystack normalizer) to use these helpers to reduce duplication while preserving the same fallbacks and use of TransactionEvent and metadata.crates/domain/src/paystack.rs (1)
31-34:as_f64()already handles JSON integers — theor_elsewithas_i64is redundant.
serde_json::Value::as_f64()returnsSomefor both JSON floats and integers that fit inf64. Theor_elsebranch is dead code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/domain/src/paystack.rs` around lines 31 - 34, The amount extraction uses redundant branches because serde_json::Value::as_f64() already returns Some for JSON integers; remove the unnecessary or_else(...) chain and simplify the amount assignment to use data["amount"].as_f64().unwrap_or(0.0) (referencing the amount variable in paystack.rs) so the dead-code path with as_i64() is eliminated.crates/webhook-server/src/main.rs (1)
41-57: Error context is silently discarded — add logging before mapping to status codes.Both the normalization error and the publish error are mapped to bare status codes with
map_err(|_| ...). In production, you'll have no visibility into why requests are failing. Log the error before converting.Proposed improvement
- let event = normalize_webhook_payload(&body).map_err(|_| StatusCode::BAD_REQUEST)?; + let event = normalize_webhook_payload(&body).map_err(|err| { + eprintln!("webhook normalization error: {err:#}"); + StatusCode::BAD_REQUEST + })?; state .queue .publish(event) .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + .map_err(|err| { + eprintln!("queue publish error: {err:#}"); + StatusCode::INTERNAL_SERVER_ERROR + })?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/webhook-server/src/main.rs` around lines 41 - 57, The webhook_handler currently discards error details by using map_err(|_| ...) for normalize_webhook_payload and state.queue.publish; update webhook_handler to capture the error values from normalize_webhook_payload(...) and state.queue.publish(...).await, log the error details (e.g., with tracing::error! or log::error!) including contextual info (request body or event type) before converting to StatusCode::BAD_REQUEST or StatusCode::INTERNAL_SERVER_ERROR, and then map or return the appropriate status code; reference normalize_webhook_payload for the parsing step and state.queue.publish for the publish step so you log the exact errors from those calls.crates/queue/src/lib.rs (2)
43-43:Error::msg(err.to_string())discards the error chain — prefer idiomaticanyhowconversion.This pattern appears throughout the file (lines 43, 58, 68, 79, 86, 106, 108). All the source error types (
redis::RedisError,serde_json::Error,mpsc::SendError) implementstd::error::Error, soanyhowcan wrap them directly without stringifying. Use.map_err(anyhow::Error::from)or simply?in contexts where the function already returnsResult<_, anyhow::Error>:♻️ Example fix (applies to all occurrences)
- let client = redis::Client::open(redis_url).map_err(|err| Error::msg(err.to_string()))?; + let client = redis::Client::open(redis_url).map_err(anyhow::Error::from)?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/queue/src/lib.rs` at line 43, The current calls like redis::Client::open(redis_url).map_err(|err| Error::msg(err.to_string())) (and the similar patterns around serde_json::from_str and tx.send) convert errors to strings and drop the original error chain; replace these with idiomatic anyhow propagation—either drop the map_err and use `?` if the surrounding function returns Result<_, anyhow::Error>, or change the mapper to `.map_err(anyhow::Error::from)` (or `Into::into`) so the original error type (e.g., redis::RedisError, serde_json::Error, mpsc::SendError) is preserved; update calls in the code paths that construct the redis client (redis::Client::open), JSON parsing (serde_json::from_str), and channel sends (tx.send) accordingly.
9-17: Consider replacing#[async_trait]with nativeasync fnin traits (Rust ≥ 1.75).
async_traitallocates aBox<dyn Future>on every call. Since Rust 1.75 (December 28, 2023),async fnin traits is stable natively through return-positionimpl Traitin traits (RPITIT), eliminating that overhead and simplifying the code.This requires removing the
async_traitcrate dependency and the six#[async_trait]attributes (on trait definitions at lines 9, 14 and trait implementations at lines 52, 62, 72, 92), while keeping the async method signatures unchanged.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/queue/src/lib.rs` around lines 9 - 17, Remove the #[async_trait] usages and dependency and convert the traits and their impls to use native async trait methods: keep the async method signatures for EventQueue::publish and EventQueueConsumer::consume but delete the #[async_trait] attribute on the trait definitions and on all impl blocks that implement those traits (i.e., remove the six #[async_trait] occurrences mentioned), delete the async_trait crate from Cargo.toml, and ensure the impls implement async fn publish(...) and async fn consume(...) directly (toolchain >= 1.75).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.githooks/pre-commit:
- Around line 17-18: Replace the auto-staging step that uses "git add -u" with a
formatter check so we don't clobber a developer's intentional partial staging:
remove the "git add -u" invocation in the pre-commit script and instead run
"cargo fmt --all" in check mode (e.g., pass the -- --check flag to cargo fmt)
and fail the hook when formatting is required so the developer can re-stage
explicitly; update the pre-commit flow around the existing cargo fmt invocation
to exit non-zero on formatting issues and print instructions to run cargo fmt
manually.
In `@crates/domain/src/paystack.rs`:
- Around line 17-21: The code currently uses data["customer"]["id"].as_str()
which returns None for numeric IDs; update the user_id extraction to first try
as_str(), then try as_i64()/as_u64() (or as_u64 then as_i64) and convert that
numeric value to a string, and only then fall back to
data["customer"]["email"].as_str(); apply the same change for the analogous
user_id extraction in flutterwave.rs so numeric customer IDs are preserved.
In `@crates/queue/Cargo.toml`:
- Line 12: The redis dependency is pinned to an old 0.25 release; bump it to a
modern 1.0.x (recommend "1.0.3") in Cargo.toml so you get recent fixes and
security updates—no code changes expected since the code already uses modern
async APIs like get_multiplexed_async_connection() and query_async(), so simply
update the redis version string to 1.0.3 in the crate's dependencies entry.
In `@crates/queue/src/lib.rs`:
- Around line 36-50: RedisQueue currently opens a new TCP connection on every
publish/consume because it only stores redis::Client; change RedisQueue to cache
a reusable async connection (e.g., store redis::aio::ConnectionManager or a
redis::aio::MultiplexedConnection) as a field (replace or add a conn field)
created once in RedisQueue::new, then update EventQueue::publish to use
self.conn.clone() instead of calling get_multiplexed_async_connection(), and
ensure EventQueueConsumer::consume uses a dedicated non-multiplexed blocking
connection or a separate connection manager for blocking BRPOP to avoid
exhausting Redis server connections.
- Around line 92-111: In RedisQueue::consume (EventQueueConsumer impl) switch
from a multiplexed connection to a dedicated non-multiplexed connection by using
client.get_async_connection() instead of get_multiplexed_async_connection(), and
change the BRPOP call to use a finite timeout (e.g., arg(5) instead of arg(0))
so the blocking call returns periodically and the caller (worker_loop) can
observe shutdown/cancellation; keep the existing error mapping/serde logic but
acquire and use the non-multiplexed connection for the blocking BRPOP.
In `@crates/rpc-client/src/lib.rs`:
- Line 83: The HTTP POST currently ignores HTTP error statuses because
send().await? only fails on network errors; in the method containing
self.http_client.post(url).json(&payload).send().await? capture the response
into a variable, call response.error_for_status()? to convert 4xx/5xx into an
Err, and propagate that error (e.g., using ?). Update the code around the
self.http_client.post(...).json(&payload).send().await? expression to perform
this additional check so HTTP error responses are not treated as success.
- Around line 48-54: The HttpBackendClient::new currently constructs
reqwest::Client with reqwest::Client::new(), which leaves no request timeout and
can hang; update HttpBackendClient::new to build the client with a default
timeout (e.g., use
reqwest::Client::builder().timeout(Duration::from_secs(10)).build().unwrap() or
return a Result) so http_client has a finite timeout, and ensure you import
std::time::Duration and handle build() errors appropriately; reference
HttpBackendClient::new, http_client, and reqwest::Client::builder() when making
the change.
- Around line 93-128: Tests app_config_selects_environment_from_env and
backend_url_reads_from_env_with_fallbacks mutate process-wide env via
std::env::set_var/remove_var, which is unsafe for parallel tests; fix by
serializing or avoiding env mutation: annotate the two test functions
(app_config_selects_environment_from_env and
backend_url_reads_from_env_with_fallbacks) with a serializing attribute from the
serial_test crate (e.g., #[serial]) or refactor to call
AppConfig::from_env()/AppConfig::backend_url() with injected config rather than
using std::env::set_var/remove_var, and remove the unsafe blocks around env
manipulation if you choose serialization or eliminate env mutation entirely.
In `@crates/webhook-server/src/main.rs`:
- Around line 41-57: The webhook_handler currently accepts any POST body; add
signature verification before calling normalize_webhook_payload and publishing
to state.queue: extract provider headers (e.g., "X-Paystack-Signature" and
"verif-hash") from the incoming request, retrieve the corresponding secret from
the AppState (e.g., paystack_secret/flutterwave_secret on State(AppState<Q>)),
compute the HMAC (Paystack uses HMAC-SHA512 over the raw body) and perform a
constant-time comparison with the header value, returning
StatusCode::UNAUTHORIZED on mismatch or missing header; only then call
normalize_webhook_payload and state.queue.publish(event). Ensure you reference
webhook_handler, normalize_webhook_payload, and state.queue.publish when making
changes.
In `@crates/worker/Cargo.toml`:
- Line 4: The crate uses edition = "2024" in Cargo.toml but the repository lacks
a pinned toolchain; add a rust-toolchain.toml at the repo root containing
channel = "1.85" (or newer) to ensure Rust 1.85+ is used for builds and CI, so
tools compiling code tied to edition = "2024" will not fail on older compilers.
In `@crates/worker/src/main.rs`:
- Around line 23-32: The worker_loop currently returns on any error from
queue.consume() or backend_client.send_transaction(), so change it to catch
errors, log them (using the crate's logger, e.g., tracing::error/warn), and
retry transient failures with exponential backoff (use tokio::time::sleep,
doubling delay up to a cap and a configurable max_retries), while continuing the
outer loop after backoff instead of returning; keep a separate short-circuit for
non-transient/fatal errors if your EventQueueConsumer or BackendClient exposes
error kinds (match on error type or provide is_transient()), and only propagate
or terminate on those fatal cases. Ensure to reference worker_loop,
queue.consume, and backend_client.send_transaction when adding the retry/backoff
and logging.
In `@docker-compose.yml`:
- Around line 1-11: The docker-compose service "webhook-server" lacks a
dependency on Redis, which can cause it to start before Redis is ready and lead
to connection failures in the Redis-backed queue (RedisQueue); update the
"webhook-server" service to include a depends_on entry referencing the Redis
service (e.g., add depends_on: - redis) so Docker Compose will start Redis
before webhook-server and prevent race conditions during startup.
In `@Dockerfile`:
- Around line 1-25: The Dockerfile hardcodes the binary name ("webhook-server")
and never declares the build arg BIN_NAME, causing services that pass BIN_NAME
(e.g., worker) to build/run the wrong binary; add ARG BIN_NAME with a default
(e.g., webhook-server) and replace occurrences of "webhook-server" in the cargo
build command (-p webhook-server), the copy source path
(/app/target/release/webhook-server), and the ENTRYPOINT (currently
/usr/local/bin/webhook-app) so they use the BIN_NAME value and keep the final
copied file/name consistent (e.g., copy from /app/target/release/${BIN_NAME} to
/usr/local/bin/${BIN_NAME} and set ENTRYPOINT to that path).
---
Duplicate comments:
In `@crates/domain/src/flutterwave.rs`:
- Around line 17-21: The current user_id extraction uses as_str() on
data["customer"]["id"] which returns None for numeric IDs; update the user_id
assignment (the user_id variable that reads data["customer"]["id"] and
data["customer"]["email"]) to first attempt to get the id as a number (e.g.,
as_i64/as_u64) and convert it to a string, then fall back to
data["customer"]["email"].as_str(), and finally default to "" — mirror the same
fix applied in paystack.rs so numeric customer.id values are handled correctly.
---
Nitpick comments:
In `@crates/domain/src/flutterwave.rs`:
- Around line 6-50: The Flutterwave normalizer (normalize_flutterwave_payload)
duplicates logic used by the Paystack normalizer: parse JSON, resolve `data`,
extract fields with fallbacks, and construct TransactionEvent; refactor by
extracting shared helpers (e.g., a resolve_data(&Value) -> &Value,
extract_str_field(&Value, keys: &[&str]) -> String, extract_f64_field(&Value,
keys: &[&str]) -> f64) and a small builder function that takes the resolved data
and a mapping of provider-specific keys to populate TransactionEvent; then
update normalize_flutterwave_payload (and the Paystack normalizer) to use these
helpers to reduce duplication while preserving the same fallbacks and use of
TransactionEvent and metadata.
In `@crates/domain/src/lib.rs`:
- Around line 9-11: The code currently parses the raw webhook JSON twice: in
normalize_webhook_payload and again inside each provider-specific normalizer;
refactor by parsing raw once into a serde_json::Value in
normalize_webhook_payload and change provider normalizer function signatures
(e.g., any functions named like normalize_{provider}_payload or similar) to
accept a &serde_json::Value (or owned Value) instead of &str, then call those
normalizers with the pre-parsed Value and remove their internal
serde_json::from_str calls; update all callers (including
normalize_webhook_payload’s provider routing) to pass the Value and adjust error
handling/return types accordingly.
- Around line 9-17: The error returned from normalize_webhook_payload when the
provider is not matched should include the actual provider string for debugging;
update the final match arm in normalize_webhook_payload to return an anyhow
error that interpolates the provider value (use the existing provider variable,
which is lowercased) into the message instead of the static "unsupported
provider" text so logs show e.g. "unsupported provider: {provider}".
In `@crates/domain/src/paystack.rs`:
- Around line 31-34: The amount extraction uses redundant branches because
serde_json::Value::as_f64() already returns Some for JSON integers; remove the
unnecessary or_else(...) chain and simplify the amount assignment to use
data["amount"].as_f64().unwrap_or(0.0) (referencing the amount variable in
paystack.rs) so the dead-code path with as_i64() is eliminated.
In `@crates/domain/src/transaction.rs`:
- Around line 3-12: The TransactionEvent struct uses f64 for the amount field
which can lose precision for currency; change the type of amount in
TransactionEvent to an exact representation (e.g., i64 representing minor
units/cents or rust_decimal::Decimal) and update serializers/deserializers and
any callers/converters that construct or consume TransactionEvent to use the new
type; specifically, modify the amount field on TransactionEvent, adjust serde
annotations or feature flags if using rust_decimal (e.g., serde feature) or
ensure conversion helpers exist where amounts are produced/parsed so existing
code compiles and preserves exact monetary values.
In `@crates/queue/src/lib.rs`:
- Line 43: The current calls like redis::Client::open(redis_url).map_err(|err|
Error::msg(err.to_string())) (and the similar patterns around
serde_json::from_str and tx.send) convert errors to strings and drop the
original error chain; replace these with idiomatic anyhow propagation—either
drop the map_err and use `?` if the surrounding function returns Result<_,
anyhow::Error>, or change the mapper to `.map_err(anyhow::Error::from)` (or
`Into::into`) so the original error type (e.g., redis::RedisError,
serde_json::Error, mpsc::SendError) is preserved; update calls in the code paths
that construct the redis client (redis::Client::open), JSON parsing
(serde_json::from_str), and channel sends (tx.send) accordingly.
- Around line 9-17: Remove the #[async_trait] usages and dependency and convert
the traits and their impls to use native async trait methods: keep the async
method signatures for EventQueue::publish and EventQueueConsumer::consume but
delete the #[async_trait] attribute on the trait definitions and on all impl
blocks that implement those traits (i.e., remove the six #[async_trait]
occurrences mentioned), delete the async_trait crate from Cargo.toml, and ensure
the impls implement async fn publish(...) and async fn consume(...) directly
(toolchain >= 1.75).
In `@crates/rpc-client/Cargo.toml`:
- Around line 6-15: Remove the unnecessary async-trait dependency from
crates/rpc-client/Cargo.toml and replace all uses of the #[async_trait] macro on
the BackendClient trait and its implementations with native async trait syntax:
remove the attribute, declare async fns directly in the trait (async fn ...) and
in each impl use native async fn implementations for the same method names;
ensure there are no Box<dyn BackendClient> usages (only generic C:
BackendClient) before removing the crate.
In `@crates/rpc-client/src/lib.rs`:
- Around line 57-66: TransactionRequestDto duplicates TransactionEvent
field-for-field; remove TransactionRequestDto and stop manual mapping by
serializing TransactionEvent directly wherever TransactionRequestDto is
constructed or passed (search for uses of TransactionRequestDto and the mapping
code that builds it), ensure TransactionEvent derives Serialize (it already
does) and update any function signatures or RPC payload types expecting
TransactionRequestDto to accept/serialize TransactionEvent instead, adjusting
tests and callers accordingly.
In `@crates/webhook-server/src/main.rs`:
- Around line 41-57: The webhook_handler currently discards error details by
using map_err(|_| ...) for normalize_webhook_payload and state.queue.publish;
update webhook_handler to capture the error values from
normalize_webhook_payload(...) and state.queue.publish(...).await, log the error
details (e.g., with tracing::error! or log::error!) including contextual info
(request body or event type) before converting to StatusCode::BAD_REQUEST or
StatusCode::INTERNAL_SERVER_ERROR, and then map or return the appropriate status
code; reference normalize_webhook_payload for the parsing step and
state.queue.publish for the publish step so you log the exact errors from those
calls.
In `@docker-compose.yml`:
- Around line 25-28: Add a Docker healthcheck for the redis service in
docker-compose.yml so other services wait for Redis to accept connections;
update the redis service block (service name "redis") to include a healthcheck
using a small command like "redis-cli ping" with sensible
interval/retries/timeouts, and then change the depends_on clauses in the
"webhook-server" and "worker" service definitions to use condition:
service_healthy for redis so they only start when the redis healthcheck passes.
In `@scripts/start-all.sh`:
- Around line 4-8: Add a signal trap that forwards termination signals to the
background cargo processes and ensures they are cleaned up: after launching the
processes and setting SERVER_PID and WORKER_PID, define a cleanup function that
checks those variables and sends a TERM (and if necessary a KILL) to
"$SERVER_PID" and "$WORKER_PID" and then waits for them, and register that
function with trap for INT, TERM, and EXIT so signals (Ctrl-C) are forwarded and
children are not orphaned; update the script around the SERVER_PID/WORKER_PID
logic and the final wait call to use this cleanup trap.
| echo "[pre-commit] Staging formatted files..." | ||
| git add -u |
There was a problem hiding this comment.
git add -u silently stages all untracked modifications, clobbering intentional partial staging.
After cargo fmt --all runs, git add -u restages every tracked modified file — not just the formatter's changes. If a developer has deliberately left some hunks/files unstaged (to be committed separately), those will be swept into the commit without their consent.
The safer pattern is to fail fast with --check and let the developer re-stage explicitly, rather than auto-staging on their behalf:
🔒 Proposed fix
-echo "[pre-commit] Running cargo fmt..."
-cargo fmt --all
+echo "[pre-commit] Checking cargo fmt..."
+if ! cargo fmt --all -- --check; then
+ echo "[pre-commit] Formatting issues found. Run 'cargo fmt --all' and re-stage your changes."
+ exit 1
+fi
echo "[pre-commit] Running tests..."
if [ -x "./scripts/test.sh" ]; then
./scripts/test.sh
else
cargo test --workspace
fi
-echo "[pre-commit] Staging formatted files..."
-git add -u
-
echo "[pre-commit] All checks passed."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.githooks/pre-commit around lines 17 - 18, Replace the auto-staging step
that uses "git add -u" with a formatter check so we don't clobber a developer's
intentional partial staging: remove the "git add -u" invocation in the
pre-commit script and instead run "cargo fmt --all" in check mode (e.g., pass
the -- --check flag to cargo fmt) and fail the hook when formatting is required
so the developer can re-stage explicitly; update the pre-commit flow around the
existing cargo fmt invocation to exit non-zero on formatting issues and print
instructions to run cargo fmt manually.
| let user_id = data["customer"]["id"] | ||
| .as_str() | ||
| .or_else(|| data["customer"]["email"].as_str()) | ||
| .unwrap_or("") | ||
| .to_string(); |
There was a problem hiding this comment.
customer.id is an integer in real Paystack webhooks — as_str() will always return None.
Paystack's API returns customer.id as a numeric value (e.g., 46123456), not a string. as_str() on a JSON number yields None, so this will silently fall through to the email every time, losing the actual customer ID.
Handle numeric IDs explicitly:
Proposed fix
- let user_id = data["customer"]["id"]
- .as_str()
- .or_else(|| data["customer"]["email"].as_str())
- .unwrap_or("")
- .to_string();
+ let user_id = data["customer"]["id"]
+ .as_str()
+ .map(String::from)
+ .or_else(|| data["customer"]["id"].as_i64().map(|v| v.to_string()))
+ .or_else(|| data["customer"]["email"].as_str().map(String::from))
+ .unwrap_or_default();The same issue exists in crates/domain/src/flutterwave.rs at lines 17-21 — Flutterwave also returns customer.id as an integer.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let user_id = data["customer"]["id"] | |
| .as_str() | |
| .or_else(|| data["customer"]["email"].as_str()) | |
| .unwrap_or("") | |
| .to_string(); | |
| let user_id = data["customer"]["id"] | |
| .as_str() | |
| .map(String::from) | |
| .or_else(|| data["customer"]["id"].as_i64().map(|v| v.to_string())) | |
| .or_else(|| data["customer"]["email"].as_str().map(String::from)) | |
| .unwrap_or_default(); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/domain/src/paystack.rs` around lines 17 - 21, The code currently uses
data["customer"]["id"].as_str() which returns None for numeric IDs; update the
user_id extraction to first try as_str(), then try as_i64()/as_u64() (or as_u64
then as_i64) and convert that numeric value to a string, and only then fall back
to data["customer"]["email"].as_str(); apply the same change for the analogous
user_id extraction in flutterwave.rs so numeric customer IDs are preserved.
| domain = { path = "../domain" } | ||
| tokio = { version = "1", features = ["sync"] } | ||
| serde_json = "1" | ||
| redis = { version = "0.25", features = ["aio", "tokio-comp"] } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check the current state of Cargo.toml and understand the context
cat -n crates/queue/Cargo.toml | head -20Repository: ourpocket/webhook
Length of output: 412
🏁 Script executed:
# Search for all redis API usage in the queue crate to understand compatibility requirements
rg -n --type=rust "redis::" crates/queue/src/Repository: ourpocket/webhook
Length of output: 544
🏁 Script executed:
# Also check for any other async/await patterns or specific redis API calls
rg -n --type=rust "Client|ConnectionManager|cmd|aio" crates/queue/src/ | head -30Repository: ourpocket/webhook
Length of output: 492
🏁 Script executed:
# Check if there are any tests or examples that use redis
find crates/queue -type f -name "*.rs" | xargs wc -l | sort -nRepository: ourpocket/webhook
Length of output: 88
🌐 Web query:
redis crate crates.io current version 2026
💡 Result:
As of Feb 22, 2026, the current (latest) published version of the Rust redis crate on crates.io is 1.0.3 (released 2026-01-30). [1] [2]
Cargo.toml:
redis = "1.0.3"Sources: docs.rs changelog for redis 1.0.3 [1] and the corresponding Cargo.toml showing version = "1.0.3" [2].
🌐 Web query:
redis rust crate 0.30 vs 1.0 breaking changes AsyncCommands
💡 Result:
Key breaking changes that commonly affect AsyncCommands users when upgrading redis from 0.30.x (2025-04-22) to 1.0.x (1.0.0 released 2025-12-05):
-
aio::Connectionremoved (0.30.0): if you were using the old deprecatedredis::aio::Connection, it’s gone. Switch toMultiplexedConnection(e.g.Client::get_multiplexed_async_connection()) and/orConnectionManagerinstead. [1] -
Async runtime support changes (0.30.0): 0.30 added Smol runtime support (marked as a breaking change). This typically means you must ensure the right feature flags are enabled (
tokio-comporsmol-comp) and you might need to resolve runtime-selection differences if you previously relied on older defaults. [1] -
AsyncCommandstrait bounds are stricter in 1.0:- The trait is
ConnectionLike + Send + Sized, and the blanket impl is forT: ConnectionLike + Send + Sync + Sized. [2] - Many methods now require args like
K: ToSingleRedisArg + Send + Sync + 'a(and similar), which can break code that passed non-Send/Syncargument types (or relied on looser bounds). [2]
- The trait is
-
Iterator-related breaking change (1.0 migration): iterators are “safe by default” now (return
RedisResult<Value>instead ofValue), which can surface in async usage viascan*/AsyncIterpatterns. [3] -
1.0 cleanup removes deprecated surface area: the 1.0.0 RC series explicitly removed deprecated features/functions as part of the 1.0 prep, which can break builds if you were still using deprecated APIs around async. [4]
Practical fix checklist
- Replace any
redis::aio::Connectionusage withget_multiplexed_async_connection()/MultiplexedConnection(orConnectionManager). [1] - Ensure your Cargo features match your runtime (
tokio-comporsmol-comp). [1] - If you hit new compiler errors around
.get(...),.set(...), etc., make sure the types you pass as keys/args areSend + Sync(or convert to ownedString/Vec<u8>/etc.), matching the updatedAsyncCommandsbounds. [2]
Sources
- redis
CHANGELOG.md– 0.30.0 breaking changes (Smol support; remove deprecatedaio::Connection). https://docs.rs/crate/redis/latest/source/CHANGELOG.md#L1651 [1] AsyncCommandstrait docs (bounds and method signatures). https://docs.rs/redis/latest/redis/trait.AsyncCommands.html [2]lib.rs“Upgrading to version 1” notes (iterators safe by default). https://docs.rs/redis/latest/src/redis/lib.rs.html [3]- redis
CHANGELOG.md– 1.0.0-rc.0 prep: remove deprecated features/functions. https://docs.rs/crate/redis/latest/source/CHANGELOG.md#L1538 [4]
🏁 Script executed:
# Let me also check the actual async usage pattern in the queue lib
sed -n '30,60p' crates/queue/src/lib.rsRepository: ourpocket/webhook
Length of output: 797
🏁 Script executed:
# Check the BRPOP usage and AsyncCommands context
sed -n '95,110p' crates/queue/src/lib.rsRepository: ourpocket/webhook
Length of output: 585
Update redis dependency from 0.25 to 1.0 or 1.0.3 (current stable).
The redis crate is significantly outdated. As of February 2026, the latest stable version is 1.0.3 (released January 30, 2026). Pinning to 0.25 excludes multiple years of bug fixes, security improvements, and API enhancements.
The good news: your codebase already uses the modern async pattern (get_multiplexed_async_connection() and query_async()) that is fully compatible with redis 1.0.3. No code changes are needed—this is a straightforward dependency bump.
🔧 Suggested fix
-redis = { version = "0.25", features = ["aio", "tokio-comp"] }
+redis = { version = "1.0", features = ["aio", "tokio-comp"] }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/queue/Cargo.toml` at line 12, The redis dependency is pinned to an old
0.25 release; bump it to a modern 1.0.x (recommend "1.0.3") in Cargo.toml so you
get recent fixes and security updates—no code changes expected since the code
already uses modern async APIs like get_multiplexed_async_connection() and
query_async(), so simply update the redis version string to 1.0.3 in the crate's
dependencies entry.
| pub struct RedisQueue { | ||
| client: redis::Client, | ||
| queue_key: String, | ||
| } | ||
|
|
||
| impl RedisQueue { | ||
| pub fn new(redis_url: &str, queue_key: impl Into<String>) -> Result<Self, Error> { | ||
| let client = redis::Client::open(redis_url).map_err(|err| Error::msg(err.to_string()))?; | ||
|
|
||
| Ok(Self { | ||
| client, | ||
| queue_key: queue_key.into(), | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
RedisQueue opens a new TCP connection on every publish and consume call.
Because RedisQueue holds only a redis::Client and no cached connection, both EventQueue::publish and EventQueueConsumer::consume call get_multiplexed_async_connection() on every invocation. For async connections, connection pooling isn't necessary—the MultiplexedConnection is cheaply cloneable and can be safely reused from multiple threads, so a single connection should be created and reused. Under any meaningful throughput this will exhaust Redis server connections.
Cache the connection (or a ConnectionManager for auto-reconnect) in the struct:
♻️ Suggested refactor
+use redis::aio::MultiplexedConnection;
+
#[derive(Clone)]
pub struct RedisQueue {
- client: redis::Client,
+ conn: MultiplexedConnection,
queue_key: String,
}
impl RedisQueue {
- pub fn new(redis_url: &str, queue_key: impl Into<String>) -> Result<Self, Error> {
- let client = redis::Client::open(redis_url).map_err(|err| Error::msg(err.to_string()))?;
- Ok(Self {
- client,
- queue_key: queue_key.into(),
- })
+ pub async fn new(redis_url: &str, queue_key: impl Into<String>) -> Result<Self, Error> {
+ let client = redis::Client::open(redis_url).map_err(|err| Error::msg(err.to_string()))?;
+ let conn = client
+ .get_multiplexed_async_connection()
+ .await
+ .map_err(|err| Error::msg(err.to_string()))?;
+ Ok(Self {
+ conn,
+ queue_key: queue_key.into(),
+ })
}
}Then use self.conn.clone() in publish instead of creating a new connection. For consume (blocking BRPOP), see the separate comment about using a dedicated non-multiplexed connection.
Alternatively, use the connection-manager feature for transparent reconnection:
use redis::aio::ConnectionManager;
pub struct RedisQueue {
conn: ConnectionManager,
queue_key: String,
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/queue/src/lib.rs` around lines 36 - 50, RedisQueue currently opens a
new TCP connection on every publish/consume because it only stores
redis::Client; change RedisQueue to cache a reusable async connection (e.g.,
store redis::aio::ConnectionManager or a redis::aio::MultiplexedConnection) as a
field (replace or add a conn field) created once in RedisQueue::new, then update
EventQueue::publish to use self.conn.clone() instead of calling
get_multiplexed_async_connection(), and ensure EventQueueConsumer::consume uses
a dedicated non-multiplexed blocking connection or a separate connection manager
for blocking BRPOP to avoid exhausting Redis server connections.
| #[async_trait] | ||
| impl EventQueueConsumer for RedisQueue { | ||
| async fn consume(&self) -> Result<TransactionEvent, Error> { | ||
| let mut conn = self | ||
| .client | ||
| .get_multiplexed_async_connection() | ||
| .await | ||
| .map_err(|err| Error::msg(err.to_string()))?; | ||
|
|
||
| let (_key, payload): (String, String) = redis::cmd("BRPOP") | ||
| .arg(&self.queue_key) | ||
| .arg(0) | ||
| .query_async(&mut conn) | ||
| .await | ||
| .map_err(|err| Error::msg(err.to_string()))?; | ||
|
|
||
| let event = serde_json::from_str(&payload).map_err(|err| Error::msg(err.to_string()))?; | ||
|
|
||
| Ok(event) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's verify the file exists and read the actual code
cat -n crates/queue/src/lib.rs | head -120 | tail -40Repository: ourpocket/webhook
Length of output: 1437
🏁 Script executed:
# Also check the full file to understand the struct and context
wc -l crates/queue/src/lib.rsRepository: ourpocket/webhook
Length of output: 88
🏁 Script executed:
# Search for how consume() is used in the codebase
rg "\.consume\(" -A 3 -B 2Repository: ourpocket/webhook
Length of output: 1178
🏁 Script executed:
# Look for RedisQueue definition and any client creation patterns
rg "RedisQueue" -A 5 -B 2 | head -100Repository: ourpocket/webhook
Length of output: 4734
🏁 Script executed:
# Check if there are any shutdown or cancellation patterns
rg "cancel|shutdown|timeout|select!" -B 2 -A 2Repository: ourpocket/webhook
Length of output: 336
🏁 Script executed:
# Look for how multiplexed connections are used elsewhere
rg "get_multiplexed_async_connection" -B 2 -A 2Repository: ourpocket/webhook
Length of output: 917
🏁 Script executed:
# Check if there's a Cargo.lock or Cargo.toml to see redis-rs version
cat crates/queue/Cargo.tomlRepository: ourpocket/webhook
Length of output: 328
🌐 Web query:
redis-rs multiplexed connection blocking commands BRPOP documentation
💡 Result:
In redis-rs, a MultiplexedConnection does expose async blocking-list APIs like brpop(...) / brpoplpush(...) via AsyncCommands / AsyncTypedCommands (see the MultiplexedConnection docs and method list). However, using blocking commands (e.g., BRPOP, BLPOP, XREAD ... BLOCK) on a multiplexed connection is fundamentally incompatible with sharing that same underlying TCP connection for other work: while one request is blocked waiting for a server reply, Redis won’t process/return replies for other commands on that same connection, effectively stalling the multiplexer. [1], [2], [4]
What the redis-rs docs recommend / warn about:
MultiplexedConnectionis cloneable and cancellation-safe, but dropping the future doesn’t cancel the command already sent; with blocking commands, that can mean the underlying connection won’t be closed/released until the server finally answers (potentially “stuck” until timeout / unblocked), with extra caveats if the connection driver task isn’t managed canonically. [1]- The project README explicitly notes: multiplexing won’t help with blocking commands; if you need blocking + non-blocking concurrently, send blocking commands on a different connection (or use pooling / separate dedicated connections). [2]
- This limitation is also discussed directly in redis-rs issue
#1236(“isn’t possible to share a MultiplexedConnection and use blocking operations”). [3]
Practical takeaway: use a separate dedicated (non-multiplexed) connection (or a pool where each blocking consumer gets its own connection) for BRPOP, and keep your MultiplexedConnection for normal, non-blocking commands. [2]
Sources: [1] MultiplexedConnection docs on docs.rs, [2] redis-rs GitHub README (async / multiplexing section), [3] redis-rs issue #1236, [4] Redis blog post explaining why blocking commands stall multiplexers.
🌐 Web query:
redis-rs issue #1236 multiplexed connection blocking
💡 Result:
redis-rs issue #1236 is about a mismatch between aio::MultiplexedConnection and blocking Redis commands (e.g., XREAD ... BLOCK, BLPOP, etc.):
- A
MultiplexedConnectionis designed to be cloned and used concurrently over one underlying socket, but blocking commands “block the connection” at the Redis server level, so you cannot safely share that same underlying connection for other work while it’s blocked. The repo docs explicitly recommend using another connection for blocking commands. [1] - Additionally,
MultiplexedConnectionis cancellation-safe in the Rust-future sense (you can drop a request future), but that does not cancel the request already sent to Redis. As a result, if a blocking command is in-flight, the underlying connection may not be released/closed until Redis replies, even if you drop all clones—this is called out directly in the type docs/source with a link to#1236. [2][3] - The issue author asks whether the (deprecated) basic async
Connectionshould be “undeprecated” for these cases and notes it would also help with custom pooling and explicit shutdown of the driver task. [4]
Practical takeaway / recommended path:
- Keep using
MultiplexedConnectionfor normal (non-blocking) commands, but run blocking commands on a separate dedicated connection (often via a pool or a separately-managed connection). [1] - If you need an immediate workaround specifically around lifecycle/closing behavior, people have used patterns like wrapping the multiplexed driver task and aborting it on drop (example workaround code). [5]
Sources
- redis-rs repo docs: “Multiplexing won’t help if blocking commands are used… send the blocking commands on another connection.” [1]
MultiplexedConnectiondocs warning about blocking commands preventing resource release + link to#1236. [2]- Same warning in the
multiplexed_connection.rssource. [3] - Issue
#1236description (blocking ops + desire for basic connection / explicit close). [4] - Workaround gist referencing
#1236(custom pool/wrapper to abort driver). [5]
🏁 Script executed:
# Look at the worker_loop to understand the full context
cat -n crates/worker/src/main.rs | grep -A 20 "async fn worker_loop"Repository: ourpocket/webhook
Length of output: 1543
Use a dedicated non-multiplexed connection for BRPOP, and add a finite timeout for graceful shutdown.
Wrong connection type: Multiplexed connections are fundamentally incompatible with blocking Redis commands. While the type exposes async blocking APIs, the underlying connection will be stalled until the blocking command resolves, preventing other concurrent work. Additionally, per redis-rs documentation and issue #1236, dropping the future doesn't cancel an already-sent blocking command, so the underlying connection resource may not be released even after all clones of MultiplexedConnection are dropped. Use client.get_async_connection() for the consumer path.
Infinite blocking: arg(0) blocks forever. With the current loop { queue.consume().await?; ... } pattern in worker_loop(), there is no way to gracefully shut down the worker or observe cancellation signals without externally terminating the connection. Use a finite timeout (e.g., 5 seconds) so the caller can loop and observe shutdown signals.
♻️ Suggested refactor
#[async_trait]
impl EventQueueConsumer for RedisQueue {
async fn consume(&self) -> Result<TransactionEvent, Error> {
- let mut conn = self
- .client
- .get_multiplexed_async_connection()
- .await
- .map_err(|err| Error::msg(err.to_string()))?;
+ // Use a dedicated (non-multiplexed) connection for blocking commands.
+ let mut conn = self
+ .client
+ .get_async_connection()
+ .await
+ .map_err(|err| Error::msg(err.to_string()))?;
let result: Option<(String, String)> = redis::cmd("BRPOP")
.arg(&self.queue_key)
- .arg(0)
+ .arg(5) // finite timeout; caller should loop
.query_async(&mut conn)
.await
.map_err(|err| Error::msg(err.to_string()))?;
+ let (_key, payload) = result
+ .ok_or_else(|| Error::msg("BRPOP timed out"))?;
+
- let (_key, payload): (String, String) = redis::cmd("BRPOP")
- .arg(&self.queue_key)
- .arg(0)
- .query_async(&mut conn)
- .await
- .map_err(|err| Error::msg(err.to_string()))?;
-
let event = serde_json::from_str(&payload).map_err(|err| Error::msg(err.to_string()))?;
Ok(event)
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/queue/src/lib.rs` around lines 92 - 111, In RedisQueue::consume
(EventQueueConsumer impl) switch from a multiplexed connection to a dedicated
non-multiplexed connection by using client.get_async_connection() instead of
get_multiplexed_async_connection(), and change the BRPOP call to use a finite
timeout (e.g., arg(5) instead of arg(0)) so the blocking call returns
periodically and the caller (worker_loop) can observe shutdown/cancellation;
keep the existing error mapping/serde logic but acquire and use the
non-multiplexed connection for the blocking BRPOP.
| async fn webhook_handler<Q>( | ||
| State(state): State<AppState<Q>>, | ||
| body: String, | ||
| ) -> Result<StatusCode, StatusCode> | ||
| where | ||
| Q: EventQueue + Clone + 'static, | ||
| { | ||
| let event = normalize_webhook_payload(&body).map_err(|_| StatusCode::BAD_REQUEST)?; | ||
|
|
||
| state | ||
| .queue | ||
| .publish(event) | ||
| .await | ||
| .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; | ||
|
|
||
| Ok(StatusCode::OK) | ||
| } |
There was a problem hiding this comment.
Missing webhook signature verification — anyone can POST fake events.
Neither Paystack (X-Paystack-Signature, HMAC-SHA512) nor Flutterwave (verif-hash) signatures are validated. Without this, the /webhook endpoint is open to spoofed events, which could inject fraudulent transactions into the queue.
This should be addressed before going to production. At minimum, extract the signature header and verify the HMAC against the raw body using the provider's secret key.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/webhook-server/src/main.rs` around lines 41 - 57, The webhook_handler
currently accepts any POST body; add signature verification before calling
normalize_webhook_payload and publishing to state.queue: extract provider
headers (e.g., "X-Paystack-Signature" and "verif-hash") from the incoming
request, retrieve the corresponding secret from the AppState (e.g.,
paystack_secret/flutterwave_secret on State(AppState<Q>)), compute the HMAC
(Paystack uses HMAC-SHA512 over the raw body) and perform a constant-time
comparison with the header value, returning StatusCode::UNAUTHORIZED on mismatch
or missing header; only then call normalize_webhook_payload and
state.queue.publish(event). Ensure you reference webhook_handler,
normalize_webhook_payload, and state.queue.publish when making changes.
| [package] | ||
| name = "worker" | ||
| version = "0.1.0" | ||
| edition = "2024" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check edition consistency across all workspace crates
echo "=== Editions in workspace ==="
fd Cargo.toml --type f | xargs grep -H 'edition'
echo ""
echo "=== Rust toolchain file ==="
cat rust-toolchain.toml 2>/dev/null || cat rust-toolchain 2>/dev/null || echo "No rust-toolchain file found"Repository: ourpocket/webhook
Length of output: 369
Add rust-toolchain.toml to pin Rust 1.85+ for edition 2024.
All crates already use edition = "2024" consistently, but there's no rust-toolchain.toml to enforce the required Rust 1.85+. Without this pinned requirement, builds on older Rust versions will fail. Create a rust-toolchain.toml at the repository root with channel = "1.85" (or newer) to prevent toolchain incompatibility issues in CI and local development.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/worker/Cargo.toml` at line 4, The crate uses edition = "2024" in
Cargo.toml but the repository lacks a pinned toolchain; add a
rust-toolchain.toml at the repo root containing channel = "1.85" (or newer) to
ensure Rust 1.85+ is used for builds and CI, so tools compiling code tied to
edition = "2024" will not fail on older compilers.
| async fn worker_loop<Q, C>(queue: Q, backend_client: C) -> Result<()> | ||
| where | ||
| Q: EventQueueConsumer, | ||
| C: BackendClient, | ||
| { | ||
| loop { | ||
| let event = queue.consume().await?; | ||
| backend_client.send_transaction(event).await?; | ||
| } | ||
| } |
There was a problem hiding this comment.
Any transient error kills the worker permanently — add retry/backoff and logging.
worker_loop propagates all errors from consume() and send_transaction() directly, exiting the process on the first failure. A momentary Redis timeout or a single backend 5xx will take the worker down with no recovery.
At minimum, log errors and continue the loop for transient failures:
Sketch of a more resilient loop
+use std::time::Duration;
+
async fn worker_loop<Q, C>(queue: Q, backend_client: C) -> Result<()>
where
Q: EventQueueConsumer,
C: BackendClient,
{
loop {
- let event = queue.consume().await?;
- backend_client.send_transaction(event).await?;
+ let event = match queue.consume().await {
+ Ok(e) => e,
+ Err(err) => {
+ eprintln!("queue consume error: {err:#}");
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ continue;
+ }
+ };
+
+ if let Err(err) = backend_client.send_transaction(event).await {
+ eprintln!("send_transaction error: {err:#}");
+ // TODO: consider re-enqueuing or dead-letter logic
+ }
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn worker_loop<Q, C>(queue: Q, backend_client: C) -> Result<()> | |
| where | |
| Q: EventQueueConsumer, | |
| C: BackendClient, | |
| { | |
| loop { | |
| let event = queue.consume().await?; | |
| backend_client.send_transaction(event).await?; | |
| } | |
| } | |
| use std::time::Duration; | |
| async fn worker_loop<Q, C>(queue: Q, backend_client: C) -> Result<()> | |
| where | |
| Q: EventQueueConsumer, | |
| C: BackendClient, | |
| { | |
| loop { | |
| let event = match queue.consume().await { | |
| Ok(e) => e, | |
| Err(err) => { | |
| eprintln!("queue consume error: {err:#}"); | |
| tokio::time::sleep(Duration::from_secs(1)).await; | |
| continue; | |
| } | |
| }; | |
| if let Err(err) = backend_client.send_transaction(event).await { | |
| eprintln!("send_transaction error: {err:#}"); | |
| // TODO: consider re-enqueuing or dead-letter logic | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/worker/src/main.rs` around lines 23 - 32, The worker_loop currently
returns on any error from queue.consume() or backend_client.send_transaction(),
so change it to catch errors, log them (using the crate's logger, e.g.,
tracing::error/warn), and retry transient failures with exponential backoff (use
tokio::time::sleep, doubling delay up to a cap and a configurable max_retries),
while continuing the outer loop after backoff instead of returning; keep a
separate short-circuit for non-transient/fatal errors if your EventQueueConsumer
or BackendClient exposes error kinds (match on error type or provide
is_transient()), and only propagate or terminate on those fatal cases. Ensure to
reference worker_loop, queue.consume, and backend_client.send_transaction when
adding the retry/backoff and logging.
| services: | ||
| webhook-server: | ||
| build: | ||
| context: . | ||
| dockerfile: Dockerfile | ||
| args: | ||
| BIN_NAME: webhook-server | ||
| ports: | ||
| - "3000:3000" | ||
| env_file: | ||
| - .env |
There was a problem hiding this comment.
webhook-server is missing depends_on: redis.
Per the AI summary, webhook-server uses a Redis-backed queue (RedisQueue). Without this dependency, the webhook-server container may start before Redis is available, causing connection failures.
🐛 Proposed fix
webhook-server:
build:
context: .
dockerfile: Dockerfile
args:
BIN_NAME: webhook-server
ports:
- "3000:3000"
env_file:
- .env
+ depends_on:
+ - redis📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| services: | |
| webhook-server: | |
| build: | |
| context: . | |
| dockerfile: Dockerfile | |
| args: | |
| BIN_NAME: webhook-server | |
| ports: | |
| - "3000:3000" | |
| env_file: | |
| - .env | |
| services: | |
| webhook-server: | |
| build: | |
| context: . | |
| dockerfile: Dockerfile | |
| args: | |
| BIN_NAME: webhook-server | |
| ports: | |
| - "3000:3000" | |
| env_file: | |
| - .env | |
| depends_on: | |
| - redis |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docker-compose.yml` around lines 1 - 11, The docker-compose service
"webhook-server" lacks a dependency on Redis, which can cause it to start before
Redis is ready and lead to connection failures in the Redis-backed queue
(RedisQueue); update the "webhook-server" service to include a depends_on entry
referencing the Redis service (e.g., add depends_on: - redis) so Docker Compose
will start Redis before webhook-server and prevent race conditions during
startup.
| FROM rust:slim-trixie AS builder | ||
|
|
||
| WORKDIR /app | ||
|
|
||
| # Copy manifests and code | ||
| COPY Cargo.toml Cargo.lock ./ | ||
| COPY crates ./crates | ||
|
|
||
|
|
||
| RUN cargo build --release -p webhook-server | ||
|
|
||
| FROM debian:trixie-slim | ||
|
|
||
| RUN useradd -m appuser | ||
|
|
||
|
|
||
| COPY --from=builder /app/target/release/webhook-server /usr/local/bin/webhook-app | ||
|
|
||
| RUN chmod +x /usr/local/bin/webhook-app && chown appuser:appuser /usr/local/bin/webhook-app | ||
|
|
||
| USER appuser | ||
| ENV RUST_LOG=info | ||
|
|
||
|
|
||
| ENTRYPOINT ["/usr/local/bin/webhook-app"] No newline at end of file |
There was a problem hiding this comment.
BIN_NAME build arg is never declared or used — worker container will run webhook-server instead.
docker-compose.yml passes BIN_NAME: worker for the worker service, but this Dockerfile never declares ARG BIN_NAME and hardcodes webhook-server in the build command (Line 10), binary copy path (Line 17), and entrypoint (Line 25). The worker container will silently build and run the wrong binary.
🐛 Proposed fix: parameterize with ARG
FROM rust:slim-trixie AS builder
+ARG BIN_NAME=webhook-server
WORKDIR /app
# Copy manifests and code
COPY Cargo.toml Cargo.lock ./
COPY crates ./crates
-RUN cargo build --release -p webhook-server
+RUN cargo build --release -p ${BIN_NAME}
FROM debian:trixie-slim
+ARG BIN_NAME=webhook-server
RUN useradd -m appuser
-COPY --from=builder /app/target/release/webhook-server /usr/local/bin/webhook-app
+COPY --from=builder /app/target/release/${BIN_NAME} /usr/local/bin/webhook-app
RUN chmod +x /usr/local/bin/webhook-app && chown appuser:appuser /usr/local/bin/webhook-app
USER appuser
ENV RUST_LOG=info
ENTRYPOINT ["/usr/local/bin/webhook-app"]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| FROM rust:slim-trixie AS builder | |
| WORKDIR /app | |
| # Copy manifests and code | |
| COPY Cargo.toml Cargo.lock ./ | |
| COPY crates ./crates | |
| RUN cargo build --release -p webhook-server | |
| FROM debian:trixie-slim | |
| RUN useradd -m appuser | |
| COPY --from=builder /app/target/release/webhook-server /usr/local/bin/webhook-app | |
| RUN chmod +x /usr/local/bin/webhook-app && chown appuser:appuser /usr/local/bin/webhook-app | |
| USER appuser | |
| ENV RUST_LOG=info | |
| ENTRYPOINT ["/usr/local/bin/webhook-app"] | |
| FROM rust:slim-trixie AS builder | |
| ARG BIN_NAME=webhook-server | |
| WORKDIR /app | |
| # Copy manifests and code | |
| COPY Cargo.toml Cargo.lock ./ | |
| COPY crates ./crates | |
| RUN cargo build --release -p ${BIN_NAME} | |
| FROM debian:trixie-slim | |
| ARG BIN_NAME=webhook-server | |
| RUN useradd -m appuser | |
| COPY --from=builder /app/target/release/${BIN_NAME} /usr/local/bin/webhook-app | |
| RUN chmod +x /usr/local/bin/webhook-app && chown appuser:appuser /usr/local/bin/webhook-app | |
| USER appuser | |
| ENV RUST_LOG=info | |
| ENTRYPOINT ["/usr/local/bin/webhook-app"] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@Dockerfile` around lines 1 - 25, The Dockerfile hardcodes the binary name
("webhook-server") and never declares the build arg BIN_NAME, causing services
that pass BIN_NAME (e.g., worker) to build/run the wrong binary; add ARG
BIN_NAME with a default (e.g., webhook-server) and replace occurrences of
"webhook-server" in the cargo build command (-p webhook-server), the copy source
path (/app/target/release/webhook-server), and the ENTRYPOINT (currently
/usr/local/bin/webhook-app) so they use the BIN_NAME value and keep the final
copied file/name consistent (e.g., copy from /app/target/release/${BIN_NAME} to
/usr/local/bin/${BIN_NAME} and set ENTRYPOINT to that path).
Summary by CodeRabbit
New Features
Chores