Skip to content

ct/l1: one last PR of pre-requesite mechanical changes for leveling#30646

Open
WillemKauf wants to merge 7 commits into
redpanda-data:devfrom
WillemKauf:leveling_next_2805_prelude
Open

ct/l1: one last PR of pre-requesite mechanical changes for leveling#30646
WillemKauf wants to merge 7 commits into
redpanda-data:devfrom
WillemKauf:leveling_next_2805_prelude

Conversation

@WillemKauf
Copy link
Copy Markdown
Contributor

  • Add some required cluster properties
  • Add some new logic to cap the size of levelable_ranges
  • rename compaction related members in the scheduler/worker_manager/worker before we add a bunch of symmetric leveling related ones
  • Add new leveling related metrics to the scheduler_probe and worker_probe
  • Rework leveling info collection using scheduled_ranges to avoid the potential problem of queuing the same leveling range several times
  • prune some dead code in the log_info_collector

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

  • none

@WillemKauf WillemKauf requested a review from andrwng May 29, 2026 02:46
@WillemKauf WillemKauf requested a review from a team as a code owner May 29, 2026 02:46
Copilot AI review requested due to automatic review settings May 29, 2026 02:46
@WillemKauf WillemKauf mentioned this pull request May 29, 2026
7 tasks
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR prepares the cloud-topics L1 maintenance subsystem for upcoming “leveling” work by adding new cluster configs, introducing a size cap for leveling ranges, renaming compaction-specific worker/scheduler members to make room for symmetric leveling equivalents, and adding initial leveling-related metrics and scheduling dedup state.

Changes:

  • Add new cloud-topics leveling cluster properties (interval, concurrency, size thresholds/caps).
  • Cap leveling range sizes in leveling_range_builder and add unit tests for range splitting behavior.
  • Rename compaction-specific scheduler/worker/manager members and add initial leveling metrics + scheduled_ranges-based dedup bookkeeping.

Reviewed changes

Copilot reviewed 29 out of 29 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
src/v/config/configuration.h Declares new leveling-related cluster configuration properties.
src/v/config/configuration.cc Registers new leveling-related cluster configuration properties with defaults/bounds.
src/v/cloud_topics/level_one/metastore/leveling_range_builder.h Adds per-range byte cap logic for leveling range generation.
src/v/cloud_topics/level_one/metastore/BUILD Adds config dependency for the range builder.
src/v/cloud_topics/level_one/metastore/tests/leveling_range_builder_test.cc Adds tests verifying capped splitting and singleton remainder dropping.
src/v/cloud_topics/level_one/metastore/tests/BUILD Adds deps for config/scoped_config used by the new tests.
src/v/cloud_topics/level_one/maintenance/worker.h Renames compaction-specific members and APIs for clarity/consistency.
src/v/cloud_topics/level_one/maintenance/worker.cc Applies corresponding renames and updates condition-variable/semaphore usage.
src/v/cloud_topics/level_one/maintenance/worker_probe.h Adds leveling histogram measurement helper + reclaimed-extents counter API.
src/v/cloud_topics/level_one/maintenance/worker_probe.cc Registers new leveling metrics (duration histogram + reclaimed counter).
src/v/cloud_topics/level_one/maintenance/worker_manager.h Renames queue/worker alert APIs to be explicitly compaction-oriented.
src/v/cloud_topics/level_one/maintenance/worker_manager.cc Applies renames and updates worker invocation wiring.
src/v/cloud_topics/level_one/maintenance/tests/worker_manager_test.cc Updates tests for renamed worker members/APIs.
src/v/cloud_topics/level_one/maintenance/tests/log_info_collector_test.cc Adds tests for scheduled_ranges de-dup/cooldown behavior and prunes dead assertions.
src/v/cloud_topics/level_one/maintenance/scheduler.h Renames scheduling loop/semaphore to compaction-specific naming.
src/v/cloud_topics/level_one/maintenance/scheduler.cc Applies renames and updates the background loop implementation.
src/v/cloud_topics/level_one/maintenance/scheduler_probe.h Adds leveling queue length + completed range counters.
src/v/cloud_topics/level_one/maintenance/scheduler_probe.cc Registers new leveling-related scheduler metrics.
src/v/cloud_topics/level_one/maintenance/meta.h Introduces levelable_range_key and replaces outstanding count with scheduled_ranges map.
src/v/cloud_topics/level_one/maintenance/log_info_collector.h Updates public API docs/signatures for leveling spec collection.
src/v/cloud_topics/level_one/maintenance/log_info_collector.cc Reworks leveling spec building and uses scheduled_ranges for de-duping/cooldown.
src/v/cloud_topics/level_one/maintenance/leveling/tests/leveling_reducer_test.cc Adjusts sink/source construction to pass probe to the sink instead of the source.
src/v/cloud_topics/level_one/maintenance/leveling/leveling_source.h Removes probe dependency from the leveling source.
src/v/cloud_topics/level_one/maintenance/leveling/leveling_source.cc Removes probe wiring from the leveling source implementation.
src/v/cloud_topics/level_one/maintenance/leveling/leveling_sink.h Adds probe dependency and tracks input extent count for reclaimed metric.
src/v/cloud_topics/level_one/maintenance/leveling/leveling_sink.cc Computes reclaimed extents and emits probe metric/logs on successful commit.
src/v/cloud_topics/level_one/maintenance/leveling/BUILD Moves worker_probe dep from leveling_source to leveling_sink.
src/v/cloud_topics/level_one/maintenance/l1_object_sink.h Adds _output_objects accounting for successful metadata-registered outputs.
src/v/cloud_topics/level_one/maintenance/l1_object_sink.cc Increments _output_objects on successful finish; adds early-return on finish failure.

Comment thread src/v/cloud_topics/level_one/metastore/leveling_range_builder.h
Comment thread src/v/cloud_topics/level_one/maintenance/log_info_collector.cc Outdated
Comment on lines 103 to 107
// metastore. Logs are skipped if `leveling.info_and_ts` is still fresh.
// For freshly-sampled logs, per-range `leveling_job`s are pushed into the
// provided `leveling_queue` and `leveling.outstanding_ranges` is bumped
// accordingly. The transient `info.ranges` is cleared after queueing
// while the `collected_at` timestamp is retained so the next tick
// provided `leveling_queue`. The transient `info.ranges` is cleared after
// queueing while the `collected_at` timestamp is retained so the next tick
// respects the sampling interval.
Comment thread src/v/cloud_topics/level_one/maintenance/log_info_collector.cc
Comment thread src/v/cloud_topics/level_one/maintenance/log_info_collector.cc
Comment thread src/v/cloud_topics/level_one/metastore/leveling_range_builder.h
Comment thread src/v/cloud_topics/level_one/maintenance/log_info_collector.cc Outdated
Comment thread src/v/cloud_topics/level_one/maintenance/log_info_collector.h Outdated
Comment thread src/v/cloud_topics/level_one/maintenance/log_info_collector.cc
Comment thread src/v/cloud_topics/level_one/maintenance/log_info_collector.cc
@WillemKauf WillemKauf force-pushed the leveling_next_2805_prelude branch from dd1e219 to 31b371d Compare May 29, 2026 03:05
@WillemKauf
Copy link
Copy Markdown
Contributor Author

Force push to:

  • Address bot comments

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented May 29, 2026

CI test results

