diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 0cda2b519c4c47..de4fc6f577a17c 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -72,6 +72,16 @@ class FileWriter { // If there is no data appended, an empty file will be persisted. virtual Status close(bool non_block = false) = 0; + // Non-blocking probe for a previous close(true). + // OK means close finished successfully. NeedSendAgain means close is still running. + // Other errors mean close finished with error or the writer does not support this API. + // NOTE: This method consumes the async close result when it is ready. The caller must + // use it as the only completion path for that async close; mixing it with close(false) + // or another try_finish_close consumer is not supported. + virtual Status try_finish_close() { + return Status::NotSupported("try_finish_close is not supported"); + } + Status append(const Slice& data) { return appendv(&data, 1); } virtual Status appendv(const Slice* data, size_t data_cnt) = 0; diff --git a/be/src/io/fs/packed_file_manager.cpp b/be/src/io/fs/packed_file_manager.cpp index 6bff9eff6f3d9b..654f2d933fde0c 100644 --- a/be/src/io/fs/packed_file_manager.cpp +++ b/be/src/io/fs/packed_file_manager.cpp @@ -851,11 +851,6 @@ void PackedFileManager::process_uploading_packed_files() { Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path, packed_file->writer.get()); - if (upload_status.is()) { - record_ready_to_upload(packed_file); - handle_success(packed_file); - continue; - } if (!upload_status.ok()) { handle_failure(packed_file, upload_status); continue; @@ -873,16 +868,13 @@ void PackedFileManager::process_uploading_packed_files() { continue; } - if (packed_file->writer->state() != FileWriter::State::CLOSED) { + Status status = packed_file->writer->try_finish_close(); + if (status.is()) { continue; } - Status status = packed_file->writer->close(true); - if (status.is()) { - handle_success(packed_file); - continue; - } if (status.ok()) { + handle_success(packed_file); continue; } diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index a1b72b8dc98c5d..eec1e4c3a6073b 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -41,6 +41,7 @@ #include "io/fs/s3_file_system.h" #include "io/fs/s3_obj_storage_client.h" #include "runtime/exec_env.h" +#include "util/debug_points.h" #include "util/s3_util.h" #include "util/stopwatch.hpp" @@ -126,48 +127,10 @@ void S3FileWriter::_wait_until_finish(std::string_view task_name) { } Status S3FileWriter::close(bool non_block) { - auto record_close_latency = [this]() { - if (_close_latency_recorded || !_first_append_timestamp.has_value()) { - return; - } - auto now = std::chrono::steady_clock::now(); - auto latency_ms = std::chrono::duration_cast( - now - *_first_append_timestamp) - .count(); - s3_file_writer_first_append_to_close_ms_recorder << latency_ms; - if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) { - sampler->take_sample(); - } - _close_latency_recorded = true; - }; - if (state() == State::CLOSED) { - if (_async_close_pack != nullptr) { - _st = _async_close_pack->future.get(); - _async_close_pack = nullptr; - // Return the final close status so that a blocking close issued after - // an async close observes the real result just like the legacy behavior. - if (!non_block && _st.ok()) { - record_close_latency(); - } - return _st; - } - if (non_block) { - if (_st.ok()) { - record_close_latency(); - return Status::Error( - "S3FileWriter already closed, file path {}, file key {}", - _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key); - } - return _st; - } - if (_st.ok()) { - record_close_latency(); - return Status::Error( - "S3FileWriter already closed, file path {}, file key {}", - _obj_storage_path_opts.path.native(), _obj_storage_path_opts.key); - } - return _st; + return Status::InternalError("S3FileWriter already closed, file path {}, file key {}", + _obj_storage_path_opts.path.native(), + _obj_storage_path_opts.key); } if (state() == State::ASYNC_CLOSING) { if (non_block) { @@ -181,7 +144,7 @@ Status S3FileWriter::close(bool non_block) { // The next time we call close() with no matter non_block true or false, it would always return the // '_st' value because this writer is already closed. if (!non_block && _st.ok()) { - record_close_latency(); + _record_close_latency(); } return _st; } @@ -194,7 +157,6 @@ Status S3FileWriter::close(bool non_block) { s3_file_writer_async_close_queuing << -1; s3_file_writer_async_close_processing << 1; _st = _close_impl(); - _state = State::CLOSED; _async_close_pack->promise.set_value(_st); s3_file_writer_async_close_processing << -1; }); @@ -202,7 +164,42 @@ Status S3FileWriter::close(bool non_block) { _st = _close_impl(); _state = State::CLOSED; if (!non_block && _st.ok()) { - record_close_latency(); + _record_close_latency(); + } + return _st; +} + +void S3FileWriter::_record_close_latency() { + if (_close_latency_recorded || !_first_append_timestamp.has_value()) { + return; + } + auto now = std::chrono::steady_clock::now(); + auto latency_ms = + std::chrono::duration_cast(now - *_first_append_timestamp) + .count(); + s3_file_writer_first_append_to_close_ms_recorder << latency_ms; + if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) { + sampler->take_sample(); + } + _close_latency_recorded = true; +} + +Status S3FileWriter::try_finish_close() { + if (state() == State::CLOSED) { + return _st; + } + if (state() != State::ASYNC_CLOSING) { + return Status::NotSupported("S3FileWriter is not async closing"); + } + CHECK(_async_close_pack != nullptr); + if (_async_close_pack->future.wait_for(std::chrono::seconds(0)) != std::future_status::ready) { + return Status::NeedSendAgain("async close is not finished"); + } + _st = _async_close_pack->future.get(); + _async_close_pack = nullptr; + _state = State::CLOSED; + if (_st.ok()) { + _record_close_latency(); } return _st; } @@ -254,6 +251,12 @@ Status S3FileWriter::_build_upload_buffer() { Status S3FileWriter::_close_impl() { VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native(); + DBUG_EXECUTE_IF("S3FileWriter._close_impl.inject_error", { + if (_obj_storage_path_opts.key.ends_with(".dat")) { + return Status::IOError("S3FileWriter._close_impl.inject_error"); + } + }); + if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); } diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 38e68b14c4d9a7..f31a9edef2c387 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -71,6 +71,7 @@ class S3FileWriter final : public FileWriter { } Status close(bool non_block = false) override; + Status try_finish_close() override; private: Status _close_impl(); @@ -84,6 +85,7 @@ class S3FileWriter final : public FileWriter { void _upload_one_part(int part_num, UploadFileBuffer& buf); bool _complete_part_task_callback(Status s); Status _build_upload_buffer(); + void _record_close_latency(); ObjectStoragePathOptions _obj_storage_path_opts; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 8e0794ce5d3018..16f1419dba6766 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -23,11 +23,13 @@ #include #include +#include #include // time #include #include #include #include +#include #include #include @@ -139,6 +141,16 @@ Status SegmentFileCollection::close() { } for (auto&& [_, writer] : _file_writers) { + DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", { + auto before_state = writer->state(); + for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path=" + << writer->path().native() + << " before_state=" << static_cast(before_state) + << " after_state=" << static_cast(writer->state()); + }); if (writer->state() != io::FileWriter::State::CLOSED) { RETURN_IF_ERROR(writer->close()); } diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index b8a434d5aadaf9..8d06737d362678 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -20,6 +20,7 @@ // IWYU pragma: no_include #include // IWYU pragma: keep +#include #include #include #include @@ -235,6 +236,9 @@ Status SegmentFlusher::_flush_segment_writer( return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); } + DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep", + { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); }); + MonotonicStopWatch inverted_index_timer; inverted_index_timer.start(); int64_t inverted_index_file_size = 0; @@ -310,6 +314,9 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptr( - "MockFileWriter already closed: {}", _path.native()); - } - return _close_status; - } return Status::Error("MockFileWriter already closed: {}", _path.native()); } @@ -82,11 +78,18 @@ class MockFileWriter : public FileWriter { } if (_state == State::ASYNC_CLOSING) { - return Status::InternalError("Don't submit async close multi times"); + if (non_block) { + return Status::InternalError("Don't submit async close multi times"); + } + _async_close_ready = true; + _async_close_consumed = true; + _state = State::CLOSED; + return _close_status; } if (non_block) { _state = State::ASYNC_CLOSING; + _async_close_ready = false; return Status::OK(); } @@ -94,6 +97,19 @@ class MockFileWriter : public FileWriter { return _close_status; } + Status try_finish_close() override { + ++_try_finish_close_calls; + if (_state == State::CLOSED) { + return _async_close_consumed ? _close_status : Status::OK(); + } + if (_state != State::ASYNC_CLOSING || !_async_close_ready) { + return Status::NeedSendAgain("async close is not finished"); + } + _state = State::CLOSED; + _async_close_consumed = true; + return _close_status; + } + Status appendv(const Slice* data, size_t data_cnt) override { if (!_append_status.ok()) { return _append_status; @@ -114,10 +130,14 @@ class MockFileWriter : public FileWriter { Path _path; size_t _bytes_appended = 0; size_t _append_calls = 0; + size_t _close_calls = 0; + size_t _try_finish_close_calls = 0; std::string _written; Status _start_close_status = Status::OK(); Status _append_status = Status::OK(); Status _close_status = Status::OK(); + bool _async_close_ready = false; + bool _async_close_consumed = false; State _state = State::OPENED; }; @@ -581,6 +601,40 @@ TEST_F(PackedFileManagerTest, ProcessUploadingFilesSetsFailedWhenAsyncCloseFails EXPECT_NE(failed->last_error.find("async close fail"), std::string::npos); } +TEST_F(PackedFileManagerTest, ProcessUploadingFilesPollsAsyncCloseWithoutBlocking) { + std::string payload = "abc"; + Slice slice(payload); + auto info = default_append_info(); + ASSERT_TRUE(manager->append_small_file("async_poll_fail", slice, info).ok()); + ASSERT_TRUE(manager->mark_current_packed_file_for_upload(_resource_id).ok()); + ASSERT_EQ(manager->uploading_packed_files_for_test().size(), 1); + + auto uploading = manager->uploading_packed_files_for_test().begin()->second; + auto* writer = dynamic_cast(uploading->writer.get()); + ASSERT_NE(writer, nullptr); + uploading->state = PackedFileManager::PackedFileState::UPLOADING; + writer->set_close_status(Status::IOError("async close poll fail")); + ASSERT_TRUE(writer->close(true).ok()); + ASSERT_EQ(writer->close_calls(), 1); + + manager->process_uploading_packed_files(); + EXPECT_EQ(uploading->state.load(), PackedFileManager::PackedFileState::UPLOADING); + EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 1); + EXPECT_EQ(manager->uploaded_packed_files_for_test().size(), 0); + EXPECT_EQ(writer->close_calls(), 1); + EXPECT_EQ(writer->try_finish_close_calls(), 1); + + writer->complete_async_close(); + manager->process_uploading_packed_files(); + EXPECT_EQ(writer->close_calls(), 1); + EXPECT_EQ(writer->try_finish_close_calls(), 2); + EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 0); + ASSERT_EQ(manager->uploaded_packed_files_for_test().size(), 1); + auto failed = manager->uploaded_packed_files_for_test().begin()->second; + EXPECT_EQ(failed->state.load(), PackedFileManager::PackedFileState::FAILED); + EXPECT_NE(failed->last_error.find("async close poll fail"), std::string::npos); +} + TEST_F(PackedFileManagerTest, AppendPackedFileInfoToFileTail) { std::string payload = "abc"; Slice slice(payload); diff --git a/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy b/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy new file mode 100644 index 00000000000000..7407e5eb27d7f0 --- /dev/null +++ b/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_packed_file_async_close_error", "p0, nonConcurrent") { + if (!isCloudMode()) { + return + } + + def closeErrorPoint = "S3FileWriter._close_impl.inject_error" + def afterFinalizeSleepPoint = "SegmentFlusher._flush_segment_writer.after_finalize.sleep" + def waitDatClosedPoint = "SegmentFileCollection.close.wait_dat_closed" + + sql """ DROP TABLE IF EXISTS test_packed_file_async_close_error """ + sql """ + CREATE TABLE IF NOT EXISTS test_packed_file_async_close_error ( + `k1` int NULL, + `v1` varchar(32) NULL, + INDEX idx_v1 (`v1`) USING INVERTED PROPERTIES("parser" = "english") + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + setBeConfigTemporary([ + "enable_packed_file": "false", + "enable_vertical_segment_writer": "true", + "small_file_threshold_bytes": "1048576", + "packed_file_size_threshold_bytes": "1048576", + "packed_file_time_threshold_ms": "1" + ]) { + try { + sql """ SET enable_file_cache = false """ + GetDebugPoint().disableDebugPointForAllBEs(closeErrorPoint) + GetDebugPoint().disableDebugPointForAllBEs(afterFinalizeSleepPoint) + GetDebugPoint().disableDebugPointForAllBEs(waitDatClosedPoint) + GetDebugPoint().enableDebugPointForAllBEs(closeErrorPoint) + GetDebugPoint().enableDebugPointForAllBEs(afterFinalizeSleepPoint) + GetDebugPoint().enableDebugPointForAllBEs(waitDatClosedPoint) + + streamLoad { + table "test_packed_file_async_close_error" + set "column_separator", "," + inputText "1,a\n2,b\n" + time 120000 + + check { result, exception, startTime, endTime -> + def msg = exception == null ? result : exception.getMessage() + logger.info("stream load result with injected S3 close error: ${msg}") + assertTrue(exception == null, "stream load should succeed before async packed file upload fails: ${msg}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + } + } + + } finally { + GetDebugPoint().disableDebugPointForAllBEs(closeErrorPoint) + GetDebugPoint().disableDebugPointForAllBEs(afterFinalizeSleepPoint) + GetDebugPoint().disableDebugPointForAllBEs(waitDatClosedPoint) + } + } +}