Skip to content

ct/l1: leveling :shipit: #30647

Open
WillemKauf wants to merge 12 commits into
redpanda-data:devfrom
WillemKauf:leveling_next_2805_final
Open

ct/l1: leveling :shipit: #30647
WillemKauf wants to merge 12 commits into
redpanda-data:devfrom
WillemKauf:leveling_next_2805_final

Conversation

@WillemKauf
Copy link
Copy Markdown
Contributor

First 7 mechanical commits pulled out into #30646.

The next 5 commits wire up leveling into the scheduler, worker_manager, and worker classes and add ducktape tests. Also an off-shoot commit which de-pessimizes an existing code path.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

  • none

Adds three new properties used by the per-range leveling scheduler:
* `cloud_topics_leveling_interval_ms` (not yet used)
* `cloud_topics_max_concurrent_leveling_jobs_per_shard` (not yet used)
* `cloud_topics_leveling_min_extent_size_ratio` (replaces previous hardcode)
Without a cap, the `leveling_range_builder` could produce a single range
covering an unboundedly large number of consecutive undersized extents,
which would several limit parallelism of leveling jobs.

Caps the per-range bytes at the new cluster property
`cloud_topics_leveling_max_range_bytes` (default `1_GiB`).
NFCs.

Prepare for the per-range leveling counterparts by renaming the
compaction-specific worker, worker_manager, and scheduler members.
Adds
* `leveling_queue_length`
* `leveling_ranges_completed_total`
Add:
* `leveling_duration_microseconds`
* `leveling_extents_reclaimed_total`
Deprecate `outstanding_ranges` and add a new `scheduled_ranges`
map which is used for determining when/how to queue leveling
range jobs.

This map attempts to solve the problem of potentially scheduling the same
range for leveling multiple times when it is re-returned from the `metastore`.

Leveling ranges for a CTP that are currently queued/inflight
(value == nullopt in the map) or were recently committed (value == some
completion timestamp) are persisted for a period of time. The `log_info_collector`
consults this to avoid re-scheduling the same range replacement while it is still
pending, or within the post-commit cooldown window. A range stays "undersized" in
the metastore until its job is finished and committed. During this window, we
could potentially re-queue the same levelable range every tick of `leveling_interval_ms`.
We evict completed entries after a period of time. Here, the period of time
is computed as a constant factor (`leveling_range_cooldown_intervals = 3`) times
`cloud_topics_leveling_interval_ms`.
These paths would never be taken in production, since the higher
level info collection scheduling already occurs at an interval of
`cloud_topics_leveling_interval_ms` 🤦.
Copilot AI review requested due to automatic review settings May 29, 2026 02:49
@WillemKauf WillemKauf requested a review from a team as a code owner May 29, 2026 02:49
@WillemKauf WillemKauf force-pushed the leveling_next_2805_final branch from ba6fa20 to 9fcce78 Compare May 29, 2026 02:51
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR wires cloud-topics L1 leveling into the existing maintenance subsystem by adding scheduler/worker plumbing, cluster tunables, metrics, and both unit + ducktape coverage. It also adds a range-size cap so long runs of undersized extents are split into bounded leveling jobs.

Changes:

  • Add new cluster configuration properties to control leveling cadence, concurrency, undersized threshold, and max bytes per leveling range.
  • Extend the scheduler/worker_manager/worker to schedule and execute leveling jobs concurrently with compaction, including new scheduler/worker metrics.
  • Add/extend unit tests and ducktape tests to validate leveling scheduling semantics, range splitting, and reclaimed-extents accounting.

Reviewed changes

Copilot reviewed 35 out of 35 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/rptest/tests/cloud_topics/leveling_stress_test.py New ducktape stress tests for fragmentation + write-pressure leveling behavior and record integrity.
tests/rptest/tests/cloud_topics/e2e_test.py Add an end-to-end leveling test based on scheduler metrics (queue drain + completions).
src/v/config/configuration.h Declare new leveling-related cluster properties.
src/v/config/configuration.cc Define defaults/descriptions/bounds for leveling-related cluster properties.
src/v/cloud_topics/level_one/metastore/tests/leveling_range_builder_test.cc Add tests ensuring long undersized runs are split by cloud_topics_leveling_max_range_bytes and singleton remainders are dropped.
src/v/cloud_topics/level_one/metastore/tests/BUILD Add config/scoped_config deps for new range-builder tests.
src/v/cloud_topics/level_one/metastore/leveling_range_builder.h Add max-range-bytes cap sourced from cluster config to split long leveling ranges.
src/v/cloud_topics/level_one/metastore/BUILD Add config dependency for range builder.
src/v/cloud_topics/level_one/maintenance/worker.h Extend worker interface/state to run leveling and compaction on separate fibers with configurable leveling concurrency.
src/v/cloud_topics/level_one/maintenance/worker.cc Implement leveling fiber loop, leveling job execution path, and manager integration.
src/v/cloud_topics/level_one/maintenance/worker_probe.h Add leveling duration measurement + reclaimed-extents counter API.
src/v/cloud_topics/level_one/maintenance/worker_probe.cc Export leveling duration histogram and reclaimed-extents counter metrics.
src/v/cloud_topics/level_one/maintenance/worker_manager.h Add leveling queue support and APIs to acquire/complete/stop leveling work and alert specific fibers.
src/v/cloud_topics/level_one/maintenance/worker_manager.cc Implement leveling queue acquisition/completion and per-CTP stop requests.
src/v/cloud_topics/level_one/maintenance/tests/worker_manager_test.cc Update tests for new worker futures and add coverage for skipping stale compaction queue entries.
src/v/cloud_topics/level_one/maintenance/tests/log_info_collector_test.cc Add tests for scheduled-range dedup, inflight suppression, cooldown, and rescheduling behavior.
src/v/cloud_topics/level_one/maintenance/tests/log_collector_test.cc Update expectation: disabling compaction should not unmanage a cloud topic (leveling still applies).
src/v/cloud_topics/level_one/maintenance/tests/compaction_e2e_test.cc Update management expectations: all cloud topics are managed regardless of cleanup policy.
src/v/cloud_topics/level_one/maintenance/scheduler.h Split scheduler into distinct compaction + leveling loops and add leveling queue.
src/v/cloud_topics/level_one/maintenance/scheduler.cc Implement leveling scheduling loop, worker alerts per fiber, and unify management semantics for cloud topics.
src/v/cloud_topics/level_one/maintenance/scheduler_probe.h Add leveling queue length + completed ranges counters.
src/v/cloud_topics/level_one/maintenance/scheduler_probe.cc Export leveling queue gauge and completed ranges counter metrics.
src/v/cloud_topics/level_one/maintenance/meta.h Add levelable_range_key and scheduled_ranges for dedup/cooldown tracking of leveling work.
src/v/cloud_topics/level_one/maintenance/log_info_collector.h Update leveling collection documentation and spec-building signature.
src/v/cloud_topics/level_one/maintenance/log_info_collector.cc Implement scheduled-range dedup/cooldown eviction and use config-driven undersized ratio.
src/v/cloud_topics/level_one/maintenance/log_collector.cc Change management policy to manage all cloud topics (compaction eligibility gated downstream).
src/v/cloud_topics/level_one/maintenance/leveling/tests/leveling_reducer_test.cc Update tests for leveling_sink signature (probe moved to sink).
src/v/cloud_topics/level_one/maintenance/leveling/leveling_source.h Remove probe dependency and allow sink to inspect leveling ranges (friend).
src/v/cloud_topics/level_one/maintenance/leveling/leveling_source.cc Update constructor to match signature change (no probe).
src/v/cloud_topics/level_one/maintenance/leveling/leveling_sink.h Add probe reference + input extent accounting for reclaimed-extents metric.
src/v/cloud_topics/level_one/maintenance/leveling/leveling_sink.cc Compute input extents, record reclaimed extents on successful commit, and improve logging.
src/v/cloud_topics/level_one/maintenance/leveling/BUILD Adjust deps: probe moved from source to sink.
src/v/cloud_topics/level_one/maintenance/l1_object_sink.h Track count of output objects registered with the metadata builder.
src/v/cloud_topics/level_one/maintenance/l1_object_sink.cc Increment output-object count on successful object registration.
src/v/cloud_topics/level_one/maintenance/BUILD Wire leveling source/sink deps and new utility deps (adjustable_semaphore, absl hash).

Comment on lines +578 to +585
if (key.tidp == tidp) {
handle->state = compaction_job_state::hard_stop;
vlog(
compaction_log.debug,
"Terminating leveling range for CTP {} (base {})",
tidp,
key.base_offset);
}
Comment on lines +92 to +94
// Soft-stops every inflight leveling range for `tidp` across all worker
// shards that have one.
void request_stop_leveling(log_compaction_meta_ptr);
Comment on lines +422 to +423
// Before scheduling levelable ranges recieved from the `metastore`,
// query the log's existing scheduled ranges.
@WillemKauf WillemKauf force-pushed the leveling_next_2805_final branch 2 times, most recently from 38726dc to c68a13a Compare May 29, 2026 03:12
Leveling is L1 maintenance that applies regardless of cleanup policy-
so the log collector now needs to collect every cloud topic.
Wire the leveling path end to end, mirroring compaction but scheduling
one job per `levelable_range` rather than one per CTP.

Add a dedicated `scheduler::leveling_scheduling_loop()` with its own
`cloud_topics_leveling_interval_ms` binding, signal semaphore, and
`_leveling_queue`. At a lower level, in the `worker_manager`,
we mirror the compaction API with `try_acquire_leveling_work()`,
`complete_leveling_work()`, `alert_leveling_workers()`, and
`request_stop_leveling()` for preemption while tracking inflight ranges
per shard. Finally, in `worker`, an independent leveling fiber dispatches
up to `cloud_topics_max_concurrent_leveling_jobs_per_shard` range rewrites
concurrently. Each inflight job carries its own soft/hard-stop handle
(keyed by tidp and base offset) so a single job, or all jobs for a tidp,
can be cancelled when a CTP is unmanaged.
Returning on the first stale entry of the `_compaction_queue` is
pessimistic - we should continue to pop entries in case there is
work to be had behind stale entries.
@WillemKauf WillemKauf force-pushed the leveling_next_2805_final branch from c68a13a to fdd2f53 Compare May 29, 2026 04:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants