ct/l1: one last PR of pre-requesite mechanical changes for leveling#30646
ct/l1: one last PR of pre-requesite mechanical changes for leveling#30646WillemKauf wants to merge 7 commits into
ct/l1: one last PR of pre-requesite mechanical changes for leveling#30646Conversation
There was a problem hiding this comment.
Pull request overview
This PR prepares the cloud-topics L1 maintenance subsystem for upcoming “leveling” work by adding new cluster configs, introducing a size cap for leveling ranges, renaming compaction-specific worker/scheduler members to make room for symmetric leveling equivalents, and adding initial leveling-related metrics and scheduling dedup state.
Changes:
- Add new cloud-topics leveling cluster properties (interval, concurrency, size thresholds/caps).
- Cap leveling range sizes in
leveling_range_builderand add unit tests for range splitting behavior. - Rename compaction-specific scheduler/worker/manager members and add initial leveling metrics +
scheduled_ranges-based dedup bookkeeping.
Reviewed changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/config/configuration.h | Declares new leveling-related cluster configuration properties. |
| src/v/config/configuration.cc | Registers new leveling-related cluster configuration properties with defaults/bounds. |
| src/v/cloud_topics/level_one/metastore/leveling_range_builder.h | Adds per-range byte cap logic for leveling range generation. |
| src/v/cloud_topics/level_one/metastore/BUILD | Adds config dependency for the range builder. |
| src/v/cloud_topics/level_one/metastore/tests/leveling_range_builder_test.cc | Adds tests verifying capped splitting and singleton remainder dropping. |
| src/v/cloud_topics/level_one/metastore/tests/BUILD | Adds deps for config/scoped_config used by the new tests. |
| src/v/cloud_topics/level_one/maintenance/worker.h | Renames compaction-specific members and APIs for clarity/consistency. |
| src/v/cloud_topics/level_one/maintenance/worker.cc | Applies corresponding renames and updates condition-variable/semaphore usage. |
| src/v/cloud_topics/level_one/maintenance/worker_probe.h | Adds leveling histogram measurement helper + reclaimed-extents counter API. |
| src/v/cloud_topics/level_one/maintenance/worker_probe.cc | Registers new leveling metrics (duration histogram + reclaimed counter). |
| src/v/cloud_topics/level_one/maintenance/worker_manager.h | Renames queue/worker alert APIs to be explicitly compaction-oriented. |
| src/v/cloud_topics/level_one/maintenance/worker_manager.cc | Applies renames and updates worker invocation wiring. |
| src/v/cloud_topics/level_one/maintenance/tests/worker_manager_test.cc | Updates tests for renamed worker members/APIs. |
| src/v/cloud_topics/level_one/maintenance/tests/log_info_collector_test.cc | Adds tests for scheduled_ranges de-dup/cooldown behavior and prunes dead assertions. |
| src/v/cloud_topics/level_one/maintenance/scheduler.h | Renames scheduling loop/semaphore to compaction-specific naming. |
| src/v/cloud_topics/level_one/maintenance/scheduler.cc | Applies renames and updates the background loop implementation. |
| src/v/cloud_topics/level_one/maintenance/scheduler_probe.h | Adds leveling queue length + completed range counters. |
| src/v/cloud_topics/level_one/maintenance/scheduler_probe.cc | Registers new leveling-related scheduler metrics. |
| src/v/cloud_topics/level_one/maintenance/meta.h | Introduces levelable_range_key and replaces outstanding count with scheduled_ranges map. |
| src/v/cloud_topics/level_one/maintenance/log_info_collector.h | Updates public API docs/signatures for leveling spec collection. |
| src/v/cloud_topics/level_one/maintenance/log_info_collector.cc | Reworks leveling spec building and uses scheduled_ranges for de-duping/cooldown. |
| src/v/cloud_topics/level_one/maintenance/leveling/tests/leveling_reducer_test.cc | Adjusts sink/source construction to pass probe to the sink instead of the source. |
| src/v/cloud_topics/level_one/maintenance/leveling/leveling_source.h | Removes probe dependency from the leveling source. |
| src/v/cloud_topics/level_one/maintenance/leveling/leveling_source.cc | Removes probe wiring from the leveling source implementation. |
| src/v/cloud_topics/level_one/maintenance/leveling/leveling_sink.h | Adds probe dependency and tracks input extent count for reclaimed metric. |
| src/v/cloud_topics/level_one/maintenance/leveling/leveling_sink.cc | Computes reclaimed extents and emits probe metric/logs on successful commit. |
| src/v/cloud_topics/level_one/maintenance/leveling/BUILD | Moves worker_probe dep from leveling_source to leveling_sink. |
| src/v/cloud_topics/level_one/maintenance/l1_object_sink.h | Adds _output_objects accounting for successful metadata-registered outputs. |
| src/v/cloud_topics/level_one/maintenance/l1_object_sink.cc | Increments _output_objects on successful finish; adds early-return on finish failure. |
| // metastore. Logs are skipped if `leveling.info_and_ts` is still fresh. | ||
| // For freshly-sampled logs, per-range `leveling_job`s are pushed into the | ||
| // provided `leveling_queue` and `leveling.outstanding_ranges` is bumped | ||
| // accordingly. The transient `info.ranges` is cleared after queueing | ||
| // while the `collected_at` timestamp is retained so the next tick | ||
| // provided `leveling_queue`. The transient `info.ranges` is cleared after | ||
| // queueing while the `collected_at` timestamp is retained so the next tick | ||
| // respects the sampling interval. |
dd1e219 to
31b371d
Compare
|
Force push to:
|
CI test resultstest results on build#85110test results on build#85165
|
|
|
||
| // Cap per-range bytes so each leveling job stays bounded. | ||
| if (_range->bytes >= _max_acceptable_range_bytes) { | ||
| maybe_commit_range(); |
There was a problem hiding this comment.
is it possible that after committing the range which size is _max_acceptable_range_bytes the resulting extent will still be small enough for leveling to pick it up one more time?
There was a problem hiding this comment.
There's no reason ranges won't be re-eligible for leveling in the future, but the factors at play here are cloud_topics_leveling_max_range_bytes (1_GiB) and cloud_topics_reconciliation_max_object_size (80_MiB).
If there are 100 extents picked up in cloud_topics_leveling_max_range_bytes before this range is force committed by the above logic, we will attempt to make as many objects of size cloud_topics_reconciliation_max_object_size as possible (likely with some remainder).
Say the initial range forced rolled by _max_acceptable_range_bytes is 100 extents with offsets [0, 1000], and we produced 12 objects of size target (80_MiB) spanning offsets [0, 800], then one object 64_MiB big spanning offsets [801, 1000]. Then, the next time we attempt to build leveling ranges with the builder, only the extent starting from offset 801 should be eligible for leveling (in the presence of no retention of course).
31b371d to
b51c893
Compare
|
Force push to:
|
Adds three new properties used by the per-range leveling scheduler: * `cloud_topics_leveling_interval_ms` (not yet used) * `cloud_topics_max_concurrent_leveling_jobs_per_shard` (not yet used) * `cloud_topics_leveling_min_extent_size_ratio` (replaces previous hardcode)
Without a cap, the `leveling_range_builder` could produce a single range covering an unboundedly large number of consecutive undersized extents, which would several limit parallelism of leveling jobs. Caps the per-range bytes at the new cluster property `cloud_topics_leveling_max_range_bytes` (default `1_GiB`).
NFCs. Prepare for the per-range leveling counterparts by renaming the compaction-specific worker, worker_manager, and scheduler members.
Adds * `leveling_queue_length` * `leveling_ranges_completed_total`
Add: * `leveling_duration_microseconds` * `leveling_extents_reclaimed_total`
Deprecate `outstanding_ranges` and add a new `scheduled_ranges` map which is used for determining when/how to queue leveling range jobs. This map attempts to solve the problem of potentially scheduling the same range for leveling multiple times when it is re-returned from the `metastore`. Leveling ranges for a CTP that are currently queued/inflight (value == nullopt in the map) or were recently committed (value == some completion timestamp) are persisted for a period of time. The `log_info_collector` consults this to avoid re-scheduling the same range replacement while it is still pending, or within the post-commit cooldown window. A range stays "undersized" in the metastore until its job is finished and committed. During this window, we could potentially re-queue the same levelable range every tick of `leveling_interval_ms`. We evict completed entries after a period of time. Here, the period of time is computed as a constant factor (`leveling_range_cooldown_intervals = 3`) times `cloud_topics_leveling_interval_ms`.
These paths would never be taken in production, since the higher level info collection scheduling already occurs at an interval of `cloud_topics_leveling_interval_ms` 🤦.
b51c893 to
e3beeff
Compare
|
Force push to:
|
| // First, evict completed entries whose cooldown has elapsed: this keeps | ||
| // the map bounded and lets those ranges become schedulable again. The | ||
| // map has no per-range erase, so rebuild it from the surviving ranges. | ||
| const auto cooldown_ms = config::shard_local_cfg() |
There was a problem hiding this comment.
This is likely much too long of a grace period as is (default is 15minutes), will likely update after other review comments come in.
| : _min_acceptable_extent_bytes(min_acceptable_extent_bytes) {} | ||
| : _min_acceptable_extent_bytes(min_acceptable_extent_bytes) | ||
| , _max_acceptable_range_bytes( | ||
| config::shard_local_cfg().cloud_topics_leveling_max_range_bytes()) {} |
There was a problem hiding this comment.
Curious why the min is an input while the max is a config. Would it make sense to have them both be inputs and have callers evaluate the config?
There was a problem hiding this comment.
no great reason- we plumbed the min through the entire RPC instead of just probing the shard local cfg here. happy to do the same with the max value.
i always have mixed feelings on whether being lazy and resorting to singleton use of the cfg at a deep layer is a good enough solution, or if "doing the right thing" of plumbing this value through all the layers is going to make for better code in the future.
| // re-scheduling any range that *overlaps* one still pending or within the | ||
| // post-commit cooldown window. A range stays "undersized" in the |
There was a problem hiding this comment.
I'm not quite following why "the commit is not immediately visible to the next sample." If the completion timestamp represents a time just after completing the replace_objects call, I'm wondering if we still need a cooldown window, vs ignoring all leveling info that was collected before the completion time. What's the race that the cooldown is trying to prevent?
There was a problem hiding this comment.
ignoring all leveling info that came before which timestamp? these ranges can all be very disjoint in the log/queue, it wouldn't make sense to not schedule range [1500,2000] for leveling just because range [0,500] just completed leveling, right? the cooldown/grace period before clearing entries is just to try to deal with the potential concurrency of
F1. info collection RPC to metastore indicates [0,100] should be leveled
F2. node is actually in the process of leveling [0,100] already, and just finished calling replace_objects()
F2. Metastore only now knows about leveling update for range [0,100]
F2. If we didn't have some sort of cooldown, we'd probably clear the scheduled_ranges entry for [0,100] at this point
F1. checks scheduled_ranges, requeues [0,100] for leveling based on stale metastore metadata, leveling rewrites that data in the log again for 0 gain
unless i'm missing something obvious that you are commenting on, please let me know if I misunderstood anything.
There was a problem hiding this comment.
I'm not sure I follow still. Let's say that the policy were that instead of having a cooldown, when we collect new info from the metastore, remove completed ranges that were completed before the new collection timestamp (I think this might be the equivalent to cooldown = 0). I think for your example this looks like:
F1. info collection RPC to metastore indicates [0,100] should be leveled. This is recorded as timestamp T1
F2. node is actually in the process of leveling [0,100] already, and just finished calling replace_objects()
F2. Metastore only now knows about leveling update for range [0,100]. Completion timestamp is recorded as T2
F2. we don't have a cooldown, but it's also not the role of the worker to clear scheduled_ranges, so we don't clear anything
F1. checks scheduled_ranges, doesn't requeue [0,100]
...
F1. eventually info collection RPC to metastore indicates [0, 100] should be leveled? Or shouldn't, it doesn't matter for this example. This is recorded as timestamp T3
F1. scheduled_ranges that completed before T3 are removed
F1. scheduling proceeds with knowledge that if we're going to schedule something that was recently leveled, at least the decision is happening with knowledge taken at T3 > T2, and therefore reflects the leveling that just happened
levelable_rangescompactionrelated members in thescheduler/worker_manager/workerbefore we add a bunch of symmetric leveling related onesscheduler_probeandworker_probescheduled_rangesto avoid the potential problem of queuing the same leveling range several timeslog_info_collectorBackports Required
Release Notes