refactor(poa): move redis publish into PoA, native-async with cached connections#3280
refactor(poa): move redis publish into PoA, native-async with cached connections#3280
Conversation
…ed connections Background: as of the 2026-04-22 hotfix (#3278) the redis publish lived in the importer's rayon worker and used detached `std::thread::spawn` workers with an mpsc channel for quorum short-circuit. That fixed the mainnet hang but left two warts: * the importer owned `BlockReconciliationWritePort` even though only `Source::Local` blocks were ever published — an artifact of the original vibe-coded scaffolding, not a design intent, * the short-circuit left OS threads running until their sync redis call returned, which meant a steady trickle of leaked threads while a half-alive node stayed unresponsive. Changes: * Delete `BlockReconciliationWritePort` from `fuel_core_importer` and the matching field / forwarder on `BlockImporterAdapter`. The importer no longer knows about redis at all. * Add `BlockReconciliationWritePort` to `fuel_core_poa::ports` as an async trait, co-located with the existing `BlockReconciliationReadPort`. `RP` on `MainTask` now requires both. * `MainTask::produce_block` (and `produce_predefined_block`) call `reconciliation_port.publish_produced_block(&sealed_block).await?` before `block_importer.commit_result(...).await?`. Same publish-then-commit invariant as before; fewer hops to get there. * `RedisLeaderLeaseAdapter` goes native-async: - `publish_block_on_node` uses the existing `multiplexed_connection` helper (cached per-node `redis::aio::MultiplexedConnection`), calls `invoke_async`, and wraps in `tokio::time::timeout`. - `publish_block_on_all_nodes` uses `FuturesUnordered` to fan out, drains until quorum is reached, and drops the remaining futures — which cooperatively cancels their in-flight RESP exchange and closes the connection-side handles. No OS threads, no leak. - On timeout or transport error the node's cached multiplexed connection is cleared so the next call reconnects. * Delete `invoke_write_block_script`, the `'static` sync helper that only existed to satisfy `std::thread::spawn`'s `'static` bound. * `repair_sub_quorum_block` becomes `async` (it calls `publish_block_on_all_nodes`); the one caller in `unreconciled_blocks` gains an `.await`. * Tests updated: all existing `publish_produced_block__*` tests now `.await`; the hang-repro test drops its `spawn_blocking` wrapper because the publish path is already async and cancellable at the future level. * `FakeReconciliationPort` in `poa/service_test.rs` gains a stub `BlockReconciliationWritePort` impl alongside its existing read-port impl so `MainTask`'s combined trait bound is satisfied. Tests (leader_lock feature): * 29 adapter tests in `service::adapters::consensus_module::poa::tests`: all pass. * 51 service tests in `fuel-core-poa`: all pass. Connection caching: each `RedisNode` holds a `Mutex<Option<MultiplexedConnection>>`. `multiplexed_connection` was already used for reads and lease ops; publishes now go through the same cache instead of opening a fresh TCP connection per block. On transport errors the cache is cleared so the next call reconnects cleanly.
PR SummaryMedium Risk Overview Redis publishing is rewritten to be native async and more bounded. Reviewed by Cursor Bugbot for commit b5a04f9. Bugbot is set up for automated code reviews on this repo. Configure here. |
…to PoA
The two tests covering the importer's Source::Local publish branch
(commit_result__when_source_is_local_then_publishes_to_reconciliation_writer,
execute_and_commit__when_source_is_network_then_does_not_publish_to_reconciliation_writer,
commit_result__when_publish_to_reconciliation_writer_fails_then_returns_error)
exercised behavior that no longer lives in the importer. Equivalent
coverage now lives in the PoA adapter tests in crates/fuel-core/src/
service/adapters/consensus_module/poa.rs:tests::publish_produced_block__*.
Also removes the now-unused FakeBlockReconciliationWriter helper, the
unused std::sync::{Arc, Mutex} imports, restores the missing anyhow
import that the deleted block had been pulling in, and drops a stray
#[tokio::test] left dangling above #[test] fn one_lock_at_the_same_time
by the deletion.
| C: GetTime, | ||
| RS: WaitForReadySignal, | ||
| RP: BlockReconciliationReadPort + 'static, | ||
| RP: BlockReconciliationReadPort + BlockReconciliationWritePort + 'static, |
There was a problem hiding this comment.
nit: do these need to be separate traits?
…urability
Previously `publish_block_on_all_nodes` used `FuturesUnordered` and dropped
the unfinished futures on quorum short-circuit. That cancelled per-node
`invoke_async` calls mid-RESP for the slow nodes — quorum was satisfied
but those nodes either got no write attempt or a half-written frame on
the wire, dropping data durability and pushing recovery onto the
sub-quorum repair path.
Now each per-node publish runs in its own `tokio::spawn`ed task. The
parent only awaits the first quorum's worth of `Written` results via
mpsc. Remaining tasks continue to completion in the background so every
reachable node gets a real write attempt, bounded by `node_timeout`.
Background tasks are tokio futures (not OS threads), each holds shared
`Arc<RedisLeaderLeaseAdapter>` / `Arc<SealedBlock>` / `Arc<[u8]>`
references (no per-task heavy clone), and they're cooperatively
cancellable.
`RedisNode::cached_connection` is now `Arc<tokio::sync::Mutex<...>>` so
adapter clones share the per-node connection cache. Without that change
each spawned task would hold a separate clone with an empty cache and
open a fresh socket per publish, defeating the connection reuse.
Cost analysis for the half-alive-node scenario at 1 block/s with
node_timeout=1s and redis 1.2:
* before: 0 outstanding background tasks per dead node (cancelled) but
no write attempt landed on those nodes
* after: ~1 outstanding background task per dead node at any moment
(each lives ≤ node_timeout), every node gets its publish attempt
…Port Per review nit on #3280: every implementor (`RedisLeaderLeaseAdapter`, `NoopReconciliationAdapter`, `ReconciliationAdapter`, `FakeReconciliation Port`) implements both halves, every `RP:` trait bound on `MainTask` requires both, and there's no caller that uses just one side. Splitting the trait bought ISP in theory but cost a duplicate import + duplicate impl block at every site. `BlockReconciliationReadPort` and `BlockReconciliationWritePort` are merged into a single `BlockReconciliationPort` with `leader_state` / `release` / `publish_produced_block`. Trait bounds simplify from `RP: BlockReconciliationReadPort + BlockReconciliationWritePort` to `RP: BlockReconciliationPort` in all four sites. No behavior change. All 29 adapter tests + 51 PoA service tests pass; clippy clean across `--all-targets --all-features`.
|
Addressed in 5c2002f: collapsed |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 5c2002f. Configure here.
Bugbot review on #3280 caught: each spawned task in publish_block_on_all_nodes did `let adapter = self.clone()`, which clones the entire RedisLeaderLeaseAdapter including drop_release_guard. Lingering background tasks after a quorum short-circuit kept Arc::strong_count(drop_release_guard) > 1, so dropping the owning adapter would skip the lease-release path until the slow nodes timed out (up to ~node_timeout per dead node). In crash/panic scenarios this delayed leader failover. Fix: make drop_release_guard an `Option<Arc<()>>`. Logical adapter clones (the existing Clone impl) carry `Some` and participate in the strong-count gate as before. Spawned-task clones in publish_block_on_all_nodes explicitly null out the guard (`adapter.drop_release_guard = None;`) so they don't pin the logical lifetime — the owning adapter's Drop sees count == 1 and triggers release immediately. Drop impl: early-return if drop_release_guard is None (task clones never trigger release) and otherwise check strong_count on the inner Arc as before. Adds a regression test (drop__after_publish_releases_lease_promptly_despite_inflight_background_tasks) that publishes against 3 healthy + 1 half-alive nodes, drops the adapter immediately, and asserts the lease is released on all 3 healthy nodes within a window well under node_timeout while the half-alive node's background task is still in flight.
|
Addressed in bf28529: |
Tracks background per-node publish tasks spawned by publish_block_on_all_nodes. Steady-state should be near zero; sustained non-zero values indicate per-node publishes are not exiting (regression in the per-call timeout, or a connection/protocol stall not covered by node_timeout). Cheap visibility for ops to alert on. Implementation: an OutstandingPublishTaskGuard RAII type increments the gauge synchronously in the parent when constructed (so the gauge counts the task from the instant it is queued, not from when the task starts running) and decrements on drop. The guard is moved into the spawn closure so the decrement runs whether the task completes normally, panics, or is cancelled mid-await. Adds a focused regression test (publish_produced_block__outstanding_tasks_metric_drains_to_baseline) that publishes against 3 healthy + 1 half-alive nodes, verifies the gauge climbs above baseline right after quorum short-circuit, then verifies it drains back to baseline after the half-alive node's background task times out. Also extends metrics__poa_metrics_appear_in_encoded_output_after_ exercising_all_paths to assert the new metric name appears in the prometheus encoding.
|
Added |
| // Clone the adapter for the spawned task, but null out the | ||
| // drop guard so this clone doesn't pin the logical adapter | ||
| // lifetime. Without this, a lingering background task after | ||
| // quorum short-circuit would keep `Arc::strong_count(...) > 1` | ||
| // and the owning adapter's `Drop` would skip lease release. | ||
| let mut adapter = self.clone(); | ||
| adapter.drop_release_guard = None; |
There was a problem hiding this comment.
I think this would be better as a helper method on self.
self.clone_non_reference_counted()
or something like that.
BUT... do we even need this reference counter at all anymore? Wasn't it because we were sharing it between the two services as separate port impls?
There was a problem hiding this comment.
I think we still clone it in sub_services.rs, but that's just residual.

Summary
BlockReconciliationWritePortout offuel_core_importer::portsand intofuel_core_poa::ports, making it async. The importer no longer knows anything about redis; PoA owns the publish path end to end.RedisLeaderLeaseAdapter::publish_produced_block(andpublish_block_on_all_nodes,repair_sub_quorum_block) as native async:FuturesUnorderedfan-out,tokio::time::timeout(node_timeout, ...)per node, short-circuit on quorum by dropping the remaining futures. No rayon bridge, nostd::thread::spawnleak.redis::aio::MultiplexedConnectioncache for publishes so we stop opening a fresh TCP connection per block.Context
Follow-up to #3278 (the 2026-04-22 mainnet hang hotfix). That PR kept the importer-owned publish + rayon bridge and used detached sync threads with an mpsc channel for the quorum short-circuit. Discussed with the original authors — the importer never had a reason to own publish, that was an artifact of the initial scaffolding — and confirmed with the team that moving it to PoA is the intended direction. This PR does that cleanup and goes native-async at the same time.
Behavior-equivalence
reconciliation_port.publish_produced_block(...).await?is called inMainTask::produce_block(andproduce_predefined_block) immediately beforeblock_importer.commit_result(...).await?. If publish fails to reach quorum, the commit is skipped — identical invariant to the oldSource::Localgate in_commit_result.write_block.luacontract are unchanged.MultiplexedConnections are cleared on transport errors / timeouts, same pattern already used byread_latest_stream_entry_on_nodeand the lease-ops paths.Test plan
service::adapters::consensus_module::poa::testspass with--features leader_lock, including the 2026-04-22 hang-repro (publish_produced_block__returns_within_bound_when_one_node_is_half_alive) — thespawn_blockingwrapper is gone and the publish returns cleanly in the normal timeout budget.fuel-core-poapass (new trait bound required aBlockReconciliationWritePortimpl onFakeReconciliationPort).cargo +nightly-2025-09-28 fmt --checkclean.cargo sort -w --checkclean.Notes
spawn_blocking. It still asserts a 5s wall-clock deadline; with the native-async fan-out + drop-cancel, a single half-alive node resolves the call in one timeout interval instead of contributing a leaked OS thread.