test results on build#85110
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) NodePreRestartProbeTest pre_restart_probe_test null integration https://buildkite.com/redpanda/redpanda/builds/85110#019e71c9-24f8-43e7-9dcb-a15e8bd6156f 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=NodePreRestartProbeTest&test_method=pre_restart_probe_test
FLAKY(PASS) WriteCachingFailureInjectionE2ETest test_crash_all {"use_transactions": false} integration https://buildkite.com/redpanda/redpanda/builds/85110#019e71cc-491d-44a5-b59b-8825dff7f971 9/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0943, p0=0.6285, reject_threshold=0.0100. adj_baseline=0.2570, p1=0.2287, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=WriteCachingFailureInjectionE2ETest&test_method=test_crash_all
test results on build#85165
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ControllerForcedReconfiguration_Size5 test_cluster_recovery {"scenario": "Simple"} integration https://buildkite.com/redpanda/redpanda/builds/85165#019e75aa-12e1-4d38-bb9e-d171498d174b 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ControllerForcedReconfiguration_Size5&test_method=test_cluster_recovery
FLAKY(PASS) ShadowLinkingReplicationTests test_auto_prefix_trimming {"source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "cloud", "with_failures": true} integration https://buildkite.com/redpanda/redpanda/builds/85165#019e75aa-12da-4123-9b8b-2f61186317f4 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0013, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_auto_prefix_trimming
FLAKY(PASS) DataTransformsLoggingTest test_logs_volume null integration https://buildkite.com/redpanda/redpanda/builds/85165#019e75aa-12da-4123-9b8b-2f61186317f4 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=DataTransformsLoggingTest&test_method=test_logs_volume


// Cap per-range bytes so each leveling job stays bounded.
if (_range->bytes >= _max_acceptable_range_bytes) {
maybe_commit_range();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that after committing the range which size is _max_acceptable_range_bytes the resulting extent will still be small enough for leveling to pick it up one more time?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no reason ranges won't be re-eligible for leveling in the future, but the factors at play here are cloud_topics_leveling_max_range_bytes (1_GiB) and cloud_topics_reconciliation_max_object_size (80_MiB).

If there are 100 extents picked up in cloud_topics_leveling_max_range_bytes before this range is force committed by the above logic, we will attempt to make as many objects of size cloud_topics_reconciliation_max_object_size as possible (likely with some remainder).

Say the initial range forced rolled by _max_acceptable_range_bytes is 100 extents with offsets [0, 1000], and we produced 12 objects of size target (80_MiB) spanning offsets [0, 800], then one object 64_MiB big spanning offsets [801, 1000]. Then, the next time we attempt to build leveling ranges with the builder, only the extent starting from offset 801 should be eligible for leveling (in the presence of no retention of course).

@WillemKauf WillemKauf force-pushed the leveling_next_2805_prelude branch from 31b371d to b51c893 Compare May 29, 2026 21:04
@WillemKauf
Copy link
Copy Markdown
Contributor Author

Force push to:

  • Rebase to upstream/dev

Adds three new properties used by the per-range leveling scheduler:
* `cloud_topics_leveling_interval_ms` (not yet used)
* `cloud_topics_max_concurrent_leveling_jobs_per_shard` (not yet used)
* `cloud_topics_leveling_min_extent_size_ratio` (replaces previous hardcode)
Without a cap, the `leveling_range_builder` could produce a single range
covering an unboundedly large number of consecutive undersized extents,
which would several limit parallelism of leveling jobs.

Caps the per-range bytes at the new cluster property
`cloud_topics_leveling_max_range_bytes` (default `1_GiB`).
NFCs.

Prepare for the per-range leveling counterparts by renaming the
compaction-specific worker, worker_manager, and scheduler members.
Adds
* `leveling_queue_length`
* `leveling_ranges_completed_total`
Add:
* `leveling_duration_microseconds`
* `leveling_extents_reclaimed_total`
Deprecate `outstanding_ranges` and add a new `scheduled_ranges`
map which is used for determining when/how to queue leveling
range jobs.

This map attempts to solve the problem of potentially scheduling the same
range for leveling multiple times when it is re-returned from the `metastore`.

Leveling ranges for a CTP that are currently queued/inflight
(value == nullopt in the map) or were recently committed (value == some
completion timestamp) are persisted for a period of time. The `log_info_collector`
consults this to avoid re-scheduling the same range replacement while it is still
pending, or within the post-commit cooldown window. A range stays "undersized" in
the metastore until its job is finished and committed. During this window, we
could potentially re-queue the same levelable range every tick of `leveling_interval_ms`.
We evict completed entries after a period of time. Here, the period of time
is computed as a constant factor (`leveling_range_cooldown_intervals = 3`) times
`cloud_topics_leveling_interval_ms`.
These paths would never be taken in production, since the higher
level info collection scheduling already occurs at an interval of
`cloud_topics_leveling_interval_ms` 🤦.
@WillemKauf WillemKauf force-pushed the leveling_next_2805_prelude branch from b51c893 to e3beeff Compare May 29, 2026 21:05
@WillemKauf
Copy link
Copy Markdown
Contributor Author

Force push to:

  • Use offset_interval_map in reworking of log info collection for leveling

// First, evict completed entries whose cooldown has elapsed: this keeps
// the map bounded and lets those ranges become schedulable again. The
// map has no per-range erase, so rebuild it from the surviving ranges.
const auto cooldown_ms = config::shard_local_cfg()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is likely much too long of a grace period as is (default is 15minutes), will likely update after other review comments come in.

: _min_acceptable_extent_bytes(min_acceptable_extent_bytes) {}
: _min_acceptable_extent_bytes(min_acceptable_extent_bytes)
, _max_acceptable_range_bytes(
config::shard_local_cfg().cloud_topics_leveling_max_range_bytes()) {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why the min is an input while the max is a config. Would it make sense to have them both be inputs and have callers evaluate the config?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no great reason- we plumbed the min through the entire RPC instead of just probing the shard local cfg here. happy to do the same with the max value.

i always have mixed feelings on whether being lazy and resorting to singleton use of the cfg at a deep layer is a good enough solution, or if "doing the right thing" of plumbing this value through all the layers is going to make for better code in the future.

Comment on lines +75 to +76
// re-scheduling any range that *overlaps* one still pending or within the
// post-commit cooldown window. A range stays "undersized" in the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite following why "the commit is not immediately visible to the next sample." If the completion timestamp represents a time just after completing the replace_objects call, I'm wondering if we still need a cooldown window, vs ignoring all leveling info that was collected before the completion time. What's the race that the cooldown is trying to prevent?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoring all leveling info that came before which timestamp? these ranges can all be very disjoint in the log/queue, it wouldn't make sense to not schedule range [1500,2000] for leveling just because range [0,500] just completed leveling, right? the cooldown/grace period before clearing entries is just to try to deal with the potential concurrency of

F1. info collection RPC to metastore indicates [0,100] should be leveled
F2. node is actually in the process of leveling [0,100] already, and just finished calling replace_objects()
F2. Metastore only now knows about leveling update for range [0,100]
F2. If we didn't have some sort of cooldown, we'd probably clear the scheduled_ranges entry for [0,100] at this point
F1. checks scheduled_ranges, requeues [0,100] for leveling based on stale metastore metadata, leveling rewrites that data in the log again for 0 gain

unless i'm missing something obvious that you are commenting on, please let me know if I misunderstood anything.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow still. Let's say that the policy were that instead of having a cooldown, when we collect new info from the metastore, remove completed ranges that were completed before the new collection timestamp (I think this might be the equivalent to cooldown = 0). I think for your example this looks like:

F1. info collection RPC to metastore indicates [0,100] should be leveled. This is recorded as timestamp T1
F2. node is actually in the process of leveling [0,100] already, and just finished calling replace_objects()
F2. Metastore only now knows about leveling update for range [0,100]. Completion timestamp is recorded as T2
F2. we don't have a cooldown, but it's also not the role of the worker to clear scheduled_ranges, so we don't clear anything
F1. checks scheduled_ranges, doesn't requeue [0,100]
...
F1. eventually info collection RPC to metastore indicates [0, 100] should be leveled? Or shouldn't, it doesn't matter for this example. This is recorded as timestamp T3
F1. scheduled_ranges that completed before T3 are removed
F1. scheduling proceeds with knowledge that if we're going to schedule something that was recently leveled, at least the decision is happening with knowledge taken at T3 > T2, and therefore reflects the leveling that just happened

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants