Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ class FileWriter {
// If there is no data appended, an empty file will be persisted.
virtual Status close(bool non_block = false) = 0;

// Non-blocking probe for a previous close(true).
// OK means close finished successfully. NeedSendAgain means close is still running.
// Other errors mean close finished with error or the writer does not support this API.
// NOTE: This method consumes the async close result when it is ready. The caller must
// use it as the only completion path for that async close; mixing it with close(false)
// or another try_finish_close consumer is not supported.
virtual Status try_finish_close() {
return Status::NotSupported("try_finish_close is not supported");
}

Status append(const Slice& data) { return appendv(&data, 1); }

virtual Status appendv(const Slice* data, size_t data_cnt) = 0;
Expand Down
14 changes: 3 additions & 11 deletions be/src/io/fs/packed_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,11 +851,6 @@ void PackedFileManager::process_uploading_packed_files() {
Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path,
packed_file->writer.get());

if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) {
record_ready_to_upload(packed_file);
handle_success(packed_file);
continue;
}
if (!upload_status.ok()) {
handle_failure(packed_file, upload_status);
continue;
Expand All @@ -873,16 +868,13 @@ void PackedFileManager::process_uploading_packed_files() {
continue;
}

if (packed_file->writer->state() != FileWriter::State::CLOSED) {
Status status = packed_file->writer->try_finish_close();
if (status.is<ErrorCode::NEED_SEND_AGAIN>()) {
continue;
}

Status status = packed_file->writer->close(true);
if (status.is<ErrorCode::ALREADY_CLOSED>()) {
handle_success(packed_file);
continue;
}
if (status.ok()) {
handle_success(packed_file);
continue;
}

Expand Down
91 changes: 47 additions & 44 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "io/fs/s3_file_system.h"
#include "io/fs/s3_obj_storage_client.h"
#include "runtime/exec_env.h"
#include "util/debug_points.h"
#include "util/s3_util.h"
#include "util/stopwatch.hpp"

Expand Down Expand Up @@ -126,48 +127,10 @@ void S3FileWriter::_wait_until_finish(std::string_view task_name) {
}

Status S3FileWriter::close(bool non_block) {
auto record_close_latency = [this]() {
if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
return;
}
auto now = std::chrono::steady_clock::now();
auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - *_first_append_timestamp)
.count();
s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
sampler->take_sample();
}
_close_latency_recorded = true;
};

if (state() == State::CLOSED) {
if (_async_close_pack != nullptr) {
_st = _async_close_pack->future.get();
_async_close_pack = nullptr;
// Return the final close status so that a blocking close issued after
// an async close observes the real result just like the legacy behavior.
if (!non_block && _st.ok()) {
record_close_latency();
}
return _st;
}
if (non_block) {
if (_st.ok()) {
record_close_latency();
return Status::Error<ErrorCode::ALREADY_CLOSED>(
"S3FileWriter already closed, file path {}, file key {}",
_obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
}
return _st;
}
if (_st.ok()) {
record_close_latency();
return Status::Error<ErrorCode::ALREADY_CLOSED>(
"S3FileWriter already closed, file path {}, file key {}",
_obj_storage_path_opts.path.native(), _obj_storage_path_opts.key);
}
return _st;
return Status::InternalError("S3FileWriter already closed, file path {}, file key {}",
_obj_storage_path_opts.path.native(),
_obj_storage_path_opts.key);
}
if (state() == State::ASYNC_CLOSING) {
if (non_block) {
Expand All @@ -181,7 +144,7 @@ Status S3FileWriter::close(bool non_block) {
// The next time we call close() with no matter non_block true or false, it would always return the
// '_st' value because this writer is already closed.
if (!non_block && _st.ok()) {
record_close_latency();
_record_close_latency();
}
return _st;
}
Expand All @@ -194,15 +157,49 @@ Status S3FileWriter::close(bool non_block) {
s3_file_writer_async_close_queuing << -1;
s3_file_writer_async_close_processing << 1;
_st = _close_impl();
_state = State::CLOSED;
_async_close_pack->promise.set_value(_st);
s3_file_writer_async_close_processing << -1;
});
}
_st = _close_impl();
_state = State::CLOSED;
if (!non_block && _st.ok()) {
record_close_latency();
_record_close_latency();
}
return _st;
}

void S3FileWriter::_record_close_latency() {
if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
return;
}
auto now = std::chrono::steady_clock::now();
auto latency_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(now - *_first_append_timestamp)
.count();
s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
if (auto* sampler = s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
sampler->take_sample();
}
_close_latency_recorded = true;
}

Status S3FileWriter::try_finish_close() {
if (state() == State::CLOSED) {
return _st;
}
if (state() != State::ASYNC_CLOSING) {
return Status::NotSupported("S3FileWriter is not async closing");
}
CHECK(_async_close_pack != nullptr);
if (_async_close_pack->future.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
return Status::NeedSendAgain("async close is not finished");
}
_st = _async_close_pack->future.get();
_async_close_pack = nullptr;
_state = State::CLOSED;
if (_st.ok()) {
_record_close_latency();
}
return _st;
}
Expand Down Expand Up @@ -254,6 +251,12 @@ Status S3FileWriter::_build_upload_buffer() {
Status S3FileWriter::_close_impl() {
VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();

DBUG_EXECUTE_IF("S3FileWriter._close_impl.inject_error", {
if (_obj_storage_path_opts.key.ends_with(".dat")) {
return Status::IOError("S3FileWriter._close_impl.inject_error");
}
});

if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/s3_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class S3FileWriter final : public FileWriter {
}

Status close(bool non_block = false) override;
Status try_finish_close() override;

private:
Status _close_impl();
Expand All @@ -84,6 +85,7 @@ class S3FileWriter final : public FileWriter {
void _upload_one_part(int part_num, UploadFileBuffer& buf);
bool _complete_part_task_callback(Status s);
Status _build_upload_buffer();
void _record_close_latency();

ObjectStoragePathOptions _obj_storage_path_opts;

Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
#include <fmt/format.h>
#include <stdio.h>

#include <chrono>
#include <ctime> // time
#include <filesystem>
#include <memory>
#include <mutex>
#include <sstream>
#include <thread>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -139,6 +141,16 @@ Status SegmentFileCollection::close() {
}

for (auto&& [_, writer] : _file_writers) {
DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", {
auto before_state = writer->state();
for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path="
<< writer->path().native()
<< " before_state=" << static_cast<int>(before_state)
<< " after_state=" << static_cast<int>(writer->state());
});
if (writer->state() != io::FileWriter::State::CLOSED) {
RETURN_IF_ERROR(writer->close());
}
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep

#include <chrono>
#include <filesystem>
#include <memory>
#include <sstream>
Expand Down Expand Up @@ -235,6 +236,9 @@ Status SegmentFlusher::_flush_segment_writer(
return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
}

DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
{ std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });

MonotonicStopWatch inverted_index_timer;
inverted_index_timer.start();
int64_t inverted_index_file_size = 0;
Expand Down Expand Up @@ -310,6 +314,9 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
}

DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
{ std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });

MonotonicStopWatch inverted_index_timer;
inverted_index_timer.start();
int64_t inverted_index_file_size = 0;
Expand Down
72 changes: 63 additions & 9 deletions be/test/io/fs/packed_file_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,20 @@ class MockFileWriter : public FileWriter {
void set_start_close_status(Status st) { _start_close_status = std::move(st); }
void complete_async_close() {
if (_state == State::ASYNC_CLOSING) {
_state = State::CLOSED;
_async_close_ready = true;
}
}

size_t close_calls() const { return _close_calls; }
size_t try_finish_close_calls() const { return _try_finish_close_calls; }
size_t append_calls() const { return _append_calls; }
bool closed() const { return _state == State::CLOSED; }
size_t bytes_appended() const override { return _bytes_appended; }
const std::string& written_data() const { return _written; }

Status close(bool non_block = false) override {
++_close_calls;
if (_state == State::CLOSED) {
if (non_block) {
if (_close_status.ok()) {
return Status::Error<ErrorCode::ALREADY_CLOSED>(
"MockFileWriter already closed: {}", _path.native());
}
return _close_status;
}
return Status::Error<ErrorCode::ALREADY_CLOSED>("MockFileWriter already closed: {}",
_path.native());
}
Expand All @@ -82,18 +78,38 @@ class MockFileWriter : public FileWriter {
}

if (_state == State::ASYNC_CLOSING) {
return Status::InternalError("Don't submit async close multi times");
if (non_block) {
return Status::InternalError("Don't submit async close multi times");
}
_async_close_ready = true;
_async_close_consumed = true;
_state = State::CLOSED;
return _close_status;
}

if (non_block) {
_state = State::ASYNC_CLOSING;
_async_close_ready = false;
return Status::OK();
}

_state = State::CLOSED;
return _close_status;
}

Status try_finish_close() override {
++_try_finish_close_calls;
if (_state == State::CLOSED) {
return _async_close_consumed ? _close_status : Status::OK();
}
if (_state != State::ASYNC_CLOSING || !_async_close_ready) {
return Status::NeedSendAgain("async close is not finished");
}
_state = State::CLOSED;
_async_close_consumed = true;
return _close_status;
}

Status appendv(const Slice* data, size_t data_cnt) override {
if (!_append_status.ok()) {
return _append_status;
Expand All @@ -114,10 +130,14 @@ class MockFileWriter : public FileWriter {
Path _path;
size_t _bytes_appended = 0;
size_t _append_calls = 0;
size_t _close_calls = 0;
size_t _try_finish_close_calls = 0;
std::string _written;
Status _start_close_status = Status::OK();
Status _append_status = Status::OK();
Status _close_status = Status::OK();
bool _async_close_ready = false;
bool _async_close_consumed = false;
State _state = State::OPENED;
};

Expand Down Expand Up @@ -581,6 +601,40 @@ TEST_F(PackedFileManagerTest, ProcessUploadingFilesSetsFailedWhenAsyncCloseFails
EXPECT_NE(failed->last_error.find("async close fail"), std::string::npos);
}

TEST_F(PackedFileManagerTest, ProcessUploadingFilesPollsAsyncCloseWithoutBlocking) {
std::string payload = "abc";
Slice slice(payload);
auto info = default_append_info();
ASSERT_TRUE(manager->append_small_file("async_poll_fail", slice, info).ok());
ASSERT_TRUE(manager->mark_current_packed_file_for_upload(_resource_id).ok());
ASSERT_EQ(manager->uploading_packed_files_for_test().size(), 1);

auto uploading = manager->uploading_packed_files_for_test().begin()->second;
auto* writer = dynamic_cast<MockFileWriter*>(uploading->writer.get());
ASSERT_NE(writer, nullptr);
uploading->state = PackedFileManager::PackedFileState::UPLOADING;
writer->set_close_status(Status::IOError("async close poll fail"));
ASSERT_TRUE(writer->close(true).ok());
ASSERT_EQ(writer->close_calls(), 1);

manager->process_uploading_packed_files();
EXPECT_EQ(uploading->state.load(), PackedFileManager::PackedFileState::UPLOADING);
EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 1);
EXPECT_EQ(manager->uploaded_packed_files_for_test().size(), 0);
EXPECT_EQ(writer->close_calls(), 1);
EXPECT_EQ(writer->try_finish_close_calls(), 1);

writer->complete_async_close();
manager->process_uploading_packed_files();
EXPECT_EQ(writer->close_calls(), 1);
EXPECT_EQ(writer->try_finish_close_calls(), 2);
EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 0);
ASSERT_EQ(manager->uploaded_packed_files_for_test().size(), 1);
auto failed = manager->uploaded_packed_files_for_test().begin()->second;
EXPECT_EQ(failed->state.load(), PackedFileManager::PackedFileState::FAILED);
EXPECT_NE(failed->last_error.find("async close poll fail"), std::string::npos);
}

TEST_F(PackedFileManagerTest, AppendPackedFileInfoToFileTail) {
std::string payload = "abc";
Slice slice(payload);
Expand Down
Loading
Loading