From 417de05d931a8b874e45445015b0801f855a91e2 Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Sat, 20 Jun 2026 23:00:18 +0300 Subject: [PATCH 1/8] feat(connectors): add OTLP gRPC source connector Adds iggy_connector_otlp_source, a cdylib connector plugin that binds port 4317 (OTLP standard) and receives logs, metrics, and traces from any OpenTelemetry SDK or Collector. Each incoming gRPC export call is deserialized via the opentelemetry-proto crate (already a transitive dep of opentelemetry-otlp) and serialized as JSON messages into the connector channel, eliminating any build-time proto compilation step. All three OTLP services (LogsService, MetricsService, TraceService) are served on the same listener, with gzip compression enabled on every service (OTel SDKs and the Collector compress payloads by default). The connector follows the pull-model Source trait: poll() blocks on the first message, then drains up to batch_size additional messages via try_recv() before returning. A tokio oneshot channel carries the shutdown signal from close() to the gRPC server task. --- Cargo.lock | 25 +- Cargo.toml | 2 + .../connectors/sources/otlp_source/Cargo.toml | 58 ++++ core/connectors/sources/otlp_source/README.md | 44 +++ .../sources/otlp_source/config.toml | 36 ++ .../sources/otlp_source/src/convert.rs | 323 ++++++++++++++++++ .../connectors/sources/otlp_source/src/lib.rs | 139 ++++++++ .../sources/otlp_source/src/server.rs | 141 ++++++++ 8 files changed, 764 insertions(+), 4 deletions(-) create mode 100644 core/connectors/sources/otlp_source/Cargo.toml create mode 100644 core/connectors/sources/otlp_source/README.md create mode 100644 core/connectors/sources/otlp_source/config.toml create mode 100644 core/connectors/sources/otlp_source/src/convert.rs create mode 100644 core/connectors/sources/otlp_source/src/lib.rs create mode 100644 core/connectors/sources/otlp_source/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index 12382acfca..e49627addb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6953,6 +6953,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_otlp_source" +version = "0.1.0" +dependencies = [ + "async-trait", + "dashmap", + "iggy_connector_sdk", + "once_cell", + "opentelemetry-proto", + "serde", + "serde_json", + "tokio", + "tonic", + "tracing", +] + [[package]] name = "iggy_connector_postgres_sink" version = "0.4.1-edge.1" @@ -10713,9 +10729,9 @@ dependencies = [ [[package]] name = "rmcp" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0810a9f717d9828f475fe1f629f4c305c8464b7f496c3a854b58d29e65f4058e" +checksum = "1d1f571c72940a19d9532fe52dbea8bc9912bf1d766c2970bb824056b86f3f59" dependencies = [ "async-trait", "base64", @@ -10745,9 +10761,9 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aefac48c364756e97f04c0401ba3231e8607882c7c1d92da0437dc16307904d" +checksum = "1aad0035b69380782d78ea95b508327e6deaa2235909053e596eea8f27b5e1d5" dependencies = [ "darling 0.23.0", "proc-macro2", @@ -13059,6 +13075,7 @@ dependencies = [ "axum", "base64", "bytes", + "flate2", "h2 0.4.15", "http 1.4.2", "http-body 1.0.1", diff --git a/Cargo.toml b/Cargo.toml index 4c6777fdea..b0c9db91eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/influxdb_source", + "core/connectors/sources/otlp_source", "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", @@ -308,6 +309,7 @@ tokio-rustls = "0.26.4" tokio-tungstenite = { version = "0.29", features = ["rustls-tls-webpki-roots"] } tokio-util = { version = "0.7.18", features = ["compat"] } toml = "1.1.2" +tonic = { version = "0.14.6", default-features = false } tower-http = { version = "0.7.0", features = ["add-extension", "cors", "trace"] } tracing = "0.1.44" tracing-appender = "0.2.5" diff --git a/core/connectors/sources/otlp_source/Cargo.toml b/core/connectors/sources/otlp_source/Cargo.toml new file mode 100644 index 0000000000..37dfac9b90 --- /dev/null +++ b/core/connectors/sources/otlp_source/Cargo.toml @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_otlp_source" +version = "0.1.0" +description = "Iggy OTLP/gRPC source connector — receives logs, metrics, and traces from any OpenTelemetry SDK or Collector and writes them to Iggy streams as JSON" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "opentelemetry", "otlp"] +categories = ["command-line-utilities", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +# dashmap and once_cell are not imported directly in this crate's source, but +# the source_connector! macro (in iggy_connector_sdk::source) expands bare +# `use dashmap::DashMap` and `use once_cell::sync::Lazy` into this crate's +# namespace, so they must be listed here. Remove them only after the SDK macro +# is updated to use `$crate::connector_macro_support::{DashMap, Lazy}`. +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +opentelemetry-proto = { version = "0.32.0", default-features = false, features = [ + "gen-tonic", + "logs", + "metrics", + "trace", +] } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true, features = ["transport", "router", "gzip"] } +tracing = { workspace = true } diff --git a/core/connectors/sources/otlp_source/README.md b/core/connectors/sources/otlp_source/README.md new file mode 100644 index 0000000000..6e23a05931 --- /dev/null +++ b/core/connectors/sources/otlp_source/README.md @@ -0,0 +1,44 @@ +# OTLP Source + +Receives logs, metrics, and traces from any OpenTelemetry SDK or Collector +over gRPC (OTLP/gRPC protocol) and writes them to an Iggy stream as JSON +messages. + +## How it works + +The connector binds a gRPC server (default port 4317, the OTLP standard) and +implements all three collector services: `LogsService`, `MetricsService`, and +`TraceService`. Each incoming export request is deserialized from the +`opentelemetry-proto` wire format and serialized to JSON. The JSON messages are +buffered in an in-process channel and drained by the runtime via the `poll()` +call. + +Gzip compression is enabled on every service. OTel SDKs and the Collector's +OTLP exporter compress payloads by default, so the connector accepts and sends +gzip on all three services. + +## JSON schema + +Each message has a `signal` field (`"log"`, `"metric"`, or `"trace"`) plus +signal-specific fields: + +**Logs** — `timestamp_ns`, `observed_timestamp_ns`, `severity`, +`severity_text`, `body`, `trace_id`, `span_id`, `service_name`, `attributes` + +**Metrics** — `name`, `description`, `unit`, `kind` (gauge/sum/histogram/…), +`data_points`, `resource`, `attributes` + +**Traces** — `trace_id`, `span_id`, `parent_span_id`, `name`, `kind`, +`start_time_ns`, `end_time_ns`, `status`, `service_name`, `attributes`, +`events`, `links` + +## Configuration + +```toml +[plugin_config] +listen_addr = "0.0.0.0:4317" # gRPC bind address +channel_capacity = 50000 # in-process buffer (messages) +batch_size = 1000 # max messages returned per poll() +``` + +Point any OTel SDK or Collector at `grpc://host:4317` (no TLS by default). diff --git a/core/connectors/sources/otlp_source/config.toml b/core/connectors/sources/otlp_source/config.toml new file mode 100644 index 0000000000..24341c2a23 --- /dev/null +++ b/core/connectors/sources/otlp_source/config.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "otlp" +enabled = true +version = 0 +name = "OTLP source" +path = "../../target/release/libiggy_connector_otlp_source" +verbose = false + +[[streams]] +stream = "otel" +topic = "signals" +schema = "json" +batch_length = 1000 +linger_time = "5ms" + +[plugin_config] +listen_addr = "0.0.0.0:4317" +channel_capacity = 50000 +batch_size = 1000 diff --git a/core/connectors/sources/otlp_source/src/convert.rs b/core/connectors/sources/otlp_source/src/convert.rs new file mode 100644 index 0000000000..a93b705aad --- /dev/null +++ b/core/connectors/sources/otlp_source/src/convert.rs @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy_connector_sdk::ProducedMessage; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; +use opentelemetry_proto::tonic::metrics::v1::{Metric, metric, number_data_point}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use serde_json::{Map, Value, json}; +use tracing::warn; + +pub fn export_logs_to_messages(req: ExportLogsServiceRequest) -> Vec { + let mut messages = Vec::new(); + for resource_logs in req.resource_logs { + let resource_attrs = extract_resource_attrs(resource_logs.resource.as_ref()); + let service_name = resource_attrs + .get("service.name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + + for scope_logs in resource_logs.scope_logs { + for record in scope_logs.log_records { + let body = record.body.as_ref().map(any_value_to_json); + let mut doc = json!({ + "signal": "log", + "timestamp_ns": record.time_unix_nano, + "observed_timestamp_ns": record.observed_time_unix_nano, + "severity": severity_number_to_text(record.severity_number), + "severity_text": record.severity_text, + "body": body, + "trace_id": bytes_to_hex(&record.trace_id), + "span_id": bytes_to_hex(&record.span_id), + "service_name": service_name, + "resource": resource_attrs.clone(), + "attributes": extract_attrs(&record.attributes), + }); + if let Some(obj) = doc.as_object_mut() { + obj.retain(|_, v| !v.is_null() && v != &Value::String(String::new())); + } + match serde_json::to_vec(&doc) { + Ok(payload) => messages.push(ProducedMessage { + id: None, + checksum: None, + timestamp: Some(record.time_unix_nano), + origin_timestamp: None, + headers: None, + payload, + }), + Err(err) => warn!("Failed to serialize log record: {err}"), + } + } + } + } + messages +} + +pub fn export_metrics_to_messages(req: ExportMetricsServiceRequest) -> Vec { + let mut messages = Vec::new(); + for resource_metrics in req.resource_metrics { + let resource_attrs = extract_resource_attrs(resource_metrics.resource.as_ref()); + let service_name = resource_attrs + .get("service.name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + + for scope_metrics in resource_metrics.scope_metrics { + for metric in scope_metrics.metrics { + let data_points = metric_to_data_points(&metric, &resource_attrs, &service_name); + messages.extend(data_points); + } + } + } + messages +} + +pub fn export_traces_to_messages(req: ExportTraceServiceRequest) -> Vec { + let mut messages = Vec::new(); + for resource_spans in req.resource_spans { + let resource_attrs = extract_resource_attrs(resource_spans.resource.as_ref()); + let service_name = resource_attrs + .get("service.name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + + for scope_spans in resource_spans.scope_spans { + for span in scope_spans.spans { + let status_code = span + .status + .as_ref() + .map(|s| status_code_to_text(s.code)) + .unwrap_or("unset"); + let status_message = span + .status + .as_ref() + .map(|s| s.message.as_str()) + .unwrap_or_default(); + + let mut doc = json!({ + "signal": "trace", + "trace_id": bytes_to_hex(&span.trace_id), + "span_id": bytes_to_hex(&span.span_id), + "parent_span_id": bytes_to_hex(&span.parent_span_id), + "name": span.name, + "kind": span_kind_to_text(span.kind), + "start_time_ns": span.start_time_unix_nano, + "end_time_ns": span.end_time_unix_nano, + "status": status_code, + "status_message": status_message, + "service_name": service_name, + "resource": resource_attrs.clone(), + "attributes": extract_attrs(&span.attributes), + }); + if let Some(obj) = doc.as_object_mut() { + obj.retain(|_, v| !v.is_null() && v != &Value::String(String::new())); + } + match serde_json::to_vec(&doc) { + Ok(payload) => messages.push(ProducedMessage { + id: None, + checksum: None, + timestamp: Some(span.start_time_unix_nano), + origin_timestamp: None, + headers: None, + payload, + }), + Err(err) => warn!("Failed to serialize span: {err}"), + } + } + } + } + messages +} + +fn metric_to_data_points( + metric: &Metric, + resource_attrs: &Map, + service_name: &str, +) -> Vec { + let mut messages = Vec::new(); + + let base = |time_ns: u64, value: Value, attrs: &[KeyValue], metric_type: &str| -> Value { + json!({ + "signal": "metric", + "name": metric.name, + "type": metric_type, + "unit": metric.unit, + "timestamp_ns": time_ns, + "value": value, + "service_name": service_name, + "resource": resource_attrs, + "attributes": extract_attrs(attrs), + }) + }; + + match &metric.data { + Some(metric::Data::Gauge(gauge)) => { + for dp in &gauge.data_points { + let value = number_dp_value(&dp.value); + let doc = base(dp.time_unix_nano, value, &dp.attributes, "gauge"); + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + Some(metric::Data::Sum(sum)) => { + for dp in &sum.data_points { + let value = number_dp_value(&dp.value); + let mut doc = base(dp.time_unix_nano, value, &dp.attributes, "sum"); + if let Some(obj) = doc.as_object_mut() { + obj.insert("is_monotonic".into(), Value::Bool(sum.is_monotonic)); + } + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + Some(metric::Data::Histogram(hist)) => { + for dp in &hist.data_points { + let doc = base( + dp.time_unix_nano, + json!({ "count": dp.count, "sum": dp.sum }), + &dp.attributes, + "histogram", + ); + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + Some(metric::Data::ExponentialHistogram(eh)) => { + for dp in &eh.data_points { + let doc = base( + dp.time_unix_nano, + json!({ "count": dp.count, "sum": dp.sum, "scale": dp.scale }), + &dp.attributes, + "exponential_histogram", + ); + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + Some(metric::Data::Summary(summary)) => { + for dp in &summary.data_points { + let doc = base( + dp.time_unix_nano, + json!({ "count": dp.count, "sum": dp.sum }), + &dp.attributes, + "summary", + ); + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + None => {} + } + + messages +} + +fn push_metric_doc(doc: Value, time_ns: u64, messages: &mut Vec) { + match serde_json::to_vec(&doc) { + Ok(payload) => messages.push(ProducedMessage { + id: None, + checksum: None, + timestamp: Some(time_ns), + origin_timestamp: None, + headers: None, + payload, + }), + Err(err) => warn!("Failed to serialize metric data point: {err}"), + } +} + +fn number_dp_value(value: &Option) -> Value { + match value { + Some(number_data_point::Value::AsDouble(d)) => json!(d), + Some(number_data_point::Value::AsInt(i)) => json!(i), + None => Value::Null, + } +} + +pub fn extract_resource_attrs(resource: Option<&Resource>) -> Map { + resource + .map(|r| extract_attrs(&r.attributes)) + .unwrap_or_default() +} + +pub fn extract_attrs(attrs: &[KeyValue]) -> Map { + attrs + .iter() + .map(|kv| { + let value = kv + .value + .as_ref() + .map(any_value_to_json) + .unwrap_or(Value::Null); + (kv.key.clone(), value) + }) + .collect() +} + +pub fn any_value_to_json(value: &AnyValue) -> Value { + match &value.value { + Some(any_value::Value::StringValue(s)) => Value::String(s.clone()), + Some(any_value::Value::BoolValue(b)) => Value::Bool(*b), + Some(any_value::Value::IntValue(i)) => json!(i), + Some(any_value::Value::DoubleValue(d)) => json!(d), + Some(any_value::Value::ArrayValue(arr)) => { + Value::Array(arr.values.iter().map(any_value_to_json).collect()) + } + Some(any_value::Value::KvlistValue(kvlist)) => Value::Object(extract_attrs(&kvlist.values)), + Some(any_value::Value::BytesValue(bytes)) => Value::String(bytes_to_hex(bytes)), + Some(any_value::Value::StringValueStrindex(_)) | None => Value::Null, + } +} + +pub fn bytes_to_hex(bytes: &[u8]) -> String { + if bytes.is_empty() { + return String::new(); + } + bytes.iter().map(|b| format!("{b:02x}")).collect() +} + +fn severity_number_to_text(number: i32) -> &'static str { + match number { + 1..=4 => "TRACE", + 5..=8 => "DEBUG", + 9..=12 => "INFO", + 13..=16 => "WARN", + 17..=20 => "ERROR", + 21..=24 => "FATAL", + _ => "UNSPECIFIED", + } +} + +fn status_code_to_text(code: i32) -> &'static str { + match code { + 1 => "ok", + 2 => "error", + _ => "unset", + } +} + +fn span_kind_to_text(kind: i32) -> &'static str { + match kind { + 1 => "internal", + 2 => "server", + 3 => "client", + 4 => "producer", + 5 => "consumer", + _ => "unspecified", + } +} diff --git a/core/connectors/sources/otlp_source/src/lib.rs b/core/connectors/sources/otlp_source/src/lib.rs new file mode 100644 index 0000000000..b54e5f7fd3 --- /dev/null +++ b/core/connectors/sources/otlp_source/src/lib.rs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use iggy_connector_sdk::{ + ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{Mutex, mpsc, oneshot}; +use tokio::task::JoinHandle; +use tracing::info; + +pub mod convert; +pub mod server; + +source_connector!(OtlpSource); + +#[derive(Debug)] +pub struct OtlpSource { + id: u32, + config: OtlpSourceConfig, + rx: Mutex>>, + shutdown_tx: Mutex>>, + server_task: Mutex>>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OtlpSourceConfig { + pub listen_addr: String, + #[serde(default = "default_channel_capacity")] + pub channel_capacity: usize, + #[serde(default = "default_batch_size")] + pub batch_size: usize, +} + +fn default_channel_capacity() -> usize { + 50_000 +} + +fn default_batch_size() -> usize { + 1_000 +} + +impl OtlpSource { + pub fn new(id: u32, config: OtlpSourceConfig, _state: Option) -> Self { + OtlpSource { + id, + config, + rx: Mutex::new(None), + shutdown_tx: Mutex::new(None), + server_task: Mutex::new(None), + } + } +} + +#[async_trait] +impl Source for OtlpSource { + async fn open(&mut self) -> Result<(), Error> { + let addr = self + .config + .listen_addr + .parse() + .map_err(|err| Error::InitError(format!("Invalid listen address: {err}")))?; + + let (tx, rx) = mpsc::channel(self.config.channel_capacity); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let handle = tokio::spawn(server::run_grpc_server(addr, tx, shutdown_rx)); + + *self.rx.lock().await = Some(rx); + *self.shutdown_tx.lock().await = Some(shutdown_tx); + *self.server_task.lock().await = Some(handle); + + info!( + "OTLP source connector with ID: {} listening on {}", + self.id, self.config.listen_addr + ); + Ok(()) + } + + async fn poll(&self) -> Result { + let mut rx_guard = self.rx.lock().await; + let rx = rx_guard.as_mut().ok_or_else(|| { + Error::InitError("OTLP source connector is not initialized".to_string()) + })?; + + let first = match rx.recv().await { + Some(msg) => msg, + None => { + return Ok(ProducedMessages { + schema: Schema::Json, + messages: vec![], + state: None, + }); + } + }; + + let mut messages = Vec::with_capacity(self.config.batch_size); + messages.push(first); + + while messages.len() < self.config.batch_size { + match rx.try_recv() { + Ok(msg) => messages.push(msg), + Err(_) => break, + } + } + + Ok(ProducedMessages { + schema: Schema::Json, + messages, + state: None, + }) + } + + async fn close(&mut self) -> Result<(), Error> { + if let Some(tx) = self.shutdown_tx.lock().await.take() { + let _ = tx.send(()); + } + if let Some(task) = self.server_task.lock().await.take() { + task.abort(); + } + info!("OTLP source connector with ID: {} closed.", self.id); + Ok(()) + } +} diff --git a/core/connectors/sources/otlp_source/src/server.rs b/core/connectors/sources/otlp_source/src/server.rs new file mode 100644 index 0000000000..8d176bfd9f --- /dev/null +++ b/core/connectors/sources/otlp_source/src/server.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::convert; +use iggy_connector_sdk::ProducedMessage; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceRequest, ExportLogsServiceResponse, + logs_service_server::{LogsService, LogsServiceServer}, +}; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsServiceRequest, ExportMetricsServiceResponse, + metrics_service_server::{MetricsService, MetricsServiceServer}, +}; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, + trace_service_server::{TraceService, TraceServiceServer}, +}; +use std::net::SocketAddr; +use tokio::sync::{mpsc, oneshot}; +use tonic::codec::CompressionEncoding; +use tonic::{Request, Response, Status}; +use tracing::{error, info, warn}; + +pub async fn run_grpc_server( + addr: SocketAddr, + tx: mpsc::Sender, + shutdown: oneshot::Receiver<()>, +) { + let logs_svc = LogsServiceImpl { tx: tx.clone() }; + let metrics_svc = MetricsServiceImpl { tx: tx.clone() }; + let trace_svc = TraceServiceImpl { tx }; + + info!("OTLP gRPC server starting on {addr}"); + + // OTel SDKs and the Collector's OTLP exporter gzip-compress payloads by + // default, so every service must accept gzip on the wire. Responses are tiny + // (empty partial_success), but advertising gzip on send is harmless and lets + // clients negotiate it. + let logs_server = LogsServiceServer::new(logs_svc) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + let metrics_server = MetricsServiceServer::new(metrics_svc) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + let trace_server = TraceServiceServer::new(trace_svc) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + if let Err(err) = tonic::transport::Server::builder() + .add_service(logs_server) + .add_service(metrics_server) + .add_service(trace_server) + .serve_with_shutdown(addr, async { + let _ = shutdown.await; + info!("OTLP gRPC server received shutdown signal"); + }) + .await + { + error!("OTLP gRPC server error: {err}"); + } +} + +struct LogsServiceImpl { + tx: mpsc::Sender, +} + +struct MetricsServiceImpl { + tx: mpsc::Sender, +} + +struct TraceServiceImpl { + tx: mpsc::Sender, +} + +#[tonic::async_trait] +impl LogsService for LogsServiceImpl { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let messages = convert::export_logs_to_messages(request.into_inner()); + send_messages(&self.tx, messages, "logs").await; + Ok(Response::new(ExportLogsServiceResponse { + partial_success: None, + })) + } +} + +#[tonic::async_trait] +impl MetricsService for MetricsServiceImpl { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let messages = convert::export_metrics_to_messages(request.into_inner()); + send_messages(&self.tx, messages, "metrics").await; + Ok(Response::new(ExportMetricsServiceResponse { + partial_success: None, + })) + } +} + +#[tonic::async_trait] +impl TraceService for TraceServiceImpl { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let messages = convert::export_traces_to_messages(request.into_inner()); + send_messages(&self.tx, messages, "traces").await; + Ok(Response::new(ExportTraceServiceResponse { + partial_success: None, + })) + } +} + +async fn send_messages( + tx: &mpsc::Sender, + messages: Vec, + signal: &str, +) { + for message in messages { + if let Err(err) = tx.try_send(message) { + warn!("OTLP channel full, dropping {signal} message: {err}"); + } + } +} From a648b62cda525d9671b7923e8af8a77cc412736c Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Sun, 21 Jun 2026 23:04:56 +0300 Subject: [PATCH 2/8] fix(connectors/otlp_source): bind TcpListener synchronously in open() Spawn raced with bind: if EADDRINUSE/EPERM, the server task exited silently, tx was dropped, and poll() returned Ok(empty) forever at CPU speed with the connector status stuck at Running. Bind with TcpIncoming::bind() before tokio::spawn so any port error surfaces as InitError in open() instead of a silent busy-loop. Switch run_grpc_server to serve_with_incoming_shutdown. --- core/connectors/sources/otlp_source/src/lib.rs | 16 +++++++++++----- .../connectors/sources/otlp_source/src/server.rs | 8 +++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/core/connectors/sources/otlp_source/src/lib.rs b/core/connectors/sources/otlp_source/src/lib.rs index b54e5f7fd3..f4b14161c5 100644 --- a/core/connectors/sources/otlp_source/src/lib.rs +++ b/core/connectors/sources/otlp_source/src/lib.rs @@ -22,6 +22,7 @@ use iggy_connector_sdk::{ use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, mpsc, oneshot}; use tokio::task::JoinHandle; +use tonic::transport::server::TcpIncoming; use tracing::info; pub mod convert; @@ -76,19 +77,24 @@ impl Source for OtlpSource { .parse() .map_err(|err| Error::InitError(format!("Invalid listen address: {err}")))?; + let incoming = TcpIncoming::bind(addr).map_err(|err| { + Error::InitError(format!("Failed to bind {}: {err}", self.config.listen_addr)) + })?; + + info!( + "OTLP source connector with ID: {} listening on {}", + self.id, self.config.listen_addr + ); + let (tx, rx) = mpsc::channel(self.config.channel_capacity); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let handle = tokio::spawn(server::run_grpc_server(addr, tx, shutdown_rx)); + let handle = tokio::spawn(server::run_grpc_server(incoming, tx, shutdown_rx)); *self.rx.lock().await = Some(rx); *self.shutdown_tx.lock().await = Some(shutdown_tx); *self.server_task.lock().await = Some(handle); - info!( - "OTLP source connector with ID: {} listening on {}", - self.id, self.config.listen_addr - ); Ok(()) } diff --git a/core/connectors/sources/otlp_source/src/server.rs b/core/connectors/sources/otlp_source/src/server.rs index 8d176bfd9f..b3aecdc5fd 100644 --- a/core/connectors/sources/otlp_source/src/server.rs +++ b/core/connectors/sources/otlp_source/src/server.rs @@ -29,14 +29,14 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, trace_service_server::{TraceService, TraceServiceServer}, }; -use std::net::SocketAddr; use tokio::sync::{mpsc, oneshot}; use tonic::codec::CompressionEncoding; +use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; use tracing::{error, info, warn}; pub async fn run_grpc_server( - addr: SocketAddr, + incoming: TcpIncoming, tx: mpsc::Sender, shutdown: oneshot::Receiver<()>, ) { @@ -44,8 +44,6 @@ pub async fn run_grpc_server( let metrics_svc = MetricsServiceImpl { tx: tx.clone() }; let trace_svc = TraceServiceImpl { tx }; - info!("OTLP gRPC server starting on {addr}"); - // OTel SDKs and the Collector's OTLP exporter gzip-compress payloads by // default, so every service must accept gzip on the wire. Responses are tiny // (empty partial_success), but advertising gzip on send is harmless and lets @@ -64,7 +62,7 @@ pub async fn run_grpc_server( .add_service(logs_server) .add_service(metrics_server) .add_service(trace_server) - .serve_with_shutdown(addr, async { + .serve_with_incoming_shutdown(incoming, async { let _ = shutdown.await; info!("OTLP gRPC server received shutdown signal"); }) From f18bb2bd8ecd6604a77eac05620601c6e3d2909b Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Sun, 21 Jun 2026 23:08:09 +0300 Subject: [PATCH 3/8] fix(connectors/otlp_source): report partial_success on drop; error on closed channel Two correctness issues: 1. Channel-full drops were silent successes: try_send failure dropped the message but each service impl returned partial_success: None, so the OTel SDK saw full acceptance and did not retry. Now send_messages returns the drop count and each impl populates the OTLP-spec rejected_log_records / rejected_data_points / rejected_spans field. 2. rx.recv() == None returned Ok(empty), causing a CPU-speed busy-loop when the gRPC server task exited for any reason (panic, OS signal). Now returns Err(Error::Connection(...)) so the runtime propagates the failure and stops the connector. --- .../connectors/sources/otlp_source/src/lib.rs | 8 ++-- .../sources/otlp_source/src/server.rs | 37 +++++++++++++------ 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/core/connectors/sources/otlp_source/src/lib.rs b/core/connectors/sources/otlp_source/src/lib.rs index f4b14161c5..fa84e60007 100644 --- a/core/connectors/sources/otlp_source/src/lib.rs +++ b/core/connectors/sources/otlp_source/src/lib.rs @@ -107,11 +107,9 @@ impl Source for OtlpSource { let first = match rx.recv().await { Some(msg) => msg, None => { - return Ok(ProducedMessages { - schema: Schema::Json, - messages: vec![], - state: None, - }); + return Err(Error::Connection( + "OTLP gRPC server terminated unexpectedly".to_string(), + )); } }; diff --git a/core/connectors/sources/otlp_source/src/server.rs b/core/connectors/sources/otlp_source/src/server.rs index b3aecdc5fd..6e4295972e 100644 --- a/core/connectors/sources/otlp_source/src/server.rs +++ b/core/connectors/sources/otlp_source/src/server.rs @@ -18,15 +18,15 @@ use crate::convert; use iggy_connector_sdk::ProducedMessage; use opentelemetry_proto::tonic::collector::logs::v1::{ - ExportLogsServiceRequest, ExportLogsServiceResponse, + ExportLogsPartialSuccess, ExportLogsServiceRequest, ExportLogsServiceResponse, logs_service_server::{LogsService, LogsServiceServer}, }; use opentelemetry_proto::tonic::collector::metrics::v1::{ - ExportMetricsServiceRequest, ExportMetricsServiceResponse, + ExportMetricsPartialSuccess, ExportMetricsServiceRequest, ExportMetricsServiceResponse, metrics_service_server::{MetricsService, MetricsServiceServer}, }; use opentelemetry_proto::tonic::collector::trace::v1::{ - ExportTraceServiceRequest, ExportTraceServiceResponse, + ExportTracePartialSuccess, ExportTraceServiceRequest, ExportTraceServiceResponse, trace_service_server::{TraceService, TraceServiceServer}, }; use tokio::sync::{mpsc, oneshot}; @@ -91,10 +91,12 @@ impl LogsService for LogsServiceImpl { request: Request, ) -> Result, Status> { let messages = convert::export_logs_to_messages(request.into_inner()); - send_messages(&self.tx, messages, "logs").await; - Ok(Response::new(ExportLogsServiceResponse { - partial_success: None, - })) + let rejected = send_messages(&self.tx, messages, "logs").await; + let partial_success = (rejected > 0).then(|| ExportLogsPartialSuccess { + rejected_log_records: rejected, + error_message: "channel full; records dropped".to_string(), + }); + Ok(Response::new(ExportLogsServiceResponse { partial_success })) } } @@ -105,9 +107,13 @@ impl MetricsService for MetricsServiceImpl { request: Request, ) -> Result, Status> { let messages = convert::export_metrics_to_messages(request.into_inner()); - send_messages(&self.tx, messages, "metrics").await; + let rejected = send_messages(&self.tx, messages, "metrics").await; + let partial_success = (rejected > 0).then(|| ExportMetricsPartialSuccess { + rejected_data_points: rejected, + error_message: "channel full; data points dropped".to_string(), + }); Ok(Response::new(ExportMetricsServiceResponse { - partial_success: None, + partial_success, })) } } @@ -119,9 +125,13 @@ impl TraceService for TraceServiceImpl { request: Request, ) -> Result, Status> { let messages = convert::export_traces_to_messages(request.into_inner()); - send_messages(&self.tx, messages, "traces").await; + let rejected = send_messages(&self.tx, messages, "traces").await; + let partial_success = (rejected > 0).then(|| ExportTracePartialSuccess { + rejected_spans: rejected, + error_message: "channel full; spans dropped".to_string(), + }); Ok(Response::new(ExportTraceServiceResponse { - partial_success: None, + partial_success, })) } } @@ -130,10 +140,13 @@ async fn send_messages( tx: &mpsc::Sender, messages: Vec, signal: &str, -) { +) -> i64 { + let mut dropped: i64 = 0; for message in messages { if let Err(err) = tx.try_send(message) { warn!("OTLP channel full, dropping {signal} message: {err}"); + dropped += 1; } } + dropped } From 961bd1ab9f9c8f19124ed122934f44788f2cabd6 Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Sun, 21 Jun 2026 23:12:04 +0300 Subject: [PATCH 4/8] fix(connectors/otlp_source): guard double open(); single-alloc bytes_to_hex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Double open() overwrote Mutex fields without aborting the old server task, leaving a dangling task on the same port (EADDRINUSE on restart). Added an is_some() guard at the top of open() that returns InitError immediately. bytes_to_hex allocated one String per byte via format!("{b:02x}") — 16 heap allocs per trace_id/span_id call, 48 per span. Replaced with a single String::with_capacity(len*2) + write! loop. --- core/connectors/sources/otlp_source/src/convert.rs | 8 +++++--- core/connectors/sources/otlp_source/src/lib.rs | 6 ++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/connectors/sources/otlp_source/src/convert.rs b/core/connectors/sources/otlp_source/src/convert.rs index a93b705aad..a4fdf5a9d8 100644 --- a/core/connectors/sources/otlp_source/src/convert.rs +++ b/core/connectors/sources/otlp_source/src/convert.rs @@ -23,6 +23,7 @@ use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; use opentelemetry_proto::tonic::metrics::v1::{Metric, metric, number_data_point}; use opentelemetry_proto::tonic::resource::v1::Resource; use serde_json::{Map, Value, json}; +use std::fmt::Write as _; use tracing::warn; pub fn export_logs_to_messages(req: ExportLogsServiceRequest) -> Vec { @@ -285,10 +286,11 @@ pub fn any_value_to_json(value: &AnyValue) -> Value { } pub fn bytes_to_hex(bytes: &[u8]) -> String { - if bytes.is_empty() { - return String::new(); + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + let _ = write!(s, "{b:02x}"); } - bytes.iter().map(|b| format!("{b:02x}")).collect() + s } fn severity_number_to_text(number: i32) -> &'static str { diff --git a/core/connectors/sources/otlp_source/src/lib.rs b/core/connectors/sources/otlp_source/src/lib.rs index fa84e60007..8cde3b41fe 100644 --- a/core/connectors/sources/otlp_source/src/lib.rs +++ b/core/connectors/sources/otlp_source/src/lib.rs @@ -71,6 +71,12 @@ impl OtlpSource { #[async_trait] impl Source for OtlpSource { async fn open(&mut self) -> Result<(), Error> { + if self.rx.lock().await.is_some() { + return Err(Error::InitError( + "OTLP source connector is already open".to_string(), + )); + } + let addr = self .config .listen_addr From 54615c5eaee66729c492ae2dc98b1f7bed4f820a Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Sun, 21 Jun 2026 23:15:28 +0300 Subject: [PATCH 5/8] fix(connectors/otlp_source): pre-build resource Value; warn on unknown AnyValue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit resource_attrs (Map) was cloned inside the innermost loop over log records and spans — 1000 Map clones per batch for a shared resource. Convert to Value::Object once before the scope loops; the inner loop clones the Value instead. extract_attrs now matches StringValueStrindex before calling any_value_to_json, so the attribute key is included in the warn! when an unrecognized AnyValue variant silently drops data. --- .../sources/otlp_source/src/convert.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/connectors/sources/otlp_source/src/convert.rs b/core/connectors/sources/otlp_source/src/convert.rs index a4fdf5a9d8..1b7f5687f4 100644 --- a/core/connectors/sources/otlp_source/src/convert.rs +++ b/core/connectors/sources/otlp_source/src/convert.rs @@ -36,6 +36,7 @@ pub fn export_logs_to_messages(req: ExportLogsServiceRequest) -> Vec Vec Vec Vec Map { attrs .iter() .map(|kv| { - let value = kv - .value - .as_ref() - .map(any_value_to_json) - .unwrap_or(Value::Null); + let value = match kv.value.as_ref() { + Some(av) if matches!(av.value, Some(any_value::Value::StringValueStrindex(_))) => { + warn!(key = %kv.key, "dropping attribute with unrecognized AnyValue variant"); + Value::Null + } + Some(av) => any_value_to_json(av), + None => Value::Null, + }; (kv.key.clone(), value) }) .collect() From bc5b37b048ec67d118957235e78e3204999c9827 Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Mon, 22 Jun 2026 06:29:15 +0300 Subject: [PATCH 6/8] fix(connectors/otlp_source): task await, log flood, version, workspace dep, visibility, zero timestamp close() now awaits the aborted task so the TCP port is released before returning; rx is also cleared to allow a subsequent open() after restart. send_messages is non-async (it has no .await); drops are counted and emitted as a single warn! instead of one per message. Version aligned to workspace pattern 0.4.1-edge.1. opentelemetry-proto added to workspace deps; crate references it via workspace = true. convert and server modules narrowed to pub(crate); only FFI symbols generated by source_connector! remain part of the public surface. Proto scalar default 0 for time_unix_nano / start_time_unix_nano is now treated as unset and emits None instead of Some(0) / Unix epoch. --- Cargo.toml | 1 + core/connectors/sources/otlp_source/Cargo.toml | 6 +++--- .../connectors/sources/otlp_source/src/convert.rs | 5 +++-- core/connectors/sources/otlp_source/src/lib.rs | 9 ++++++--- core/connectors/sources/otlp_source/src/server.rs | 15 +++++++++------ 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b0c9db91eb..6d970e1267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -223,6 +223,7 @@ opentelemetry-otlp = { version = "0.32.0", features = [ "http-proto", "reqwest-client", ] } +opentelemetry-proto = { version = "0.32.0", default-features = false } opentelemetry-semantic-conventions = "0.32.0" opentelemetry_sdk = { version = "0.32.1", features = [ "logs", diff --git a/core/connectors/sources/otlp_source/Cargo.toml b/core/connectors/sources/otlp_source/Cargo.toml index 37dfac9b90..5cd3f52cd1 100644 --- a/core/connectors/sources/otlp_source/Cargo.toml +++ b/core/connectors/sources/otlp_source/Cargo.toml @@ -17,8 +17,8 @@ [package] name = "iggy_connector_otlp_source" -version = "0.1.0" -description = "Iggy OTLP/gRPC source connector — receives logs, metrics, and traces from any OpenTelemetry SDK or Collector and writes them to Iggy streams as JSON" +version = "0.4.1-edge.1" +description = "Iggy OTLP/gRPC source connector - receives logs, metrics, and traces from any OpenTelemetry SDK or Collector and writes them to Iggy streams as JSON" edition = "2024" license = "Apache-2.0" keywords = ["iggy", "messaging", "streaming", "opentelemetry", "otlp"] @@ -45,7 +45,7 @@ async-trait = { workspace = true } dashmap = { workspace = true } iggy_connector_sdk = { workspace = true } once_cell = { workspace = true } -opentelemetry-proto = { version = "0.32.0", default-features = false, features = [ +opentelemetry-proto = { workspace = true, features = [ "gen-tonic", "logs", "metrics", diff --git a/core/connectors/sources/otlp_source/src/convert.rs b/core/connectors/sources/otlp_source/src/convert.rs index 1b7f5687f4..276a7136fb 100644 --- a/core/connectors/sources/otlp_source/src/convert.rs +++ b/core/connectors/sources/otlp_source/src/convert.rs @@ -60,7 +60,7 @@ pub fn export_logs_to_messages(req: ExportLogsServiceRequest) -> Vec messages.push(ProducedMessage { id: None, checksum: None, - timestamp: Some(record.time_unix_nano), + timestamp: (record.time_unix_nano != 0).then_some(record.time_unix_nano), origin_timestamp: None, headers: None, payload, @@ -139,7 +139,8 @@ pub fn export_traces_to_messages(req: ExportTraceServiceRequest) -> Vec messages.push(ProducedMessage { id: None, checksum: None, - timestamp: Some(span.start_time_unix_nano), + timestamp: (span.start_time_unix_nano != 0) + .then_some(span.start_time_unix_nano), origin_timestamp: None, headers: None, payload, diff --git a/core/connectors/sources/otlp_source/src/lib.rs b/core/connectors/sources/otlp_source/src/lib.rs index 8cde3b41fe..d4ae15609d 100644 --- a/core/connectors/sources/otlp_source/src/lib.rs +++ b/core/connectors/sources/otlp_source/src/lib.rs @@ -25,8 +25,8 @@ use tokio::task::JoinHandle; use tonic::transport::server::TcpIncoming; use tracing::info; -pub mod convert; -pub mod server; +pub(crate) mod convert; +pub(crate) mod server; source_connector!(OtlpSource); @@ -140,9 +140,12 @@ impl Source for OtlpSource { if let Some(tx) = self.shutdown_tx.lock().await.take() { let _ = tx.send(()); } - if let Some(task) = self.server_task.lock().await.take() { + let task = self.server_task.lock().await.take(); + if let Some(task) = task { task.abort(); + let _ = task.await; } + *self.rx.lock().await = None; info!("OTLP source connector with ID: {} closed.", self.id); Ok(()) } diff --git a/core/connectors/sources/otlp_source/src/server.rs b/core/connectors/sources/otlp_source/src/server.rs index 6e4295972e..8b189ad456 100644 --- a/core/connectors/sources/otlp_source/src/server.rs +++ b/core/connectors/sources/otlp_source/src/server.rs @@ -91,7 +91,7 @@ impl LogsService for LogsServiceImpl { request: Request, ) -> Result, Status> { let messages = convert::export_logs_to_messages(request.into_inner()); - let rejected = send_messages(&self.tx, messages, "logs").await; + let rejected = send_messages(&self.tx, messages, "logs"); let partial_success = (rejected > 0).then(|| ExportLogsPartialSuccess { rejected_log_records: rejected, error_message: "channel full; records dropped".to_string(), @@ -107,7 +107,7 @@ impl MetricsService for MetricsServiceImpl { request: Request, ) -> Result, Status> { let messages = convert::export_metrics_to_messages(request.into_inner()); - let rejected = send_messages(&self.tx, messages, "metrics").await; + let rejected = send_messages(&self.tx, messages, "metrics"); let partial_success = (rejected > 0).then(|| ExportMetricsPartialSuccess { rejected_data_points: rejected, error_message: "channel full; data points dropped".to_string(), @@ -125,7 +125,7 @@ impl TraceService for TraceServiceImpl { request: Request, ) -> Result, Status> { let messages = convert::export_traces_to_messages(request.into_inner()); - let rejected = send_messages(&self.tx, messages, "traces").await; + let rejected = send_messages(&self.tx, messages, "traces"); let partial_success = (rejected > 0).then(|| ExportTracePartialSuccess { rejected_spans: rejected, error_message: "channel full; spans dropped".to_string(), @@ -136,17 +136,20 @@ impl TraceService for TraceServiceImpl { } } -async fn send_messages( +fn send_messages( tx: &mpsc::Sender, messages: Vec, signal: &str, ) -> i64 { + let total = messages.len() as i64; let mut dropped: i64 = 0; for message in messages { - if let Err(err) = tx.try_send(message) { - warn!("OTLP channel full, dropping {signal} message: {err}"); + if tx.try_send(message).is_err() { dropped += 1; } } + if dropped > 0 { + warn!("OTLP channel full, dropped {dropped}/{total} {signal} messages"); + } dropped } From 1ecae99cf35ff594573f906b70bba14bf96a763b Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Mon, 22 Jun 2026 17:48:56 +0300 Subject: [PATCH 7/8] feat(connectors/otlp_source): add proto storage format Adds `format = "proto"` (default: json) to OtlpSourceConfig. In proto mode the entire Export*ServiceRequest is prost-encoded as a single raw message instead of fanning out to per-item JSON documents. Paired with otlp_sink's proto mode this yields 4-5x storage reduction in Iggy with zero application-level parsing overhead on the sink side. The default remains "json" so existing deployments are unaffected. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 3 +- .../connectors/sources/otlp_source/Cargo.toml | 1 + .../connectors/sources/otlp_source/src/lib.rs | 32 ++++++++- .../sources/otlp_source/src/server.rs | 70 +++++++++++++++++-- 4 files changed, 97 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e49627addb..a18892db23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6955,13 +6955,14 @@ dependencies = [ [[package]] name = "iggy_connector_otlp_source" -version = "0.1.0" +version = "0.4.1-edge.1" dependencies = [ "async-trait", "dashmap", "iggy_connector_sdk", "once_cell", "opentelemetry-proto", + "prost", "serde", "serde_json", "tokio", diff --git a/core/connectors/sources/otlp_source/Cargo.toml b/core/connectors/sources/otlp_source/Cargo.toml index 5cd3f52cd1..eb9a35d3a1 100644 --- a/core/connectors/sources/otlp_source/Cargo.toml +++ b/core/connectors/sources/otlp_source/Cargo.toml @@ -45,6 +45,7 @@ async-trait = { workspace = true } dashmap = { workspace = true } iggy_connector_sdk = { workspace = true } once_cell = { workspace = true } +prost = { workspace = true } opentelemetry-proto = { workspace = true, features = [ "gen-tonic", "logs", diff --git a/core/connectors/sources/otlp_source/src/lib.rs b/core/connectors/sources/otlp_source/src/lib.rs index d4ae15609d..bd6db95c15 100644 --- a/core/connectors/sources/otlp_source/src/lib.rs +++ b/core/connectors/sources/otlp_source/src/lib.rs @@ -30,6 +30,22 @@ pub(crate) mod server; source_connector!(OtlpSource); +/// How messages are stored in the Iggy topic. +/// +/// `Json` (default): each OTLP signal item (span / data-point / log record) +/// is stored as a flat JSON document. Human-readable but verbose. +/// +/// `Proto`: the entire `Export*ServiceRequest` proto is stored as raw bytes +/// per gRPC call. Typically 4-5x smaller than JSON and zero-copy on the sink +/// side when paired with `otlp_sink`'s `format = "proto"`. +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum StorageFormat { + #[default] + Json, + Proto, +} + #[derive(Debug)] pub struct OtlpSource { id: u32, @@ -46,6 +62,8 @@ pub struct OtlpSourceConfig { pub channel_capacity: usize, #[serde(default = "default_batch_size")] pub batch_size: usize, + #[serde(default)] + pub format: StorageFormat, } fn default_channel_capacity() -> usize { @@ -95,7 +113,12 @@ impl Source for OtlpSource { let (tx, rx) = mpsc::channel(self.config.channel_capacity); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let handle = tokio::spawn(server::run_grpc_server(incoming, tx, shutdown_rx)); + let handle = tokio::spawn(server::run_grpc_server( + incoming, + tx, + shutdown_rx, + self.config.format, + )); *self.rx.lock().await = Some(rx); *self.shutdown_tx.lock().await = Some(shutdown_tx); @@ -129,8 +152,13 @@ impl Source for OtlpSource { } } + let schema = match self.config.format { + StorageFormat::Json => Schema::Json, + StorageFormat::Proto => Schema::Raw, + }; + Ok(ProducedMessages { - schema: Schema::Json, + schema, messages, state: None, }) diff --git a/core/connectors/sources/otlp_source/src/server.rs b/core/connectors/sources/otlp_source/src/server.rs index 8b189ad456..2e78bed3c6 100644 --- a/core/connectors/sources/otlp_source/src/server.rs +++ b/core/connectors/sources/otlp_source/src/server.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::StorageFormat; use crate::convert; use iggy_connector_sdk::ProducedMessage; use opentelemetry_proto::tonic::collector::logs::v1::{ @@ -29,6 +30,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTracePartialSuccess, ExportTraceServiceRequest, ExportTraceServiceResponse, trace_service_server::{TraceService, TraceServiceServer}, }; +use prost::Message as ProstMessage; use tokio::sync::{mpsc, oneshot}; use tonic::codec::CompressionEncoding; use tonic::transport::server::TcpIncoming; @@ -39,10 +41,17 @@ pub async fn run_grpc_server( incoming: TcpIncoming, tx: mpsc::Sender, shutdown: oneshot::Receiver<()>, + format: StorageFormat, ) { - let logs_svc = LogsServiceImpl { tx: tx.clone() }; - let metrics_svc = MetricsServiceImpl { tx: tx.clone() }; - let trace_svc = TraceServiceImpl { tx }; + let logs_svc = LogsServiceImpl { + tx: tx.clone(), + format, + }; + let metrics_svc = MetricsServiceImpl { + tx: tx.clone(), + format, + }; + let trace_svc = TraceServiceImpl { tx, format }; // OTel SDKs and the Collector's OTLP exporter gzip-compress payloads by // default, so every service must accept gzip on the wire. Responses are tiny @@ -74,14 +83,17 @@ pub async fn run_grpc_server( struct LogsServiceImpl { tx: mpsc::Sender, + format: StorageFormat, } struct MetricsServiceImpl { tx: mpsc::Sender, + format: StorageFormat, } struct TraceServiceImpl { tx: mpsc::Sender, + format: StorageFormat, } #[tonic::async_trait] @@ -90,7 +102,7 @@ impl LogsService for LogsServiceImpl { &self, request: Request, ) -> Result, Status> { - let messages = convert::export_logs_to_messages(request.into_inner()); + let messages = encode_or_convert(request.into_inner(), self.format, "logs"); let rejected = send_messages(&self.tx, messages, "logs"); let partial_success = (rejected > 0).then(|| ExportLogsPartialSuccess { rejected_log_records: rejected, @@ -106,7 +118,7 @@ impl MetricsService for MetricsServiceImpl { &self, request: Request, ) -> Result, Status> { - let messages = convert::export_metrics_to_messages(request.into_inner()); + let messages = encode_or_convert(request.into_inner(), self.format, "metrics"); let rejected = send_messages(&self.tx, messages, "metrics"); let partial_success = (rejected > 0).then(|| ExportMetricsPartialSuccess { rejected_data_points: rejected, @@ -124,7 +136,7 @@ impl TraceService for TraceServiceImpl { &self, request: Request, ) -> Result, Status> { - let messages = convert::export_traces_to_messages(request.into_inner()); + let messages = encode_or_convert(request.into_inner(), self.format, "traces"); let rejected = send_messages(&self.tx, messages, "traces"); let partial_success = (rejected > 0).then(|| ExportTracePartialSuccess { rejected_spans: rejected, @@ -136,6 +148,52 @@ impl TraceService for TraceServiceImpl { } } +trait IntoMessages { + fn into_json_messages(self) -> Vec; +} + +impl IntoMessages for ExportLogsServiceRequest { + fn into_json_messages(self) -> Vec { + convert::export_logs_to_messages(self) + } +} + +impl IntoMessages for ExportMetricsServiceRequest { + fn into_json_messages(self) -> Vec { + convert::export_metrics_to_messages(self) + } +} + +impl IntoMessages for ExportTraceServiceRequest { + fn into_json_messages(self) -> Vec { + convert::export_traces_to_messages(self) + } +} + +fn encode_or_convert(req: R, format: StorageFormat, signal: &str) -> Vec +where + R: ProstMessage + IntoMessages, +{ + match format { + StorageFormat::Json => req.into_json_messages(), + StorageFormat::Proto => { + let mut buf = Vec::new(); + if let Err(e) = req.encode(&mut buf) { + warn!("Failed to encode {signal} proto: {e}"); + return vec![]; + } + vec![ProducedMessage { + id: None, + checksum: None, + timestamp: None, + origin_timestamp: None, + headers: None, + payload: buf, + }] + } + } +} + fn send_messages( tx: &mpsc::Sender, messages: Vec, From c54eb7c480912c9fd23d8bf8dc9ee301ddbbba59 Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Tue, 23 Jun 2026 16:14:42 +0300 Subject: [PATCH 8/8] fix(connectors): add otlp_source plugin to Dockerfile.connectors `iggy_connector_otlp_source` was not being built or shipped in the runtime image. Add it to the build stage and the runtime COPY step. --- Dockerfile.connectors | 79 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 Dockerfile.connectors diff --git a/Dockerfile.connectors b/Dockerfile.connectors new file mode 100644 index 0000000000..51b60e8ae3 --- /dev/null +++ b/Dockerfile.connectors @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# syntax=docker/dockerfile:1.7 +# +# iggy-connectors runtime image — ships the iggy-connectors binary and the +# OTLP/gRPC source, OTLP/gRPC sink, quickwit sink plugins (.so). Config files are injected +# via K8s ConfigMaps. +# +# Builder: rust:1-slim-bookworm (mold linker, BuildKit cache mounts) +# Runtime: debian:bookworm-slim + libssl3 + +# ---- Build ---- +FROM rust:1-slim-bookworm AS build +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + pkg-config libssl-dev clang mold \ + && rm -rf /var/lib/apt/lists/* + +COPY . . + +ARG TARGETARCH +RUN --mount=type=cache,id=iggy-connectors-registry,sharing=locked,target=/usr/local/cargo/registry \ + --mount=type=cache,id=iggy-connectors-git,sharing=locked,target=/usr/local/cargo/git \ + --mount=type=cache,id=iggy-connectors-target-${TARGETARCH},target=/app/target \ + RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=mold" \ + cargo build --release --bin iggy-connectors \ + && cargo build --release -p iggy_connector_otlp_source \ + && cargo build --release -p iggy_connector_quickwit_sink \ + && cargo build --release -p iggy_connector_otlp_sink \ + && cp target/release/iggy-connectors /usr/local/bin/iggy-connectors \ + && cp target/release/libiggy_connector_otlp_source.so /usr/local/lib/libiggy_connector_otlp_source.so \ + && cp target/release/libiggy_connector_quickwit_sink.so /usr/local/lib/libiggy_connector_quickwit_sink.so \ + && cp target/release/libiggy_connector_otlp_sink.so /usr/local/lib/libiggy_connector_otlp_sink.so \ + && strip /usr/local/bin/iggy-connectors + +# ---- Runtime ---- +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates libssl3 \ + && rm -rf /var/lib/apt/lists/* \ + && useradd --system --uid 10001 --home-dir /connectors \ + --shell /usr/sbin/nologin iggy \ + && mkdir -p /connectors/plugins /connectors/state \ + && chown -R iggy:iggy /connectors + +COPY --from=build /usr/local/bin/iggy-connectors /usr/local/bin/iggy-connectors +COPY --from=build /usr/local/lib/libiggy_connector_otlp_source.so \ + /connectors/plugins/libiggy_connector_otlp_source.so +COPY --from=build /usr/local/lib/libiggy_connector_quickwit_sink.so \ + /connectors/plugins/libiggy_connector_quickwit_sink.so +COPY --from=build /usr/local/lib/libiggy_connector_otlp_sink.so \ + /connectors/plugins/libiggy_connector_otlp_sink.so + +USER iggy +WORKDIR /connectors + +# Runtime config: IGGY_CONNECTORS_CONFIG_PATH → /connectors/runtime.toml (ConfigMap) +# Plugin configs: /connectors/plugins/otlp_source.toml (ConfigMap) +ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml + +EXPOSE 8081 + +ENTRYPOINT ["/usr/local/bin/iggy-connectors"]