diff --git a/Cargo.lock b/Cargo.lock index 12382acfca..a18892db23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6953,6 +6953,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_otlp_source" +version = "0.4.1-edge.1" +dependencies = [ + "async-trait", + "dashmap", + "iggy_connector_sdk", + "once_cell", + "opentelemetry-proto", + "prost", + "serde", + "serde_json", + "tokio", + "tonic", + "tracing", +] + [[package]] name = "iggy_connector_postgres_sink" version = "0.4.1-edge.1" @@ -10713,9 +10730,9 @@ dependencies = [ [[package]] name = "rmcp" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0810a9f717d9828f475fe1f629f4c305c8464b7f496c3a854b58d29e65f4058e" +checksum = "1d1f571c72940a19d9532fe52dbea8bc9912bf1d766c2970bb824056b86f3f59" dependencies = [ "async-trait", "base64", @@ -10745,9 +10762,9 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aefac48c364756e97f04c0401ba3231e8607882c7c1d92da0437dc16307904d" +checksum = "1aad0035b69380782d78ea95b508327e6deaa2235909053e596eea8f27b5e1d5" dependencies = [ "darling 0.23.0", "proc-macro2", @@ -13059,6 +13076,7 @@ dependencies = [ "axum", "base64", "bytes", + "flate2", "h2 0.4.15", "http 1.4.2", "http-body 1.0.1", diff --git a/Cargo.toml b/Cargo.toml index 4c6777fdea..6d970e1267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/influxdb_source", + "core/connectors/sources/otlp_source", "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", @@ -222,6 +223,7 @@ opentelemetry-otlp = { version = "0.32.0", features = [ "http-proto", "reqwest-client", ] } +opentelemetry-proto = { version = "0.32.0", default-features = false } opentelemetry-semantic-conventions = "0.32.0" opentelemetry_sdk = { version = "0.32.1", features = [ "logs", @@ -308,6 +310,7 @@ tokio-rustls = "0.26.4" tokio-tungstenite = { version = "0.29", features = ["rustls-tls-webpki-roots"] } tokio-util = { version = "0.7.18", features = ["compat"] } toml = "1.1.2" +tonic = { version = "0.14.6", default-features = false } tower-http = { version = "0.7.0", features = ["add-extension", "cors", "trace"] } tracing = "0.1.44" tracing-appender = "0.2.5" diff --git a/Dockerfile.connectors b/Dockerfile.connectors new file mode 100644 index 0000000000..51b60e8ae3 --- /dev/null +++ b/Dockerfile.connectors @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# syntax=docker/dockerfile:1.7 +# +# iggy-connectors runtime image — ships the iggy-connectors binary and the +# OTLP/gRPC source, OTLP/gRPC sink, quickwit sink plugins (.so). Config files are injected +# via K8s ConfigMaps. +# +# Builder: rust:1-slim-bookworm (mold linker, BuildKit cache mounts) +# Runtime: debian:bookworm-slim + libssl3 + +# ---- Build ---- +FROM rust:1-slim-bookworm AS build +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + pkg-config libssl-dev clang mold \ + && rm -rf /var/lib/apt/lists/* + +COPY . . + +ARG TARGETARCH +RUN --mount=type=cache,id=iggy-connectors-registry,sharing=locked,target=/usr/local/cargo/registry \ + --mount=type=cache,id=iggy-connectors-git,sharing=locked,target=/usr/local/cargo/git \ + --mount=type=cache,id=iggy-connectors-target-${TARGETARCH},target=/app/target \ + RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=mold" \ + cargo build --release --bin iggy-connectors \ + && cargo build --release -p iggy_connector_otlp_source \ + && cargo build --release -p iggy_connector_quickwit_sink \ + && cargo build --release -p iggy_connector_otlp_sink \ + && cp target/release/iggy-connectors /usr/local/bin/iggy-connectors \ + && cp target/release/libiggy_connector_otlp_source.so /usr/local/lib/libiggy_connector_otlp_source.so \ + && cp target/release/libiggy_connector_quickwit_sink.so /usr/local/lib/libiggy_connector_quickwit_sink.so \ + && cp target/release/libiggy_connector_otlp_sink.so /usr/local/lib/libiggy_connector_otlp_sink.so \ + && strip /usr/local/bin/iggy-connectors + +# ---- Runtime ---- +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates libssl3 \ + && rm -rf /var/lib/apt/lists/* \ + && useradd --system --uid 10001 --home-dir /connectors \ + --shell /usr/sbin/nologin iggy \ + && mkdir -p /connectors/plugins /connectors/state \ + && chown -R iggy:iggy /connectors + +COPY --from=build /usr/local/bin/iggy-connectors /usr/local/bin/iggy-connectors +COPY --from=build /usr/local/lib/libiggy_connector_otlp_source.so \ + /connectors/plugins/libiggy_connector_otlp_source.so +COPY --from=build /usr/local/lib/libiggy_connector_quickwit_sink.so \ + /connectors/plugins/libiggy_connector_quickwit_sink.so +COPY --from=build /usr/local/lib/libiggy_connector_otlp_sink.so \ + /connectors/plugins/libiggy_connector_otlp_sink.so + +USER iggy +WORKDIR /connectors + +# Runtime config: IGGY_CONNECTORS_CONFIG_PATH → /connectors/runtime.toml (ConfigMap) +# Plugin configs: /connectors/plugins/otlp_source.toml (ConfigMap) +ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml + +EXPOSE 8081 + +ENTRYPOINT ["/usr/local/bin/iggy-connectors"] diff --git a/core/connectors/sources/otlp_source/Cargo.toml b/core/connectors/sources/otlp_source/Cargo.toml new file mode 100644 index 0000000000..eb9a35d3a1 --- /dev/null +++ b/core/connectors/sources/otlp_source/Cargo.toml @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_otlp_source" +version = "0.4.1-edge.1" +description = "Iggy OTLP/gRPC source connector - receives logs, metrics, and traces from any OpenTelemetry SDK or Collector and writes them to Iggy streams as JSON" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "opentelemetry", "otlp"] +categories = ["command-line-utilities", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +# dashmap and once_cell are not imported directly in this crate's source, but +# the source_connector! macro (in iggy_connector_sdk::source) expands bare +# `use dashmap::DashMap` and `use once_cell::sync::Lazy` into this crate's +# namespace, so they must be listed here. Remove them only after the SDK macro +# is updated to use `$crate::connector_macro_support::{DashMap, Lazy}`. +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +prost = { workspace = true } +opentelemetry-proto = { workspace = true, features = [ + "gen-tonic", + "logs", + "metrics", + "trace", +] } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true, features = ["transport", "router", "gzip"] } +tracing = { workspace = true } diff --git a/core/connectors/sources/otlp_source/README.md b/core/connectors/sources/otlp_source/README.md new file mode 100644 index 0000000000..6e23a05931 --- /dev/null +++ b/core/connectors/sources/otlp_source/README.md @@ -0,0 +1,44 @@ +# OTLP Source + +Receives logs, metrics, and traces from any OpenTelemetry SDK or Collector +over gRPC (OTLP/gRPC protocol) and writes them to an Iggy stream as JSON +messages. + +## How it works + +The connector binds a gRPC server (default port 4317, the OTLP standard) and +implements all three collector services: `LogsService`, `MetricsService`, and +`TraceService`. Each incoming export request is deserialized from the +`opentelemetry-proto` wire format and serialized to JSON. The JSON messages are +buffered in an in-process channel and drained by the runtime via the `poll()` +call. + +Gzip compression is enabled on every service. OTel SDKs and the Collector's +OTLP exporter compress payloads by default, so the connector accepts and sends +gzip on all three services. + +## JSON schema + +Each message has a `signal` field (`"log"`, `"metric"`, or `"trace"`) plus +signal-specific fields: + +**Logs** — `timestamp_ns`, `observed_timestamp_ns`, `severity`, +`severity_text`, `body`, `trace_id`, `span_id`, `service_name`, `attributes` + +**Metrics** — `name`, `description`, `unit`, `kind` (gauge/sum/histogram/…), +`data_points`, `resource`, `attributes` + +**Traces** — `trace_id`, `span_id`, `parent_span_id`, `name`, `kind`, +`start_time_ns`, `end_time_ns`, `status`, `service_name`, `attributes`, +`events`, `links` + +## Configuration + +```toml +[plugin_config] +listen_addr = "0.0.0.0:4317" # gRPC bind address +channel_capacity = 50000 # in-process buffer (messages) +batch_size = 1000 # max messages returned per poll() +``` + +Point any OTel SDK or Collector at `grpc://host:4317` (no TLS by default). diff --git a/core/connectors/sources/otlp_source/config.toml b/core/connectors/sources/otlp_source/config.toml new file mode 100644 index 0000000000..24341c2a23 --- /dev/null +++ b/core/connectors/sources/otlp_source/config.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "otlp" +enabled = true +version = 0 +name = "OTLP source" +path = "../../target/release/libiggy_connector_otlp_source" +verbose = false + +[[streams]] +stream = "otel" +topic = "signals" +schema = "json" +batch_length = 1000 +linger_time = "5ms" + +[plugin_config] +listen_addr = "0.0.0.0:4317" +channel_capacity = 50000 +batch_size = 1000 diff --git a/core/connectors/sources/otlp_source/src/convert.rs b/core/connectors/sources/otlp_source/src/convert.rs new file mode 100644 index 0000000000..276a7136fb --- /dev/null +++ b/core/connectors/sources/otlp_source/src/convert.rs @@ -0,0 +1,331 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy_connector_sdk::ProducedMessage; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; +use opentelemetry_proto::tonic::metrics::v1::{Metric, metric, number_data_point}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use serde_json::{Map, Value, json}; +use std::fmt::Write as _; +use tracing::warn; + +pub fn export_logs_to_messages(req: ExportLogsServiceRequest) -> Vec { + let mut messages = Vec::new(); + for resource_logs in req.resource_logs { + let resource_attrs = extract_resource_attrs(resource_logs.resource.as_ref()); + let service_name = resource_attrs + .get("service.name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + + let resource_value = Value::Object(resource_attrs); + for scope_logs in resource_logs.scope_logs { + for record in scope_logs.log_records { + let body = record.body.as_ref().map(any_value_to_json); + let mut doc = json!({ + "signal": "log", + "timestamp_ns": record.time_unix_nano, + "observed_timestamp_ns": record.observed_time_unix_nano, + "severity": severity_number_to_text(record.severity_number), + "severity_text": record.severity_text, + "body": body, + "trace_id": bytes_to_hex(&record.trace_id), + "span_id": bytes_to_hex(&record.span_id), + "service_name": service_name, + "resource": resource_value.clone(), + "attributes": extract_attrs(&record.attributes), + }); + if let Some(obj) = doc.as_object_mut() { + obj.retain(|_, v| !v.is_null() && v != &Value::String(String::new())); + } + match serde_json::to_vec(&doc) { + Ok(payload) => messages.push(ProducedMessage { + id: None, + checksum: None, + timestamp: (record.time_unix_nano != 0).then_some(record.time_unix_nano), + origin_timestamp: None, + headers: None, + payload, + }), + Err(err) => warn!("Failed to serialize log record: {err}"), + } + } + } + } + messages +} + +pub fn export_metrics_to_messages(req: ExportMetricsServiceRequest) -> Vec { + let mut messages = Vec::new(); + for resource_metrics in req.resource_metrics { + let resource_attrs = extract_resource_attrs(resource_metrics.resource.as_ref()); + let service_name = resource_attrs + .get("service.name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + + for scope_metrics in resource_metrics.scope_metrics { + for metric in scope_metrics.metrics { + let data_points = metric_to_data_points(&metric, &resource_attrs, &service_name); + messages.extend(data_points); + } + } + } + messages +} + +pub fn export_traces_to_messages(req: ExportTraceServiceRequest) -> Vec { + let mut messages = Vec::new(); + for resource_spans in req.resource_spans { + let resource_attrs = extract_resource_attrs(resource_spans.resource.as_ref()); + let service_name = resource_attrs + .get("service.name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + + let resource_value = Value::Object(resource_attrs); + for scope_spans in resource_spans.scope_spans { + for span in scope_spans.spans { + let status_code = span + .status + .as_ref() + .map(|s| status_code_to_text(s.code)) + .unwrap_or("unset"); + let status_message = span + .status + .as_ref() + .map(|s| s.message.as_str()) + .unwrap_or_default(); + + let mut doc = json!({ + "signal": "trace", + "trace_id": bytes_to_hex(&span.trace_id), + "span_id": bytes_to_hex(&span.span_id), + "parent_span_id": bytes_to_hex(&span.parent_span_id), + "name": span.name, + "kind": span_kind_to_text(span.kind), + "start_time_ns": span.start_time_unix_nano, + "end_time_ns": span.end_time_unix_nano, + "status": status_code, + "status_message": status_message, + "service_name": service_name, + "resource": resource_value.clone(), + "attributes": extract_attrs(&span.attributes), + }); + if let Some(obj) = doc.as_object_mut() { + obj.retain(|_, v| !v.is_null() && v != &Value::String(String::new())); + } + match serde_json::to_vec(&doc) { + Ok(payload) => messages.push(ProducedMessage { + id: None, + checksum: None, + timestamp: (span.start_time_unix_nano != 0) + .then_some(span.start_time_unix_nano), + origin_timestamp: None, + headers: None, + payload, + }), + Err(err) => warn!("Failed to serialize span: {err}"), + } + } + } + } + messages +} + +fn metric_to_data_points( + metric: &Metric, + resource_attrs: &Map, + service_name: &str, +) -> Vec { + let mut messages = Vec::new(); + + let base = |time_ns: u64, value: Value, attrs: &[KeyValue], metric_type: &str| -> Value { + json!({ + "signal": "metric", + "name": metric.name, + "type": metric_type, + "unit": metric.unit, + "timestamp_ns": time_ns, + "value": value, + "service_name": service_name, + "resource": resource_attrs, + "attributes": extract_attrs(attrs), + }) + }; + + match &metric.data { + Some(metric::Data::Gauge(gauge)) => { + for dp in &gauge.data_points { + let value = number_dp_value(&dp.value); + let doc = base(dp.time_unix_nano, value, &dp.attributes, "gauge"); + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + Some(metric::Data::Sum(sum)) => { + for dp in &sum.data_points { + let value = number_dp_value(&dp.value); + let mut doc = base(dp.time_unix_nano, value, &dp.attributes, "sum"); + if let Some(obj) = doc.as_object_mut() { + obj.insert("is_monotonic".into(), Value::Bool(sum.is_monotonic)); + } + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + Some(metric::Data::Histogram(hist)) => { + for dp in &hist.data_points { + let doc = base( + dp.time_unix_nano, + json!({ "count": dp.count, "sum": dp.sum }), + &dp.attributes, + "histogram", + ); + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + Some(metric::Data::ExponentialHistogram(eh)) => { + for dp in &eh.data_points { + let doc = base( + dp.time_unix_nano, + json!({ "count": dp.count, "sum": dp.sum, "scale": dp.scale }), + &dp.attributes, + "exponential_histogram", + ); + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + Some(metric::Data::Summary(summary)) => { + for dp in &summary.data_points { + let doc = base( + dp.time_unix_nano, + json!({ "count": dp.count, "sum": dp.sum }), + &dp.attributes, + "summary", + ); + push_metric_doc(doc, dp.time_unix_nano, &mut messages); + } + } + None => {} + } + + messages +} + +fn push_metric_doc(doc: Value, time_ns: u64, messages: &mut Vec) { + match serde_json::to_vec(&doc) { + Ok(payload) => messages.push(ProducedMessage { + id: None, + checksum: None, + timestamp: Some(time_ns), + origin_timestamp: None, + headers: None, + payload, + }), + Err(err) => warn!("Failed to serialize metric data point: {err}"), + } +} + +fn number_dp_value(value: &Option) -> Value { + match value { + Some(number_data_point::Value::AsDouble(d)) => json!(d), + Some(number_data_point::Value::AsInt(i)) => json!(i), + None => Value::Null, + } +} + +pub fn extract_resource_attrs(resource: Option<&Resource>) -> Map { + resource + .map(|r| extract_attrs(&r.attributes)) + .unwrap_or_default() +} + +pub fn extract_attrs(attrs: &[KeyValue]) -> Map { + attrs + .iter() + .map(|kv| { + let value = match kv.value.as_ref() { + Some(av) if matches!(av.value, Some(any_value::Value::StringValueStrindex(_))) => { + warn!(key = %kv.key, "dropping attribute with unrecognized AnyValue variant"); + Value::Null + } + Some(av) => any_value_to_json(av), + None => Value::Null, + }; + (kv.key.clone(), value) + }) + .collect() +} + +pub fn any_value_to_json(value: &AnyValue) -> Value { + match &value.value { + Some(any_value::Value::StringValue(s)) => Value::String(s.clone()), + Some(any_value::Value::BoolValue(b)) => Value::Bool(*b), + Some(any_value::Value::IntValue(i)) => json!(i), + Some(any_value::Value::DoubleValue(d)) => json!(d), + Some(any_value::Value::ArrayValue(arr)) => { + Value::Array(arr.values.iter().map(any_value_to_json).collect()) + } + Some(any_value::Value::KvlistValue(kvlist)) => Value::Object(extract_attrs(&kvlist.values)), + Some(any_value::Value::BytesValue(bytes)) => Value::String(bytes_to_hex(bytes)), + Some(any_value::Value::StringValueStrindex(_)) | None => Value::Null, + } +} + +pub fn bytes_to_hex(bytes: &[u8]) -> String { + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + let _ = write!(s, "{b:02x}"); + } + s +} + +fn severity_number_to_text(number: i32) -> &'static str { + match number { + 1..=4 => "TRACE", + 5..=8 => "DEBUG", + 9..=12 => "INFO", + 13..=16 => "WARN", + 17..=20 => "ERROR", + 21..=24 => "FATAL", + _ => "UNSPECIFIED", + } +} + +fn status_code_to_text(code: i32) -> &'static str { + match code { + 1 => "ok", + 2 => "error", + _ => "unset", + } +} + +fn span_kind_to_text(kind: i32) -> &'static str { + match kind { + 1 => "internal", + 2 => "server", + 3 => "client", + 4 => "producer", + 5 => "consumer", + _ => "unspecified", + } +} diff --git a/core/connectors/sources/otlp_source/src/lib.rs b/core/connectors/sources/otlp_source/src/lib.rs new file mode 100644 index 0000000000..bd6db95c15 --- /dev/null +++ b/core/connectors/sources/otlp_source/src/lib.rs @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use iggy_connector_sdk::{ + ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{Mutex, mpsc, oneshot}; +use tokio::task::JoinHandle; +use tonic::transport::server::TcpIncoming; +use tracing::info; + +pub(crate) mod convert; +pub(crate) mod server; + +source_connector!(OtlpSource); + +/// How messages are stored in the Iggy topic. +/// +/// `Json` (default): each OTLP signal item (span / data-point / log record) +/// is stored as a flat JSON document. Human-readable but verbose. +/// +/// `Proto`: the entire `Export*ServiceRequest` proto is stored as raw bytes +/// per gRPC call. Typically 4-5x smaller than JSON and zero-copy on the sink +/// side when paired with `otlp_sink`'s `format = "proto"`. +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum StorageFormat { + #[default] + Json, + Proto, +} + +#[derive(Debug)] +pub struct OtlpSource { + id: u32, + config: OtlpSourceConfig, + rx: Mutex>>, + shutdown_tx: Mutex>>, + server_task: Mutex>>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OtlpSourceConfig { + pub listen_addr: String, + #[serde(default = "default_channel_capacity")] + pub channel_capacity: usize, + #[serde(default = "default_batch_size")] + pub batch_size: usize, + #[serde(default)] + pub format: StorageFormat, +} + +fn default_channel_capacity() -> usize { + 50_000 +} + +fn default_batch_size() -> usize { + 1_000 +} + +impl OtlpSource { + pub fn new(id: u32, config: OtlpSourceConfig, _state: Option) -> Self { + OtlpSource { + id, + config, + rx: Mutex::new(None), + shutdown_tx: Mutex::new(None), + server_task: Mutex::new(None), + } + } +} + +#[async_trait] +impl Source for OtlpSource { + async fn open(&mut self) -> Result<(), Error> { + if self.rx.lock().await.is_some() { + return Err(Error::InitError( + "OTLP source connector is already open".to_string(), + )); + } + + let addr = self + .config + .listen_addr + .parse() + .map_err(|err| Error::InitError(format!("Invalid listen address: {err}")))?; + + let incoming = TcpIncoming::bind(addr).map_err(|err| { + Error::InitError(format!("Failed to bind {}: {err}", self.config.listen_addr)) + })?; + + info!( + "OTLP source connector with ID: {} listening on {}", + self.id, self.config.listen_addr + ); + + let (tx, rx) = mpsc::channel(self.config.channel_capacity); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let handle = tokio::spawn(server::run_grpc_server( + incoming, + tx, + shutdown_rx, + self.config.format, + )); + + *self.rx.lock().await = Some(rx); + *self.shutdown_tx.lock().await = Some(shutdown_tx); + *self.server_task.lock().await = Some(handle); + + Ok(()) + } + + async fn poll(&self) -> Result { + let mut rx_guard = self.rx.lock().await; + let rx = rx_guard.as_mut().ok_or_else(|| { + Error::InitError("OTLP source connector is not initialized".to_string()) + })?; + + let first = match rx.recv().await { + Some(msg) => msg, + None => { + return Err(Error::Connection( + "OTLP gRPC server terminated unexpectedly".to_string(), + )); + } + }; + + let mut messages = Vec::with_capacity(self.config.batch_size); + messages.push(first); + + while messages.len() < self.config.batch_size { + match rx.try_recv() { + Ok(msg) => messages.push(msg), + Err(_) => break, + } + } + + let schema = match self.config.format { + StorageFormat::Json => Schema::Json, + StorageFormat::Proto => Schema::Raw, + }; + + Ok(ProducedMessages { + schema, + messages, + state: None, + }) + } + + async fn close(&mut self) -> Result<(), Error> { + if let Some(tx) = self.shutdown_tx.lock().await.take() { + let _ = tx.send(()); + } + let task = self.server_task.lock().await.take(); + if let Some(task) = task { + task.abort(); + let _ = task.await; + } + *self.rx.lock().await = None; + info!("OTLP source connector with ID: {} closed.", self.id); + Ok(()) + } +} diff --git a/core/connectors/sources/otlp_source/src/server.rs b/core/connectors/sources/otlp_source/src/server.rs new file mode 100644 index 0000000000..2e78bed3c6 --- /dev/null +++ b/core/connectors/sources/otlp_source/src/server.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::StorageFormat; +use crate::convert; +use iggy_connector_sdk::ProducedMessage; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsPartialSuccess, ExportLogsServiceRequest, ExportLogsServiceResponse, + logs_service_server::{LogsService, LogsServiceServer}, +}; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsPartialSuccess, ExportMetricsServiceRequest, ExportMetricsServiceResponse, + metrics_service_server::{MetricsService, MetricsServiceServer}, +}; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTracePartialSuccess, ExportTraceServiceRequest, ExportTraceServiceResponse, + trace_service_server::{TraceService, TraceServiceServer}, +}; +use prost::Message as ProstMessage; +use tokio::sync::{mpsc, oneshot}; +use tonic::codec::CompressionEncoding; +use tonic::transport::server::TcpIncoming; +use tonic::{Request, Response, Status}; +use tracing::{error, info, warn}; + +pub async fn run_grpc_server( + incoming: TcpIncoming, + tx: mpsc::Sender, + shutdown: oneshot::Receiver<()>, + format: StorageFormat, +) { + let logs_svc = LogsServiceImpl { + tx: tx.clone(), + format, + }; + let metrics_svc = MetricsServiceImpl { + tx: tx.clone(), + format, + }; + let trace_svc = TraceServiceImpl { tx, format }; + + // OTel SDKs and the Collector's OTLP exporter gzip-compress payloads by + // default, so every service must accept gzip on the wire. Responses are tiny + // (empty partial_success), but advertising gzip on send is harmless and lets + // clients negotiate it. + let logs_server = LogsServiceServer::new(logs_svc) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + let metrics_server = MetricsServiceServer::new(metrics_svc) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + let trace_server = TraceServiceServer::new(trace_svc) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + if let Err(err) = tonic::transport::Server::builder() + .add_service(logs_server) + .add_service(metrics_server) + .add_service(trace_server) + .serve_with_incoming_shutdown(incoming, async { + let _ = shutdown.await; + info!("OTLP gRPC server received shutdown signal"); + }) + .await + { + error!("OTLP gRPC server error: {err}"); + } +} + +struct LogsServiceImpl { + tx: mpsc::Sender, + format: StorageFormat, +} + +struct MetricsServiceImpl { + tx: mpsc::Sender, + format: StorageFormat, +} + +struct TraceServiceImpl { + tx: mpsc::Sender, + format: StorageFormat, +} + +#[tonic::async_trait] +impl LogsService for LogsServiceImpl { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let messages = encode_or_convert(request.into_inner(), self.format, "logs"); + let rejected = send_messages(&self.tx, messages, "logs"); + let partial_success = (rejected > 0).then(|| ExportLogsPartialSuccess { + rejected_log_records: rejected, + error_message: "channel full; records dropped".to_string(), + }); + Ok(Response::new(ExportLogsServiceResponse { partial_success })) + } +} + +#[tonic::async_trait] +impl MetricsService for MetricsServiceImpl { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let messages = encode_or_convert(request.into_inner(), self.format, "metrics"); + let rejected = send_messages(&self.tx, messages, "metrics"); + let partial_success = (rejected > 0).then(|| ExportMetricsPartialSuccess { + rejected_data_points: rejected, + error_message: "channel full; data points dropped".to_string(), + }); + Ok(Response::new(ExportMetricsServiceResponse { + partial_success, + })) + } +} + +#[tonic::async_trait] +impl TraceService for TraceServiceImpl { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let messages = encode_or_convert(request.into_inner(), self.format, "traces"); + let rejected = send_messages(&self.tx, messages, "traces"); + let partial_success = (rejected > 0).then(|| ExportTracePartialSuccess { + rejected_spans: rejected, + error_message: "channel full; spans dropped".to_string(), + }); + Ok(Response::new(ExportTraceServiceResponse { + partial_success, + })) + } +} + +trait IntoMessages { + fn into_json_messages(self) -> Vec; +} + +impl IntoMessages for ExportLogsServiceRequest { + fn into_json_messages(self) -> Vec { + convert::export_logs_to_messages(self) + } +} + +impl IntoMessages for ExportMetricsServiceRequest { + fn into_json_messages(self) -> Vec { + convert::export_metrics_to_messages(self) + } +} + +impl IntoMessages for ExportTraceServiceRequest { + fn into_json_messages(self) -> Vec { + convert::export_traces_to_messages(self) + } +} + +fn encode_or_convert(req: R, format: StorageFormat, signal: &str) -> Vec +where + R: ProstMessage + IntoMessages, +{ + match format { + StorageFormat::Json => req.into_json_messages(), + StorageFormat::Proto => { + let mut buf = Vec::new(); + if let Err(e) = req.encode(&mut buf) { + warn!("Failed to encode {signal} proto: {e}"); + return vec![]; + } + vec![ProducedMessage { + id: None, + checksum: None, + timestamp: None, + origin_timestamp: None, + headers: None, + payload: buf, + }] + } + } +} + +fn send_messages( + tx: &mpsc::Sender, + messages: Vec, + signal: &str, +) -> i64 { + let total = messages.len() as i64; + let mut dropped: i64 = 0; + for message in messages { + if tx.try_send(message).is_err() { + dropped += 1; + } + } + if dropped > 0 { + warn!("OTLP channel full, dropped {dropped}/{total} {signal} messages"); + } + dropped +}