diff --git a/lib/saluki-components/src/common/otlp/mod.rs b/lib/saluki-components/src/common/otlp/mod.rs index d21ba96420..b27399c8d2 100644 --- a/lib/saluki-components/src/common/otlp/mod.rs +++ b/lib/saluki-components/src/common/otlp/mod.rs @@ -140,19 +140,26 @@ impl OtlpServerBuilder { let metrics = Arc::new(metrics); // Create and spawn the gRPC server. - // - // TODO: Properly update the `bytes_received` metric for gRPC payloads. - let grpc_metrics_server = - MetricsServiceServer::new(GrpcServiceImpl::new(otlp_handler.clone(), memory_limiter.clone())) - .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes); - - let grpc_logs_server = - LogsServiceServer::new(GrpcServiceImpl::new(otlp_handler.clone(), memory_limiter.clone())) - .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes); - - let grpc_traces_server = - TraceServiceServer::new(GrpcServiceImpl::new(otlp_handler.clone(), memory_limiter.clone())) - .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes); + let grpc_metrics_server = MetricsServiceServer::new(GrpcServiceImpl::new( + otlp_handler.clone(), + memory_limiter.clone(), + metrics.clone(), + )) + .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes); + + let grpc_logs_server = LogsServiceServer::new(GrpcServiceImpl::new( + otlp_handler.clone(), + memory_limiter.clone(), + metrics.clone(), + )) + .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes); + + let grpc_traces_server = TraceServiceServer::new(GrpcServiceImpl::new( + otlp_handler.clone(), + memory_limiter.clone(), + metrics.clone(), + )) + .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes); let grpc_server = Server::builder() .add_service(grpc_metrics_server) @@ -241,13 +248,15 @@ async fn http_traces_handler( struct GrpcServiceImpl { handler: Arc, memory_limiter: MemoryLimiter, + metrics: Arc, } impl GrpcServiceImpl { - fn new(handler: Arc, memory_limiter: MemoryLimiter) -> Self { + fn new(handler: Arc, memory_limiter: MemoryLimiter, metrics: Arc) -> Self { Self { handler, memory_limiter, + metrics, } } } @@ -257,6 +266,7 @@ impl Clone for GrpcServiceImpl { Self { handler: self.handler.clone(), memory_limiter: self.memory_limiter.clone(), + metrics: self.metrics.clone(), } } } @@ -269,6 +279,7 @@ impl MetricsService for GrpcServiceImpl { self.memory_limiter.wait_for_capacity().await; let raw_bytes = request.into_inner().encode_to_vec(); + self.metrics.bytes_received().increment(raw_bytes.len() as u64); match self.handler.handle_metrics(Bytes::from(raw_bytes)).await { Ok(()) => Ok(Response::new(ExportMetricsServiceResponse { partial_success: None })), @@ -288,6 +299,7 @@ impl LogsService for GrpcServiceImpl { self.memory_limiter.wait_for_capacity().await; let raw_bytes = request.into_inner().encode_to_vec(); + self.metrics.bytes_received().increment(raw_bytes.len() as u64); match self.handler.handle_logs(Bytes::from(raw_bytes)).await { Ok(()) => Ok(Response::new(ExportLogsServiceResponse { partial_success: None })), @@ -307,6 +319,7 @@ impl TraceService for GrpcServiceImpl { self.memory_limiter.wait_for_capacity().await; let raw_bytes = request.into_inner().encode_to_vec(); + self.metrics.bytes_received().increment(raw_bytes.len() as u64); match self.handler.handle_traces(Bytes::from(raw_bytes)).await { Ok(()) => Ok(Response::new(ExportTraceServiceResponse { partial_success: None })), @@ -317,3 +330,104 @@ impl TraceService for GrpcServiceImpl { } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use memory_accounting::MemoryLimiter; + use saluki_core::{components::ComponentContext, topology::ComponentId}; + use saluki_metrics::test::TestRecorder; + + use super::*; + + struct NoopHandler; + + #[async_trait] + impl OtlpHandler for NoopHandler { + async fn handle_metrics(&self, _body: Bytes) -> Result<(), GenericError> { + Ok(()) + } + + async fn handle_logs(&self, _body: Bytes) -> Result<(), GenericError> { + Ok(()) + } + + async fn handle_traces(&self, _body: Bytes) -> Result<(), GenericError> { + Ok(()) + } + } + + fn assert_bytes_received(recorder: &TestRecorder, expected_size: u64) { + assert_eq!( + recorder.counter(( + "component_bytes_received_total", + &[ + ("component_id", "otlp_test"), + ("component_type", "source"), + ("source", "otlp"), + ] + )), + Some(expected_size) + ); + } + + fn test_component_context() -> ComponentContext { + ComponentContext::source(ComponentId::try_from("otlp_test").unwrap()) + } + + #[tokio::test] + async fn grpc_metrics_export_updates_bytes_received() { + let recorder = TestRecorder::default(); + let _local = metrics::set_default_local_recorder(&recorder); + + let metrics = Arc::new(build_metrics(&test_component_context())); + let service = GrpcServiceImpl::new(Arc::new(NoopHandler), MemoryLimiter::noop(), metrics); + let request = ExportMetricsServiceRequest { + resource_metrics: vec![otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics::default()], + }; + let expected_size = request.encode_to_vec().len() as u64; + + MetricsService::export(&service, TonicRequest::new(request)) + .await + .unwrap(); + + assert_bytes_received(&recorder, expected_size); + } + + #[tokio::test] + async fn grpc_logs_export_updates_bytes_received() { + let recorder = TestRecorder::default(); + let _local = metrics::set_default_local_recorder(&recorder); + + let metrics = Arc::new(build_metrics(&test_component_context())); + let service = GrpcServiceImpl::new(Arc::new(NoopHandler), MemoryLimiter::noop(), metrics); + let request = ExportLogsServiceRequest { + resource_logs: vec![otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs::default()], + }; + let expected_size = request.encode_to_vec().len() as u64; + + LogsService::export(&service, TonicRequest::new(request)).await.unwrap(); + + assert_bytes_received(&recorder, expected_size); + } + + #[tokio::test] + async fn grpc_traces_export_updates_bytes_received() { + let recorder = TestRecorder::default(); + let _local = metrics::set_default_local_recorder(&recorder); + + let metrics = Arc::new(build_metrics(&test_component_context())); + let service = GrpcServiceImpl::new(Arc::new(NoopHandler), MemoryLimiter::noop(), metrics); + let request = ExportTraceServiceRequest { + resource_spans: vec![otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans::default()], + }; + let expected_size = request.encode_to_vec().len() as u64; + + TraceService::export(&service, TonicRequest::new(request)) + .await + .unwrap(); + + assert_bytes_received(&recorder, expected_size); + } +}