diff --git a/CMakeLists.txt b/CMakeLists.txt index 86eaef64720..6df450c510c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,8 @@ project(YDB-CPP-SDK VERSION ${YDB_SDK_VERSION} LANGUAGES C CXX ASM) option(YDB_SDK_INSTALL "Install YDB C++ SDK" Off) option(YDB_SDK_TESTS "Build YDB C++ SDK tests" Off) option(YDB_SDK_EXAMPLES "Build YDB C++ SDK examples" On) +option(YDB_SDK_ENABLE_OTEL_METRICS "Build OpenTelemetry metrics plugin" Off) +option(YDB_SDK_ENABLE_OTEL_TRACE "Build OpenTelemetry trace plugin" Off) set(YDB_SDK_GOOGLE_COMMON_PROTOS_TARGET "" CACHE STRING "Name of cmake target preparing google common proto library") option(YDB_SDK_USE_RAPID_JSON "Search for rapid json library in system" ON) @@ -58,6 +60,7 @@ add_subdirectory(library/cpp) add_subdirectory(include/ydb-cpp-sdk/client) add_subdirectory(src) add_subdirectory(util) +add_subdirectory(plugins) #_ydb_sdk_validate_public_headers() diff --git a/cmake/external_libs.cmake b/cmake/external_libs.cmake index dc46fdb1d5e..9d2500bffb4 100644 --- a/cmake/external_libs.cmake +++ b/cmake/external_libs.cmake @@ -14,6 +14,10 @@ find_package(Brotli 1.1.0 REQUIRED) find_package(jwt-cpp REQUIRED) find_package(double-conversion REQUIRED) +if (YDB_SDK_ENABLE_OTEL_METRICS OR YDB_SDK_ENABLE_OTEL_TRACE) + find_package(opentelemetry-cpp REQUIRED) +endif() + # RapidJSON if (YDB_SDK_USE_RAPID_JSON) find_package(RapidJSON REQUIRED) diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index b6864dcf809..0f1ac3dfca4 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -3,6 +3,8 @@ #include "fwd.h" #include +#include +#include #include #include #include @@ -166,6 +168,12 @@ class TDriverConfig { //! If not set, default executor will be used. TDriverConfig& SetExecutor(std::shared_ptr executor); + //! Set external metrics registry implementation. + TDriverConfig& SetMetricRegistry(std::shared_ptr registry); + + //! Set external trace provider implementation. + TDriverConfig& SetTraceProvider(std::shared_ptr provider); + private: class TImpl; std::shared_ptr Impl_; diff --git a/include/ydb-cpp-sdk/client/metrics/metrics.h b/include/ydb-cpp-sdk/client/metrics/metrics.h new file mode 100644 index 00000000000..7e2b0b903dd --- /dev/null +++ b/include/ydb-cpp-sdk/client/metrics/metrics.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace NYdb::inline V3::NMetrics { + +using TLabels = std::map; + +class ICounter { +public: + virtual ~ICounter() = default; + virtual void Inc() = 0; +}; + +class IGauge { +public: + virtual ~IGauge() = default; + virtual void Add(double delta) = 0; + virtual void Set(double value) = 0; +}; + +class IHistogram { +public: + virtual ~IHistogram() = default; + virtual void Record(double value) = 0; +}; + +class IMetricRegistry { +public: + virtual ~IMetricRegistry() = default; + + virtual std::shared_ptr Counter(const std::string& name, const TLabels& labels = {}) = 0; + virtual std::shared_ptr Gauge(const std::string& name, const TLabels& labels = {}) = 0; + virtual std::shared_ptr Histogram(const std::string& name, const std::vector& buckets, const TLabels& labels = {}) = 0; +}; + +} // namespace NYdb::NMetrics diff --git a/include/ydb-cpp-sdk/client/trace/trace.h b/include/ydb-cpp-sdk/client/trace/trace.h new file mode 100644 index 00000000000..117f4220b39 --- /dev/null +++ b/include/ydb-cpp-sdk/client/trace/trace.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include + +namespace NYdb::inline V3::NTrace { + +enum class ESpanKind { + INTERNAL, + SERVER, + CLIENT, + PRODUCER, + CONSUMER +}; + +class ISpan { +public: + virtual ~ISpan() = default; + virtual void End() = 0; + virtual void SetAttribute(const std::string& key, const std::string& value) = 0; + virtual void SetAttribute(const std::string& key, int64_t value) = 0; + virtual void AddEvent(const std::string& name, const std::map& attributes = {}) = 0; +}; + +class ITracer { +public: + virtual ~ITracer() = default; + virtual std::shared_ptr StartSpan(const std::string& name, ESpanKind kind = ESpanKind::INTERNAL) = 0; +}; + +class ITraceProvider { +public: + virtual ~ITraceProvider() = default; + virtual std::shared_ptr GetTracer(const std::string& name) = 0; +}; + +} // namespace NYdb::NTrace diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt new file mode 100644 index 00000000000..0d232800455 --- /dev/null +++ b/plugins/CMakeLists.txt @@ -0,0 +1,2 @@ +add_subdirectory(metrics) +add_subdirectory(trace) diff --git a/plugins/metrics/CMakeLists.txt b/plugins/metrics/CMakeLists.txt new file mode 100644 index 00000000000..6d50a5111e7 --- /dev/null +++ b/plugins/metrics/CMakeLists.txt @@ -0,0 +1,3 @@ +if (YDB_SDK_ENABLE_OTEL_METRICS) + add_subdirectory(otel EXCLUDE_FROM_ALL) +endif() diff --git a/plugins/metrics/otel/CMakeLists.txt b/plugins/metrics/otel/CMakeLists.txt new file mode 100644 index 00000000000..e26b1931984 --- /dev/null +++ b/plugins/metrics/otel/CMakeLists.txt @@ -0,0 +1,17 @@ +_ydb_sdk_add_library(open_telemetry_metrics) +target_sources(open_telemetry_metrics PRIVATE + src/metrics.cpp +) +target_include_directories(open_telemetry_metrics PUBLIC + $ + $ +) +target_link_libraries(open_telemetry_metrics PUBLIC + client-metrics + client-resources + opentelemetry-cpp::api + opentelemetry-cpp::metrics +) +_ydb_sdk_make_client_component(OpenTelemetryMetrics open_telemetry_metrics) + +_ydb_sdk_install_headers(${CMAKE_INSTALL_INCLUDEDIR} DIRECTORY include/) diff --git a/plugins/metrics/otel/include/ydb-cpp-sdk/open_telemetry/metrics.h b/plugins/metrics/otel/include/ydb-cpp-sdk/open_telemetry/metrics.h new file mode 100644 index 00000000000..054d040f97d --- /dev/null +++ b/plugins/metrics/otel/include/ydb-cpp-sdk/open_telemetry/metrics.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +#include + +namespace opentelemetry::metrics { +class MeterProvider; +} + +namespace NYdb::inline V3::NMetrics { + +std::shared_ptr CreateOtelMetricRegistry( + opentelemetry::nostd::shared_ptr meterProvider); + +} // namespace NYdb::NMetrics diff --git a/plugins/metrics/otel/src/metrics.cpp b/plugins/metrics/otel/src/metrics.cpp new file mode 100644 index 00000000000..6b9f14be362 --- /dev/null +++ b/plugins/metrics/otel/src/metrics.cpp @@ -0,0 +1,156 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace NYdb::inline V3::NMetrics { + +namespace { + +using namespace opentelemetry; + +common::KeyValueIterableView MakeAttributes(const TLabels& labels) { + return common::KeyValueIterableView(labels); +} + +class TOtelCounter : public ICounter { +public: + TOtelCounter(nostd::shared_ptr> counter, const TLabels& labels) + : Counter_(std::move(counter)) + , Labels_(labels) + {} + + void Inc() override { + Counter_->Add(1, MakeAttributes(Labels_), context::RuntimeContext::GetCurrent()); + } + +private: + nostd::shared_ptr> Counter_; + TLabels Labels_; +}; + +class TOtelUpDownCounterGauge : public IGauge { +public: + TOtelUpDownCounterGauge(nostd::shared_ptr> counter, const TLabels& labels) + : Counter_(std::move(counter)) + , Labels_(labels) + {} + + void Add(double delta) override { + Counter_->Add(delta, MakeAttributes(Labels_), context::RuntimeContext::GetCurrent()); + Value_ += delta; + } + + void Set(double value) override { + Counter_->Add(value - Value_, MakeAttributes(Labels_), context::RuntimeContext::GetCurrent()); + Value_ = value; + } + +private: + nostd::shared_ptr> Counter_; + TLabels Labels_; + double Value_ = 0; +}; + +class TOtelHistogram : public IHistogram { +public: + TOtelHistogram(nostd::shared_ptr> histogram, const TLabels& labels) + : Histogram_(std::move(histogram)) + , Labels_(labels) + {} + + void Record(double value) override { + Histogram_->Record(value, MakeAttributes(Labels_), context::RuntimeContext::GetCurrent()); + } + +private: + nostd::shared_ptr> Histogram_; + TLabels Labels_; +}; + +class TOtelMetricRegistry : public IMetricRegistry { +public: + TOtelMetricRegistry(nostd::shared_ptr meterProvider) + : MeterProvider_(std::move(meterProvider)) + , Meter_(MeterProvider_->GetMeter("ydb-cpp-sdk", GetSdkSemver())) + {} + + std::shared_ptr Counter(const std::string& name, const TLabels& labels) override { + auto counter = Meter_->CreateUInt64Counter(name); + return std::make_shared(std::move(counter), labels); + } + + std::shared_ptr Gauge(const std::string& name, const TLabels& labels) override { + auto counter = Meter_->CreateDoubleUpDownCounter(name); + return std::make_shared(std::move(counter), labels); + } + + std::shared_ptr Histogram(const std::string& name, const std::vector& buckets, const TLabels& labels) override { + ConfigureHistogramBuckets(name, buckets); + auto histogram = Meter_->CreateDoubleHistogram(name); + return std::make_shared(std::move(histogram), labels); + } + +private: + void ConfigureHistogramBuckets(const std::string& name, const std::vector& buckets) { + if (buckets.empty()) { + return; + } + + auto* sdkProvider = dynamic_cast(MeterProvider_.get()); + if (!sdkProvider) { + return; + } + + { + std::lock_guard lock(HistogramViewsLock_); + if (!HistogramViews_.insert(name).second) { + return; + } + } + + auto selector = std::make_unique( + sdk::metrics::InstrumentType::kHistogram, + name, + "" + ); + auto meterSelector = std::make_unique( + "ydb-cpp-sdk", + GetSdkSemver(), + {} + ); + + auto histogramConfig = std::make_shared(); + histogramConfig->boundaries_ = buckets; + + auto view = std::make_unique( + {}, + {}, + sdk::metrics::AggregationType::kHistogram, + histogramConfig + ); + + sdkProvider->AddView(std::move(selector), std::move(meterSelector), std::move(view)); + } + + nostd::shared_ptr MeterProvider_; + nostd::shared_ptr Meter_; + std::mutex HistogramViewsLock_; + std::unordered_set HistogramViews_; +}; + +} // namespace + +std::shared_ptr CreateOtelMetricRegistry( + opentelemetry::nostd::shared_ptr meterProvider) +{ + return std::make_shared(std::move(meterProvider)); +} + +} // namespace NYdb::NMetrics diff --git a/plugins/trace/CMakeLists.txt b/plugins/trace/CMakeLists.txt new file mode 100644 index 00000000000..ef231ab7103 --- /dev/null +++ b/plugins/trace/CMakeLists.txt @@ -0,0 +1,3 @@ +if (YDB_SDK_ENABLE_OTEL_TRACE) + add_subdirectory(otel EXCLUDE_FROM_ALL) +endif() diff --git a/plugins/trace/otel/CMakeLists.txt b/plugins/trace/otel/CMakeLists.txt new file mode 100644 index 00000000000..6816d8ff7c6 --- /dev/null +++ b/plugins/trace/otel/CMakeLists.txt @@ -0,0 +1,16 @@ +_ydb_sdk_add_library(open_telemetry_trace) +target_sources(open_telemetry_trace PRIVATE + src/trace.cpp +) +target_include_directories(open_telemetry_trace PUBLIC + $ + $ +) +target_link_libraries(open_telemetry_trace PUBLIC + client-trace + opentelemetry-cpp::api + opentelemetry-cpp::trace +) +_ydb_sdk_make_client_component(OpenTelemetryTrace open_telemetry_trace) + +_ydb_sdk_install_headers(${CMAKE_INSTALL_INCLUDEDIR} DIRECTORY include/) diff --git a/plugins/trace/otel/include/ydb-cpp-sdk/open_telemetry/trace.h b/plugins/trace/otel/include/ydb-cpp-sdk/open_telemetry/trace.h new file mode 100644 index 00000000000..13fb952335d --- /dev/null +++ b/plugins/trace/otel/include/ydb-cpp-sdk/open_telemetry/trace.h @@ -0,0 +1,12 @@ +#pragma once + +#include + +#include + +namespace NYdb::inline V3::NTrace { + +std::shared_ptr CreateOtelTraceProvider( + opentelemetry::nostd::shared_ptr tracerProvider); + +} // namespace NYdb::NTrace diff --git a/plugins/trace/otel/src/trace.cpp b/plugins/trace/otel/src/trace.cpp new file mode 100644 index 00000000000..41b1df64793 --- /dev/null +++ b/plugins/trace/otel/src/trace.cpp @@ -0,0 +1,97 @@ +#include + +#include +#include +#include + +namespace NYdb::inline V3::NTrace { + +namespace { + +using namespace opentelemetry; + +trace::SpanKind MapSpanKind(ESpanKind kind) { + switch (kind) { + case ESpanKind::INTERNAL: return trace::SpanKind::kInternal; + case ESpanKind::SERVER: return trace::SpanKind::kServer; + case ESpanKind::CLIENT: return trace::SpanKind::kClient; + case ESpanKind::PRODUCER: return trace::SpanKind::kProducer; + case ESpanKind::CONSUMER: return trace::SpanKind::kConsumer; + } + return trace::SpanKind::kInternal; +} + +class TOtelSpan : public ISpan { +public: + TOtelSpan(nostd::shared_ptr span) + : Span_(std::move(span)) + {} + + void End() override { + Span_->End(); + } + + void SetAttribute(const std::string& key, const std::string& value) override { + Span_->SetAttribute(key, value); + } + + void SetAttribute(const std::string& key, int64_t value) override { + Span_->SetAttribute(key, value); + } + + void AddEvent(const std::string& name, const std::map& attributes) override { + if (attributes.empty()) { + Span_->AddEvent(name); + } else { + std::vector> attrs; + attrs.reserve(attributes.size()); + for (const auto& [k, v] : attributes) { + attrs.emplace_back(nostd::string_view(k), common::AttributeValue(nostd::string_view(v))); + } + Span_->AddEvent(name, attrs); + } + } + +private: + nostd::shared_ptr Span_; +}; + +class TOtelTracer : public ITracer { +public: + TOtelTracer(nostd::shared_ptr tracer) + : Tracer_(std::move(tracer)) + {} + + std::shared_ptr StartSpan(const std::string& name, ESpanKind kind) override { + trace::StartSpanOptions options; + options.kind = MapSpanKind(kind); + return std::make_shared(Tracer_->StartSpan(name, options)); + } + +private: + nostd::shared_ptr Tracer_; +}; + +class TOtelTraceProvider : public ITraceProvider { +public: + TOtelTraceProvider(nostd::shared_ptr tracerProvider) + : TracerProvider_(std::move(tracerProvider)) + {} + + std::shared_ptr GetTracer(const std::string& name) override { + return std::make_shared(TracerProvider_->GetTracer(name)); + } + +private: + nostd::shared_ptr TracerProvider_; +}; + +} // namespace + +std::shared_ptr CreateOtelTraceProvider( + opentelemetry::nostd::shared_ptr tracerProvider) +{ + return std::make_shared(std::move(tracerProvider)); +} + +} // namespace NYdb::NTrace diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3dff7094058..b251a041380 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,3 +1,3 @@ add_subdirectory(api) add_subdirectory(client) -add_subdirectory(library) \ No newline at end of file +add_subdirectory(library) diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index e7f448e8675..ce5e4938058 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -12,6 +12,7 @@ add_subdirectory(iam) add_subdirectory(iam_private) add_subdirectory(impl) add_subdirectory(import) +add_subdirectory(metrics) add_subdirectory(monitoring) add_subdirectory(operation) add_subdirectory(params) @@ -25,5 +26,6 @@ add_subdirectory(scheme) add_subdirectory(ss_tasks) add_subdirectory(table) add_subdirectory(topic) +add_subdirectory(trace) add_subdirectory(types) add_subdirectory(value) diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index 84f6188e819..97022547244 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -53,6 +53,8 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t GetMaxMessageSize() const override { return MaxMessageSize; } const TLog& GetLog() const override { return Log; } std::shared_ptr GetExecutor() const override { return Executor; } + std::shared_ptr GetExternalMetricRegistry() const override { return MetricRegistry; } + std::shared_ptr GetTraceProvider() const override { return TraceProvider; } std::string Endpoint; size_t NetworkThreadsNum = 2; @@ -84,6 +86,8 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t MaxMessageSize = 0; TLog Log; // Null by default. std::shared_ptr Executor; + std::shared_ptr MetricRegistry; + std::shared_ptr TraceProvider; }; TDriverConfig::TDriverConfig(const std::string& connectionString) @@ -243,6 +247,16 @@ TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr executor) { return *this; } +TDriverConfig& TDriverConfig::SetMetricRegistry(std::shared_ptr registry) { + Impl_->MetricRegistry = std::move(registry); + return *this; +} + +TDriverConfig& TDriverConfig::SetTraceProvider(std::shared_ptr provider) { + Impl_->TraceProvider = std::move(provider); + return *this; +} + //////////////////////////////////////////////////////////////////////////////// std::shared_ptr CreateInternalInterface(const TDriver connection) { @@ -296,6 +310,8 @@ TDriverConfig TDriver::GetConfig() const { config.SetMaxOutboundMessageSize(Impl_->MaxOutboundMessageSize_); config.SetMaxMessageSize(Impl_->MaxMessageSize_); config.Impl_->Log = Impl_->Log; + config.SetMetricRegistry(Impl_->GetExternalMetricRegistry()); + config.SetTraceProvider(Impl_->GetTraceProvider()); return config; } diff --git a/src/client/impl/CMakeLists.txt b/src/client/impl/CMakeLists.txt index 9e04f134b37..8dfc3fa865b 100644 --- a/src/client/impl/CMakeLists.txt +++ b/src/client/impl/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(endpoints) add_subdirectory(executor) add_subdirectory(internal) +add_subdirectory(observability) add_subdirectory(session) add_subdirectory(stats) diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.cpp b/src/client/impl/internal/grpc_connections/grpc_connections.cpp index 5ca5d78998f..1bbd04f7c87 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.cpp +++ b/src/client/impl/internal/grpc_connections/grpc_connections.cpp @@ -169,6 +169,8 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL , ChannelPool_(TcpKeepAliveSettings_, params->GetSocketIdleTimeout(), TcpNoDelay_) #endif + , MetricRegistry_(params->GetExternalMetricRegistry()) + , TraceProvider_(params->GetTraceProvider()) , NetworkThreadsNum_(params->GetNetworkThreadsNum()) , UsePerChannelTcpConnection_(params->GetUsePerChannelTcpConnection()) , GRpcClientLow_(NetworkThreadsNum_) @@ -436,6 +438,14 @@ void TGRpcConnectionsImpl::RegisterExtensionApi(IExtensionApi* api) { ExtensionApis_.emplace_back(api); } +std::shared_ptr TGRpcConnectionsImpl::GetExternalMetricRegistry() const { + return MetricRegistry_; +} + +std::shared_ptr TGRpcConnectionsImpl::GetTraceProvider() const { + return TraceProvider_; +} + void TGRpcConnectionsImpl::SetDiscoveryMutator(IDiscoveryMutatorApi::TMutatorCb&& cb) { std::lock_guard lock(ExtensionsLock_); DiscoveryMutatorCb = std::move(cb); diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.h b/src/client/impl/internal/grpc_connections/grpc_connections.h index 630dc051332..b5627052118 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/internal/grpc_connections/grpc_connections.h @@ -18,6 +18,14 @@ namespace NYdb::inline V3 { +namespace NMetrics { + class IMetricRegistry; +} // namespace NMetrics + +namespace NTrace { + class ITraceProvider; +} // namespace NTrace + constexpr TDeadline::Duration GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY = std::chrono::seconds(10); constexpr TDeadline::Duration INITIAL_DEFERRED_CALL_DELAY = std::chrono::milliseconds(10); // The delay before first deferred service call constexpr TDeadline::Duration GET_ENDPOINTS_TIMEOUT = std::chrono::seconds(10); // Time wait for ListEndpoints request, after this time we pass error to client @@ -581,6 +589,9 @@ class TGRpcConnectionsImpl ::NMonitoring::TMetricRegistry* GetMetricRegistry() override; void RegisterExtension(IExtension* extension); void RegisterExtensionApi(IExtensionApi* api); + std::shared_ptr GetExternalMetricRegistry() const; + std::shared_ptr GetTraceProvider() const; + void SetDiscoveryMutator(IDiscoveryMutatorApi::TMutatorCb&& cb); const TLog& GetLog() const override; @@ -716,6 +727,8 @@ class TGRpcConnectionsImpl std::vector> Extensions_; std::vector> ExtensionApis_; + std::shared_ptr MetricRegistry_; + std::shared_ptr TraceProvider_; IDiscoveryMutatorApi::TMutatorCb DiscoveryMutatorCb; diff --git a/src/client/impl/internal/grpc_connections/params.h b/src/client/impl/internal/grpc_connections/params.h index 1112bfdf533..11e41ffad6a 100644 --- a/src/client/impl/internal/grpc_connections/params.h +++ b/src/client/impl/internal/grpc_connections/params.h @@ -11,6 +11,14 @@ namespace NYdb::inline V3 { +namespace NMetrics { + class IMetricRegistry; +} // namespace NMetrics + +namespace NTrace { + class ITraceProvider; +} // namespace NTrace + class IConnectionsParams { public: virtual ~IConnectionsParams() = default; @@ -38,6 +46,8 @@ class IConnectionsParams { virtual uint64_t GetMaxOutboundMessageSize() const = 0; virtual uint64_t GetMaxMessageSize() const = 0; virtual std::shared_ptr GetExecutor() const = 0; + virtual std::shared_ptr GetExternalMetricRegistry() const = 0; + virtual std::shared_ptr GetTraceProvider() const = 0; }; } // namespace NYdb diff --git a/src/client/impl/observability/CMakeLists.txt b/src/client/impl/observability/CMakeLists.txt new file mode 100644 index 00000000000..3330f1f1358 --- /dev/null +++ b/src/client/impl/observability/CMakeLists.txt @@ -0,0 +1,15 @@ +_ydb_sdk_add_library(impl-observability) + +target_link_libraries(impl-observability PUBLIC + yutil + logger + client-impl-ydb_stats + client-trace +) + +target_sources(impl-observability PRIVATE + operation_metrics.cpp + operation_span.cpp +) + +_ydb_sdk_install_targets(TARGETS impl-observability) diff --git a/src/client/impl/observability/operation_metrics.cpp b/src/client/impl/observability/operation_metrics.cpp new file mode 100644 index 00000000000..9c027627fda --- /dev/null +++ b/src/client/impl/observability/operation_metrics.cpp @@ -0,0 +1,33 @@ +#include "operation_metrics.h" + +namespace NYdb::inline V3::NObservability { + +TOperationMetrics::TOperationMetrics(NSdkStats::TStatCollector::TClientOperationStatCollector& collector, + const std::string& operationName) + : Collector_(collector) + , OperationName_(operationName) + , StartTime_(std::chrono::steady_clock::now()) +{ + Collector_.IncRequestCount(OperationName_); +} + +TOperationMetrics::~TOperationMetrics() noexcept { + End(EStatus::CLIENT_INTERNAL_ERROR); +} + +void TOperationMetrics::End(EStatus status) noexcept { + if (Ended_) { + return; + } + Ended_ = true; + + auto durationMs = std::chrono::duration_cast( + std::chrono::steady_clock::now() - StartTime_).count(); + Collector_.RecordLatency(OperationName_, TDuration::MilliSeconds(durationMs)); + + if (status != EStatus::SUCCESS) { + Collector_.IncErrorCount(OperationName_, status); + } +} + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/operation_metrics.h b/src/client/impl/observability/operation_metrics.h new file mode 100644 index 00000000000..c914d2491b8 --- /dev/null +++ b/src/client/impl/observability/operation_metrics.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include +#include + +namespace NYdb::inline V3::NObservability { + +class TOperationMetrics { +public: + TOperationMetrics(NSdkStats::TStatCollector::TClientOperationStatCollector& collector, + const std::string& operationName); + ~TOperationMetrics() noexcept; + + void End(EStatus status) noexcept; + +private: + NSdkStats::TStatCollector::TClientOperationStatCollector& Collector_; + std::string OperationName_; + std::chrono::steady_clock::time_point StartTime_; + bool Ended_ = false; +}; + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/operation_span.cpp b/src/client/impl/observability/operation_span.cpp new file mode 100644 index 00000000000..35276877a50 --- /dev/null +++ b/src/client/impl/observability/operation_span.cpp @@ -0,0 +1,141 @@ +#include "operation_span.h" + +#include + +#include + +#include + +namespace NYdb::inline V3::NObservability { + +namespace { + +constexpr int DefaultGrpcPort = 2135; + +void ParseEndpoint(const std::string& endpoint, std::string& host, int& port) { + port = DefaultGrpcPort; + + if (endpoint.empty()) { + host = endpoint; + return; + } + + // IPv6 bracket notation: [addr]:port + if (endpoint.front() == '[') { + auto bracketEnd = endpoint.find(']'); + if (bracketEnd != std::string::npos) { + host = endpoint.substr(1, bracketEnd - 1); + if (bracketEnd + 2 < endpoint.size() && endpoint[bracketEnd + 1] == ':') { + try { + port = std::stoi(endpoint.substr(bracketEnd + 2)); + } catch (...) {} + } + return; + } + } + + auto pos = endpoint.rfind(':'); + if (pos != std::string::npos) { + host = endpoint.substr(0, pos); + try { + port = std::stoi(endpoint.substr(pos + 1)); + } catch (...) {} + } else { + host = endpoint; + } +} + +void SafeLogSpanError(TLog& log, const char* message) noexcept { + try { + try { + std::rethrow_exception(std::current_exception()); + } catch (const std::exception& e) { + LOG_LAZY(log, TLOG_ERR, std::string("TOperationSpan: ") + message + ": " + e.what()); + return; + } catch (...) { + } + LOG_LAZY(log, TLOG_ERR, std::string("TOperationSpan: ") + message + ": (unknown)"); + } catch (...) { + } +} + +} // namespace + +TOperationSpan::TOperationSpan(std::shared_ptr tracer, const std::string& operationName, + const std::string& endpoint, const TLog& log) + : Log_(log) +{ + if (!tracer) { + return; + } + + std::string host; + int port; + ParseEndpoint(endpoint, host, port); + + try { + Span_ = tracer->StartSpan("ydb." + operationName, NTrace::ESpanKind::CLIENT); + if (!Span_) { + return; + } + Span_->SetAttribute("db.system.name", "ydb"); + Span_->SetAttribute("server.address", host); + Span_->SetAttribute("server.port", static_cast(port)); + } catch (...) { + SafeLogSpanError(Log_, "failed to initialize span"); + Span_.reset(); + } +} + +TOperationSpan::~TOperationSpan() noexcept { + if (Span_) { + try { + Span_->End(); + } catch (...) { + SafeLogSpanError(Log_, "failed to end span"); + } + } +} + +void TOperationSpan::SetPeerEndpoint(const std::string& endpoint) noexcept { + if (!Span_ || endpoint.empty()) { + return; + } + try { + std::string host; + int port; + ParseEndpoint(endpoint, host, port); + Span_->SetAttribute("network.peer.address", host); + Span_->SetAttribute("network.peer.port", static_cast(port)); + } catch (...) { + SafeLogSpanError(Log_, "failed to set peer endpoint"); + } +} + +void TOperationSpan::AddEvent(const std::string& name, const std::map& attributes) noexcept { + if (!Span_) { + return; + } + try { + Span_->AddEvent(name, attributes); + } catch (...) { + SafeLogSpanError(Log_, "failed to add event"); + } +} + +void TOperationSpan::End(EStatus status) noexcept { + if (Span_) { + try { + Span_->SetAttribute("db.response.status_code", static_cast(status)); + if (status != EStatus::SUCCESS) { + Span_->SetAttribute("error.type", ToString(status)); + } + Span_->End(); + } catch (...) { + SafeLogSpanError(Log_, "failed to finalize span"); + } + Span_.reset(); + } +} + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/operation_span.h b/src/client/impl/observability/operation_span.h new file mode 100644 index 00000000000..00edd4cb7de --- /dev/null +++ b/src/client/impl/observability/operation_span.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include + +#include + +#include +#include +#include + +namespace NYdb::inline V3::NObservability { + +class TOperationSpan { +public: + TOperationSpan(std::shared_ptr tracer, const std::string& operationName, + const std::string& endpoint, const TLog& log); + ~TOperationSpan() noexcept; + + void SetPeerEndpoint(const std::string& endpoint) noexcept; + void AddEvent(const std::string& name, const std::map& attributes = {}) noexcept; + + void End(EStatus status) noexcept; + +private: + TLog Log_; + std::shared_ptr Span_; +}; + +} // namespace NYdb::NObservability diff --git a/src/client/impl/stats/CMakeLists.txt b/src/client/impl/stats/CMakeLists.txt index 498104196cd..15866af4bc6 100644 --- a/src/client/impl/stats/CMakeLists.txt +++ b/src/client/impl/stats/CMakeLists.txt @@ -4,6 +4,7 @@ target_link_libraries(client-impl-ydb_stats PUBLIC yutil grpc-client monlib-metrics + client-metrics ) target_sources(client-impl-ydb_stats PRIVATE diff --git a/src/client/impl/stats/stats.h b/src/client/impl/stats/stats.h index d545764c887..532e42f0754 100644 --- a/src/client/impl/stats/stats.h +++ b/src/client/impl/stats/stats.h @@ -1,13 +1,17 @@ #pragma once #include +#include #include #include #include +#include #include #include +#include +#include namespace NYdb::inline V3 { namespace NSdkStats { @@ -226,6 +230,89 @@ struct TStatCollector { std::string ClientType_; }; + struct TClientOperationStatCollector { + + TClientOperationStatCollector() = default; + + TClientOperationStatCollector(::NMonitoring::TMetricRegistry* registry, + const std::string& database, + const std::string& clientType, + std::shared_ptr externalRegistry = nullptr) + : MetricRegistryPtr_(registry) + , Database_(database) + , ClientType_(clientType) + , ExternalRegistry_(std::move(externalRegistry)) + { + OtelPrefix_ = "ydb."; + std::string lower = clientType; + std::transform(lower.begin(), lower.end(), lower.begin(), ::tolower); + OtelPrefix_ += lower; + } + + void IncRequestCount(const std::string& operation) { + if (auto registry = MetricRegistryPtr_.Get()) { + registry->Rate({ {"database", Database_}, {"ydb_client", ClientType_}, + {"operation", operation}, {"sensor", "Request/Count"} })->Inc(); + } + if (ExternalRegistry_) { + NMetrics::TLabels labels = {{"operation", operation}}; + auto counter = ExternalRegistry_->Counter(OtelPrefix_ + ".requests", labels); + if (counter) { + counter->Inc(); + } + ExternalRegistry_->Counter(OtelPrefix_ + ".errors", labels); + } + } + + void IncErrorCount(const std::string& operation, const EStatus& status) { + if (auto registry = MetricRegistryPtr_.Get()) { + std::string statusName = TStringBuilder() << status; + registry->Rate({ {"database", Database_}, {"ydb_client", ClientType_}, + {"operation", operation}, {"status", statusName}, {"sensor", "Request/Errors"} })->Inc(); + } + if (ExternalRegistry_) { + NMetrics::TLabels labels = {{"operation", operation}}; + auto counter = ExternalRegistry_->Counter(OtelPrefix_ + ".errors", labels); + if (counter) { + counter->Inc(); + } + } + } + + void RecordLatency(const std::string& operation, TDuration duration) { + if (auto registry = MetricRegistryPtr_.Get()) { + registry->HistogramRate({ {"database", Database_}, {"ydb_client", ClientType_}, + {"operation", operation}, {"sensor", "Request/OperationLatency"} }, + ::NMonitoring::ExponentialHistogram(20, 2, 1))->Record(duration.MilliSeconds()); + } + if (ExternalRegistry_) { + static const std::vector LatencyBuckets = { + 1, 2, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000 + }; + NMetrics::TLabels labels = {{"operation", operation}}; + auto histogram = ExternalRegistry_->Histogram(OtelPrefix_ + ".latency_ms", LatencyBuckets, labels); + if (histogram) { + histogram->Record(static_cast(duration.MilliSeconds())); + } + } + } + + void SetExternalRegistry(std::shared_ptr registry) { + ExternalRegistry_ = std::move(registry); + } + + bool HasAnyRegistry() const { + return MetricRegistryPtr_.Get() != nullptr || ExternalRegistry_ != nullptr; + } + + private: + TAtomicPointer<::NMonitoring::TMetricRegistry> MetricRegistryPtr_; + std::string Database_; + std::string ClientType_; + std::string OtelPrefix_; + std::shared_ptr ExternalRegistry_; + }; + struct TClientStatCollector { TClientStatCollector(::NMonitoring::TRate* cacheMiss = nullptr @@ -233,13 +320,15 @@ struct TStatCollector { , ::NMonitoring::THistogram* paramsSize = nullptr , ::NMonitoring::TRate* sessionRemoved = nullptr , ::NMonitoring::TRate* requestMigrated = nullptr - , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector()) + , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector() + , TClientOperationStatCollector operationStatCollector = TClientOperationStatCollector()) : CacheMiss(cacheMiss) , QuerySize(querySize) , ParamsSize(paramsSize) , SessionRemovedDueBalancing(sessionRemoved) , RequestMigrated(requestMigrated) , RetryOperationStatCollector(retryOperationStatCollector) + , OperationStatCollector(operationStatCollector) { } ::NMonitoring::TRate* CacheMiss; @@ -248,6 +337,7 @@ struct TStatCollector { ::NMonitoring::TRate* SessionRemovedDueBalancing; ::NMonitoring::TRate* RequestMigrated; TClientRetryOperationStatCollector RetryOperationStatCollector; + TClientOperationStatCollector OperationStatCollector; }; TStatCollector(const std::string& database, TMetricRegistry* sensorsRegistry) @@ -376,10 +466,13 @@ struct TStatCollector { {"sensor", "Request/ParamsSize"} }, ::NMonitoring::ExponentialHistogram(10, 2, 32)); return TClientStatCollector(cacheMiss, querySize, paramsSize, sessionRemovedDueBalancing, requestMigrated, - TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_, clientType)); + TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_, clientType), + TClientOperationStatCollector(MetricRegistryPtr_.Get(), Database_, clientType)); } - return TClientStatCollector(); + return TClientStatCollector(nullptr, nullptr, nullptr, nullptr, nullptr, + TClientRetryOperationStatCollector(), + TClientOperationStatCollector(nullptr, Database_, clientType)); } bool IsCollecting() { diff --git a/src/client/metrics/CMakeLists.txt b/src/client/metrics/CMakeLists.txt new file mode 100644 index 00000000000..e681a846b26 --- /dev/null +++ b/src/client/metrics/CMakeLists.txt @@ -0,0 +1,7 @@ +_ydb_sdk_add_library(client-metrics) + +target_sources(client-metrics PRIVATE + metrics.cpp +) + +_ydb_sdk_make_client_component(Metrics client-metrics) diff --git a/src/client/metrics/metrics.cpp b/src/client/metrics/metrics.cpp new file mode 100644 index 00000000000..341917291bb --- /dev/null +++ b/src/client/metrics/metrics.cpp @@ -0,0 +1 @@ +#include diff --git a/src/client/query/CMakeLists.txt b/src/client/query/CMakeLists.txt index 6677d402d4d..3cc7401200b 100644 --- a/src/client/query/CMakeLists.txt +++ b/src/client/query/CMakeLists.txt @@ -11,6 +11,8 @@ target_link_libraries(client-ydb_query PUBLIC client-ydb_driver client-ydb_query-impl client-ydb_result + client-metrics + client-trace client-types-operation api-protos api-grpc diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index ccf90f1175c..b9ce909d498 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -15,7 +15,10 @@ #include #include #include +#include +#include #include +#include #include @@ -67,6 +70,14 @@ class TQueryClient::TImpl: public TClientImplCommon, public { SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector("Query")); SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector("Query")); + + if (auto externalRegistry = Connections_->GetExternalMetricRegistry()) { + OperationStatCollector_.SetExternalRegistry(externalRegistry); + } + + if (auto traceProvider = Connections_->GetTraceProvider()) { + Tracer_ = traceProvider->GetTracer("ydb-cpp-sdk-query"); + } } ~TImpl() { @@ -77,6 +88,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public QuerySizeHistogram_.Set(collector.QuerySize); ParamsSizeHistogram_.Set(collector.ParamsSize); RetryOperationStatCollector_ = collector.RetryOperationStatCollector; + OperationStatCollector_ = collector.OperationStatCollector; } TAsyncExecuteQueryIterator StreamExecuteQuery(const std::string& query, const TTxControl& txControl, @@ -94,8 +106,25 @@ class TQueryClient::TImpl: public TClientImplCommon, public { CollectQuerySize(query); CollectParamsSize(params ? ¶ms->GetProtoMap() : nullptr); + + auto span = std::make_shared(Tracer_, "ExecuteQuery", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "ExecuteQuery"); + return TExecQueryImpl::ExecuteQuery( - Connections_, DbDriverState_, query, txControl, params, settings, session); + Connections_, DbDriverState_, query, txControl, params, settings, session) + .Apply([span, metrics](TAsyncExecuteQueryResult future) { + try { + auto result = future.GetValue(); + span->SetPeerEndpoint(result.GetEndpoint()); + span->End(result.GetStatus()); + metrics->End(result.GetStatus()); + return result; + } catch (...) { + span->End(EStatus::CLIENT_INTERNAL_ERROR); + metrics->End(EStatus::CLIENT_INTERNAL_ERROR); + throw; + } + }); } NThreading::TFuture ExecuteScript(const std::string& script, const std::optional& params, const TExecuteScriptSettings& settings) { @@ -162,20 +191,31 @@ class TQueryClient::TImpl: public TClientImplCommon, public auto promise = NThreading::NewPromise(); - auto responseCb = [promise, session] + auto span = std::make_shared(Tracer_, "Rollback", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "Rollback"); + + auto responseCb = [promise, session, span, metrics] (Ydb::Query::RollbackTransactionResponse* response, TPlainStatus status) mutable { try { + span->SetPeerEndpoint(status.Endpoint); if (response) { NYdb::NIssue::TIssues opIssues; NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); TStatus rollbackTxStatus(TPlainStatus{static_cast(response->status()), std::move(opIssues), status.Endpoint, std::move(status.Metadata)}); + span->End(rollbackTxStatus.GetStatus()); + metrics->End(rollbackTxStatus.GetStatus()); + promise.SetValue(std::move(rollbackTxStatus)); } else { + span->End(status.Status); + metrics->End(status.Status); promise.SetValue(TStatus(std::move(status))); } } catch (...) { + span->End(EStatus::CLIENT_INTERNAL_ERROR); + metrics->End(EStatus::CLIENT_INTERNAL_ERROR); promise.SetException(std::current_exception()); } }; @@ -203,21 +243,32 @@ class TQueryClient::TImpl: public TClientImplCommon, public auto promise = NThreading::NewPromise(); - auto responseCb = [promise, session] + auto span = std::make_shared(Tracer_, "Commit", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "Commit"); + + auto responseCb = [promise, session, span, metrics] (Ydb::Query::CommitTransactionResponse* response, TPlainStatus status) mutable { try { + span->SetPeerEndpoint(status.Endpoint); if (response) { NYdb::NIssue::TIssues opIssues; NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); TStatus commitTxStatus(TPlainStatus{static_cast(response->status()), std::move(opIssues), status.Endpoint, std::move(status.Metadata)}); + span->End(commitTxStatus.GetStatus()); + metrics->End(commitTxStatus.GetStatus()); + TCommitTransactionResult commitTxResult(std::move(commitTxStatus)); promise.SetValue(std::move(commitTxResult)); } else { + span->End(status.Status); + metrics->End(status.Status); promise.SetValue(TCommitTransactionResult(TStatus(std::move(status)))); } } catch (...) { + span->End(EStatus::CLIENT_INTERNAL_ERROR); + metrics->End(EStatus::CLIENT_INTERNAL_ERROR); promise.SetException(std::current_exception()); } }; @@ -425,10 +476,13 @@ class TQueryClient::TImpl: public TClientImplCommon, public TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings) { class TQueryClientGetSessionCtx : public NSessionPool::IGetSessionCtx { public: - TQueryClientGetSessionCtx(std::shared_ptr client, const TCreateSessionSettings& settings) + TQueryClientGetSessionCtx(std::shared_ptr client, const TCreateSessionSettings& settings, + std::shared_ptr span, std::shared_ptr metrics) : Promise(NThreading::NewPromise()) , Client(client) , RpcSettings(TRpcRequestSettings::Make(settings)) + , Span(span) + , Metrics(metrics) {} TAsyncCreateSessionResult GetFuture() { @@ -437,6 +491,12 @@ class TQueryClient::TImpl: public TClientImplCommon, public void ReplyError(TStatus status) override { TSession session; + if (Span) { + Span->End(status.GetStatus()); + } + if (Metrics) { + Metrics->End(status.GetStatus()); + } ScheduleReply(TCreateSessionResult(std::move(status), std::move(session))); } @@ -449,14 +509,28 @@ class TQueryClient::TImpl: public TClientImplCommon, public ) ); + if (Span) { + Span->End(EStatus::SUCCESS); + } + if (Metrics) { + Metrics->End(EStatus::SUCCESS); + } ScheduleReply(std::move(val)); } void ReplyNewSession() override { Client->CreateAttachedSession(RpcSettings).Subscribe( - [promise{std::move(Promise)}](TAsyncCreateSessionResult future) mutable + [promise{std::move(Promise)}, span = Span, metrics = Metrics](TAsyncCreateSessionResult future) mutable { - promise.SetValue(future.ExtractValue()); + auto val = future.ExtractValue(); + if (span) { + span->SetPeerEndpoint(val.GetEndpoint()); + span->End(val.GetStatus()); + } + if (metrics) { + metrics->End(val.GetStatus()); + } + promise.SetValue(std::move(val)); }); } @@ -481,9 +555,13 @@ class TQueryClient::TImpl: public TClientImplCommon, public NThreading::TPromise Promise; std::shared_ptr Client; const TRpcRequestSettings RpcSettings; + std::shared_ptr Span; + std::shared_ptr Metrics; }; - auto ctx = std::make_unique(shared_from_this(), settings); + auto span = std::make_shared(Tracer_, "CreateSession", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "CreateSession"); + auto ctx = std::make_unique(shared_from_this(), settings, span, metrics); auto future = ctx->GetFuture(); SessionPool_.GetSession(std::move(ctx)); @@ -552,6 +630,8 @@ class TQueryClient::TImpl: public TClientImplCommon, public } private: + std::shared_ptr Tracer_; + NSdkStats::TStatCollector::TClientOperationStatCollector OperationStatCollector_; NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector_; NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram_; NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram_; diff --git a/src/client/query/impl/CMakeLists.txt b/src/client/query/impl/CMakeLists.txt index 76b112b2254..1397c58648d 100644 --- a/src/client/query/impl/CMakeLists.txt +++ b/src/client/query/impl/CMakeLists.txt @@ -7,6 +7,7 @@ target_link_libraries(client-ydb_query-impl PUBLIC client-ydb_common_client-impl client-ydb_proto client-ydb_result + impl-observability ) target_sources(client-ydb_query-impl PRIVATE diff --git a/src/client/table/impl/CMakeLists.txt b/src/client/table/impl/CMakeLists.txt index 8f53d386fc6..7d6be58a1e2 100644 --- a/src/client/table/impl/CMakeLists.txt +++ b/src/client/table/impl/CMakeLists.txt @@ -9,6 +9,7 @@ target_link_libraries(client-ydb_table-impl library-operation_id client-impl-ydb_endpoints impl-session + impl-observability client-ydb_table-query_stats PRIVATE OpenSSL::SSL diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 6529f03c187..221dd99c04c 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -22,12 +22,22 @@ TTableClient::TImpl::TImpl(std::shared_ptr&& connections, , Settings_(settings) , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_) { - if (!DbDriverState_->StatCollector.IsCollecting()) { - return; + if (auto traceProvider = Connections_->GetTraceProvider()) { + Tracer_ = traceProvider->GetTracer("ydb-cpp-sdk-table"); } - SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector("Table")); - SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector("Table")); + auto clientStatCollector = DbDriverState_->StatCollector.GetClientStatCollector("Table"); + + if (DbDriverState_->StatCollector.IsCollecting()) { + SetStatCollector(clientStatCollector); + SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector("Table")); + } + + OperationStatCollector_ = clientStatCollector.OperationStatCollector; + + if (auto externalRegistry = Connections_->GetExternalMetricRegistry()) { + OperationStatCollector_.SetExternalRegistry(externalRegistry); + } } TTableClient::TImpl::~TImpl() { @@ -759,11 +769,19 @@ TAsyncStatus TTableClient::TImpl::ExecuteSchemeQuery(const TSession& session, co request.set_session_id(TStringType{session.GetId()}); request.set_yql_text(TStringType{query}); + auto span = std::make_shared(Tracer_, "ExecuteSchemeQuery", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "ExecuteSchemeQuery"); + return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncExecuteSchemeQuery, rpcSettings - ); + ).Apply([span, metrics](TAsyncStatus future) { + auto status = future.GetValue(); + span->End(status.GetStatus()); + metrics->End(status.GetStatus()); + return status; + }); } TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSession& session, const TTxSettings& txSettings, @@ -776,9 +794,12 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio request.set_session_id(TStringType{session.GetId()}); SetTxSettings(txSettings, request.mutable_tx_settings()); + auto span = std::make_shared(Tracer_, "BeginTransaction", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "BeginTransaction"); + auto promise = NewPromise(); - auto extractor = [promise, session] + auto extractor = [promise, session, span, metrics] (google::protobuf::Any* any, TPlainStatus status) mutable { std::string txId; if (any) { @@ -789,6 +810,8 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio TBeginTransactionResult beginTxResult(TStatus(std::move(status)), TTransaction(session, txId)); + span->End(beginTxResult.GetStatus()); + metrics->End(beginTxResult.GetStatus()); promise.SetValue(std::move(beginTxResult)); }; @@ -815,9 +838,12 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess request.set_tx_id(TStringType{txId}); request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); + auto span = std::make_shared(Tracer_, "CommitTransaction", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "CommitTransaction"); + auto promise = NewPromise(); - auto extractor = [promise] + auto extractor = [promise, span, metrics] (google::protobuf::Any* any, TPlainStatus status) mutable { std::optional queryStats; if (any) { @@ -830,6 +856,8 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess } TCommitTransactionResult commitTxResult(TStatus(std::move(status)), queryStats); + span->End(commitTxResult.GetStatus()); + metrics->End(commitTxResult.GetStatus()); promise.SetValue(std::move(commitTxResult)); }; @@ -855,11 +883,19 @@ TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, c request.set_session_id(TStringType{session.GetId()}); request.set_tx_id(TStringType{txId}); + auto span = std::make_shared(Tracer_, "RollbackTransaction", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "RollbackTransaction"); + return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction, rpcSettings - ); + ).Apply([span, metrics](TAsyncStatus future) { + auto status = future.GetValue(); + span->End(status.GetStatus()); + metrics->End(status.GetStatus()); + return status; + }); } TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSession& session, const std::string& query, @@ -1100,6 +1136,7 @@ void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TCli ParamsSizeHistogram.Set(collector.ParamsSize); RetryOperationStatCollector = collector.RetryOperationStatCollector; SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing); + OperationStatCollector_ = collector.OperationStatCollector; } TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings) { @@ -1128,10 +1165,15 @@ TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, *mutable_rows->mutable_type() = rows.GetType().GetProto(); } + auto span = std::make_shared(Tracer_, "BulkUpsert", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "BulkUpsert"); + auto promise = NewPromise(); - auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable { + auto extractor = [promise, span, metrics](google::protobuf::Any* any, TPlainStatus status) mutable { Y_UNUSED(any); TBulkUpsertResult val(TStatus(std::move(status))); + span->End(val.GetStatus()); + metrics->End(val.GetStatus()); promise.SetValue(std::move(val)); }; @@ -1174,12 +1216,17 @@ TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, } request.set_data(TStringType{data}); + auto span = std::make_shared(Tracer_, "BulkUpsert", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "BulkUpsert"); + auto promise = NewPromise(); - auto extractor = [promise] + auto extractor = [promise, span, metrics] (google::protobuf::Any* any, TPlainStatus status) mutable { Y_UNUSED(any); TBulkUpsertResult val(TStatus(std::move(status))); + span->End(val.GetStatus()); + metrics->End(val.GetStatus()); promise.SetValue(std::move(val)); }; diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index 4c0f607d748..9304aa0e890 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -18,6 +18,9 @@ #include "request_migrator.h" #include "readers.h" +#include +#include + #include @@ -237,6 +240,9 @@ class TTableClient::TImpl: public TClientImplCommon, public SetQueryCachePolicy(query, settings, request.mutable_query_cache_policy()); + auto span = std::make_shared(Tracer_, "ExecuteDataQuery", DbDriverState_->DiscoveryEndpoint, DbDriverState_->Log); + auto metrics = std::make_shared(OperationStatCollector_, "ExecuteDataQuery"); + auto promise = NewPromise(); bool keepInCache = settings.KeepInQueryCache_ && settings.KeepInQueryCache_.value(); @@ -249,7 +255,7 @@ class TTableClient::TImpl: public TClientImplCommon, public // - capture pointer // - call free just before SetValue call auto sessionPtr = new TSession(session); - auto extractor = [promise, sessionPtr, query, fromCache, keepInCache] + auto extractor = [promise, sessionPtr, query, fromCache, keepInCache, span, metrics] (google::protobuf::Any* any, TPlainStatus status) mutable { std::vector res; std::optional tx; @@ -288,6 +294,9 @@ class TTableClient::TImpl: public TClientImplCommon, public TDataQueryResult dataQueryResult(TStatus(std::move(status)), std::move(res), tx, dataQuery, fromCache, queryStats); + span->End(dataQueryResult.GetStatus()); + metrics->End(dataQueryResult.GetStatus()); + delete sessionPtr; tx.reset(); dataQuery.reset(); @@ -330,6 +339,8 @@ class TTableClient::TImpl: public TClientImplCommon, public NSdkStats::TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing; private: + std::shared_ptr Tracer_; + NSdkStats::TStatCollector::TClientOperationStatCollector OperationStatCollector_; NSessionPool::TSessionPool SessionPool_; TRequestMigrator RequestMigrator_; static const TKeepAliveSettings KeepAliveSettings; diff --git a/src/client/trace/CMakeLists.txt b/src/client/trace/CMakeLists.txt new file mode 100644 index 00000000000..86a8f8d4208 --- /dev/null +++ b/src/client/trace/CMakeLists.txt @@ -0,0 +1,7 @@ +_ydb_sdk_add_library(client-trace) + +target_sources(client-trace PRIVATE + trace.cpp +) + +_ydb_sdk_make_client_component(Trace client-trace) diff --git a/src/client/trace/trace.cpp b/src/client/trace/trace.cpp new file mode 100644 index 00000000000..6bf5bc664f0 --- /dev/null +++ b/src/client/trace/trace.cpp @@ -0,0 +1 @@ +#include diff --git a/tests/common/fake_metric_registry.h b/tests/common/fake_metric_registry.h new file mode 100644 index 00000000000..60ff1414633 --- /dev/null +++ b/tests/common/fake_metric_registry.h @@ -0,0 +1,122 @@ +#pragma once + +#include + +#include +#include +#include +#include + +namespace NYdb::NTests { + +class TFakeCounter : public NMetrics::ICounter { +public: + void Inc() override { + Count_.fetch_add(1, std::memory_order_relaxed); + } + + int64_t Get() const { + return Count_.load(std::memory_order_relaxed); + } + +private: + std::atomic Count_{0}; +}; + +class TFakeHistogram : public NMetrics::IHistogram { +public: + void Record(double value) override { + std::lock_guard lock(Mutex_); + Values_.push_back(value); + } + + std::vector GetValues() const { + std::lock_guard lock(Mutex_); + return Values_; + } + + size_t Count() const { + std::lock_guard lock(Mutex_); + return Values_.size(); + } + +private: + mutable std::mutex Mutex_; + std::vector Values_; +}; + +class TFakeGauge : public NMetrics::IGauge { +public: + void Add(double delta) override { Value_ += delta; } + void Set(double value) override { Value_ = value; } + double Get() const { return Value_; } + +private: + double Value_ = 0.0; +}; + +struct TMetricKey { + std::string Name; + NMetrics::TLabels Labels; + + bool operator==(const TMetricKey& other) const = default; + bool operator<(const TMetricKey& other) const { + if (Name != other.Name) return Name < other.Name; + return Labels < other.Labels; + } +}; + +class TFakeMetricRegistry : public NMetrics::IMetricRegistry { +public: + std::shared_ptr Counter(const std::string& name, const NMetrics::TLabels& labels) override { + std::lock_guard lock(Mutex_); + auto key = TMetricKey{name, labels}; + auto it = Counters_.find(key); + if (it != Counters_.end()) { + return it->second; + } + auto counter = std::make_shared(); + Counters_[key] = counter; + return counter; + } + + std::shared_ptr Gauge(const std::string& name, const NMetrics::TLabels& labels) override { + std::lock_guard lock(Mutex_); + auto key = TMetricKey{name, labels}; + auto gauge = std::make_shared(); + Gauges_[key] = gauge; + return gauge; + } + + std::shared_ptr Histogram(const std::string& name, const std::vector& /*buckets*/, const NMetrics::TLabels& labels) override { + std::lock_guard lock(Mutex_); + auto key = TMetricKey{name, labels}; + auto it = Histograms_.find(key); + if (it != Histograms_.end()) { + return it->second; + } + auto histogram = std::make_shared(); + Histograms_[key] = histogram; + return histogram; + } + + std::shared_ptr GetCounter(const std::string& name, const NMetrics::TLabels& labels = {}) const { + std::lock_guard lock(Mutex_); + auto it = Counters_.find(TMetricKey{name, labels}); + return it != Counters_.end() ? it->second : nullptr; + } + + std::shared_ptr GetHistogram(const std::string& name, const NMetrics::TLabels& labels = {}) const { + std::lock_guard lock(Mutex_); + auto it = Histograms_.find(TMetricKey{name, labels}); + return it != Histograms_.end() ? it->second : nullptr; + } + +private: + mutable std::mutex Mutex_; + std::map> Counters_; + std::map> Gauges_; + std::map> Histograms_; +}; + +} // namespace NYdb::NTests diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index d5a1d709245..8aa28839a63 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -1,6 +1,7 @@ add_subdirectory(auth) add_subdirectory(basic_example) add_subdirectory(bulk_upsert) +add_subdirectory(metrics) add_subdirectory(server_restart) add_subdirectory(sessions) add_subdirectory(sessions_pool) diff --git a/tests/integration/metrics/CMakeLists.txt b/tests/integration/metrics/CMakeLists.txt new file mode 100644 index 00000000000..6c9bb8b3abd --- /dev/null +++ b/tests/integration/metrics/CMakeLists.txt @@ -0,0 +1,12 @@ +add_ydb_test(NAME metrics_it GTEST + INCLUDE_DIRS + ${YDB_SDK_SOURCE_DIR} + SOURCES + main.cpp + LINK_LIBRARIES + yutil + YDB-CPP-SDK::Query + client-metrics + LABELS + integration +) diff --git a/tests/integration/metrics/main.cpp b/tests/integration/metrics/main.cpp new file mode 100644 index 00000000000..cab8f71cdc4 --- /dev/null +++ b/tests/integration/metrics/main.cpp @@ -0,0 +1,273 @@ +#include +#include +#include + +#include + +using namespace NYdb; +using namespace NYdb::NQuery; +using namespace NYdb::NTests; + +namespace { + +struct TRunArgs { + TDriver Driver; + std::shared_ptr Registry; +}; + +TRunArgs MakeRunArgs() { + std::string endpoint = std::getenv("YDB_ENDPOINT"); + std::string database = std::getenv("YDB_DATABASE"); + + auto registry = std::make_shared(); + + auto driverConfig = TDriverConfig() + .SetEndpoint(endpoint) + .SetDatabase(database) + .SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : "") + .SetMetricRegistry(registry); + + TDriver driver(driverConfig); + return {driver, registry}; +} + +std::shared_ptr GetCounter( + const std::shared_ptr& registry, + const std::string& name, + const std::string& operation) +{ + return registry->GetCounter(name, {{"operation", operation}}); +} + +std::shared_ptr GetHistogram( + const std::shared_ptr& registry, + const std::string& name, + const std::string& operation) +{ + return registry->GetHistogram(name, {{"operation", operation}}); +} + +} // namespace + +TEST(QueryMetricsIntegration, ExecuteQuerySuccessRecordsMetrics) { + auto [driver, registry] = MakeRunArgs(); + TQueryClient client(driver); + + auto session = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); + + auto result = session.GetSession().ExecuteQuery( + "SELECT 1;", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); + + auto requests = GetCounter(registry, "ydb.query.requests", "ExecuteQuery"); + ASSERT_NE(requests, nullptr) << "ExecuteQuery request counter not created"; + EXPECT_GE(requests->Get(), 1); + + auto errors = GetCounter(registry, "ydb.query.errors", "ExecuteQuery"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), 0); + + auto latency = GetHistogram(registry, "ydb.query.latency_ms", "ExecuteQuery"); + ASSERT_NE(latency, nullptr) << "ExecuteQuery latency histogram not created"; + EXPECT_GE(latency->Count(), 1u); + for (double v : latency->GetValues()) { + EXPECT_GE(v, 0.0); + } + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, ExecuteQueryErrorRecordsErrorMetric) { + auto [driver, registry] = MakeRunArgs(); + TQueryClient client(driver); + + auto session = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); + + auto result = session.GetSession().ExecuteQuery( + "INVALID SQL QUERY !!!", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + EXPECT_NE(result.GetStatus(), EStatus::SUCCESS); + + auto requests = GetCounter(registry, "ydb.query.requests", "ExecuteQuery"); + ASSERT_NE(requests, nullptr); + EXPECT_GE(requests->Get(), 1); + + auto errors = GetCounter(registry, "ydb.query.errors", "ExecuteQuery"); + ASSERT_NE(errors, nullptr); + EXPECT_GE(errors->Get(), 1); + + auto latency = GetHistogram(registry, "ydb.query.latency_ms", "ExecuteQuery"); + ASSERT_NE(latency, nullptr); + EXPECT_GE(latency->Count(), 1u); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, CreateSessionRecordsMetrics) { + auto [driver, registry] = MakeRunArgs(); + TQueryClient client(driver); + + auto session = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); + + auto requests = GetCounter(registry, "ydb.query.requests", "CreateSession"); + ASSERT_NE(requests, nullptr) << "CreateSession request counter not created"; + EXPECT_GE(requests->Get(), 1); + + auto latency = GetHistogram(registry, "ydb.query.latency_ms", "CreateSession"); + ASSERT_NE(latency, nullptr) << "CreateSession latency histogram not created"; + EXPECT_GE(latency->Count(), 1u); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, CommitTransactionRecordsMetrics) { + auto [driver, registry] = MakeRunArgs(); + TQueryClient client(driver); + + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()) << sessionResult.GetIssues().ToString(); + auto session = sessionResult.GetSession(); + + auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW()).ExtractValueSync(); + ASSERT_TRUE(beginResult.IsSuccess()) << beginResult.GetIssues().ToString(); + auto tx = beginResult.GetTransaction(); + + auto execResult = tx.GetSession().ExecuteQuery( + "SELECT 1;", + TTxControl::Tx(tx) + ).ExtractValueSync(); + ASSERT_EQ(execResult.GetStatus(), EStatus::SUCCESS) << execResult.GetIssues().ToString(); + + if (execResult.GetTransaction()) { + auto commitResult = execResult.GetTransaction()->Commit().ExtractValueSync(); + ASSERT_TRUE(commitResult.IsSuccess()) << commitResult.GetIssues().ToString(); + + auto commitRequests = GetCounter(registry, "ydb.query.requests", "Commit"); + ASSERT_NE(commitRequests, nullptr) << "Commit request counter not created"; + EXPECT_GE(commitRequests->Get(), 1); + + auto commitLatency = GetHistogram(registry, "ydb.query.latency_ms", "Commit"); + ASSERT_NE(commitLatency, nullptr); + EXPECT_GE(commitLatency->Count(), 1u); + } + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, RollbackTransactionRecordsMetrics) { + auto [driver, registry] = MakeRunArgs(); + TQueryClient client(driver); + + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()) << sessionResult.GetIssues().ToString(); + auto session = sessionResult.GetSession(); + + auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW()).ExtractValueSync(); + ASSERT_TRUE(beginResult.IsSuccess()) << beginResult.GetIssues().ToString(); + auto tx = beginResult.GetTransaction(); + + auto rollbackResult = tx.Rollback().ExtractValueSync(); + ASSERT_TRUE(rollbackResult.IsSuccess()) << rollbackResult.GetIssues().ToString(); + + auto rollbackRequests = GetCounter(registry, "ydb.query.requests", "Rollback"); + ASSERT_NE(rollbackRequests, nullptr) << "Rollback request counter not created"; + EXPECT_GE(rollbackRequests->Get(), 1); + + auto rollbackErrors = GetCounter(registry, "ydb.query.errors", "Rollback"); + ASSERT_NE(rollbackErrors, nullptr); + EXPECT_EQ(rollbackErrors->Get(), 0); + + auto rollbackLatency = GetHistogram(registry, "ydb.query.latency_ms", "Rollback"); + ASSERT_NE(rollbackLatency, nullptr); + EXPECT_GE(rollbackLatency->Count(), 1u); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, MultipleQueriesAccumulateMetrics) { + auto [driver, registry] = MakeRunArgs(); + TQueryClient client(driver); + + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()) << sessionResult.GetIssues().ToString(); + auto session = sessionResult.GetSession(); + + const int numQueries = 5; + for (int i = 0; i < numQueries; ++i) { + auto result = session.ExecuteQuery( + "SELECT 1;", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); + } + + auto requests = GetCounter(registry, "ydb.query.requests", "ExecuteQuery"); + ASSERT_NE(requests, nullptr); + EXPECT_EQ(requests->Get(), numQueries); + + auto errors = GetCounter(registry, "ydb.query.errors", "ExecuteQuery"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), 0); + + auto latency = GetHistogram(registry, "ydb.query.latency_ms", "ExecuteQuery"); + ASSERT_NE(latency, nullptr); + EXPECT_EQ(latency->Count(), static_cast(numQueries)); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, NoRegistryDoesNotBreakOperations) { + std::string endpoint = std::getenv("YDB_ENDPOINT"); + std::string database = std::getenv("YDB_DATABASE"); + + auto driverConfig = TDriverConfig() + .SetEndpoint(endpoint) + .SetDatabase(database) + .SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : ""); + + TDriver driver(driverConfig); + TQueryClient client(driver); + + auto session = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); + + auto result = session.GetSession().ExecuteQuery( + "SELECT 1;", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + EXPECT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, LatencyValuesAreRealistic) { + auto [driver, registry] = MakeRunArgs(); + TQueryClient client(driver); + + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()) << sessionResult.GetIssues().ToString(); + auto session = sessionResult.GetSession(); + + auto result = session.ExecuteQuery( + "SELECT 1;", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); + + auto latency = GetHistogram(registry, "ydb.query.latency_ms", "ExecuteQuery"); + ASSERT_NE(latency, nullptr); + ASSERT_GE(latency->Count(), 1u); + + for (double v : latency->GetValues()) { + EXPECT_GE(v, 0.0) << "Latency must be non-negative"; + EXPECT_LT(v, 30000.0) << "Latency > 30s is unrealistic for SELECT 1"; + } + + driver.Stop(true); +} diff --git a/tests/unit/client/CMakeLists.txt b/tests/unit/client/CMakeLists.txt index 03b0a17c386..6271bf730d9 100644 --- a/tests/unit/client/CMakeLists.txt +++ b/tests/unit/client/CMakeLists.txt @@ -100,3 +100,18 @@ add_ydb_test(NAME client-ydb_value_ut GTEST LABELS unit ) + +add_ydb_test(NAME client-ydb_query_metrics_ut GTEST + INCLUDE_DIRS + ${YDB_SDK_SOURCE_DIR} + SOURCES + query/query_metrics_ut.cpp + LINK_LIBRARIES + yutil + impl-observability + client-impl-ydb_stats + client-types + monlib-metrics + LABELS + unit +) diff --git a/tests/unit/client/query/query_metrics_ut.cpp b/tests/unit/client/query/query_metrics_ut.cpp new file mode 100644 index 00000000000..faa46b5fa85 --- /dev/null +++ b/tests/unit/client/query/query_metrics_ut.cpp @@ -0,0 +1,114 @@ +#include + +#include + +#include + +using namespace NYdb; +using namespace NYdb::NObservability; + +using namespace NYdb::NSdkStats; + +class QueryMetricsTest : public ::testing::Test { +protected: + void SetUp() override { + MonRegistry = std::make_unique<::NMonitoring::TMetricRegistry>(); + Collector = TStatCollector::TClientOperationStatCollector(MonRegistry.get(), "test_db", "Query"); + } + + ::NMonitoring::TRate* RequestCounter(const std::string& op) { + return MonRegistry->Rate({ + {"database", "test_db"}, {"ydb_client", "Query"}, + {"operation", op}, {"sensor", "Request/Count"} + }); + } + + ::NMonitoring::TRate* ErrorCounter(const std::string& op, const std::string& status) { + return MonRegistry->Rate({ + {"database", "test_db"}, {"ydb_client", "Query"}, + {"operation", op}, {"status", status}, {"sensor", "Request/Errors"} + }); + } + + std::unique_ptr<::NMonitoring::TMetricRegistry> MonRegistry; + TStatCollector::TClientOperationStatCollector Collector; +}; + +TEST_F(QueryMetricsTest, RequestCounterIncrementedOnConstruction) { + TOperationMetrics metrics(Collector, "ExecuteQuery"); + + auto counter = RequestCounter("ExecuteQuery"); + ASSERT_NE(counter, nullptr); + EXPECT_EQ(counter->Get(), 1); +} + +TEST_F(QueryMetricsTest, SuccessDoesNotIncrementErrorCounter) { + { + TOperationMetrics metrics(Collector, "ExecuteQuery"); + metrics.End(EStatus::SUCCESS); + } + + auto counter = RequestCounter("ExecuteQuery"); + ASSERT_NE(counter, nullptr); + EXPECT_EQ(counter->Get(), 1); +} + +TEST_F(QueryMetricsTest, FailureIncrementsErrorCounter) { + { + TOperationMetrics metrics(Collector, "Commit"); + metrics.End(EStatus::UNAVAILABLE); + } + + auto counter = RequestCounter("Commit"); + ASSERT_NE(counter, nullptr); + EXPECT_EQ(counter->Get(), 1); +} + +TEST_F(QueryMetricsTest, DoubleEndIsIdempotent) { + TOperationMetrics metrics(Collector, "ExecuteQuery"); + metrics.End(EStatus::SUCCESS); + metrics.End(EStatus::INTERNAL_ERROR); + + auto counter = RequestCounter("ExecuteQuery"); + ASSERT_NE(counter, nullptr); + EXPECT_EQ(counter->Get(), 1); +} + +TEST_F(QueryMetricsTest, DestructorCallsEndWithClientInternalError) { + { + TOperationMetrics metrics(Collector, "CreateSession"); + } + + auto counter = RequestCounter("CreateSession"); + ASSERT_NE(counter, nullptr); + EXPECT_EQ(counter->Get(), 1); +} + +TEST_F(QueryMetricsTest, DifferentOperationsHaveSeparateCounters) { + { + TOperationMetrics m1(Collector, "ExecuteQuery"); + m1.End(EStatus::SUCCESS); + } + { + TOperationMetrics m2(Collector, "Commit"); + m2.End(EStatus::OVERLOADED); + } + + auto execRequests = RequestCounter("ExecuteQuery"); + auto commitRequests = RequestCounter("Commit"); + ASSERT_NE(execRequests, nullptr); + ASSERT_NE(commitRequests, nullptr); + EXPECT_EQ(execRequests->Get(), 1); + EXPECT_EQ(commitRequests->Get(), 1); +} + +TEST_F(QueryMetricsTest, MultipleRequestsAccumulate) { + for (int i = 0; i < 5; ++i) { + TOperationMetrics metrics(Collector, "ExecuteQuery"); + metrics.End(i % 2 == 0 ? EStatus::SUCCESS : EStatus::TIMEOUT); + } + + auto requests = RequestCounter("ExecuteQuery"); + ASSERT_NE(requests, nullptr); + EXPECT_EQ(requests->Get(), 5); +}