Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 128 additions & 14 deletions lib/saluki-components/src/common/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,26 @@ impl OtlpServerBuilder {
let metrics = Arc::new(metrics);

// Create and spawn the gRPC server.
//
// TODO: Properly update the `bytes_received` metric for gRPC payloads.
let grpc_metrics_server =
MetricsServiceServer::new(GrpcServiceImpl::new(otlp_handler.clone(), memory_limiter.clone()))
.max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);

let grpc_logs_server =
LogsServiceServer::new(GrpcServiceImpl::new(otlp_handler.clone(), memory_limiter.clone()))
.max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);

let grpc_traces_server =
TraceServiceServer::new(GrpcServiceImpl::new(otlp_handler.clone(), memory_limiter.clone()))
.max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);
let grpc_metrics_server = MetricsServiceServer::new(GrpcServiceImpl::new(
otlp_handler.clone(),
memory_limiter.clone(),
metrics.clone(),
))
.max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);

let grpc_logs_server = LogsServiceServer::new(GrpcServiceImpl::new(
otlp_handler.clone(),
memory_limiter.clone(),
metrics.clone(),
))
.max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);

let grpc_traces_server = TraceServiceServer::new(GrpcServiceImpl::new(
otlp_handler.clone(),
memory_limiter.clone(),
metrics.clone(),
))
.max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);

let grpc_server = Server::builder()
.add_service(grpc_metrics_server)
Expand Down Expand Up @@ -241,13 +248,15 @@ async fn http_traces_handler<H: OtlpHandler>(
struct GrpcServiceImpl<H> {
handler: Arc<H>,
memory_limiter: MemoryLimiter,
metrics: Arc<Metrics>,
}

impl<H> GrpcServiceImpl<H> {
fn new(handler: Arc<H>, memory_limiter: MemoryLimiter) -> Self {
fn new(handler: Arc<H>, memory_limiter: MemoryLimiter, metrics: Arc<Metrics>) -> Self {
Self {
handler,
memory_limiter,
metrics,
}
}
}
Expand All @@ -257,6 +266,7 @@ impl<H> Clone for GrpcServiceImpl<H> {
Self {
handler: self.handler.clone(),
memory_limiter: self.memory_limiter.clone(),
metrics: self.metrics.clone(),
}
}
}
Expand All @@ -269,6 +279,7 @@ impl<H: OtlpHandler> MetricsService for GrpcServiceImpl<H> {
self.memory_limiter.wait_for_capacity().await;

let raw_bytes = request.into_inner().encode_to_vec();
self.metrics.bytes_received().increment(raw_bytes.len() as u64);

match self.handler.handle_metrics(Bytes::from(raw_bytes)).await {
Ok(()) => Ok(Response::new(ExportMetricsServiceResponse { partial_success: None })),
Expand All @@ -288,6 +299,7 @@ impl<H: OtlpHandler> LogsService for GrpcServiceImpl<H> {
self.memory_limiter.wait_for_capacity().await;

let raw_bytes = request.into_inner().encode_to_vec();
self.metrics.bytes_received().increment(raw_bytes.len() as u64);

match self.handler.handle_logs(Bytes::from(raw_bytes)).await {
Ok(()) => Ok(Response::new(ExportLogsServiceResponse { partial_success: None })),
Expand All @@ -307,6 +319,7 @@ impl<H: OtlpHandler> TraceService for GrpcServiceImpl<H> {
self.memory_limiter.wait_for_capacity().await;

let raw_bytes = request.into_inner().encode_to_vec();
self.metrics.bytes_received().increment(raw_bytes.len() as u64);

match self.handler.handle_traces(Bytes::from(raw_bytes)).await {
Ok(()) => Ok(Response::new(ExportTraceServiceResponse { partial_success: None })),
Expand All @@ -317,3 +330,104 @@ impl<H: OtlpHandler> TraceService for GrpcServiceImpl<H> {
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use memory_accounting::MemoryLimiter;
use saluki_core::{components::ComponentContext, topology::ComponentId};
use saluki_metrics::test::TestRecorder;

use super::*;

struct NoopHandler;

#[async_trait]
impl OtlpHandler for NoopHandler {
async fn handle_metrics(&self, _body: Bytes) -> Result<(), GenericError> {
Ok(())
}

async fn handle_logs(&self, _body: Bytes) -> Result<(), GenericError> {
Ok(())
}

async fn handle_traces(&self, _body: Bytes) -> Result<(), GenericError> {
Ok(())
}
}

fn assert_bytes_received(recorder: &TestRecorder, expected_size: u64) {
assert_eq!(
recorder.counter((
"component_bytes_received_total",
&[
("component_id", "otlp_test"),
("component_type", "source"),
("source", "otlp"),
]
)),
Some(expected_size)
);
}

fn test_component_context() -> ComponentContext {
ComponentContext::source(ComponentId::try_from("otlp_test").unwrap())
}

#[tokio::test]
async fn grpc_metrics_export_updates_bytes_received() {
let recorder = TestRecorder::default();
let _local = metrics::set_default_local_recorder(&recorder);

let metrics = Arc::new(build_metrics(&test_component_context()));
let service = GrpcServiceImpl::new(Arc::new(NoopHandler), MemoryLimiter::noop(), metrics);
let request = ExportMetricsServiceRequest {
resource_metrics: vec![otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics::default()],
};
let expected_size = request.encode_to_vec().len() as u64;

MetricsService::export(&service, TonicRequest::new(request))
.await
.unwrap();

assert_bytes_received(&recorder, expected_size);
}

#[tokio::test]
async fn grpc_logs_export_updates_bytes_received() {
let recorder = TestRecorder::default();
let _local = metrics::set_default_local_recorder(&recorder);

let metrics = Arc::new(build_metrics(&test_component_context()));
let service = GrpcServiceImpl::new(Arc::new(NoopHandler), MemoryLimiter::noop(), metrics);
let request = ExportLogsServiceRequest {
resource_logs: vec![otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs::default()],
};
let expected_size = request.encode_to_vec().len() as u64;

LogsService::export(&service, TonicRequest::new(request)).await.unwrap();

assert_bytes_received(&recorder, expected_size);
}

#[tokio::test]
async fn grpc_traces_export_updates_bytes_received() {
let recorder = TestRecorder::default();
let _local = metrics::set_default_local_recorder(&recorder);

let metrics = Arc::new(build_metrics(&test_component_context()));
let service = GrpcServiceImpl::new(Arc::new(NoopHandler), MemoryLimiter::noop(), metrics);
let request = ExportTraceServiceRequest {
resource_spans: vec![otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans::default()],
};
let expected_size = request.encode_to_vec().len() as u64;

TraceService::export(&service, TonicRequest::new(request))
.await
.unwrap();

assert_bytes_received(&recorder, expected_size);
}
}
Loading