fix(partitions): stop holding partition refs across .await#3557
Open
hubcio wants to merge 6 commits into
Open
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3557 +/- ##
=============================================
- Coverage 74.07% 53.57% -20.50%
Complexity 937 937
=============================================
Files 1249 1247 -2
Lines 128248 112909 -15339
Branches 104116 88820 -15296
=============================================
- Hits 94994 60489 -34505
- Misses 30219 49424 +19205
+ Partials 3035 2996 -39
🚀 New features to boost your workflow:
|
6064f97 to
3e075d6
Compare
ryankert01
approved these changes
Jun 25, 2026
The poll-read handler and the consensus tick ran as separate tasks but each held a &IggyPartition (from get_by_ns) across an .await. The pump task could meanwhile reallocate the partitions vec (ReconcileOp::InsertOwned) or take a &mut for the same namespace, dangling the held reference: a use-after-free on partition create/delete during a parked read, and &/&mut aliasing UB on ordinary concurrent produce+consume. The single-threaded runtime prevents neither. The read handler now builds an owned poll plan synchronously on the pump, then runs the disk read, straddle and offset persist+apply off it on owned data alone, so no task holds a partition ref off the pump. The consensus tick is folded into the pump select! arm, and the view-change and commit handlers address one partition by namespace. The owned poll types move to poll_plan.rs (the borrow contract documented once), and the resident decode-walk and VSR-handler prologue are de-duplicated.
Resident polls walked the journal's whole tail, but that tail holds replicated-but-uncommitted prepares ahead of the commit point, and a view change can roll them back. Serving them is a dirty read. MessageLookup now carries an inclusive `ceiling` (the commit offset); select_batch_slice stops at the first offset past it, so no poll, straddle continuation, or disk walk returns rollbackable data. Two more off-pump correctness fixes ride along, all preserving the "no partition ref across .await" contract from the previous commit: - Off-pump auto-commit upserts via fetch_max (upsert_offset_max) so a stale auto-commit racing a newer explicit StoreConsumerOffset cannot rewind the stored offset. The explicit pump path keeps store(), which may legitimately rewind. - delete_consumer_group_offset held &self across the unlink .await. It becomes reclaim_dead_group_offsets: a synchronous in-memory papaya remove that returns the owned file paths for the reconciler to unlink off-borrow, so no partition reference survives the await. Also: a resident auto-commit that needs no disk persist (no offset_path, sim/dev) applies inline on the pump instead of spawning a detached task; resident entries are cloned only when a resident tail exists; and push_selected_batch_fragments is shared by the resident and disk walks, taking the batch base as a byte offset.
3e075d6 to
dd8bbe5
Compare
The partition-ref-across-await fix made the consume poll path and the produce/commit pump run as sibling tasks over the same partition, but nothing exercised that aliasing window under real-runtime load across topologies. This adds a data-plane hammer: many producers and consumers all pound a single partition concurrently for a fixed duration, then the consumers drain and assert no message loss and a strictly contiguous offset log (every reader sees 0..total, count equals the sum of sends). Runs the four topologies via the harness matrix: single-node and 3-node cluster, each with one and two shards per node. Cluster variants are #[ignore] (heavy) and run on demand. Kept strictly data plane (poll by explicit offset, auto_commit = false, no mid-run topic or partition mutation) so it never drives the metadata consensus plane concurrently. That sidesteps a separate, still-open on_ack journal-durability race that panics the primary under concurrent metadata ops and currently gates concurrent_produce_consume under vsr.
Shard 0 opened its client listeners the instant broadcast_metadata_bundle returned, which only proves peers received the metadata bundle - not that they finished loading their on-disk partitions. Peers still scan live shared metadata in build_shard_for_thread and load each partition's segments via walk_dir. With more than one shard, a partition created by the first client lands in metadata while a peer's load scan is still running; the peer then tries to load a partition whose segment dir was never materialized and aborts the whole node with CannotReadPartitions. Deterministic in release with 2+ shards, hidden in debug by slower shard startup. Pre-existing on master. Add a reverse barrier symmetric to the metadata handoff: each peer signals once its load completes, and shard 0 drains one signal per peer before binding listeners. A partition created after the fence takes the runtime reconciler path, which creates its dir, instead of the bootstrap load path. Both sides poll the shutdown flag so a sibling failure mid-boot aborts cleanly rather than hanging.
The 2s/debug hammer passed even against a server with a boot-time shard race because the window was too short and debug timing hid it. Extend the hammer to 20s and exercise it in release so the data plane is driven hard enough to surface boot and steady-state faults across all four single-node and cluster topologies. A dead server made this dangerous: the harness root client retries connect with no cap and parked polls never re-check the consumer deadline, so a boot crash wedged the whole suite indefinitely (one run sat 47 minutes). Wrap the test body in a wall-clock timeout above MAX_TEST_DURATION so a slow-but-progressing consumer still trips its own informative deadline first, while a wedge fails fast instead of hanging CI.
The Cell import backs the borrow_active borrow tripwire (BorrowGuard), which is #[cfg(debug_assertions)] only. In release builds Cell is unused, so a plain cargo build --release warned. Gate the import to match its uses; no behavior change in either profile.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The poll-read handler and the consensus tick ran as separate
tasks but each held a &IggyPartition (from get_by_ns) across
an .await. The pump task could meanwhile reallocate the
partitions vec (ReconcileOp::InsertOwned) or take a &mut for
the same namespace, dangling the held reference: a
use-after-free on partition create/delete during a parked
read, and &/&mut aliasing UB on ordinary concurrent
produce+consume. The single-threaded runtime prevents neither.
The read handler now builds an owned poll plan synchronously
on the pump, then runs the disk read, straddle and offset
persist+apply off it on owned data alone (consumer offsets are
already Arc, the journal tail is snapshotted), so no task holds
a partition ref off the pump. The consensus tick is folded into
the pump select! arm, and the view-change and commit handlers
address one partition by namespace instead of scanning all. A
with_partition closure plus a debug tripwire enforce it.