diff --git a/cmake/SetupYdbCppSDK.cmake b/cmake/SetupYdbCppSDK.cmake index c67349fa38d6..94dd2d7842d1 100644 --- a/cmake/SetupYdbCppSDK.cmake +++ b/cmake/SetupYdbCppSDK.cmake @@ -25,6 +25,10 @@ write_package_stub(jwt-cpp) set(RAPIDJSON_INCLUDE_DIRS "${USERVER_THIRD_PARTY_DIRS}/rapidjson/include") +if(Protobuf_INCLUDE_DIR) + set(Protobuf_INCLUDE_DIR "${Protobuf_INCLUDE_DIR}" CACHE PATH "" FORCE) +endif() + if(TARGET userver-api-common-protos) set(YDB_SDK_GOOGLE_COMMON_PROTOS_TARGET userver-api-common-protos) else() @@ -37,6 +41,7 @@ cpmaddpackage( GIT_TAG v3.13.0 GITHUB_REPOSITORY ydb-platform/ydb-cpp-sdk GIT_SHALLOW TRUE + PATCHES ydb-cpp-sdk_protobuf_include.patch OPTIONS "Brotli_VERSION ${Brotli_VERSION}" "RAPIDJSON_INCLUDE_DIRS ${RAPIDJSON_INCLUDE_DIRS}" "YDB_SDK_GOOGLE_COMMON_PROTOS_TARGET ${YDB_SDK_GOOGLE_COMMON_PROTOS_TARGET}" "YDB_SDK_EXAMPLES OFF" ) diff --git a/cmake/ydb-cpp-sdk_protobuf_include.patch b/cmake/ydb-cpp-sdk_protobuf_include.patch new file mode 100644 index 000000000000..cc043cd8c4db --- /dev/null +++ b/cmake/ydb-cpp-sdk_protobuf_include.patch @@ -0,0 +1,45 @@ +From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 +From: userver +Date: Tue, 25 Feb 2026 00:00:00 +0000 +Subject: [PATCH] Fix protobuf 4.24 compatibility + +1. cmake/protobuf: Add Protobuf well-known types include path so that + protoc can find google/protobuf/struct.proto when Protobuf is fetched + via CPM. + +2. operation.h: Remove include of google/protobuf/stubs/status.h which + was removed in protobuf 4.22+. The include is unnecessary because + MessageToJsonString now returns absl::Status. + +--- + cmake/protobuf.cmake | 4 ++++ + include/ydb-cpp-sdk/client/types/operation/operation.h | 1 - + 2 files changed, 4 insertions(+), 1 deletion(-) + +diff --git a/cmake/protobuf.cmake b/cmake/protobuf.cmake +index 1111111..2222222 100644 +--- a/cmake/protobuf.cmake ++++ b/cmake/protobuf.cmake +@@ -55,6 +55,10 @@ function(_ydb_sdk_init_proto_library_impl Tgt USE_API_COMMON_PROTOS) + + set(proto_incls ${YDB_SDK_SOURCE_DIR}) + ++ if(Protobuf_INCLUDE_DIR) ++ list(APPEND proto_incls ${Protobuf_INCLUDE_DIR}) ++ endif() ++ + if (USE_API_COMMON_PROTOS) + target_link_libraries(${Tgt} PUBLIC + api-common-protos +diff --git a/include/ydb-cpp-sdk/client/types/operation/operation.h b/include/ydb-cpp-sdk/client/types/operation/operation.h +index 3333333..4444444 100644 +--- a/include/ydb-cpp-sdk/client/types/operation/operation.h ++++ b/include/ydb-cpp-sdk/client/types/operation/operation.h +@@ -6,7 +6,6 @@ + + #include + +-#include + #include + #include + diff --git a/samples/ydb_service/views/upsert-2rows/post/view.cpp b/samples/ydb_service/views/upsert-2rows/post/view.cpp index 6c980363c978..0a0bf4eaa209 100644 --- a/samples/ydb_service/views/upsert-2rows/post/view.cpp +++ b/samples/ydb_service/views/upsert-2rows/post/view.cpp @@ -27,29 +27,31 @@ VALUES ($id_key, $name_key, $service_key, $channel_key, CurrentUtcTimestamp(), $ ydb::Query::LogMode::kNameOnly, }; - auto trx = Ydb().Begin("trx", ydb::TransactionMode::kSerializableRW); - - for (auto i : {1, 2}) { - auto response = trx.Execute( - kUpsertQuery, // - "$id_key", - request["id"].As() + std::to_string(i), // - "$name_key", - ydb::Utf8{request["name"].As() + std::to_string(i)}, // - "$service_key", - request["service"].As(), // - "$channel_key", - request["channel"].As(), // - "$state_key", - request["state"].As>() // - ); - - if (response.GetCursorCount() != 0) { - throw std::runtime_error("Unexpected response data"); + Ydb().RetryTx("trx", {.tx_mode = ydb::TransactionMode::kSerializableRW}, + [&](ydb::TxActor& tx) { + for (auto i : {1, 2}) { + auto response = tx.Execute( + kUpsertQuery, // + "$id_key", + request["id"].As() + std::to_string(i), // + "$name_key", + ydb::Utf8{request["name"].As() + std::to_string(i)}, // + "$service_key", + request["service"].As(), // + "$channel_key", + request["channel"].As(), // + "$state_key", + request["state"].As>() // + ); + + if (response.GetCursorCount() != 0) { + throw std::runtime_error("Unexpected response data"); + } + } + + return ydb::TxAction::kCommit; } - } - - trx.Commit(); + ); return formats::json::MakeObject(); } diff --git a/ydb/functional_tests/basic/views/upsert-row/post/view.cpp b/ydb/functional_tests/basic/views/upsert-row/post/view.cpp index 373bf37f5d86..13c49ecf8d72 100644 --- a/ydb/functional_tests/basic/views/upsert-row/post/view.cpp +++ b/ydb/functional_tests/basic/views/upsert-row/post/view.cpp @@ -34,26 +34,29 @@ UpsertRowHandler::HandleRequestJsonThrow(const server::http::HttpRequest&, const const { engine::SleepFor(std::chrono::milliseconds(10)); - auto trx = Ydb().Begin("trx", ydb::TransactionMode::kSerializableRW); - auto response = trx.Execute( - kUpsertQuery, // - "$id_key", - request["id"].As(), // - "$name_key", - ydb::Utf8{request["name"].As()}, // - "$service_key", - request["service"].As(), // - "$channel_key", - request["channel"].As(), // - "$state_key", - request["state"].As>() // - ); + Ydb().RetryTx("trx", {.tx_mode = ydb::TransactionMode::kSerializableRW}, + [&](ydb::TxActor& tx) { + auto response = tx.Execute( + kUpsertQuery, // + "$id_key", + request["id"].As(), // + "$name_key", + ydb::Utf8{request["name"].As()}, // + "$service_key", + request["service"].As(), // + "$channel_key", + request["channel"].As(), // + "$state_key", + request["state"].As>() // + ); - if (response.GetCursorCount()) { - throw std::runtime_error("Unexpected response data"); - } + if (response.GetCursorCount()) { + throw std::runtime_error("Unexpected response data"); + } - trx.Commit(); + return ydb::TxAction::kCommit; + } + ); return formats::json::MakeObject(); } diff --git a/ydb/include/userver/ydb/builder.hpp b/ydb/include/userver/ydb/builder.hpp index 6c49da3029f1..4410128cf35f 100644 --- a/ydb/include/userver/ydb/builder.hpp +++ b/ydb/include/userver/ydb/builder.hpp @@ -37,6 +37,7 @@ class PreparedArgsBuilder final { private: friend class Transaction; friend class TableClient; + friend class TxActor; struct PreparedArgsWithKey; NYdb::TParams Build() && { return std::move(builder_).Build(); } diff --git a/ydb/include/userver/ydb/io/structs.hpp b/ydb/include/userver/ydb/io/structs.hpp index d2539b0effa9..30fdd19a321e 100644 --- a/ydb/include/userver/ydb/io/structs.hpp +++ b/ydb/include/userver/ydb/io/structs.hpp @@ -13,6 +13,8 @@ #include #include +#include + #include #include #include diff --git a/ydb/include/userver/ydb/settings.hpp b/ydb/include/userver/ydb/settings.hpp index 72403cb702ad..783079656db9 100644 --- a/ydb/include/userver/ydb/settings.hpp +++ b/ydb/include/userver/ydb/settings.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -33,6 +34,26 @@ struct QuerySettings final { std::optional collect_query_stats{std::nullopt}; }; +struct RequestSettings final { + std::chrono::milliseconds timeout_ms{0}; + + std::string trace_id{}; +}; + +using ExecuteSettings = RequestSettings; +using CommitSettings = RequestSettings; +using RollbackSettings = RequestSettings; + +struct RetryTxSettings final { + TransactionMode tx_mode{TransactionMode::kSerializableRW}; + std::chrono::milliseconds timeout_ms{0}; + std::uint32_t retries{10}; + bool is_idempotent{false}; + + CommitSettings commit_settings; + RollbackSettings rollback_settings; +}; + } // namespace ydb namespace formats::parse { diff --git a/ydb/include/userver/ydb/table.hpp b/ydb/include/userver/ydb/table.hpp index 10a05b5d42ce..e1655173415c 100644 --- a/ydb/include/userver/ydb/table.hpp +++ b/ydb/include/userver/ydb/table.hpp @@ -33,6 +33,7 @@ namespace impl { struct Stats; struct TableSettings; class Driver; +template struct RequestContext; enum class IsStreaming : bool {}; } // namespace impl @@ -48,7 +49,8 @@ using DescribeTableSettings = NYdb::NTable::TDescribeTableSettings; using DropTableSettings = NYdb::NTable::TDropTableSettings; using ScanQuerySettings = NYdb::NTable::TStreamExecScanQuerySettings; -/// @brief A dynamic transaction name for @see TableClient::Begin. +/// @brief A dynamic transaction name for @see TableClient::Begin or +/// @see TableClient::RetryTx. /// /// @warning Make sure that transaction name has low cardinality. /// If transaction name is unique for every call, per-transaction metrics will overflow metrics quota, @@ -119,19 +121,45 @@ class TableClient final { ); /// @} - /// @name Transactions + /// @name Transactions with retry + /// @brief Execute a transactional function with automatic retries. + /// + /// The user-provided function receives a TxActor for executing queries + /// and returns TxAction::kCommit or TxAction::kRollback. On transient + /// errors the whole function is retried automatically. + /// + /// @code + /// client.RetryTx("my_tx", {.retries = 3}, + /// [](ydb::TxActor& tx) { + /// tx.Execute(query, "$id", 1); + /// return ydb::TxAction::kCommit; + /// }); + /// @endcode + /// + /// @{ + void RetryTx(utils::StringLiteral transaction_name, RetryTxSettings retry_settings, RetryTxFunction fn); + + /// @warning Make sure that `transaction_name` has low cardinality. + void RetryTx(DynamicTransactionName transaction_name, RetryTxSettings retry_settings, RetryTxFunction fn); + /// @} + + /// @name Transactions (deprecated) /// @brief Begin a transaction with the specified name. The settings are used /// for the `BEGIN` statement. + /// @deprecated Use RetryTx instead for automatic retry support. /// @see ydb::Transaction /// /// @{ + [[deprecated("Use RetryTx instead")]] Transaction Begin(utils::StringLiteral transaction_name, OperationSettings settings = {}); /// @warning Make sure that `transaction_name` has low cardinality. /// If `transaction_name` is unique for every call, per-transaction metrics will overflow metrics quota, /// and metrics will become unusable. + [[deprecated("Use RetryTx instead")]] Transaction Begin(DynamicTransactionName transaction_name, OperationSettings settings = {}); + [[deprecated("Use RetryTx instead")]] Transaction Begin(utils::StringLiteral transaction_name, TransactionMode tx_mode); /// @} @@ -233,6 +261,8 @@ class TableClient final { private: friend class Transaction; + friend class TxActor; + template friend struct impl::RequestContext; std::string JoinDbPath(std::string_view path) const; @@ -248,7 +278,7 @@ class TableClient final { // (TTableClient&, const std::string& full_path, const Settings&) // -> NThreading::TFuture // ExecuteSchemeQueryImpl -> T - template + template auto ExecuteWithPathImpl( std::string_view path, std::string_view operation_name, diff --git a/ydb/include/userver/ydb/transaction.hpp b/ydb/include/userver/ydb/transaction.hpp index 7d28fe3d5750..67231ce0b7a8 100644 --- a/ydb/include/userver/ydb/transaction.hpp +++ b/ydb/include/userver/ydb/transaction.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -18,8 +19,84 @@ USERVER_NAMESPACE_BEGIN namespace ydb { +/// @brief Transaction actor for use with TableClient::RetryTx. +/// +/// Provides only query execution within a transaction. Commit and rollback +/// are controlled by returning TxAction from the retry function. +/// https://ydb.tech/docs/en/concepts/transactions +class TxActor { +public: + TxActor(const TxActor&) = delete; + TxActor& operator=(const TxActor&) = delete; + TxActor(TxActor&&) noexcept = delete; + TxActor& operator=(TxActor&&) = delete; + + /// Execute a single data query as a part of the transaction. Query parameters + /// are passed in `Args` as "string key - value" pairs: + /// + /// @code + /// tx.Execute(query, "name1", value1, "name2", value2, ...); + /// @endcode + /// + /// Use ydb::PreparedArgsBuilder for storing a generic buffer of query params + /// if needed. + /// + /// @{ + template + ExecuteResponse Execute(const Query& query, Args&&... args); + + template + ExecuteResponse Execute(ExecuteSettings settings, const Query& query, Args&&... args); + + ExecuteResponse Execute(ExecuteSettings settings, const Query& query, PreparedArgsBuilder&& builder); + /// @} + + PreparedArgsBuilder GetBuilder() const; + +private: + friend class TableClient; + + TxActor( + TableClient& table_client, + NYdb::NQuery::TTransaction ydb_tx, + std::string name + ) noexcept; + + TableClient& table_client_; + std::string name_; + impl::StatsScope stats_scope_; + tracing::Span span_; + NYdb::NQuery::TTransaction ydb_tx_; +}; + +template +ExecuteResponse TxActor::Execute(const Query& query, Args&&... args) { + auto builder = GetBuilder(); + builder.AddParams(std::forward(args)...); + return Execute(ExecuteSettings{}, query, std::move(builder)); +} + +template +ExecuteResponse TxActor::Execute(ExecuteSettings settings, const Query& query, Args&&... args) { + auto builder = GetBuilder(); + builder.AddParams(std::forward(args)...); + return Execute(std::move(settings), query, std::move(builder)); +} + +/// Action to take after the retry function completes. +enum class TxAction { + kCommit, + kRollback, +}; + +/// Signature for the function passed to TableClient::RetryTx. +using RetryTxFunction = std::function; + /// @brief YDB Transaction /// +/// @deprecated Use TableClient::RetryTx instead of manually managing +/// transactions with Begin/Commit/Rollback. +/// /// https://ydb.tech/docs/en/concepts/transactions class Transaction final { public: diff --git a/ydb/src/ydb/component.cpp b/ydb/src/ydb/component.cpp index e9ae4fce39c2..b2885af9298a 100644 --- a/ydb/src/ydb/component.cpp +++ b/ydb/src/ydb/component.cpp @@ -5,6 +5,8 @@ #include #include +#include + #include #include #include diff --git a/ydb/src/ydb/impl/future.cpp b/ydb/src/ydb/impl/future.cpp index 641bcd9e666b..cc386006b94a 100644 --- a/ydb/src/ydb/impl/future.cpp +++ b/ydb/src/ydb/impl/future.cpp @@ -1,14 +1,17 @@ #include +#include + USERVER_NAMESPACE_BEGIN namespace ydb::impl { void HandleOnceRetry(utils::RetryBudget& retry_budget, NYdb::EStatus status) { - if (IsRetryableStatus(status) && retry_budget.CanRetry()) { - retry_budget.AccountFail(); - } else if (status == NYdb::EStatus::SUCCESS) { + auto retry_step = RetryStep::GetNext({}, status, 0); + if (status == NYdb::EStatus::SUCCESS) { retry_budget.AccountOk(); + } else if (retry_step.backoff.has_value() && retry_budget.CanRetry()) { + retry_budget.AccountFail(); } } diff --git a/ydb/src/ydb/impl/future.hpp b/ydb/src/ydb/impl/future.hpp index 88568220fac6..9e52f809c9de 100644 --- a/ydb/src/ydb/impl/future.hpp +++ b/ydb/src/ydb/impl/future.hpp @@ -9,7 +9,7 @@ #include #include -#include +//#include USERVER_NAMESPACE_BEGIN @@ -41,11 +41,11 @@ T GetFutureValueUnchecked(NThreading::TFuture&& future) { return future.ExtractValue(); } -template +template T GetFutureValueChecked( NThreading::TFuture&& future, std::string_view operation_name, - RequestContext& request_context + RequestContext& request_context ) { auto status = impl::GetFutureValueUnchecked(std::move(future)); if (!status.IsSuccess()) { @@ -64,12 +64,12 @@ T GetFutureValueChecked(NThreading::TFuture&& future, std::string_view operat return status; } -template +template T GetFutureValueChecked( NThreading::TFuture&& future, std::string_view operation_name, utils::RetryBudget& retry_budget, - RequestContext& request_context + RequestContext& request_context ) { auto status = GetFutureValueUnchecked(std::move(future)); HandleOnceRetry(retry_budget, status.GetStatus()); diff --git a/ydb/src/ydb/impl/operation_settings.hpp b/ydb/src/ydb/impl/operation_settings.hpp index 86ef06d82543..8459d9a24851 100644 --- a/ydb/src/ydb/impl/operation_settings.hpp +++ b/ydb/src/ydb/impl/operation_settings.hpp @@ -34,9 +34,22 @@ void ApplyToRequestSettings( } template +void ApplyToRequestSettings( + NYdb::TRequestSettings& result, + const RequestSettings& settings, + engine::Deadline deadline +) { + result.ClientTimeout(GetBoundTimeout(settings.timeout_ms, deadline)); + + if (!settings.trace_id.empty()) { + result.TraceId(impl::ToString(settings.trace_id)); + } +} + +template void ApplyToRequestSettings( NYdb::TOperationRequestSettings& result, - const OperationSettings& settings, + const Settings& settings, engine::Deadline deadline ) { result.OperationTimeout(GetBoundTimeout(settings.operation_timeout_ms, deadline)); @@ -54,8 +67,8 @@ void ApplyToRequestSettings( } } -template -T PrepareRequestSettings(const OperationSettings& settings, engine::Deadline deadline) { +template +T PrepareRequestSettings(const Settings& settings, engine::Deadline deadline) { T result; impl::ApplyToRequestSettings(result, settings, deadline); return result; diff --git a/ydb/src/ydb/impl/request_context.cpp b/ydb/src/ydb/impl/request_context.cpp index 355b5d66aa9f..821196741b06 100644 --- a/ydb/src/ydb/impl/request_context.cpp +++ b/ydb/src/ydb/impl/request_context.cpp @@ -18,9 +18,35 @@ namespace ydb::impl { namespace { +template +void AddSpecificTags(tracing::Span& span, const Settings& settings); + +template <> +void AddSpecificTags(tracing::Span& span, const OperationSettings& settings) { + UASSERT(settings.retries.has_value()); + span.AddTag("max_retries", *settings.retries); + span.AddTag("get_session_timeout_ms", settings.get_session_timeout_ms.count()); + span.AddTag("operation_timeout_ms", settings.operation_timeout_ms.count()); + span.AddTag("cancel_after_ms", settings.cancel_after_ms.count()); + span.AddTag("client_timeout_ms", settings.client_timeout_ms.count()); +} + +template <> +void AddSpecificTags(tracing::Span& span, const RequestSettings& settings) { + span.AddTag("timeout_ms", settings.timeout_ms.count()); +} + +template <> +void AddSpecificTags(tracing::Span& span, const RetryTxSettings& settings) { + span.AddTag("timeout_ms", settings.timeout_ms.count()); + span.AddTag("retries", settings.retries); + span.AddTag("is_idempotent", settings.is_idempotent); +} + +template tracing::Span MakeSpan( const Query& query, - OperationSettings& settings, + Settings& settings, tracing::Span* custom_parent_span, utils::impl::SourceLocation location ) { @@ -29,7 +55,7 @@ tracing::Span MakeSpan( ? custom_parent_span->CreateChild("ydb_query", location) : tracing::Span("ydb_query", location); - settings.trace_id = span.GetTraceId(); + // settings.trace_id = span.GetTraceId(); const auto optional_name_view = query.GetOptionalNameView(); switch (query.GetLogMode()) { @@ -47,12 +73,7 @@ tracing::Span MakeSpan( break; } - UASSERT(settings.retries.has_value()); - span.AddTag("max_retries", *settings.retries); - span.AddTag("get_session_timeout_ms", settings.get_session_timeout_ms.count()); - span.AddTag("operation_timeout_ms", settings.operation_timeout_ms.count()); - span.AddTag("cancel_after_ms", settings.cancel_after_ms.count()); - span.AddTag("client_timeout_ms", settings.client_timeout_ms.count()); + AddSpecificTags(span, settings); if (optional_name_view) { try { @@ -65,7 +86,17 @@ tracing::Span MakeSpan( return span; } +template void PrepareSettings( + const Query& query, + const dynamic_config::Snapshot& config_snapshot, + Settings& os, + impl::IsStreaming is_streaming, + const OperationSettings& default_settings +); + +template <> +void PrepareSettings( const Query& query, const dynamic_config::Snapshot& config_snapshot, OperationSettings& os, @@ -136,6 +167,28 @@ void PrepareSettings( } } +template <> +void PrepareSettings( + [[maybe_unused]] const Query& query, + [[maybe_unused]] const dynamic_config::Snapshot& config_snapshot, + [[maybe_unused]] RequestSettings& os, + [[maybe_unused]] impl::IsStreaming is_streaming, + [[maybe_unused]] const OperationSettings& default_settings +) { + // TODO: to think about default settings for RequestSettings +} + +template <> +void PrepareSettings( + [[maybe_unused]] const Query& query, + [[maybe_unused]] const dynamic_config::Snapshot& config_snapshot, + [[maybe_unused]] RetryTxSettings& os, + [[maybe_unused]] impl::IsStreaming is_streaming, + [[maybe_unused]] const OperationSettings& default_settings +) { + // TODO: to think about default settings for RetryTxSettings +} + engine::Deadline GetDeadline(tracing::Span& span, const dynamic_config::Snapshot& config_snapshot) { if (config_snapshot[::dynamic_config::YDB_DEADLINE_PROPAGATION_VERSION] != impl::kDeadlinePropagationExperimentVersion) @@ -165,10 +218,11 @@ engine::Deadline GetDeadline(tracing::Span& span, const dynamic_config::Snapshot } // namespace -RequestContext::RequestContext( +template +RequestContext::RequestContext( TableClient& l_table_client, const Query& query, - OperationSettings&& settings, + Settings&& settings, IsStreaming is_streaming, tracing::Span* custom_parent_span, const utils::impl::SourceLocation& location @@ -180,13 +234,14 @@ RequestContext::RequestContext( config_snapshot(table_client.config_source_.GetSnapshot()), // Note: comma operator is used to insert code between initializations. span(( - PrepareSettings(query, config_snapshot, this->settings, is_streaming, table_client.default_settings_), - MakeSpan(query, this->settings, custom_parent_span, location) + PrepareSettings(query, config_snapshot, this->settings, is_streaming, table_client.default_settings_), + MakeSpan(query, this->settings, custom_parent_span, location) )), deadline(GetDeadline(span, config_snapshot)) {} -void RequestContext::HandleError(const NYdb::TStatus& status) { +template +void RequestContext::HandleError(const NYdb::TStatus& status) { if (engine::current_task::ShouldCancel()) { return; } @@ -202,13 +257,18 @@ void RequestContext::HandleError(const NYdb::TStatus& status) { } } -RequestContext::~RequestContext() { +template +RequestContext::~RequestContext() { if (engine::current_task::ShouldCancel() && !is_error) { stats_scope.OnCancelled(); span.AddTag("cancelled", true); } } +template struct RequestContext; +template struct RequestContext; +template struct RequestContext; + } // namespace ydb::impl USERVER_NAMESPACE_END diff --git a/ydb/src/ydb/impl/request_context.hpp b/ydb/src/ydb/impl/request_context.hpp index 1673d83ccc84..3af6da5cd01d 100644 --- a/ydb/src/ydb/impl/request_context.hpp +++ b/ydb/src/ydb/impl/request_context.hpp @@ -18,11 +18,12 @@ USERVER_NAMESPACE_BEGIN namespace ydb::impl { +template struct RequestContext final { RequestContext( TableClient& client, const Query& query, - OperationSettings&& settings, + Settings&& settings, IsStreaming is_streaming = IsStreaming{false}, tracing::Span* custom_parent_span = nullptr, const utils::impl::SourceLocation& location = utils::impl::SourceLocation::Current() @@ -33,7 +34,7 @@ struct RequestContext final { ~RequestContext(); TableClient& table_client; - OperationSettings settings; + Settings settings; const int initial_uncaught_exceptions; StatsScope stats_scope; dynamic_config::Snapshot config_snapshot; diff --git a/ydb/src/ydb/impl/retry.cpp b/ydb/src/ydb/impl/retry.cpp index 71ab627dff73..90f4fb049493 100644 --- a/ydb/src/ydb/impl/retry.cpp +++ b/ydb/src/ydb/impl/retry.cpp @@ -1,5 +1,7 @@ #include +#include + #include #include @@ -8,37 +10,87 @@ USERVER_NAMESPACE_BEGIN namespace ydb::impl { -NYdb::NRetry::TRetryOperationSettings PrepareRetrySettings( +namespace { + +constexpr std::chrono::milliseconds kMaxBackoff = std::chrono::hours(1); + +std::chrono::milliseconds CalcBackoffTime(const BackoffSettings& settings, std::uint32_t retry_number) { + using BackoffDuration = std::chrono::duration; + + std::uint32_t backoff_slots = 1 << std::min(retry_number, settings.ceiling); + + double uncertainty_ratio = std::max(std::min(settings.uncertain_ratio, 1.0), 0.0); + double uncertainty_multiplier = (utils::RandRange(0.0, 1.0) * uncertainty_ratio) - uncertainty_ratio + 1.0; + + auto backoff = BackoffDuration(settings.slot_duration_ms) * backoff_slots * uncertainty_multiplier; + auto backoff_ms = std::chrono::duration_cast(backoff); + + return std::max(std::min(backoff_ms, kMaxBackoff), std::chrono::milliseconds::zero()); +} + +} // namespace + +template <> +CommonRetrySettings PrepareRetrySettings( const OperationSettings& operation_settings, const utils::RetryBudget& retry_budget, engine::Deadline deadline ) { - NYdb::NRetry::TRetryOperationSettings retry_settings; - - UASSERT(operation_settings.retries.has_value()); - retry_settings.MaxRetries(retry_budget.CanRetry() ? operation_settings.retries.value() : 0); - - retry_settings.GetSessionClientTimeout(GetBoundTimeout(operation_settings.get_session_timeout_ms, deadline)); + return CommonRetrySettings{ + .timeout_ms = GetBoundTimeout(operation_settings.client_timeout_ms, deadline), + .get_session_timeout_ms = GetBoundTimeout(operation_settings.get_session_timeout_ms, deadline), + .retries = retry_budget.CanRetry() ? operation_settings.retries.value() : 0, + }; +} - return retry_settings; +template <> +CommonRetrySettings PrepareRetrySettings( + const RetryTxSettings& retry_tx_settings, + const utils::RetryBudget& retry_budget, + engine::Deadline deadline +) { + return CommonRetrySettings{ + .timeout_ms = GetBoundTimeout(retry_tx_settings.timeout_ms, deadline), + .retries = retry_budget.CanRetry() ? retry_tx_settings.retries : 0, + .is_idempotent = retry_tx_settings.is_idempotent, + }; } -bool IsRetryableStatus(NYdb::EStatus status) { +RetryStep RetryStep::GetNext(const CommonRetrySettings& retry_settings, NYdb::EStatus status, std::uint32_t retry_number) { switch (status) { case NYdb::EStatus::ABORTED: - case NYdb::EStatus::UNAVAILABLE: + return {.backoff = std::chrono::milliseconds::zero(), .reset_session = false}; + case NYdb::EStatus::OVERLOADED: - case NYdb::EStatus::BAD_SESSION: case NYdb::EStatus::CLIENT_RESOURCE_EXHAUSTED: - return true; + return {.backoff = CalcBackoffTime(retry_settings.slow_backoff_settings, retry_number), .reset_session = false}; + + case NYdb::EStatus::UNAVAILABLE: + return {.backoff = CalcBackoffTime(retry_settings.fast_backoff_settings, retry_number), .reset_session = false}; + + case NYdb::EStatus::BAD_SESSION: + case NYdb::EStatus::SESSION_BUSY: + return {.backoff = std::chrono::milliseconds::zero(), .reset_session = true}; + + case NYdb::EStatus::UNDETERMINED: + if (retry_settings.is_idempotent) { + return {.backoff = CalcBackoffTime(retry_settings.fast_backoff_settings, retry_number), .reset_session = false}; + } else { + return {.backoff = std::nullopt, .reset_session = false}; + } + + case NYdb::EStatus::TRANSPORT_UNAVAILABLE: + if (retry_settings.is_idempotent) { + return {.backoff = CalcBackoffTime(retry_settings.fast_backoff_settings, retry_number), .reset_session = true}; + } else { + return {.backoff = std::nullopt, .reset_session = false}; + } default: - return false; + return {.backoff = std::nullopt, .reset_session = false}; } } -NYdb::TStatus MakeNonRetryableStatus() { return NYdb::TStatus{NYdb::EStatus::BAD_REQUEST, NYdb::NIssue::TIssues{}}; } - } // namespace ydb::impl USERVER_NAMESPACE_END diff --git a/ydb/src/ydb/impl/retry.hpp b/ydb/src/ydb/impl/retry.hpp index 9da53aba094c..5511d8a1ca50 100644 --- a/ydb/src/ydb/impl/retry.hpp +++ b/ydb/src/ydb/impl/retry.hpp @@ -7,25 +7,55 @@ #include #include +#include #include #include +#include #include USERVER_NAMESPACE_BEGIN namespace ydb::impl { -NYdb::NRetry::TRetryOperationSettings PrepareRetrySettings( - const OperationSettings& settings, +struct BackoffSettings { + std::chrono::milliseconds slot_duration_ms{1000}; + std::uint32_t ceiling{6}; + double uncertain_ratio{0.5}; +}; + +struct CommonRetrySettings { + std::chrono::milliseconds timeout_ms{std::chrono::milliseconds::max()}; + std::chrono::milliseconds get_session_timeout_ms{5000}; + std::uint32_t retries{10}; + bool is_idempotent{false}; + + BackoffSettings fast_backoff_settings{ + .slot_duration_ms = std::chrono::milliseconds(5), + .ceiling = 10, + .uncertain_ratio = 0.5, + }; + + BackoffSettings slow_backoff_settings{ + .slot_duration_ms = std::chrono::seconds(1), + .ceiling = 6, + .uncertain_ratio = 0.5, + }; +}; + +template +CommonRetrySettings PrepareRetrySettings( + const Settings& settings, const utils::RetryBudget& retry_budget, engine::Deadline deadline ); -// See TRetryContextBase for an understanding of error handling -bool IsRetryableStatus(NYdb::EStatus status); +struct RetryStep { + static RetryStep GetNext(const CommonRetrySettings& retry_settings, NYdb::EStatus status, std::uint32_t retry_number); -NYdb::TStatus MakeNonRetryableStatus(); + std::optional backoff = std::nullopt; + bool reset_session = false; +}; template NYdb::TAsyncStatus RetryOperation(NYdb::NTable::TTableClient& table_client, Args&&... args) { @@ -37,130 +67,98 @@ NYdb::TAsyncStatus RetryOperation(NYdb::NQuery::TQueryClient& query_client, Args return query_client.RetryQuery(std::forward(args)...); } -template -class RetryHandler : public std::enable_shared_from_this> { +template +class RetryHandler { public: using TSession = typename TClient::TSession; - using ArgType = std::conditional_t, TSession, TClient&>; - using AsyncResultType = std::invoke_result_t; - using ResultType = typename AsyncResultType::value_type; RetryHandler( TClient& client, utils::RetryBudget& retry_budget, - const NYdb::NRetry::TRetryOperationSettings& retry_settings, + const CommonRetrySettings& retry_settings, Fn&& fn ) : client_{client}, retry_budget_{retry_budget}, retry_settings_{retry_settings}, + get_session_settings_{GetSessionSettings().ClientTimeout(retry_settings_.get_session_timeout_ms)}, fn_{std::move(fn)} {} - AsyncResultType Execute() { - auto internal_retry_status = RetryOperation( - client_, - [handler = this->shared_from_this()](ArgType arg) { - return handler->InternalRetryIteration(std::forward(arg)); - }, - retry_settings_ - ); - - return internal_retry_status - .Apply([handler = this->shared_from_this()](const NYdb::TAsyncStatus& internal_async_status) { - return handler->TransformInternalRetryStatus(internal_async_status); - }); - } - -private: - NYdb::TAsyncStatus InternalRetryIteration(ArgType arg) { - const auto async_result = fn_(std::forward(arg)); - - return async_result.Apply([handler = this->shared_from_this()](const auto& async_result) { - return handler->HandleResult(async_result); - }); - } - - NYdb::TStatus HandleResult(const AsyncResultType& async_result) { - async_result.TryRethrow(); - - // Alternatively, we could just copy the TFuture, causing pointless refcounting overhead. - // - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) - result_.emplace(const_cast(async_result).ExtractValue()); - - if (result_->IsSuccess()) { - retry_budget_.AccountOk(); - } else if (IsRetryableStatus(result_->GetStatus())) { - if (retry_budget_.CanRetry()) { - retry_budget_.AccountFail(); - } else { - return MakeNonRetryableStatus(); + void Execute() { + std::optional session; + engine::Deadline deadline = engine::Deadline::FromDuration(retry_settings_.timeout_ms); + for (std::uint32_t i = 0; i <= retry_settings_.retries && !deadline.IsReached(); ++i) { + try { + if constexpr (std::is_invocable_v) { + if (!session) { + auto get_session_future = client_.GetSession(get_session_settings_); + session = GetFutureValueChecked(std::move(get_session_future), "GetSession").GetSession(); + } + fn_(*session); + } else { + fn_(client_); + } + retry_budget_.AccountOk(); + } catch (const YdbResponseError& e) { + auto [backoff, reset_session] = RetryStep::GetNext(retry_settings_, e.GetStatus().GetStatus(), i); + if (reset_session) { + session.reset(); + } + if (backoff.has_value()) { + retry_budget_.AccountFail(); + engine::SleepUntil(std::min(engine::Deadline::FromDuration(*backoff), deadline)); + } else { + throw; + } } } - - // NOLINTNEXTLINE(cppcoreguidelines-slicing) - return NYdb::TStatus{*result_}; - } - - ResultType TransformInternalRetryStatus(const NYdb::TAsyncStatus& internal_retry_status) { - internal_retry_status.TryRethrow(); - - if (!result_.has_value()) { - throw DeadlineExceededError(fmt::format( - "Timed out before the initial attempt in RetryOperation, internal status {}: {}", - static_cast>(internal_retry_status.GetValue().GetStatus()), - internal_retry_status.GetValue().GetIssues().ToOneLineString() - )); - } - - return std::move(*result_); } +private: TClient& client_; utils::RetryBudget& retry_budget_; - NYdb::NRetry::TRetryOperationSettings retry_settings_; + CommonRetrySettings retry_settings_; + GetSessionSettings get_session_settings_; Fn fn_; - - std::optional result_; }; -// Fn: (NYdb::NTable::TSession) -> NThreading::TFuture +// Fn: (NYdb::NTable::TSession) -> void // OR -// (NYdb::NTable::TTableClient&) -> NThreading::TFuture -// RetryOperation -> NThreading::TFuture +// (NYdb::NTable::TTableClient&) -> void +// RetryOperation -> void template -auto RetryOperation(impl::RequestContext& request_context, Fn&& fn) { +void RetryOperation(impl::RequestContext& request_context, Fn&& fn) { static_assert(std::is_invocable_v || std::is_invocable_v); auto& client = request_context.table_client.GetNativeTableClient(); auto& retry_budget = request_context.table_client.GetRetryBudget(); - auto retry_handler = std::make_shared>( + auto retry_handler = std::make_shared>( client, retry_budget, - PrepareRetrySettings(request_context.settings, retry_budget, request_context.deadline), + PrepareRetrySettings(request_context.settings, retry_budget, request_context.deadline), std::forward(fn) ); - return retry_handler->Execute(); + retry_handler->Execute(); } -// Fn: (NYdb::NQuery::TSession) -> NThreading::TFuture +// Fn: (NYdb::NQuery::TSession) -> void // OR -// (NYdb::NQuery::TQueryClient&) -> NThreading::TFuture -// RetryQuery -> NThreading::TFuture -template -auto RetryQuery(impl::RequestContext& request_context, Fn&& fn) { +// (NYdb::NQuery::TQueryClient&) -> void +// RetryQuery -> void +template +void RetryQuery(impl::RequestContext& request_context, Fn&& fn) { static_assert(std::is_invocable_v || std::is_invocable_v); auto& client = request_context.table_client.GetNativeQueryClient(); auto& retry_budget = request_context.table_client.GetRetryBudget(); - auto retry_handler = std::make_shared>( + auto retry_handler = std::make_shared>( client, retry_budget, - PrepareRetrySettings(request_context.settings, retry_budget, request_context.deadline), + PrepareRetrySettings(request_context.settings, retry_budget, request_context.deadline), std::forward(fn) ); - return retry_handler->Execute(); + retry_handler->Execute(); } } // namespace ydb::impl diff --git a/ydb/src/ydb/table.cpp b/ydb/src/ydb/table.cpp index 851b6f63cc5a..2c65925fb92e 100644 --- a/ydb/src/ydb/table.cpp +++ b/ydb/src/ydb/table.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include USERVER_NAMESPACE_BEGIN @@ -88,7 +89,7 @@ TableClient::~TableClient() { } } -template +template auto TableClient::ExecuteWithPathImpl( std::string_view path, std::string_view operation_name, @@ -104,19 +105,23 @@ auto TableClient::ExecuteWithPathImpl( const Query query{"", Query::Name{operation_name}}; impl::RequestContext context{*this, query, std::move(settings)}; - auto future = impl::RetryOperation( + std::unique_ptr result; + + impl::RetryOperation( context, [func = std::forward(func), full_path = JoinDbPath(path), query_settings = std::forward(query_settings), - settings = context.settings, - deadline = context.deadline](FuncArg arg) mutable { - impl::ApplyToRequestSettings(query_settings, settings, deadline); - return func(std::forward(arg), full_path, query_settings); + operation_name = std::move(operation_name), + &context, + &result](FuncArg arg) mutable { + impl::ApplyToRequestSettings(query_settings, context.settings, context.deadline); + auto future = func(std::forward(arg), full_path, query_settings); + result = std::make_unique(impl::GetFutureValueChecked(std::move(future), operation_name, context)); } ); - return impl::GetFutureValueChecked(std::move(future), operation_name, context); + return std::move(*result); } void TableClient::BulkUpsert( @@ -125,7 +130,7 @@ void TableClient::BulkUpsert( OperationSettings settings, BulkUpsertSettings query_settings ) { - ExecuteWithPathImpl( + ExecuteWithPathImpl( table, "BulkUpsert", std::move(settings), @@ -147,18 +152,21 @@ ReadTableResults TableClient::ReadTable( const Query query{"", Query::Name{"ReadTable"}}; impl::RequestContext context{*this, query, std::move(settings), impl::IsStreaming{true}}; - auto future = impl::RetryOperation( + std::unique_ptr result; + + impl::RetryOperation( context, [full_path = JoinDbPath(table), read_settings = std::move(read_settings), - settings = context.settings, - deadline = context.deadline](NYdb::NTable::TSession session) mutable { - impl::ApplyToRequestSettings(read_settings, settings, deadline); - return session.ReadTable(impl::ToString(full_path), read_settings); + &context, + &result](NYdb::NTable::TSession session) mutable { + impl::ApplyToRequestSettings(read_settings, context.settings, context.deadline); + auto future = session.ReadTable(impl::ToString(full_path), read_settings); + result = std::make_unique(impl::GetFutureValueChecked(std::move(future), "ReadTable", context)); } ); - return ReadTableResults{impl::GetFutureValueChecked(std::move(future), "ReadTable", context)}; + return std::move(*result); } ScanQueryResults TableClient::ExecuteScanQuery( @@ -169,19 +177,22 @@ ScanQueryResults TableClient::ExecuteScanQuery( ) { impl::RequestContext context{*this, query, std::move(settings), impl::IsStreaming{true}}; - auto future = impl::RetryOperation( + std::unique_ptr result; + + impl::RetryOperation( context, [query, params = std::move(builder).Build(), scan_settings = std::move(scan_settings), - settings = context.settings, - deadline = context.deadline](NYdb::NTable::TTableClient& table_client) mutable { - impl::ApplyToRequestSettings(scan_settings, settings, deadline); - return table_client.StreamExecuteScanQuery(impl::ToString(query.GetStatementView()), params, scan_settings); + &context, + &result](NYdb::NTable::TTableClient& table_client) mutable { + impl::ApplyToRequestSettings(scan_settings, context.settings, context.deadline); + auto future = table_client.StreamExecuteScanQuery(impl::ToString(query.GetStatementView()), params, scan_settings); + result = std::make_unique(impl::GetFutureValueChecked(std::move(future), "ExecuteScanQuery", context)); } ); - return ScanQueryResults{impl::GetFutureValueChecked(std::move(future), "ExecuteScanQuery", context)}; + return std::move(*result); } void TableClient::Select1() { @@ -198,7 +209,7 @@ NYdb::NQuery::TQueryClient& TableClient::GetNativeQueryClient() { return *query_ utils::RetryBudget& TableClient::GetRetryBudget() { return driver_->GetRetryBudget(); } void TableClient::MakeDirectory(const std::string& path, MakeDirectorySettings query_settings) { - ExecuteWithPathImpl( + ExecuteWithPathImpl( path, "MakeDirectory", /*settings=*/{}, @@ -210,7 +221,7 @@ void TableClient::MakeDirectory(const std::string& path, MakeDirectorySettings q } void TableClient::RemoveDirectory(const std::string& path, RemoveDirectorySettings query_settings) { - ExecuteWithPathImpl( + ExecuteWithPathImpl( path, "RemoveDirectory", /*settings=*/{}, @@ -227,7 +238,7 @@ NYdb::NScheme::TDescribePathResult TableClient::DescribePath( std::string_view path, DescribePathSettings query_settings ) { - return ExecuteWithPathImpl( + return ExecuteWithPathImpl( path, "DescribePath", /*settings=*/{}, @@ -242,7 +253,7 @@ NYdb::NTable::TDescribeTableResult TableClient::DescribeTable( std::string_view path, DescribeTableSettings query_settings ) { - return ExecuteWithPathImpl( + return ExecuteWithPathImpl( path, "DescribeTable", /*settings=*/{}, @@ -257,7 +268,7 @@ NYdb::NScheme::TListDirectoryResult TableClient::ListDirectory( std::string_view path, ListDirectorySettings query_settings ) { - return ExecuteWithPathImpl( + return ExecuteWithPathImpl( path, "ListDirectory", /*settings=*/{}, @@ -273,7 +284,7 @@ void TableClient::CreateTable( NYdb::NTable::TTableDescription&& table_desc, CreateTableSettings query_settings ) { - ExecuteWithPathImpl( + ExecuteWithPathImpl( path, "CreateTable", /*settings=*/{}, @@ -287,7 +298,7 @@ void TableClient::CreateTable( } void TableClient::DropTable(std::string_view path, DropTableSettings query_settings) { - ExecuteWithPathImpl( + ExecuteWithPathImpl( path, "DropTable", /*settings=*/{}, @@ -313,18 +324,28 @@ Transaction TableClient::Begin(DynamicTransactionName transaction_name, Operatio impl::RequestContext context{*this, query, std::move(settings)}; auto tx_settings = MakeTxSettings(context.settings.tx_mode.value()); - auto future = impl::RetryQuery( + std::unique_ptr result; + + impl::RetryQuery( context, - [tx_settings = std::move(tx_settings), - settings = context.settings, - deadline = context.deadline](NYdb::NQuery::TSession session) { - const auto exec_settings = impl::PrepareRequestSettings(settings, deadline); - return session.BeginTransaction(tx_settings, exec_settings); + [this, + tx_settings = std::move(tx_settings), + transaction_name = std::move(transaction_name), + settings = std::move(settings), + &context, + &result](NYdb::NQuery::TSession session) mutable { + const auto exec_settings = impl::PrepareRequestSettings(context.settings, context.deadline); + auto future = session.BeginTransaction(tx_settings, exec_settings); + result = std::make_unique( + *this, + impl::GetFutureValueChecked(std::move(future), "BeginTransaction", context).GetTransaction(), + transaction_name.GetUnderlying(), + std::move(settings) + ); } ); - auto status = impl::GetFutureValueChecked(std::move(future), "BeginTransaction", context); - return Transaction(*this, status.GetTransaction(), transaction_name.GetUnderlying(), std::move(settings)); + return std::move(*result); } void TableClient::ExecuteSchemeQuery(const std::string& query) { @@ -332,16 +353,15 @@ void TableClient::ExecuteSchemeQuery(const std::string& query) { OperationSettings settings{}; impl::RequestContext context{*this, nameless_query, std::move(settings)}; - auto retry_future = impl::RetryOperation( + impl::RetryOperation( context, - [query, settings = context.settings, deadline = context.deadline](NYdb::NTable::TSession session) { + [query, &context](NYdb::NTable::TSession session) { const auto exec_settings = impl::PrepareRequestSettings< - NYdb::NTable::TExecSchemeQuerySettings>(settings, deadline); - return session.ExecuteSchemeQuery(impl::ToString(query), exec_settings); + NYdb::NTable::TExecSchemeQuerySettings>(context.settings, context.deadline); + auto future = session.ExecuteSchemeQuery(impl::ToString(query), exec_settings); + return impl::GetFutureValueChecked(std::move(future), "ExecuteSchemeQuery", context); } ); - - impl::GetFutureValueChecked(std::move(retry_future), "ExecuteSchemeQuery", context); } ExecuteResponse TableClient::ExecuteDataQuery( @@ -360,21 +380,24 @@ ExecuteResponse TableClient::ExecuteDataQuery( ) { impl::RequestContext context{*this, query, std::move(settings)}; - auto future = impl::RetryQuery( + std::unique_ptr response; + + impl::RetryQuery( context, [query, params = std::move(builder).Build(), exec_settings = impl::ToExecuteQuerySettings(query_settings), - settings = context.settings, - deadline = context.deadline](NYdb::NQuery::TSession session) mutable { - impl::ApplyToRequestSettings(exec_settings, settings, deadline); - const auto tx_settings = MakeTxSettings(settings.tx_mode.value()); + &response, + &context](NYdb::NQuery::TSession session) mutable { + impl::ApplyToRequestSettings(exec_settings, context.settings, context.deadline); + const auto tx_settings = MakeTxSettings(context.settings.tx_mode.value()); const auto tx = NYdb::NQuery::TTxControl::BeginTx(tx_settings).CommitTx(); - return session.ExecuteQuery(impl::ToString(query.GetStatementView()), tx, params, exec_settings); + auto future = session.ExecuteQuery(impl::ToString(query.GetStatementView()), tx, params, exec_settings); + response = std::make_unique(impl::GetFutureValueChecked(std::move(future), "ExecuteDataQuery", context)); } ); - return ExecuteResponse{impl::GetFutureValueChecked(std::move(future), "ExecuteDataQuery", context)}; + return std::move(*response); } ExecuteResponse TableClient::ExecuteQuery( @@ -400,21 +423,79 @@ ExecuteResponse TableClient::ExecuteQuery( impl::RequestContext context{*this, query, std::move(settings)}; - auto future = impl::RetryQuery( + std::unique_ptr response; + + impl::RetryQuery( context, [query, params = std::move(builder).Build(), exec_settings = std::move(exec_settings), - settings = context.settings, - deadline = context.deadline](NYdb::NQuery::TSession session) mutable { - impl::ApplyToRequestSettings(exec_settings, settings, deadline); - const auto tx_settings = MakeTxSettings(settings.tx_mode.value()); + &response, + &context](NYdb::NQuery::TSession session) mutable { + impl::ApplyToRequestSettings(exec_settings, context.settings, context.deadline); + const auto tx_settings = MakeTxSettings(context.settings.tx_mode.value()); const auto tx = NYdb::NQuery::TTxControl::BeginTx(tx_settings).CommitTx(); - return session.ExecuteQuery(impl::ToString(query.GetStatementView()), tx, params, exec_settings); + + auto future = session.ExecuteQuery(impl::ToString(query.GetStatementView()), tx, params, exec_settings); + response = std::make_unique(impl::GetFutureValueChecked(std::move(future), "ExecuteQuery", context)); } ); - return ExecuteResponse{impl::GetFutureValueChecked(std::move(future), "ExecuteQuery", context)}; + return std::move(*response); +} + +void TableClient::RetryTx( + utils::StringLiteral transaction_name, + RetryTxSettings retry_settings, + RetryTxFunction fn +) { + RetryTx(DynamicTransactionName{transaction_name.data()}, std::move(retry_settings), std::move(fn)); +} + +void TableClient::RetryTx( + DynamicTransactionName transaction_name, + RetryTxSettings retry_settings, + RetryTxFunction fn +) { + const Query query{"", Query::Name{"RetryTx"}}; + auto tx_settings = MakeTxSettings(retry_settings.tx_mode); + impl::RequestContext context{*this, query, std::move(retry_settings)}; + + impl::RetryQuery( + context, + [this, + fn = std::move(fn), + tx_name = std::move(transaction_name), + tx_settings = std::move(tx_settings), + commit_settings = std::move(retry_settings.commit_settings), + rollback_settings = std::move(retry_settings.rollback_settings) + ](NYdb::NQuery::TSession session) mutable { + NYdb::NQuery::TBeginTxSettings begin_settings; + + auto begin_future = session.BeginTransaction(tx_settings, begin_settings); + auto begin_result = impl::GetFutureValueChecked(std::move(begin_future), "RetryTx"); + + auto ydb_tx = begin_result.GetTransaction(); + TxActor tx_actor(*this, std::move(ydb_tx), tx_name.GetUnderlying()); + auto action = fn(tx_actor); + + if (action == TxAction::kCommit) { + const Query commit_query{"", Query::Name{"Commit"}}; + impl::RequestContext commit_context{*this, commit_query, std::move(commit_settings)}; + + auto commit_settings = impl::PrepareRequestSettings(commit_context.settings, commit_context.deadline); + auto commit_future = tx_actor.ydb_tx_.Commit(commit_settings); + impl::GetFutureValueChecked(std::move(commit_future), "Commit", commit_context); + } else { + const Query rollback_query{"", Query::Name{"Rollback"}}; + impl::RequestContext rollback_context{*this, rollback_query, std::move(rollback_settings)}; + + auto rollback_settings = impl::PrepareRequestSettings(rollback_context.settings, rollback_context.deadline); + auto rollback_future = tx_actor.ydb_tx_.Rollback(rollback_settings); + impl::GetFutureValueChecked(std::move(rollback_future), "Rollback", rollback_context); + } + } + ); } std::string TableClient::JoinDbPath(std::string_view path) const { return impl::JoinPath(driver_->GetDbPath(), path); } diff --git a/ydb/src/ydb/transaction.cpp b/ydb/src/ydb/transaction.cpp index 5b248d626c65..2af5a7a17f26 100644 --- a/ydb/src/ydb/transaction.cpp +++ b/ydb/src/ydb/transaction.cpp @@ -18,6 +18,47 @@ USERVER_NAMESPACE_BEGIN namespace ydb { +TxActor::TxActor( + TableClient& table_client, + NYdb::NQuery::TTransaction ydb_tx, + std::string name +) noexcept + : table_client_(table_client), + name_(std::move(name)), + stats_scope_(impl::StatsScope::TransactionTag{}, *table_client_.stats_, name_), + span_("ydb_retry_transaction"), + ydb_tx_(std::move(ydb_tx)) { + span_.DetachFromCoroStack(); + span_.AddTag("transaction_name", name_); +} + +ExecuteResponse TxActor::Execute(ExecuteSettings settings, const Query& query, PreparedArgsBuilder&& builder) { + impl::RequestContext context{table_client_, query, std::move(settings), impl::IsStreaming{false}, &span_}; + auto internal_params = std::move(builder).Build(); + + auto exec_settings = impl::PrepareRequestSettings(settings, context.deadline); + + const auto tx = NYdb::NQuery::TTxControl::Tx(ydb_tx_); + auto execute_fut = ydb_tx_.GetSession().ExecuteQuery( + impl::ToString(query.GetStatementView()), + tx, + std::move(internal_params), + exec_settings + ); + + auto status = impl::GetFutureValueChecked( + std::move(execute_fut), + "TxActor::Execute", + context + ); + + return ExecuteResponse(std::move(status)); +} + +PreparedArgsBuilder TxActor::GetBuilder() const { + return table_client_.GetBuilder(); +} + Transaction::Transaction( TableClient& table_client, NYdb::NQuery::TTransaction ydb_tx, diff --git a/ydb/tests/execute_test.cpp b/ydb/tests/execute_test.cpp index 4ddb2bbb080f..a2772f0fe326 100644 --- a/ydb/tests/execute_test.cpp +++ b/ydb/tests/execute_test.cpp @@ -470,6 +470,59 @@ UTEST_F(YdbExecute, TransactionIsQueryFromCache) { } } +UTEST_F(YdbExecute, RetryTxCommit) { + CreateTable("retry_tx_commit", false); + + GetTableClient().RetryTx("test_retry_tx", ydb::RetryTxSettings{ + .tx_mode = ydb::TransactionMode::kSerializableRW, + .retries = 3, + }, [](ydb::TxActor& tx) { + tx.Execute(ydb::Query{R"( + UPSERT INTO retry_tx_commit (key, value_str, value_int) + VALUES ("key1", "value1", 1), ("key2", "value2", 2); + )"}); + return ydb::TxAction::kCommit; + }); + + auto response = GetTableClient().ExecuteQuery(ydb::Query{R"( + SELECT key, value_str, value_int + FROM retry_tx_commit + ORDER BY key; + )"}); + auto cursor = response.GetSingleCursor(); + ASSERT_EQ(cursor.size(), 2); + auto it = cursor.begin(); + auto row1 = *it; + AssertNullableColumn(row1, "key", "key1"); + AssertNullableColumn(row1, "value_str", "value1"); + ++it; + auto row2 = *it; + AssertNullableColumn(row2, "key", "key2"); + AssertNullableColumn(row2, "value_str", "value2"); +} + +UTEST_F(YdbExecute, RetryTxRollback) { + CreateTable("retry_tx_rollback", false); + + GetTableClient().RetryTx("test_retry_tx_rollback", ydb::RetryTxSettings{ + .tx_mode = ydb::TransactionMode::kSerializableRW, + .retries = 3, + }, [](ydb::TxActor& tx) { + tx.Execute(ydb::Query{R"( + UPSERT INTO retry_tx_rollback (key, value_str, value_int) + VALUES ("key1", "value1", 1); + )"}); + return ydb::TxAction::kRollback; + }); + + auto response = GetTableClient().ExecuteQuery(ydb::Query{R"( + SELECT key FROM retry_tx_rollback; + )"}); + ASSERT_EQ(response.GetCursorCount(), 1); + auto cursor = response.GetSingleCursor(); + ASSERT_EQ(cursor.size(), 0); +} + TYPED_UTEST(YdbExecuteTpl, PrepareQueryError) { this->CreateTable("prepare_error", true); diff --git a/ydb/tests/retry_test.cpp b/ydb/tests/retry_test.cpp index c5580d3ca924..4493451d00a4 100644 --- a/ydb/tests/retry_test.cpp +++ b/ydb/tests/retry_test.cpp @@ -11,14 +11,18 @@ namespace { class RetryOperationFixture : public ydb::ClientFixtureBase { public: - template - auto RetryOperationSync(std::size_t retries, Func func) { + template + FuncResult RetryOperationSync(std::size_t retries, Func func) { auto settings = MakeOperationSettings(retries); ydb::impl::RequestContext context{GetTableClient(), ydb::Query{}, std::move(settings)}; - auto future = ydb::impl::RetryOperation(context, std::move(func)); + std::unique_ptr result; - return ydb::impl::GetFutureValueUnchecked(std::move(future)); + ydb::impl::RetryOperation(context, [func = std::move(func), &result](NYdb::NTable::TSession session) mutable { + result = std::make_unique(func(session)); + }); + + return std::move(*result); } private: @@ -38,8 +42,8 @@ constexpr NYdb::EStatus kSuccess = NYdb::EStatus::SUCCESS; constexpr NYdb::EStatus kRetryableStatus = NYdb::EStatus::ABORTED; constexpr NYdb::EStatus kNonRetryableStatus = NYdb::EStatus::BAD_REQUEST; -inline NThreading::TFuture MakeStatusFuture(NYdb::EStatus status) { - return NThreading::MakeFuture(NYdb::TStatus{status, NYdb::NIssue::TIssues{}}); +inline NYdb::TStatus MakeStatusFuture(NYdb::EStatus status) { + return NYdb::TStatus(status, NYdb::NIssue::TIssues{}); } class TestOperationResults final : public NYdb::TStatus { @@ -61,11 +65,10 @@ class TestOperationResults final : public NYdb::TStatus { UTEST_F(RetryOperationFixture, HandleOfInheritorsOfTStatus) { const std::string data = "qwerty"; - const auto res = RetryOperationSync( + const auto res = RetryOperationSync( /*retries=*/0, [&data](NYdb::NTable::TSession) { - return NThreading::MakeFuture< - TestOperationResults>(TestOperationResults{NYdb::TStatus{kSuccess, NYdb::NIssue::TIssues{}}, data}); + return TestOperationResults{NYdb::TStatus{kSuccess, NYdb::NIssue::TIssues{}}, data}; } ); ASSERT_EQ(res.GetData(), data); @@ -73,7 +76,7 @@ UTEST_F(RetryOperationFixture, HandleOfInheritorsOfTStatus) { UTEST_F(RetryOperationFixture, Success) { std::size_t attempts = 0; - const auto res = RetryOperationSync(/*retries=*/3, [&attempts](NYdb::NTable::TSession) { + const auto res = RetryOperationSync(/*retries=*/3, [&attempts](NYdb::NTable::TSession) { attempts++; return MakeStatusFuture(kSuccess); }); @@ -84,7 +87,7 @@ UTEST_F(RetryOperationFixture, Success) { UTEST_F(RetryOperationFixture, NonRetry) { std::size_t attempts = 0; - const auto res = RetryOperationSync(/*retries=*/3, [&attempts](NYdb::NTable::TSession) { + const auto res = RetryOperationSync(/*retries=*/3, [&attempts](NYdb::NTable::TSession) { attempts++; return MakeStatusFuture(kNonRetryableStatus); }); @@ -95,7 +98,7 @@ UTEST_F(RetryOperationFixture, NonRetry) { UTEST_F(RetryOperationFixture, SuccessOnTheLastAttempt) { constexpr std::uint32_t kRetries = 5; std::size_t attempts = 0; - const auto res = RetryOperationSync(/*retries=*/kRetries, [&attempts](NYdb::NTable::TSession) { + const auto res = RetryOperationSync(/*retries=*/kRetries, [&attempts](NYdb::NTable::TSession) { attempts++; if (attempts < kRetries) { return MakeStatusFuture(kRetryableStatus); @@ -109,7 +112,7 @@ UTEST_F(RetryOperationFixture, SuccessOnTheLastAttempt) { UTEST_F(RetryOperationFixture, AttemptsIsRetriesPlusOne) { constexpr std::uint32_t kRetries = 5; std::size_t attempts = 0; - const auto res = RetryOperationSync( + const auto res = RetryOperationSync( /*retries=*/kRetries, [&attempts](NYdb::NTable::TSession) { attempts++; @@ -124,7 +127,7 @@ UTEST_F(RetryOperationFixture, RetriesLimit) { // ydb-sdk has own maximum for retries, so we want to step over this constexpr std::uint32_t kRetries = 1000; std::size_t attempts = 0; - const auto res = RetryOperationSync( + const auto res = RetryOperationSync( /*retries=*/1000, [&attempts](NYdb::NTable::TSession) { attempts++; @@ -140,7 +143,7 @@ UTEST_F(RetryOperationFixture, RetriesLimit) { UTEST_F(RetryOperationFixture, Exception) { UASSERT_THROW_MSG( - RetryOperationSync( + RetryOperationSync( /*retries=*/0, [](NYdb::NTable::TSession) { throw std::runtime_error{"error"};