Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 99 additions & 15 deletions src/v/cloud_storage/anomalies_detector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,29 @@ ss::future<anomalies_detector::result> anomalies_detector::run(
_result = result{};
_received_quota = quota_total;

vlog(_logger.debug, "Downloading partition manifest ...");
vlog(
_logger.debug,
"Downloading partition manifest ... scrub_from={} quota.max_ops={} "
"quota.max_segs={} force_seg_checks={}",
scrub_from,
quota_total.max_num_operations,
quota_total.max_num_segments,
force_segment_api_checks);

partition_manifest_downloader dl(
_bucket, _remote_path_provider, _ntp, _initial_rev, _remote);
partition_manifest manifest(_ntp, _initial_rev);
auto dl_result = co_await dl.download_manifest(rtc_node, &manifest);
++_result.ops;
if (dl_result.has_error()) {
vlog(_logger.debug, "Failed downloading partition manifest ...");
_result.status = scrub_status::failed;
vlog(
_logger.debug,
"Failed downloading partition manifest, exiting scrub: error={} "
"ops={} segs={}",
dl_result.error(),
_result.ops,
_result.segments_visited);
Comment on lines +62 to +68
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - added the error_outcome to the failed-manifest log line via dl_result.error(). cloud_storage::error_outcome already has an fmt::formatter (defined in src/v/cloud_storage/types.h), so it logs cleanly.

co_return _result;
}
if (
Expand All @@ -62,6 +75,15 @@ ss::future<anomalies_detector::result> anomalies_detector::run(
co_return _result;
}

vlog(
_logger.debug,
"Partition manifest loaded: start_offset={} last_offset={} "
"num_segments={} num_spillovers={}",
manifest.get_start_offset(),
manifest.get_last_offset(),
manifest.size(),
manifest.get_spillover_map().size());

std::deque<ss::sstring> spill_manifest_paths;
const auto& spillovers = manifest.get_spillover_map();
for (auto iter = spillovers.begin(), end_it = spillovers.end();
Expand Down Expand Up @@ -169,6 +191,16 @@ ss::future<anomalies_detector::result> anomalies_detector::run(
_result.status = scrub_status::partial;
}

vlog(
_logger.debug,
"Scrub run complete: status={} last_scrubbed_offset={} ops={} segs={} "
"scrub_from_was_set={}",
_result.status,
_result.last_scrubbed_offset,
_result.ops,
_result.segments_visited,
scrub_from.has_value());

co_return _result;
}

Expand Down Expand Up @@ -203,8 +235,12 @@ anomalies_detector::check_manifest(
const existence_query_context& query_ctx) {
vlog(
_logger.debug,
"Checking manifest {}",
manifest.get_manifest_path(_remote_path_provider));
"Checking manifest {} range=[{},{}] num_segs={} scrub_from={}",
manifest.get_manifest_path(_remote_path_provider),
manifest.get_start_offset(),
manifest.get_last_offset(),
manifest.size(),
scrub_from);
if (
scrub_from
&& (manifest.get_start_offset() > *scrub_from || manifest.get_last_offset() == scrub_from)) {
Expand All @@ -217,7 +253,6 @@ anomalies_detector::check_manifest(
manifest.get_last_offset(),
manifest.get_manifest_path(_remote_path_provider),
scrub_from);

co_return stop_detector::no;
}

Expand All @@ -237,29 +272,53 @@ anomalies_detector::check_manifest(
vlog(
_logger.debug,
"Manifest with offset range [{}, {}] num segments {} ({}) skipped, "
"not enough object quota to visit any segment. Skipping ...",
"not enough object quota to visit any segment "
"(visitable_tail={}, get_visitable_segments={}). Skipping ...",
manifest.get_start_offset(),
manifest.get_last_offset(),
manifest.size(),
manifest.get_manifest_path(_remote_path_provider));
manifest.get_manifest_path(_remote_path_provider),
visitable_tail_segments,
get_visitable_segments());
co_return stop_detector::no;
}
std::optional<segment_meta> previous_seg_meta;
auto manifest_end = manifest.end();
bool seek_succeeded = false;
if (scrub_from && manifest.get_last_offset() > scrub_from) {
if (
auto iter = manifest.segment_containing(*scrub_from);
iter != manifest_end) {
previous_seg_meta = *iter;
seg_iter = std::move(++iter);
seek_succeeded = true;
}
}

vlog(
_logger.debug,
"Starting segment loop start_index={} visitable_tail={} "
"scrub_from={} seek_succeeded={} starting_at_end={}",
start_index,
visitable_tail_segments,
scrub_from,
seek_succeeded,
seg_iter == manifest_end);

size_t visited_in_this_call = 0;
for (; seg_iter != manifest_end; ++seg_iter) {
if (should_stop()) {
_result.status = scrub_status::partial;
vlog(
_logger.debug,
"Stopping mid-segment-loop: visited_in_call={} "
"last_scrubbed_offset={} total_segs_visited={} ops={}",
visited_in_this_call,
_result.last_scrubbed_offset,
_result.segments_visited,
_result.ops);
co_return stop_detector::yes;
}
++visited_in_this_call;

const auto& seg_meta = *seg_iter;
const auto segment_path = remote_segment_path{
Expand All @@ -280,8 +339,13 @@ anomalies_detector::check_manifest(

vlog(
_logger.debug,
"Finished checking manifest {}",
manifest.get_manifest_path(_remote_path_provider));
"Finished checking manifest {} visited_in_call={} "
"last_scrubbed_offset={} total_segs_visited={} ops={}",
manifest.get_manifest_path(_remote_path_provider),
visited_in_this_call,
_result.last_scrubbed_offset,
_result.segments_visited,
_result.ops);
co_return stop_detector::no;
}

Expand Down Expand Up @@ -317,17 +381,37 @@ size_t anomalies_detector::get_visitable_segments() const {

bool anomalies_detector::should_stop() const {
if (_as.abort_requested()) {
vlog(
_logger.trace,
"should_stop=true reason=abort_requested "
"ops={} segs={} last_scrubbed_offset={}",
_result.ops,
_result.segments_visited,
_result.last_scrubbed_offset);
return true;
}

if (
archival::run_quota_t{_result.ops} > _received_quota.max_num_operations
|| segment_depth_t{_result.segments_visited}
>= _received_quota.max_num_segments) {
const bool ops_over = archival::run_quota_t{_result.ops}
> _received_quota.max_num_operations;
const bool segs_over = segment_depth_t{_result.segments_visited}
>= _received_quota.max_num_segments;
if (ops_over || segs_over) {
// Allow the scrubbing of one segment even if that means
// going above the quota in order to ensure some forward
// progress in all quota cases.
return _result.last_scrubbed_offset.has_value();
const bool stop = _result.last_scrubbed_offset.has_value();
vlog(
_logger.trace,
"should_stop={} reason={} ops={}/{} segs={}/{} "
"last_scrubbed_offset={}",
stop,
ops_over ? "ops_quota" : "segs_quota",
_result.ops,
_received_quota.max_num_operations,
_result.segments_visited,
_received_quota.max_num_segments,
_result.last_scrubbed_offset);
Comment thread
travisdowns marked this conversation as resolved.
return stop;
}

return false;
Expand Down
9 changes: 8 additions & 1 deletion tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -6328,7 +6328,14 @@ def wait_for_internal_scrub(
# transfer we end up waiting for the next full scrub cycle,
# see CORE-14424
"cloud_storage_partial_scrub_interval_ms": 100,
"cloud_storage_full_scrub_interval_ms": 10 * 1000,
# Effectively disable periodic re-scrubs for the lifetime
# of this helper: once a partition completes its full
# scrub it must stay "done" until the wait succeeds,
# otherwise it gets re-enqueued on every housekeeping
# cycle and starves the laggards of the shared op quota
# (CORE-15146). 24h is well past any plausible test
# runtime, so the value is effectively infinite here.
"cloud_storage_full_scrub_interval_ms": 24 * 60 * 60 * 1000,
"cloud_storage_scrubbing_interval_jitter_ms": 100,
"cloud_storage_background_jobs_quota": 5000,
"cloud_storage_housekeeping_interval_ms": 100,
Expand Down
Loading