ct/l1: leveling
#30647
Open
WillemKauf wants to merge 12 commits into
Open
Conversation
Adds three new properties used by the per-range leveling scheduler: * `cloud_topics_leveling_interval_ms` (not yet used) * `cloud_topics_max_concurrent_leveling_jobs_per_shard` (not yet used) * `cloud_topics_leveling_min_extent_size_ratio` (replaces previous hardcode)
Without a cap, the `leveling_range_builder` could produce a single range covering an unboundedly large number of consecutive undersized extents, which would several limit parallelism of leveling jobs. Caps the per-range bytes at the new cluster property `cloud_topics_leveling_max_range_bytes` (default `1_GiB`).
NFCs. Prepare for the per-range leveling counterparts by renaming the compaction-specific worker, worker_manager, and scheduler members.
Adds * `leveling_queue_length` * `leveling_ranges_completed_total`
Add: * `leveling_duration_microseconds` * `leveling_extents_reclaimed_total`
Deprecate `outstanding_ranges` and add a new `scheduled_ranges` map which is used for determining when/how to queue leveling range jobs. This map attempts to solve the problem of potentially scheduling the same range for leveling multiple times when it is re-returned from the `metastore`. Leveling ranges for a CTP that are currently queued/inflight (value == nullopt in the map) or were recently committed (value == some completion timestamp) are persisted for a period of time. The `log_info_collector` consults this to avoid re-scheduling the same range replacement while it is still pending, or within the post-commit cooldown window. A range stays "undersized" in the metastore until its job is finished and committed. During this window, we could potentially re-queue the same levelable range every tick of `leveling_interval_ms`. We evict completed entries after a period of time. Here, the period of time is computed as a constant factor (`leveling_range_cooldown_intervals = 3`) times `cloud_topics_leveling_interval_ms`.
These paths would never be taken in production, since the higher level info collection scheduling already occurs at an interval of `cloud_topics_leveling_interval_ms` 🤦.
ba6fa20 to
9fcce78
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
This PR wires cloud-topics L1 leveling into the existing maintenance subsystem by adding scheduler/worker plumbing, cluster tunables, metrics, and both unit + ducktape coverage. It also adds a range-size cap so long runs of undersized extents are split into bounded leveling jobs.
Changes:
- Add new cluster configuration properties to control leveling cadence, concurrency, undersized threshold, and max bytes per leveling range.
- Extend the scheduler/worker_manager/worker to schedule and execute leveling jobs concurrently with compaction, including new scheduler/worker metrics.
- Add/extend unit tests and ducktape tests to validate leveling scheduling semantics, range splitting, and reclaimed-extents accounting.
Reviewed changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/cloud_topics/leveling_stress_test.py | New ducktape stress tests for fragmentation + write-pressure leveling behavior and record integrity. |
| tests/rptest/tests/cloud_topics/e2e_test.py | Add an end-to-end leveling test based on scheduler metrics (queue drain + completions). |
| src/v/config/configuration.h | Declare new leveling-related cluster properties. |
| src/v/config/configuration.cc | Define defaults/descriptions/bounds for leveling-related cluster properties. |
| src/v/cloud_topics/level_one/metastore/tests/leveling_range_builder_test.cc | Add tests ensuring long undersized runs are split by cloud_topics_leveling_max_range_bytes and singleton remainders are dropped. |
| src/v/cloud_topics/level_one/metastore/tests/BUILD | Add config/scoped_config deps for new range-builder tests. |
| src/v/cloud_topics/level_one/metastore/leveling_range_builder.h | Add max-range-bytes cap sourced from cluster config to split long leveling ranges. |
| src/v/cloud_topics/level_one/metastore/BUILD | Add config dependency for range builder. |
| src/v/cloud_topics/level_one/maintenance/worker.h | Extend worker interface/state to run leveling and compaction on separate fibers with configurable leveling concurrency. |
| src/v/cloud_topics/level_one/maintenance/worker.cc | Implement leveling fiber loop, leveling job execution path, and manager integration. |
| src/v/cloud_topics/level_one/maintenance/worker_probe.h | Add leveling duration measurement + reclaimed-extents counter API. |
| src/v/cloud_topics/level_one/maintenance/worker_probe.cc | Export leveling duration histogram and reclaimed-extents counter metrics. |
| src/v/cloud_topics/level_one/maintenance/worker_manager.h | Add leveling queue support and APIs to acquire/complete/stop leveling work and alert specific fibers. |
| src/v/cloud_topics/level_one/maintenance/worker_manager.cc | Implement leveling queue acquisition/completion and per-CTP stop requests. |
| src/v/cloud_topics/level_one/maintenance/tests/worker_manager_test.cc | Update tests for new worker futures and add coverage for skipping stale compaction queue entries. |
| src/v/cloud_topics/level_one/maintenance/tests/log_info_collector_test.cc | Add tests for scheduled-range dedup, inflight suppression, cooldown, and rescheduling behavior. |
| src/v/cloud_topics/level_one/maintenance/tests/log_collector_test.cc | Update expectation: disabling compaction should not unmanage a cloud topic (leveling still applies). |
| src/v/cloud_topics/level_one/maintenance/tests/compaction_e2e_test.cc | Update management expectations: all cloud topics are managed regardless of cleanup policy. |
| src/v/cloud_topics/level_one/maintenance/scheduler.h | Split scheduler into distinct compaction + leveling loops and add leveling queue. |
| src/v/cloud_topics/level_one/maintenance/scheduler.cc | Implement leveling scheduling loop, worker alerts per fiber, and unify management semantics for cloud topics. |
| src/v/cloud_topics/level_one/maintenance/scheduler_probe.h | Add leveling queue length + completed ranges counters. |
| src/v/cloud_topics/level_one/maintenance/scheduler_probe.cc | Export leveling queue gauge and completed ranges counter metrics. |
| src/v/cloud_topics/level_one/maintenance/meta.h | Add levelable_range_key and scheduled_ranges for dedup/cooldown tracking of leveling work. |
| src/v/cloud_topics/level_one/maintenance/log_info_collector.h | Update leveling collection documentation and spec-building signature. |
| src/v/cloud_topics/level_one/maintenance/log_info_collector.cc | Implement scheduled-range dedup/cooldown eviction and use config-driven undersized ratio. |
| src/v/cloud_topics/level_one/maintenance/log_collector.cc | Change management policy to manage all cloud topics (compaction eligibility gated downstream). |
| src/v/cloud_topics/level_one/maintenance/leveling/tests/leveling_reducer_test.cc | Update tests for leveling_sink signature (probe moved to sink). |
| src/v/cloud_topics/level_one/maintenance/leveling/leveling_source.h | Remove probe dependency and allow sink to inspect leveling ranges (friend). |
| src/v/cloud_topics/level_one/maintenance/leveling/leveling_source.cc | Update constructor to match signature change (no probe). |
| src/v/cloud_topics/level_one/maintenance/leveling/leveling_sink.h | Add probe reference + input extent accounting for reclaimed-extents metric. |
| src/v/cloud_topics/level_one/maintenance/leveling/leveling_sink.cc | Compute input extents, record reclaimed extents on successful commit, and improve logging. |
| src/v/cloud_topics/level_one/maintenance/leveling/BUILD | Adjust deps: probe moved from source to sink. |
| src/v/cloud_topics/level_one/maintenance/l1_object_sink.h | Track count of output objects registered with the metadata builder. |
| src/v/cloud_topics/level_one/maintenance/l1_object_sink.cc | Increment output-object count on successful object registration. |
| src/v/cloud_topics/level_one/maintenance/BUILD | Wire leveling source/sink deps and new utility deps (adjustable_semaphore, absl hash). |
Comment on lines
+578
to
+585
| if (key.tidp == tidp) { | ||
| handle->state = compaction_job_state::hard_stop; | ||
| vlog( | ||
| compaction_log.debug, | ||
| "Terminating leveling range for CTP {} (base {})", | ||
| tidp, | ||
| key.base_offset); | ||
| } |
Comment on lines
+92
to
+94
| // Soft-stops every inflight leveling range for `tidp` across all worker | ||
| // shards that have one. | ||
| void request_stop_leveling(log_compaction_meta_ptr); |
Comment on lines
+422
to
+423
| // Before scheduling levelable ranges recieved from the `metastore`, | ||
| // query the log's existing scheduled ranges. |
38726dc to
c68a13a
Compare
Leveling is L1 maintenance that applies regardless of cleanup policy- so the log collector now needs to collect every cloud topic.
Wire the leveling path end to end, mirroring compaction but scheduling one job per `levelable_range` rather than one per CTP. Add a dedicated `scheduler::leveling_scheduling_loop()` with its own `cloud_topics_leveling_interval_ms` binding, signal semaphore, and `_leveling_queue`. At a lower level, in the `worker_manager`, we mirror the compaction API with `try_acquire_leveling_work()`, `complete_leveling_work()`, `alert_leveling_workers()`, and `request_stop_leveling()` for preemption while tracking inflight ranges per shard. Finally, in `worker`, an independent leveling fiber dispatches up to `cloud_topics_max_concurrent_leveling_jobs_per_shard` range rewrites concurrently. Each inflight job carries its own soft/hard-stop handle (keyed by tidp and base offset) so a single job, or all jobs for a tidp, can be cancelled when a CTP is unmanaged.
Returning on the first stale entry of the `_compaction_queue` is pessimistic - we should continue to pop entries in case there is work to be had behind stale entries.
c68a13a to
fdd2f53
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
First 7 mechanical commits pulled out into #30646.
The next 5 commits wire up leveling into the
scheduler,worker_manager, andworkerclasses and add ducktape tests. Also an off-shoot commit which de-pessimizes an existing code path.Backports Required
Release Notes