From cadce9fa7bd0912382744db7d2fe1b9effc68bd5 Mon Sep 17 00:00:00 2001 From: Yixuan Wang Date: Thu, 30 Apr 2026 17:45:19 +0800 Subject: [PATCH] [fix](be) Poll packed file async close without blocking (#62938) fix: https://github.com/apache/doris/pull/57770 This change adds a non-blocking try_finish_close() hook to FileWriter and implements it for S3FileWriter. PackedFileManager now polls the async close result instead of calling close(true) repeatedly while packed files are uploading. The change avoids re-submitting async close work and lets packed file upload state transition only after the previous async close has actually completed. It also propagates async close failures back into the packed file state so failed uploads are recorded correctly. A ut test was added to verify that PackedFileManager polls async close without blocking or issuing an extra close call, and that async close failures are handled as packed file upload failures. --- be/src/io/fs/file_writer.h | 10 ++ be/src/io/fs/packed_file_manager.cpp | 14 +-- be/src/io/fs/s3_file_writer.cpp | 91 ++++++++++--------- be/src/io/fs/s3_file_writer.h | 2 + be/src/olap/rowset/beta_rowset_writer.cpp | 12 +++ be/src/olap/rowset/segment_creator.cpp | 7 ++ be/test/io/fs/packed_file_manager_test.cpp | 72 +++++++++++++-- .../test_packed_file_async_close_error.groovy | 78 ++++++++++++++++ 8 files changed, 222 insertions(+), 64 deletions(-) create mode 100644 regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 0cda2b519c4c47..de4fc6f577a17c 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -72,6 +72,16 @@ class FileWriter { // If there is no data appended, an empty file will be persisted. virtual Status close(bool non_block = false) = 0; + // Non-blocking probe for a previous close(true). + // OK means close finished successfully. NeedSendAgain means close is still running. + // Other errors mean close finished with error or the writer does not support this API. + // NOTE: This method consumes the async close result when it is ready. The caller must + // use it as the only completion path for that async close; mixing it with close(false) + // or another try_finish_close consumer is not supported. + virtual Status try_finish_close() { + return Status::NotSupported("try_finish_close is not supported"); + } + Status append(const Slice& data) { return appendv(&data, 1); } virtual Status appendv(const Slice* data, size_t data_cnt) = 0; diff --git a/be/src/io/fs/packed_file_manager.cpp b/be/src/io/fs/packed_file_manager.cpp index 6bff9eff6f3d9b..654f2d933fde0c 100644 --- a/be/src/io/fs/packed_file_manager.cpp +++ b/be/src/io/fs/packed_file_manager.cpp @@ -851,11 +851,6 @@ void PackedFileManager::process_uploading_packed_files() { Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path, packed_file->writer.get()); - if (upload_status.is()) { - record_ready_to_upload(packed_file); - handle_success(packed_file); - continue; - } if (!upload_status.ok()) { handle_failure(packed_file, upload_status); continue; @@ -873,16 +868,13 @@ void PackedFileManager::process_uploading_packed_files() { continue; } - if (packed_file->writer->state() != FileWriter::State::CLOSED) { + Status status = packed_file->writer->try_finish_close(); + if (status.is()) { continue; } - Status status = packed_file->writer->close(true); - if (status.is()) { - handle_success(packed_file); - continue; - } if (status.ok()) { + handle_success(packed_file); continue; } diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index a1b72b8dc98c5d..eec1e4c3a6073b 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -41,6 +41,7 @@ #include "io/fs/s3_file_system.h" #include "io/fs/s3_obj_storage_client.h" #include "runtime/exec_env.h" +#include "util/debug_points.h" #include "util/s3_util.h" #include "util/stopwatch.hpp" @@ -126,48 +127,10 @@ void S3FileWriter::_wait_until_finish(std::string_view task_name) { } Status S3FileWriter::close(bool non_block) { - auto record_close_latency = [this]() { - if (_close_latency_recorded || !_first_append_timestamp.has_value()) { - return; - } - auto now = std::chrono::steady_clock::now(); - auto latency_ms = std::chrono::duration_cast( - now - *_first_append_timestamp) - .count(); - s3_file_writer_first_append_to_close_ms_recorder << latency_ms; - if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) { - sampler->take_sample(); - } - _close_latency_recorded = true; - }; - if (state() == State::CLOSED) { - if (_async_close_pack != nullptr) { - _st = _async_close_pack->future.get(); - _async_close_pack = nullptr; - // Return the final close status so that a blocking close issued after - // an async close observes the real result just like the legacy behavior. - if (!non_block && _st.ok()) { - record_close_latency(); - } - return _st; - } - if (non_block) { - if (_st.ok()) { - record_close_latency(); - return Status::Error( - "S3FileWriter already closed, file path {}, file key {}", - _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key); - } - return _st; - } - if (_st.ok()) { - record_close_latency(); - return Status::Error( - "S3FileWriter already closed, file path {}, file key {}", - _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key); - } - return _st; + return Status::InternalError("S3FileWriter already closed, file path {}, file key {}", + _obj_storage_path_opts.path.native(), + _obj_storage_path_opts.key); } if (state() == State::ASYNC_CLOSING) { if (non_block) { @@ -181,7 +144,7 @@ Status S3FileWriter::close(bool non_block) { // The next time we call close() with no matter non_block true or false, it would always return the // '_st' value because this writer is already closed. if (!non_block && _st.ok()) { - record_close_latency(); + _record_close_latency(); } return _st; } @@ -194,7 +157,6 @@ Status S3FileWriter::close(bool non_block) { s3_file_writer_async_close_queuing << -1; s3_file_writer_async_close_processing << 1; _st = _close_impl(); - _state = State::CLOSED; _async_close_pack->promise.set_value(_st); s3_file_writer_async_close_processing << -1; }); @@ -202,7 +164,42 @@ Status S3FileWriter::close(bool non_block) { _st = _close_impl(); _state = State::CLOSED; if (!non_block && _st.ok()) { - record_close_latency(); + _record_close_latency(); + } + return _st; +} + +void S3FileWriter::_record_close_latency() { + if (_close_latency_recorded || !_first_append_timestamp.has_value()) { + return; + } + auto now = std::chrono::steady_clock::now(); + auto latency_ms = + std::chrono::duration_cast(now - *_first_append_timestamp) + .count(); + s3_file_writer_first_append_to_close_ms_recorder << latency_ms; + if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) { + sampler->take_sample(); + } + _close_latency_recorded = true; +} + +Status S3FileWriter::try_finish_close() { + if (state() == State::CLOSED) { + return _st; + } + if (state() != State::ASYNC_CLOSING) { + return Status::NotSupported("S3FileWriter is not async closing"); + } + CHECK(_async_close_pack != nullptr); + if (_async_close_pack->future.wait_for(std::chrono::seconds(0)) != std::future_status::ready) { + return Status::NeedSendAgain("async close is not finished"); + } + _st = _async_close_pack->future.get(); + _async_close_pack = nullptr; + _state = State::CLOSED; + if (_st.ok()) { + _record_close_latency(); } return _st; } @@ -254,6 +251,12 @@ Status S3FileWriter::_build_upload_buffer() { Status S3FileWriter::_close_impl() { VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native(); + DBUG_EXECUTE_IF("S3FileWriter._close_impl.inject_error", { + if (_obj_storage_path_opts.key.ends_with(".dat")) { + return Status::IOError("S3FileWriter._close_impl.inject_error"); + } + }); + if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); } diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 38e68b14c4d9a7..f31a9edef2c387 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -71,6 +71,7 @@ class S3FileWriter final : public FileWriter { } Status close(bool non_block = false) override; + Status try_finish_close() override; private: Status _close_impl(); @@ -84,6 +85,7 @@ class S3FileWriter final : public FileWriter { void _upload_one_part(int part_num, UploadFileBuffer& buf); bool _complete_part_task_callback(Status s); Status _build_upload_buffer(); + void _record_close_latency(); ObjectStoragePathOptions _obj_storage_path_opts; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 8e0794ce5d3018..16f1419dba6766 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -23,11 +23,13 @@ #include #include +#include #include // time #include #include #include #include +#include #include #include @@ -139,6 +141,16 @@ Status SegmentFileCollection::close() { } for (auto&& [_, writer] : _file_writers) { + DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", { + auto before_state = writer->state(); + for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path=" + << writer->path().native() + << " before_state=" << static_cast(before_state) + << " after_state=" << static_cast(writer->state()); + }); if (writer->state() != io::FileWriter::State::CLOSED) { RETURN_IF_ERROR(writer->close()); } diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index b8a434d5aadaf9..8d06737d362678 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -20,6 +20,7 @@ // IWYU pragma: no_include #include // IWYU pragma: keep +#include #include #include #include @@ -235,6 +236,9 @@ Status SegmentFlusher::_flush_segment_writer( return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); } + DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep", + { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); }); + MonotonicStopWatch inverted_index_timer; inverted_index_timer.start(); int64_t inverted_index_file_size = 0; @@ -310,6 +314,9 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptr( - "MockFileWriter already closed: {}", _path.native()); - } - return _close_status; - } return Status::Error("MockFileWriter already closed: {}", _path.native()); } @@ -82,11 +78,18 @@ class MockFileWriter : public FileWriter { } if (_state == State::ASYNC_CLOSING) { - return Status::InternalError("Don't submit async close multi times"); + if (non_block) { + return Status::InternalError("Don't submit async close multi times"); + } + _async_close_ready = true; + _async_close_consumed = true; + _state = State::CLOSED; + return _close_status; } if (non_block) { _state = State::ASYNC_CLOSING; + _async_close_ready = false; return Status::OK(); } @@ -94,6 +97,19 @@ class MockFileWriter : public FileWriter { return _close_status; } + Status try_finish_close() override { + ++_try_finish_close_calls; + if (_state == State::CLOSED) { + return _async_close_consumed ? _close_status : Status::OK(); + } + if (_state != State::ASYNC_CLOSING || !_async_close_ready) { + return Status::NeedSendAgain("async close is not finished"); + } + _state = State::CLOSED; + _async_close_consumed = true; + return _close_status; + } + Status appendv(const Slice* data, size_t data_cnt) override { if (!_append_status.ok()) { return _append_status; @@ -114,10 +130,14 @@ class MockFileWriter : public FileWriter { Path _path; size_t _bytes_appended = 0; size_t _append_calls = 0; + size_t _close_calls = 0; + size_t _try_finish_close_calls = 0; std::string _written; Status _start_close_status = Status::OK(); Status _append_status = Status::OK(); Status _close_status = Status::OK(); + bool _async_close_ready = false; + bool _async_close_consumed = false; State _state = State::OPENED; }; @@ -581,6 +601,40 @@ TEST_F(PackedFileManagerTest, ProcessUploadingFilesSetsFailedWhenAsyncCloseFails EXPECT_NE(failed->last_error.find("async close fail"), std::string::npos); } +TEST_F(PackedFileManagerTest, ProcessUploadingFilesPollsAsyncCloseWithoutBlocking) { + std::string payload = "abc"; + Slice slice(payload); + auto info = default_append_info(); + ASSERT_TRUE(manager->append_small_file("async_poll_fail", slice, info).ok()); + ASSERT_TRUE(manager->mark_current_packed_file_for_upload(_resource_id).ok()); + ASSERT_EQ(manager->uploading_packed_files_for_test().size(), 1); + + auto uploading = manager->uploading_packed_files_for_test().begin()->second; + auto* writer = dynamic_cast(uploading->writer.get()); + ASSERT_NE(writer, nullptr); + uploading->state = PackedFileManager::PackedFileState::UPLOADING; + writer->set_close_status(Status::IOError("async close poll fail")); + ASSERT_TRUE(writer->close(true).ok()); + ASSERT_EQ(writer->close_calls(), 1); + + manager->process_uploading_packed_files(); + EXPECT_EQ(uploading->state.load(), PackedFileManager::PackedFileState::UPLOADING); + EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 1); + EXPECT_EQ(manager->uploaded_packed_files_for_test().size(), 0); + EXPECT_EQ(writer->close_calls(), 1); + EXPECT_EQ(writer->try_finish_close_calls(), 1); + + writer->complete_async_close(); + manager->process_uploading_packed_files(); + EXPECT_EQ(writer->close_calls(), 1); + EXPECT_EQ(writer->try_finish_close_calls(), 2); + EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 0); + ASSERT_EQ(manager->uploaded_packed_files_for_test().size(), 1); + auto failed = manager->uploaded_packed_files_for_test().begin()->second; + EXPECT_EQ(failed->state.load(), PackedFileManager::PackedFileState::FAILED); + EXPECT_NE(failed->last_error.find("async close poll fail"), std::string::npos); +} + TEST_F(PackedFileManagerTest, AppendPackedFileInfoToFileTail) { std::string payload = "abc"; Slice slice(payload); diff --git a/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy b/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy new file mode 100644 index 00000000000000..7407e5eb27d7f0 --- /dev/null +++ b/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_packed_file_async_close_error", "p0, nonConcurrent") { + if (!isCloudMode()) { + return + } + + def closeErrorPoint = "S3FileWriter._close_impl.inject_error" + def afterFinalizeSleepPoint = "SegmentFlusher._flush_segment_writer.after_finalize.sleep" + def waitDatClosedPoint = "SegmentFileCollection.close.wait_dat_closed" + + sql """ DROP TABLE IF EXISTS test_packed_file_async_close_error """ + sql """ + CREATE TABLE IF NOT EXISTS test_packed_file_async_close_error ( + `k1` int NULL, + `v1` varchar(32) NULL, + INDEX idx_v1 (`v1`) USING INVERTED PROPERTIES("parser" = "english") + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + setBeConfigTemporary([ + "enable_packed_file": "false", + "enable_vertical_segment_writer": "true", + "small_file_threshold_bytes": "1048576", + "packed_file_size_threshold_bytes": "1048576", + "packed_file_time_threshold_ms": "1" + ]) { + try { + sql """ SET enable_file_cache = false """ + GetDebugPoint().disableDebugPointForAllBEs(closeErrorPoint) + GetDebugPoint().disableDebugPointForAllBEs(afterFinalizeSleepPoint) + GetDebugPoint().disableDebugPointForAllBEs(waitDatClosedPoint) + GetDebugPoint().enableDebugPointForAllBEs(closeErrorPoint) + GetDebugPoint().enableDebugPointForAllBEs(afterFinalizeSleepPoint) + GetDebugPoint().enableDebugPointForAllBEs(waitDatClosedPoint) + + streamLoad { + table "test_packed_file_async_close_error" + set "column_separator", "," + inputText "1,a\n2,b\n" + time 120000 + + check { result, exception, startTime, endTime -> + def msg = exception == null ? result : exception.getMessage() + logger.info("stream load result with injected S3 close error: ${msg}") + assertTrue(exception == null, "stream load should succeed before async packed file upload fails: ${msg}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + } + } + + } finally { + GetDebugPoint().disableDebugPointForAllBEs(closeErrorPoint) + GetDebugPoint().disableDebugPointForAllBEs(afterFinalizeSleepPoint) + GetDebugPoint().disableDebugPointForAllBEs(waitDatClosedPoint) + } + } +}