From b5d2d71b13500a32afe1425812fdab23ed25567f Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Mon, 22 Jun 2026 15:22:38 +0300 Subject: [PATCH 1/6] feat(connectors/otlp_sink): add OTLP/gRPC sink connector Reads JSON or raw proto messages from Iggy and forwards them to any OTLP-compatible backend (Quickwit, Jaeger, Tempo, etc.) via gRPC. - JSON mode (default): reconstructs OTLP proto from otlp_source JSON output - Proto mode: forwards raw prost-encoded bytes with zero conversion overhead - Per-request gRPC metadata headers for backend index routing (e.g. qw-otel-traces-index, qw-otel-logs-index for Quickwit) - Optional gzip compression Co-Authored-By: Claude Sonnet 4.6 Claude-Session: https://claude.ai/code/session_01Qb1ctTeXahLw5EWWHP69gK --- Cargo.toml | 1 + Dockerfile.connectors | 61 +++ core/connectors/sinks/otlp_sink/Cargo.toml | 51 ++ .../sinks/otlp_sink/src/from_json.rs | 447 ++++++++++++++++++ core/connectors/sinks/otlp_sink/src/lib.rs | 401 ++++++++++++++++ 5 files changed, 961 insertions(+) create mode 100644 Dockerfile.connectors create mode 100644 core/connectors/sinks/otlp_sink/Cargo.toml create mode 100644 core/connectors/sinks/otlp_sink/src/from_json.rs create mode 100644 core/connectors/sinks/otlp_sink/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 4c6777fdea..66001ed848 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ members = [ "core/connectors/sinks/influxdb_sink", "core/connectors/sinks/mongodb_sink", "core/connectors/sinks/postgres_sink", + "core/connectors/sinks/otlp_sink", "core/connectors/sinks/quickwit_sink", "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", diff --git a/Dockerfile.connectors b/Dockerfile.connectors new file mode 100644 index 0000000000..007a58e015 --- /dev/null +++ b/Dockerfile.connectors @@ -0,0 +1,61 @@ +# syntax=docker/dockerfile:1.7 +# +# iggy-connectors runtime image — ships the iggy-connectors binary and the +# OTLP/gRPC source plugin (.so). Config files are injected via K8s ConfigMaps. +# +# Builder: rust:1-slim-bookworm (mold linker, BuildKit cache mounts) +# Runtime: debian:bookworm-slim + libssl3 + +# ---- Build ---- +FROM rust:1-slim-bookworm AS build +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + pkg-config libssl-dev clang mold \ + && rm -rf /var/lib/apt/lists/* + +COPY . . + +ARG TARGETARCH +RUN --mount=type=cache,id=iggy-connectors-registry,sharing=locked,target=/usr/local/cargo/registry \ + --mount=type=cache,id=iggy-connectors-git,sharing=locked,target=/usr/local/cargo/git \ + --mount=type=cache,id=iggy-connectors-target-${TARGETARCH},target=/app/target \ + RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=mold" \ + cargo build --release --bin iggy-connectors \ + && cargo build --release -p iggy_connector_otlp_source \ + && cargo build --release -p iggy_connector_quickwit_sink \ + && cargo build --release -p iggy_connector_otlp_sink \ + && cp target/release/iggy-connectors /usr/local/bin/iggy-connectors \ + && cp target/release/libiggy_connector_otlp_source.so /usr/local/lib/libiggy_connector_otlp_source.so \ + && cp target/release/libiggy_connector_quickwit_sink.so /usr/local/lib/libiggy_connector_quickwit_sink.so \ + && cp target/release/libiggy_connector_otlp_sink.so /usr/local/lib/libiggy_connector_otlp_sink.so \ + && strip /usr/local/bin/iggy-connectors + +# ---- Runtime ---- +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates libssl3 \ + && rm -rf /var/lib/apt/lists/* \ + && useradd --system --uid 10001 --home-dir /connectors \ + --shell /usr/sbin/nologin iggy \ + && mkdir -p /connectors/plugins /connectors/state \ + && chown -R iggy:iggy /connectors + +COPY --from=build /usr/local/bin/iggy-connectors /usr/local/bin/iggy-connectors +COPY --from=build /usr/local/lib/libiggy_connector_otlp_source.so \ + /connectors/plugins/libiggy_connector_otlp_source.so +COPY --from=build /usr/local/lib/libiggy_connector_quickwit_sink.so \ + /connectors/plugins/libiggy_connector_quickwit_sink.so +COPY --from=build /usr/local/lib/libiggy_connector_otlp_sink.so \ + /connectors/plugins/libiggy_connector_otlp_sink.so + +USER iggy +WORKDIR /connectors + +# Runtime config: IGGY_CONNECTORS_CONFIG_PATH → /connectors/runtime.toml (ConfigMap) +# Plugin configs: /connectors/plugins/otlp_source.toml (ConfigMap) +ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml + +EXPOSE 4317 8081 + +ENTRYPOINT ["/usr/local/bin/iggy-connectors"] diff --git a/core/connectors/sinks/otlp_sink/Cargo.toml b/core/connectors/sinks/otlp_sink/Cargo.toml new file mode 100644 index 0000000000..8ed56a216f --- /dev/null +++ b/core/connectors/sinks/otlp_sink/Cargo.toml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_otlp_sink" +version = "0.1.0" +description = "Iggy OTLP/gRPC sink connector — forwards logs, metrics, and traces from Iggy streams to any OpenTelemetry-compatible backend via the OTLP protocol" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "opentelemetry", "otlp"] +categories = ["command-line-utilities", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +bytes = { workspace = true } +iggy_connector_sdk = { workspace = true } +opentelemetry-proto = { version = "0.32.0", default-features = false, features = [ + "gen-tonic", + "logs", + "metrics", + "trace", +] } +prost = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true, features = ["transport", "gzip"] } +tracing = { workspace = true } diff --git a/core/connectors/sinks/otlp_sink/src/from_json.rs b/core/connectors/sinks/otlp_sink/src/from_json.rs new file mode 100644 index 0000000000..78e9038a26 --- /dev/null +++ b/core/connectors/sinks/otlp_sink/src/from_json.rs @@ -0,0 +1,447 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reconstruct OTLP proto requests from the JSON produced by `otlp_source`'s +//! `convert.rs`. The JSON field names match what `convert.rs` emits; adding a +//! new field there requires a corresponding parse here. + +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; +use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; +use opentelemetry_proto::tonic::metrics::v1::{ + Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, metric, number_data_point, +}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status, span}; +use serde_json::Value; +use std::collections::HashMap; + +// --------------------------------------------------------------------------- +// Public entry points +// --------------------------------------------------------------------------- + +pub fn traces_from_json(messages: &[Value]) -> ExportTraceServiceRequest { + // Group spans by service_name so each service gets one ResourceSpans. + let mut groups: HashMap)> = HashMap::new(); + + for msg in messages { + let service = msg + .get("service_name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + let resource = msg.get("resource").cloned().unwrap_or(Value::Null); + let span = json_to_span(msg); + groups + .entry(service) + .or_insert_with(|| (resource, Vec::new())) + .1 + .push(span); + } + + ExportTraceServiceRequest { + resource_spans: groups + .into_values() + .map(|(resource, spans)| ResourceSpans { + resource: Some(json_to_resource(&resource)), + scope_spans: vec![ScopeSpans { + scope: None, + spans, + schema_url: String::new(), + }], + schema_url: String::new(), + }) + .collect(), + } +} + +pub fn metrics_from_json(messages: &[Value]) -> ExportMetricsServiceRequest { + let mut groups: HashMap)> = HashMap::new(); + + for msg in messages { + let service = msg + .get("service_name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + let resource = msg.get("resource").cloned().unwrap_or(Value::Null); + if let Some(metric) = json_to_metric(msg) { + groups + .entry(service) + .or_insert_with(|| (resource, Vec::new())) + .1 + .push(metric); + } + } + + ExportMetricsServiceRequest { + resource_metrics: groups + .into_values() + .map(|(resource, metrics)| ResourceMetrics { + resource: Some(json_to_resource(&resource)), + scope_metrics: vec![ScopeMetrics { + scope: None, + metrics, + schema_url: String::new(), + }], + schema_url: String::new(), + }) + .collect(), + } +} + +pub fn logs_from_json(messages: &[Value]) -> ExportLogsServiceRequest { + let mut groups: HashMap)> = HashMap::new(); + + for msg in messages { + let service = msg + .get("service_name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + let resource = msg.get("resource").cloned().unwrap_or(Value::Null); + let record = json_to_log_record(msg); + groups + .entry(service) + .or_insert_with(|| (resource, Vec::new())) + .1 + .push(record); + } + + ExportLogsServiceRequest { + resource_logs: groups + .into_values() + .map(|(resource, log_records)| ResourceLogs { + resource: Some(json_to_resource(&resource)), + scope_logs: vec![ScopeLogs { + scope: None, + log_records, + schema_url: String::new(), + }], + schema_url: String::new(), + }) + .collect(), + } +} + +// --------------------------------------------------------------------------- +// Per-signal converters +// --------------------------------------------------------------------------- + +fn json_to_span(v: &Value) -> Span { + Span { + trace_id: hex_to_bytes(v.get("trace_id").and_then(Value::as_str).unwrap_or("")), + span_id: hex_to_bytes(v.get("span_id").and_then(Value::as_str).unwrap_or("")), + parent_span_id: hex_to_bytes( + v.get("parent_span_id").and_then(Value::as_str).unwrap_or(""), + ), + name: v + .get("name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(), + kind: span_kind_from_text(v.get("kind").and_then(Value::as_str).unwrap_or("")), + start_time_unix_nano: v + .get("start_time_ns") + .and_then(Value::as_u64) + .unwrap_or(0), + end_time_unix_nano: v.get("end_time_ns").and_then(Value::as_u64).unwrap_or(0), + attributes: json_obj_to_kv(v.get("attributes")), + dropped_attributes_count: 0, + events: vec![], + dropped_events_count: 0, + links: vec![], + dropped_links_count: 0, + status: Some(Status { + code: status_code_from_text( + v.get("status").and_then(Value::as_str).unwrap_or("unset"), + ), + message: v + .get("status_message") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(), + }), + flags: 0, + trace_state: String::new(), + } +} + +fn json_to_metric(v: &Value) -> Option { + let name = v.get("name")?.as_str()?.to_owned(); + let unit = v + .get("unit") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + let time_ns = v.get("timestamp_ns").and_then(Value::as_u64).unwrap_or(0); + let attrs = json_obj_to_kv(v.get("attributes")); + let metric_type = v + .get("type") + .and_then(Value::as_str) + .unwrap_or("gauge"); + + let value_field = v.get("value"); + let as_double = value_field.and_then(Value::as_f64).unwrap_or(0.0); + + let data = match metric_type { + "sum" => { + let dp = NumberDataPoint { + attributes: attrs, + time_unix_nano: time_ns, + value: Some(number_data_point::Value::AsDouble(as_double)), + ..Default::default() + }; + Some(metric::Data::Sum(Sum { + data_points: vec![dp], + aggregation_temporality: 2, // CUMULATIVE + is_monotonic: false, + })) + } + _ => { + // Default to gauge for gauge, histogram summary etc. + // Histogram detail is lost in convert.rs (only count+sum stored); + // reconstruct as gauge using count as the value. + let dp_value = if metric_type == "histogram" || metric_type == "summary" { + value_field + .and_then(|v| v.get("count")) + .and_then(Value::as_f64) + .unwrap_or(0.0) + } else { + as_double + }; + Some(metric::Data::Gauge(Gauge { + data_points: vec![NumberDataPoint { + attributes: attrs, + time_unix_nano: time_ns, + value: Some(number_data_point::Value::AsDouble(dp_value)), + ..Default::default() + }], + })) + } + }; + + Some(Metric { + name, + description: String::new(), + unit, + data, + metadata: vec![], + }) +} + +fn json_to_log_record(v: &Value) -> LogRecord { + LogRecord { + time_unix_nano: v + .get("timestamp_ns") + .and_then(Value::as_u64) + .unwrap_or(0), + observed_time_unix_nano: v + .get("observed_timestamp_ns") + .and_then(Value::as_u64) + .unwrap_or(0), + severity_number: severity_from_text( + v.get("severity").and_then(Value::as_str).unwrap_or(""), + ), + severity_text: v + .get("severity_text") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(), + body: v + .get("body") + .map(|b| AnyValue { + value: Some(any_value::Value::StringValue(b.to_string())), + }), + attributes: json_obj_to_kv(v.get("attributes")), + trace_id: hex_to_bytes(v.get("trace_id").and_then(Value::as_str).unwrap_or("")), + span_id: hex_to_bytes(v.get("span_id").and_then(Value::as_str).unwrap_or("")), + dropped_attributes_count: 0, + flags: 0, + ..Default::default() + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn json_to_resource(v: &Value) -> Resource { + Resource { + attributes: json_obj_to_kv(Some(v)), + dropped_attributes_count: 0, + ..Default::default() + } +} + +fn json_obj_to_kv(v: Option<&Value>) -> Vec { + let obj = match v.and_then(Value::as_object) { + Some(o) => o, + None => return vec![], + }; + obj.iter() + .map(|(k, val)| KeyValue { + key: k.clone(), + value: Some(json_to_any_value(val)), + ..Default::default() + }) + .collect() +} + +fn json_to_any_value(v: &Value) -> AnyValue { + let inner = match v { + Value::String(s) => any_value::Value::StringValue(s.clone()), + Value::Bool(b) => any_value::Value::BoolValue(*b), + Value::Number(n) => { + if let Some(i) = n.as_i64() { + any_value::Value::IntValue(i) + } else { + any_value::Value::DoubleValue(n.as_f64().unwrap_or(0.0)) + } + } + Value::Array(arr) => { + use opentelemetry_proto::tonic::common::v1::ArrayValue; + any_value::Value::ArrayValue(ArrayValue { + values: arr.iter().map(json_to_any_value).collect(), + }) + } + Value::Object(obj) => { + use opentelemetry_proto::tonic::common::v1::KeyValueList; + any_value::Value::KvlistValue(KeyValueList { + values: obj + .iter() + .map(|(k, val)| KeyValue { + key: k.clone(), + value: Some(json_to_any_value(val)), + ..Default::default() + }) + .collect(), + }) + } + Value::Null => any_value::Value::StringValue(String::new()), + }; + AnyValue { value: Some(inner) } +} + +fn hex_to_bytes(s: &str) -> Vec { + if s.is_empty() { + return vec![]; + } + (0..s.len()) + .step_by(2) + .filter_map(|i| u8::from_str_radix(s.get(i..i + 2)?, 16).ok()) + .collect() +} + +fn span_kind_from_text(s: &str) -> i32 { + match s { + "internal" => span::SpanKind::Internal as i32, + "server" => span::SpanKind::Server as i32, + "client" => span::SpanKind::Client as i32, + "producer" => span::SpanKind::Producer as i32, + "consumer" => span::SpanKind::Consumer as i32, + _ => span::SpanKind::Unspecified as i32, + } +} + +fn status_code_from_text(s: &str) -> i32 { + match s { + "ok" => 1, + "error" => 2, + _ => 0, // unset + } +} + +fn severity_from_text(s: &str) -> i32 { + match s { + "TRACE" => 1, + "DEBUG" => 5, + "INFO" => 9, + "WARN" => 13, + "ERROR" => 17, + "FATAL" => 21, + _ => 0, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn given_empty_messages_should_return_empty_request() { + let req = traces_from_json(&[]); + assert!(req.resource_spans.is_empty()); + } + + #[test] + fn given_trace_json_should_reconstruct_span() { + let msg = json!({ + "signal": "trace", + "trace_id": "0102030405060708090a0b0c0d0e0f10", + "span_id": "0102030405060708", + "name": "test-span", + "kind": "server", + "start_time_ns": 1_000_000_u64, + "end_time_ns": 2_000_000_u64, + "status": "ok", + "service_name": "svc-a", + "resource": { "service.name": "svc-a" }, + "attributes": { "http.method": "GET" } + }); + let req = traces_from_json(&[msg]); + assert_eq!(req.resource_spans.len(), 1); + let spans = &req.resource_spans[0].scope_spans[0].spans; + assert_eq!(spans.len(), 1); + assert_eq!(spans[0].name, "test-span"); + assert_eq!(spans[0].kind, span::SpanKind::Server as i32); + assert_eq!(spans[0].start_time_unix_nano, 1_000_000); + assert_eq!(spans[0].status.as_ref().unwrap().code, 1); // ok + } + + #[test] + fn given_hex_trace_id_should_decode_correctly() { + let bytes = hex_to_bytes("deadbeef"); + assert_eq!(bytes, vec![0xde, 0xad, 0xbe, 0xef]); + } + + #[test] + fn given_empty_hex_should_return_empty_bytes() { + assert!(hex_to_bytes("").is_empty()); + } + + #[test] + fn given_log_json_should_reconstruct_log_record() { + let msg = json!({ + "signal": "log", + "timestamp_ns": 999_u64, + "severity": "WARN", + "severity_text": "WARNING", + "body": "something happened", + "service_name": "svc-b", + "resource": {}, + "attributes": {} + }); + let req = logs_from_json(&[msg]); + assert_eq!(req.resource_logs.len(), 1); + let records = &req.resource_logs[0].scope_logs[0].log_records; + assert_eq!(records[0].severity_number, 13); // WARN + } +} diff --git a/core/connectors/sinks/otlp_sink/src/lib.rs b/core/connectors/sinks/otlp_sink/src/lib.rs new file mode 100644 index 0000000000..e0a228003c --- /dev/null +++ b/core/connectors/sinks/otlp_sink/src/lib.rs @@ -0,0 +1,401 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceRequest, logs_service_client::LogsServiceClient, +}; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsServiceRequest, metrics_service_client::MetricsServiceClient, +}; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, trace_service_client::TraceServiceClient, +}; +use prost::Message; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tonic::codec::CompressionEncoding; +use tonic::metadata::{MetadataKey, MetadataValue}; +use tonic::transport::Channel; +use tracing::{debug, info, warn}; + +mod from_json; + +sink_connector!(OtlpSink); + +/// Which OTLP signal this sink handles. Must match the Iggy topic content. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum OtlpSignal { + Traces, + Metrics, + Logs, +} + +/// How messages are stored in the Iggy topic. +/// +/// `Json` (default): messages are JSON produced by the otlp_source JSON path. +/// The sink reconstructs OTLP proto from the JSON before forwarding. +/// +/// `Proto`: messages are raw prost-encoded OTLP proto bytes (one +/// `Export*ServiceRequest` per message). The sink forwards them directly, +/// with zero deserialization overhead. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum StorageFormat { + #[default] + Json, + Proto, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OtlpSinkConfig { + /// gRPC endpoint, e.g. "http://quickwit:7281" + pub endpoint: String, + pub signal: OtlpSignal, + #[serde(default)] + pub format: StorageFormat, + #[serde(default)] + pub compression: bool, + /// Extra gRPC metadata headers sent with every export request. + /// QW uses these to route to a specific index: + /// qw-otel-traces-index = "flows3" + /// qw-otel-logs-index = "otel-logs-v0_7" + #[serde(default)] + pub headers: HashMap, +} + +enum Client { + Traces(TraceServiceClient), + Metrics(MetricsServiceClient), + Logs(LogsServiceClient), +} + +#[derive(Debug)] +pub struct OtlpSink { + id: u32, + config: OtlpSinkConfig, + client: Option, + // Pre-parsed metadata entries derived from config.headers at open() time. + metadata: Vec<(MetadataKey, MetadataValue)>, +} + +impl OtlpSink { + pub fn new(id: u32, config: OtlpSinkConfig) -> Self { + Self { + id, + config, + client: None, + metadata: Vec::new(), + } + } + + fn with_headers(&self, msg: T) -> tonic::Request { + let mut req = tonic::Request::new(msg); + for (k, v) in &self.metadata { + req.metadata_mut().insert(k.clone(), v.clone()); + } + req + } +} + +impl std::fmt::Debug for Client { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Client::Traces(_) => write!(f, "TraceServiceClient"), + Client::Metrics(_) => write!(f, "MetricsServiceClient"), + Client::Logs(_) => write!(f, "LogsServiceClient"), + } + } +} + +#[async_trait] +impl Sink for OtlpSink { + async fn open(&mut self) -> Result<(), Error> { + for (k, v) in &self.config.headers { + let key = MetadataKey::from_bytes(k.as_bytes()) + .map_err(|e| Error::InvalidConfigValue(format!("header key '{k}': {e}")))?; + let val = MetadataValue::try_from(v.as_str()) + .map_err(|e| Error::InvalidConfigValue(format!("header value for '{k}': {e}")))?; + self.metadata.push((key, val)); + } + + let channel = Channel::from_shared(self.config.endpoint.clone()) + .map_err(|e| Error::InvalidConfigValue(format!("endpoint: {e}")))? + .connect() + .await + .map_err(|e| Error::InitError(format!("OTLP endpoint: {e}")))?; + + self.client = Some(match self.config.signal { + OtlpSignal::Traces => { + let mut c = TraceServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + Client::Traces(c) + } + OtlpSignal::Metrics => { + let mut c = MetricsServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + Client::Metrics(c) + } + OtlpSignal::Logs => { + let mut c = LogsServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + Client::Logs(c) + } + }); + + info!( + "Opened OTLP sink connector ID: {}, signal: {:?}, endpoint: {}", + self.id, self.config.signal, self.config.endpoint + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let total = messages.len(); + debug!( + "OTLP sink connector ID: {} received {total} messages, schema: {}", + self.id, messages_metadata.schema + ); + + let client = self + .client + .as_ref() + .ok_or_else(|| Error::InitError("OTLP sink client not initialized".into()))?; + + match client { + Client::Traces(c) => { + let req = build_trace_request(&self.config.format, &messages_metadata, &messages)?; + if req.resource_spans.is_empty() { + return Ok(()); + } + let span_count: usize = req + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .map(|ss| ss.spans.len()) + .sum(); + c.clone() + .export(self.with_headers(req)) + .await + .map_err(|e| Error::HttpRequestFailed(format!("OTLP traces export: {e}")))?; + debug!( + "OTLP sink connector ID: {} exported {span_count} spans", + self.id + ); + } + Client::Metrics(c) => { + let req = + build_metrics_request(&self.config.format, &messages_metadata, &messages)?; + if req.resource_metrics.is_empty() { + return Ok(()); + } + c.clone() + .export(self.with_headers(req)) + .await + .map_err(|e| Error::HttpRequestFailed(format!("OTLP metrics export: {e}")))?; + debug!( + "OTLP sink connector ID: {} exported metrics batch ({total} messages)", + self.id + ); + } + Client::Logs(c) => { + let req = build_logs_request(&self.config.format, &messages_metadata, &messages)?; + if req.resource_logs.is_empty() { + return Ok(()); + } + c.clone() + .export(self.with_headers(req)) + .await + .map_err(|e| Error::HttpRequestFailed(format!("OTLP logs export: {e}")))?; + debug!( + "OTLP sink connector ID: {} exported logs batch ({total} messages)", + self.id + ); + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + let _ = self.client.take(); + info!("Closed OTLP sink connector ID: {}", self.id); + Ok(()) + } +} + +fn build_trace_request( + format: &StorageFormat, + meta: &MessagesMetadata, + messages: &[ConsumedMessage], +) -> Result { + match format { + StorageFormat::Proto => { + let bytes = collect_raw_bytes(meta, messages); + if bytes.is_empty() { + return Ok(ExportTraceServiceRequest::default()); + } + // Proto mode: each message is a full ExportTraceServiceRequest; merge them. + let mut merged = ExportTraceServiceRequest::default(); + for b in bytes { + match ExportTraceServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { + Ok(r) => merged.resource_spans.extend(r.resource_spans), + Err(e) => warn!("Failed to decode OTLP trace proto: {e}"), + } + } + Ok(merged) + } + StorageFormat::Json => { + let jsons = collect_json_values(meta, messages); + Ok(from_json::traces_from_json(&jsons)) + } + } +} + +fn build_metrics_request( + format: &StorageFormat, + meta: &MessagesMetadata, + messages: &[ConsumedMessage], +) -> Result { + match format { + StorageFormat::Proto => { + let bytes = collect_raw_bytes(meta, messages); + let mut merged = ExportMetricsServiceRequest::default(); + for b in bytes { + match ExportMetricsServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { + Ok(r) => merged.resource_metrics.extend(r.resource_metrics), + Err(e) => warn!("Failed to decode OTLP metrics proto: {e}"), + } + } + Ok(merged) + } + StorageFormat::Json => { + let jsons = collect_json_values(meta, messages); + Ok(from_json::metrics_from_json(&jsons)) + } + } +} + +fn build_logs_request( + format: &StorageFormat, + meta: &MessagesMetadata, + messages: &[ConsumedMessage], +) -> Result { + match format { + StorageFormat::Proto => { + let bytes = collect_raw_bytes(meta, messages); + let mut merged = ExportLogsServiceRequest::default(); + for b in bytes { + match ExportLogsServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { + Ok(r) => merged.resource_logs.extend(r.resource_logs), + Err(e) => warn!("Failed to decode OTLP logs proto: {e}"), + } + } + Ok(merged) + } + StorageFormat::Json => { + let jsons = collect_json_values(meta, messages); + Ok(from_json::logs_from_json(&jsons)) + } + } +} + +fn collect_raw_bytes<'a>( + meta: &MessagesMetadata, + messages: &'a [ConsumedMessage], +) -> Vec<&'a [u8]> { + let mut out = Vec::with_capacity(messages.len()); + for msg in messages { + match &msg.payload { + Payload::Raw(b) => out.push(b.as_slice()), + _ => warn!( + "OTLP sink (proto mode): expected raw payload, got schema: {}", + meta.schema + ), + } + } + out +} + +fn collect_json_values(meta: &MessagesMetadata, messages: &[ConsumedMessage]) -> Vec { + let mut out = Vec::with_capacity(messages.len()); + for msg in messages { + match &msg.payload { + Payload::Json(v) => { + match simd_json::to_string(v) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + { + Some(val) => out.push(val), + None => warn!("OTLP sink: failed to convert JSON payload"), + } + } + _ => warn!( + "OTLP sink (json mode): expected JSON payload, got schema: {}", + meta.schema + ), + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config(signal: OtlpSignal) -> OtlpSinkConfig { + OtlpSinkConfig { + endpoint: "http://localhost:7281".to_string(), + signal, + format: StorageFormat::Json, + compression: false, + } + } + + #[test] + fn given_new_sink_client_should_not_be_initialized() { + let sink = OtlpSink::new(1, test_config(OtlpSignal::Traces)); + assert!(sink.client.is_none()); + } + + #[test] + fn given_traces_signal_config_should_store_correctly() { + let sink = OtlpSink::new(1, test_config(OtlpSignal::Traces)); + assert!(matches!(sink.config.signal, OtlpSignal::Traces)); + } +} From 0a612b24b37cd524ada320f014ec3ecf3b9dc055 Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Mon, 22 Jun 2026 18:03:35 +0300 Subject: [PATCH 2/6] feat(connectors/otlp_sink): add HTTP transport and export counters Adds `transport = "http"` (default: "grpc") to OtlpSinkConfig. The HTTP path POSTs prost-encoded proto bytes to `{endpoint}/v1/{signal}` with Content-Type: application/x-protobuf, mirrors the gzip and per-request header behaviour of the gRPC path, and treats 4xx as a logged warning (config error, not retriable) and 5xx as a retriable error. Also adds three atomic counters (messages_sent, batches_sent, batches_failed) that are logged at connector close() time, giving operators visibility into throughput and error rate without an external scrape endpoint. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 28 +- Cargo.toml | 2 + core/connectors/sinks/otlp_sink/Cargo.toml | 2 + .../sinks/otlp_sink/src/from_json.rs | 31 +- core/connectors/sinks/otlp_sink/src/lib.rs | 350 ++++++++++++++---- 5 files changed, 322 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 12382acfca..c475f43707 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6953,6 +6953,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_otlp_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "flate2", + "iggy_connector_sdk", + "opentelemetry-proto", + "prost", + "reqwest 0.13.4", + "serde", + "serde_json", + "simd-json", + "tokio", + "tonic", + "tracing", +] + [[package]] name = "iggy_connector_postgres_sink" version = "0.4.1-edge.1" @@ -10713,9 +10732,9 @@ dependencies = [ [[package]] name = "rmcp" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0810a9f717d9828f475fe1f629f4c305c8464b7f496c3a854b58d29e65f4058e" +checksum = "1d1f571c72940a19d9532fe52dbea8bc9912bf1d766c2970bb824056b86f3f59" dependencies = [ "async-trait", "base64", @@ -10745,9 +10764,9 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aefac48c364756e97f04c0401ba3231e8607882c7c1d92da0437dc16307904d" +checksum = "1aad0035b69380782d78ea95b508327e6deaa2235909053e596eea8f27b5e1d5" dependencies = [ "darling 0.23.0", "proc-macro2", @@ -13059,6 +13078,7 @@ dependencies = [ "axum", "base64", "bytes", + "flate2", "h2 0.4.15", "http 1.4.2", "http-body 1.0.1", diff --git a/Cargo.toml b/Cargo.toml index 66001ed848..80cace973e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,6 +159,7 @@ figlet-rs = "1.0.0" figment = { version = "0.10.19", features = ["toml", "env"] } file-operation = "0.8.25" flatbuffers = "25.12.19" +flate2 = "1.1" flume = "0.12.0" fs2 = "0.4.3" futures = "0.3.32" @@ -309,6 +310,7 @@ tokio-rustls = "0.26.4" tokio-tungstenite = { version = "0.29", features = ["rustls-tls-webpki-roots"] } tokio-util = { version = "0.7.18", features = ["compat"] } toml = "1.1.2" +tonic = { version = "0.14.6", default-features = false } tower-http = { version = "0.7.0", features = ["add-extension", "cors", "trace"] } tracing = "0.1.44" tracing-appender = "0.2.5" diff --git a/core/connectors/sinks/otlp_sink/Cargo.toml b/core/connectors/sinks/otlp_sink/Cargo.toml index 8ed56a216f..bb0e42e6ff 100644 --- a/core/connectors/sinks/otlp_sink/Cargo.toml +++ b/core/connectors/sinks/otlp_sink/Cargo.toml @@ -35,7 +35,9 @@ crate-type = ["cdylib", "lib"] [dependencies] async-trait = { workspace = true } bytes = { workspace = true } +flate2 = { workspace = true } iggy_connector_sdk = { workspace = true } +reqwest = { workspace = true } opentelemetry-proto = { version = "0.32.0", default-features = false, features = [ "gen-tonic", "logs", diff --git a/core/connectors/sinks/otlp_sink/src/from_json.rs b/core/connectors/sinks/otlp_sink/src/from_json.rs index 78e9038a26..4d3edf5aa3 100644 --- a/core/connectors/sinks/otlp_sink/src/from_json.rs +++ b/core/connectors/sinks/otlp_sink/src/from_json.rs @@ -149,7 +149,9 @@ fn json_to_span(v: &Value) -> Span { trace_id: hex_to_bytes(v.get("trace_id").and_then(Value::as_str).unwrap_or("")), span_id: hex_to_bytes(v.get("span_id").and_then(Value::as_str).unwrap_or("")), parent_span_id: hex_to_bytes( - v.get("parent_span_id").and_then(Value::as_str).unwrap_or(""), + v.get("parent_span_id") + .and_then(Value::as_str) + .unwrap_or(""), ), name: v .get("name") @@ -157,10 +159,7 @@ fn json_to_span(v: &Value) -> Span { .unwrap_or("") .to_owned(), kind: span_kind_from_text(v.get("kind").and_then(Value::as_str).unwrap_or("")), - start_time_unix_nano: v - .get("start_time_ns") - .and_then(Value::as_u64) - .unwrap_or(0), + start_time_unix_nano: v.get("start_time_ns").and_then(Value::as_u64).unwrap_or(0), end_time_unix_nano: v.get("end_time_ns").and_then(Value::as_u64).unwrap_or(0), attributes: json_obj_to_kv(v.get("attributes")), dropped_attributes_count: 0, @@ -169,9 +168,7 @@ fn json_to_span(v: &Value) -> Span { links: vec![], dropped_links_count: 0, status: Some(Status { - code: status_code_from_text( - v.get("status").and_then(Value::as_str).unwrap_or("unset"), - ), + code: status_code_from_text(v.get("status").and_then(Value::as_str).unwrap_or("unset")), message: v .get("status_message") .and_then(Value::as_str) @@ -192,10 +189,7 @@ fn json_to_metric(v: &Value) -> Option { .to_owned(); let time_ns = v.get("timestamp_ns").and_then(Value::as_u64).unwrap_or(0); let attrs = json_obj_to_kv(v.get("attributes")); - let metric_type = v - .get("type") - .and_then(Value::as_str) - .unwrap_or("gauge"); + let metric_type = v.get("type").and_then(Value::as_str).unwrap_or("gauge"); let value_field = v.get("value"); let as_double = value_field.and_then(Value::as_f64).unwrap_or(0.0); @@ -248,10 +242,7 @@ fn json_to_metric(v: &Value) -> Option { fn json_to_log_record(v: &Value) -> LogRecord { LogRecord { - time_unix_nano: v - .get("timestamp_ns") - .and_then(Value::as_u64) - .unwrap_or(0), + time_unix_nano: v.get("timestamp_ns").and_then(Value::as_u64).unwrap_or(0), observed_time_unix_nano: v .get("observed_timestamp_ns") .and_then(Value::as_u64) @@ -264,11 +255,9 @@ fn json_to_log_record(v: &Value) -> LogRecord { .and_then(Value::as_str) .unwrap_or("") .to_owned(), - body: v - .get("body") - .map(|b| AnyValue { - value: Some(any_value::Value::StringValue(b.to_string())), - }), + body: v.get("body").map(|b| AnyValue { + value: Some(any_value::Value::StringValue(b.to_string())), + }), attributes: json_obj_to_kv(v.get("attributes")), trace_id: hex_to_bytes(v.get("trace_id").and_then(Value::as_str).unwrap_or("")), span_id: hex_to_bytes(v.get("span_id").and_then(Value::as_str).unwrap_or("")), diff --git a/core/connectors/sinks/otlp_sink/src/lib.rs b/core/connectors/sinks/otlp_sink/src/lib.rs index e0a228003c..d9d5dd1b4b 100644 --- a/core/connectors/sinks/otlp_sink/src/lib.rs +++ b/core/connectors/sinks/otlp_sink/src/lib.rs @@ -16,6 +16,7 @@ // under the License. use async_trait::async_trait; +use flate2::{Compression, write::GzEncoder}; use iggy_connector_sdk::{ ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, }; @@ -31,6 +32,9 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ use prost::Message; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::io::Write as _; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use tonic::codec::CompressionEncoding; use tonic::metadata::{MetadataKey, MetadataValue}; use tonic::transport::Channel; @@ -65,16 +69,36 @@ pub enum StorageFormat { Proto, } +/// Wire transport used to forward OTLP data to the downstream receiver. +/// +/// `Grpc` (default): forwards via OTLP/gRPC (`opentelemetry.proto.collector.*` +/// service RPCs). Supports per-message gzip compression and custom gRPC metadata. +/// +/// `Http`: forwards via OTLP/HTTP (`POST /v1/{traces,metrics,logs}` with +/// `Content-Type: application/x-protobuf`). Useful when the downstream endpoint +/// does not expose a gRPC port. Custom headers and gzip body encoding are +/// supported through the same `headers` and `compression` config fields. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Transport { + #[default] + Grpc, + Http, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OtlpSinkConfig { - /// gRPC endpoint, e.g. "http://quickwit:7281" + /// gRPC or HTTP endpoint, e.g. "http://quickwit:7281" pub endpoint: String, pub signal: OtlpSignal, #[serde(default)] + pub transport: Transport, + #[serde(default)] pub format: StorageFormat, #[serde(default)] pub compression: bool, - /// Extra gRPC metadata headers sent with every export request. + /// Extra headers sent with every export request. + /// For gRPC these become metadata entries; for HTTP they become request headers. /// QW uses these to route to a specific index: /// qw-otel-traces-index = "flows3" /// qw-otel-logs-index = "otel-logs-v0_7" @@ -82,19 +106,46 @@ pub struct OtlpSinkConfig { pub headers: HashMap, } -enum Client { +enum GrpcClient { Traces(TraceServiceClient), Metrics(MetricsServiceClient), Logs(LogsServiceClient), } +impl std::fmt::Debug for GrpcClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GrpcClient::Traces(_) => write!(f, "TraceServiceClient"), + GrpcClient::Metrics(_) => write!(f, "MetricsServiceClient"), + GrpcClient::Logs(_) => write!(f, "LogsServiceClient"), + } + } +} + +enum Client { + Grpc(GrpcClient), + Http(reqwest::Client), +} + +/// Cumulative export counters, logged at close() and periodically at debug level. +#[derive(Debug, Default)] +struct Counters { + messages_sent: AtomicU64, + batches_sent: AtomicU64, + batches_failed: AtomicU64, +} + #[derive(Debug)] pub struct OtlpSink { id: u32, config: OtlpSinkConfig, client: Option, - // Pre-parsed metadata entries derived from config.headers at open() time. - metadata: Vec<(MetadataKey, MetadataValue)>, + // Pre-parsed metadata entries derived from config.headers at open() time (gRPC only). + metadata: Vec<( + MetadataKey, + MetadataValue, + )>, + counters: Arc, } impl OtlpSink { @@ -104,10 +155,11 @@ impl OtlpSink { config, client: None, metadata: Vec::new(), + counters: Arc::new(Counters::default()), } } - fn with_headers(&self, msg: T) -> tonic::Request { + fn with_grpc_headers(&self, msg: T) -> tonic::Request { let mut req = tonic::Request::new(msg); for (k, v) in &self.metadata { req.metadata_mut().insert(k.clone(), v.clone()); @@ -119,9 +171,10 @@ impl OtlpSink { impl std::fmt::Debug for Client { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Client::Traces(_) => write!(f, "TraceServiceClient"), - Client::Metrics(_) => write!(f, "MetricsServiceClient"), - Client::Logs(_) => write!(f, "LogsServiceClient"), + Client::Grpc(GrpcClient::Traces(_)) => write!(f, "GrpcTraceServiceClient"), + Client::Grpc(GrpcClient::Metrics(_)) => write!(f, "GrpcMetricsServiceClient"), + Client::Grpc(GrpcClient::Logs(_)) => write!(f, "GrpcLogsServiceClient"), + Client::Http(_) => write!(f, "HttpClient"), } } } @@ -129,53 +182,63 @@ impl std::fmt::Debug for Client { #[async_trait] impl Sink for OtlpSink { async fn open(&mut self) -> Result<(), Error> { - for (k, v) in &self.config.headers { - let key = MetadataKey::from_bytes(k.as_bytes()) - .map_err(|e| Error::InvalidConfigValue(format!("header key '{k}': {e}")))?; - let val = MetadataValue::try_from(v.as_str()) - .map_err(|e| Error::InvalidConfigValue(format!("header value for '{k}': {e}")))?; - self.metadata.push((key, val)); - } + self.client = Some(match self.config.transport { + Transport::Http => Client::Http( + reqwest::Client::builder() + .build() + .map_err(|e| Error::InitError(format!("HTTP client: {e}")))?, + ), + Transport::Grpc => { + for (k, v) in &self.config.headers { + let key = MetadataKey::from_bytes(k.as_bytes()) + .map_err(|e| Error::InvalidConfigValue(format!("header key '{k}': {e}")))?; + let val = MetadataValue::try_from(v.as_str()).map_err(|e| { + Error::InvalidConfigValue(format!("header value for '{k}': {e}")) + })?; + self.metadata.push((key, val)); + } - let channel = Channel::from_shared(self.config.endpoint.clone()) - .map_err(|e| Error::InvalidConfigValue(format!("endpoint: {e}")))? - .connect() - .await - .map_err(|e| Error::InitError(format!("OTLP endpoint: {e}")))?; + let channel = Channel::from_shared(self.config.endpoint.clone()) + .map_err(|e| Error::InvalidConfigValue(format!("endpoint: {e}")))? + .connect() + .await + .map_err(|e| Error::InitError(format!("OTLP endpoint: {e}")))?; - self.client = Some(match self.config.signal { - OtlpSignal::Traces => { - let mut c = TraceServiceClient::new(channel); - if self.config.compression { - c = c - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip); - } - Client::Traces(c) - } - OtlpSignal::Metrics => { - let mut c = MetricsServiceClient::new(channel); - if self.config.compression { - c = c - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip); - } - Client::Metrics(c) - } - OtlpSignal::Logs => { - let mut c = LogsServiceClient::new(channel); - if self.config.compression { - c = c - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip); - } - Client::Logs(c) + Client::Grpc(match self.config.signal { + OtlpSignal::Traces => { + let mut c = TraceServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + GrpcClient::Traces(c) + } + OtlpSignal::Metrics => { + let mut c = MetricsServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + GrpcClient::Metrics(c) + } + OtlpSignal::Logs => { + let mut c = LogsServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + GrpcClient::Logs(c) + } + }) } }); info!( - "Opened OTLP sink connector ID: {}, signal: {:?}, endpoint: {}", - self.id, self.config.signal, self.config.endpoint + "Opened OTLP sink connector ID: {}, signal: {:?}, transport: {:?}, endpoint: {}", + self.id, self.config.signal, self.config.transport, self.config.endpoint ); Ok(()) } @@ -192,14 +255,66 @@ impl Sink for OtlpSink { self.id, messages_metadata.schema ); + let result = self.export(&messages_metadata, &messages, total).await; + + match &result { + Ok(_) => { + self.counters + .messages_sent + .fetch_add(total as u64, Ordering::Relaxed); + self.counters.batches_sent.fetch_add(1, Ordering::Relaxed); + } + Err(_) => { + self.counters.batches_failed.fetch_add(1, Ordering::Relaxed); + } + } + result + } + + async fn close(&mut self) -> Result<(), Error> { + let _ = self.client.take(); + info!( + "Closed OTLP sink connector ID: {}. Counters: messages_sent={}, batches_sent={}, batches_failed={}", + self.id, + self.counters.messages_sent.load(Ordering::Relaxed), + self.counters.batches_sent.load(Ordering::Relaxed), + self.counters.batches_failed.load(Ordering::Relaxed), + ); + Ok(()) + } +} + +impl OtlpSink { + async fn export( + &self, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + total: usize, + ) -> Result<(), Error> { let client = self .client .as_ref() .ok_or_else(|| Error::InitError("OTLP sink client not initialized".into()))?; match client { - Client::Traces(c) => { - let req = build_trace_request(&self.config.format, &messages_metadata, &messages)?; + Client::Grpc(grpc) => { + self.export_grpc(grpc, messages_metadata, messages, total) + .await + } + Client::Http(http) => self.export_http(http, messages_metadata, messages).await, + } + } + + async fn export_grpc( + &self, + client: &GrpcClient, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + total: usize, + ) -> Result<(), Error> { + match client { + GrpcClient::Traces(c) => { + let req = build_trace_request(&self.config.format, messages_metadata, messages)?; if req.resource_spans.is_empty() { return Ok(()); } @@ -210,7 +325,7 @@ impl Sink for OtlpSink { .map(|ss| ss.spans.len()) .sum(); c.clone() - .export(self.with_headers(req)) + .export(self.with_grpc_headers(req)) .await .map_err(|e| Error::HttpRequestFailed(format!("OTLP traces export: {e}")))?; debug!( @@ -218,14 +333,13 @@ impl Sink for OtlpSink { self.id ); } - Client::Metrics(c) => { - let req = - build_metrics_request(&self.config.format, &messages_metadata, &messages)?; + GrpcClient::Metrics(c) => { + let req = build_metrics_request(&self.config.format, messages_metadata, messages)?; if req.resource_metrics.is_empty() { return Ok(()); } c.clone() - .export(self.with_headers(req)) + .export(self.with_grpc_headers(req)) .await .map_err(|e| Error::HttpRequestFailed(format!("OTLP metrics export: {e}")))?; debug!( @@ -233,13 +347,13 @@ impl Sink for OtlpSink { self.id ); } - Client::Logs(c) => { - let req = build_logs_request(&self.config.format, &messages_metadata, &messages)?; + GrpcClient::Logs(c) => { + let req = build_logs_request(&self.config.format, messages_metadata, messages)?; if req.resource_logs.is_empty() { return Ok(()); } c.clone() - .export(self.with_headers(req)) + .export(self.with_grpc_headers(req)) .await .map_err(|e| Error::HttpRequestFailed(format!("OTLP logs export: {e}")))?; debug!( @@ -248,13 +362,97 @@ impl Sink for OtlpSink { ); } } - Ok(()) } - async fn close(&mut self) -> Result<(), Error> { - let _ = self.client.take(); - info!("Closed OTLP sink connector ID: {}", self.id); + async fn export_http( + &self, + client: &reqwest::Client, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<(), Error> { + let (url_path, proto_bytes) = match &self.config.signal { + OtlpSignal::Traces => { + let req = build_trace_request(&self.config.format, messages_metadata, messages)?; + if req.resource_spans.is_empty() { + return Ok(()); + } + let mut buf = Vec::new(); + req.encode(&mut buf) + .map_err(|e| Error::HttpRequestFailed(format!("proto encode: {e}")))?; + ("/v1/traces", buf) + } + OtlpSignal::Metrics => { + let req = build_metrics_request(&self.config.format, messages_metadata, messages)?; + if req.resource_metrics.is_empty() { + return Ok(()); + } + let mut buf = Vec::new(); + req.encode(&mut buf) + .map_err(|e| Error::HttpRequestFailed(format!("proto encode: {e}")))?; + ("/v1/metrics", buf) + } + OtlpSignal::Logs => { + let req = build_logs_request(&self.config.format, messages_metadata, messages)?; + if req.resource_logs.is_empty() { + return Ok(()); + } + let mut buf = Vec::new(); + req.encode(&mut buf) + .map_err(|e| Error::HttpRequestFailed(format!("proto encode: {e}")))?; + ("/v1/logs", buf) + } + }; + + let base = self.config.endpoint.trim_end_matches('/'); + let url = format!("{base}{url_path}"); + + let mut builder = client + .post(&url) + .header("Content-Type", "application/x-protobuf"); + + let body = if self.config.compression { + let mut enc = GzEncoder::new(Vec::new(), Compression::default()); + enc.write_all(&proto_bytes) + .map_err(|e| Error::HttpRequestFailed(format!("gzip encode: {e}")))?; + let compressed = enc + .finish() + .map_err(|e| Error::HttpRequestFailed(format!("gzip finish: {e}")))?; + builder = builder.header("Content-Encoding", "gzip"); + compressed + } else { + proto_bytes + }; + + for (k, v) in &self.config.headers { + builder = builder.header(k.as_str(), v.as_str()); + } + + let resp = builder + .body(body) + .send() + .await + .map_err(|e| Error::HttpRequestFailed(format!("OTLP HTTP {url_path}: {e}")))?; + + let status = resp.status(); + if !status.is_success() { + let body_text = resp.text().await.unwrap_or_default(); + if status.is_client_error() { + let id = self.id; + warn!( + "OTLP sink connector ID: {id} HTTP {url_path} returned {status}: {body_text}" + ); + } else { + return Err(Error::HttpRequestFailed(format!( + "OTLP HTTP {url_path} returned {status}: {body_text}" + ))); + } + } + + debug!( + "OTLP sink connector ID: {} HTTP {url_path} exported successfully", + self.id + ); Ok(()) } } @@ -270,7 +468,6 @@ fn build_trace_request( if bytes.is_empty() { return Ok(ExportTraceServiceRequest::default()); } - // Proto mode: each message is a full ExportTraceServiceRequest; merge them. let mut merged = ExportTraceServiceRequest::default(); for b in bytes { match ExportTraceServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { @@ -352,7 +549,10 @@ fn collect_raw_bytes<'a>( out } -fn collect_json_values(meta: &MessagesMetadata, messages: &[ConsumedMessage]) -> Vec { +fn collect_json_values( + meta: &MessagesMetadata, + messages: &[ConsumedMessage], +) -> Vec { let mut out = Vec::with_capacity(messages.len()); for msg in messages { match &msg.payload { @@ -382,8 +582,10 @@ mod tests { OtlpSinkConfig { endpoint: "http://localhost:7281".to_string(), signal, + transport: Transport::Grpc, format: StorageFormat::Json, compression: false, + headers: HashMap::new(), } } @@ -398,4 +600,20 @@ mod tests { let sink = OtlpSink::new(1, test_config(OtlpSignal::Traces)); assert!(matches!(sink.config.signal, OtlpSignal::Traces)); } + + #[test] + fn given_http_transport_config_should_store_correctly() { + let mut config = test_config(OtlpSignal::Metrics); + config.transport = Transport::Http; + let sink = OtlpSink::new(2, config); + assert!(matches!(sink.config.transport, Transport::Http)); + } + + #[test] + fn given_new_sink_counters_should_be_zero() { + let sink = OtlpSink::new(1, test_config(OtlpSignal::Logs)); + assert_eq!(sink.counters.messages_sent.load(Ordering::Relaxed), 0); + assert_eq!(sink.counters.batches_sent.load(Ordering::Relaxed), 0); + assert_eq!(sink.counters.batches_failed.load(Ordering::Relaxed), 0); + } } From f267525eeae9d0e1cb15755063a02007a556fb8a Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Mon, 22 Jun 2026 21:05:41 +0300 Subject: [PATCH 3/6] fix(connectors/otlp_sink): address reviewer feedback - Dockerfile: add Apache license header, drop otlp_source build (belongs to PR #3516), remove EXPOSE 4317 (sink dials out, does not listen) - Cargo.toml: move otlp_sink to correct alphabetical position in workspace members; add opentelemetry-proto to workspace.dependencies - otlp_sink/Cargo.toml: bump version to 0.4.1-edge.1; use workspace opentelemetry-proto; drop unused simd-json dependency - lib.rs: use owned_value_to_serde_json instead of simd_json::to_string + serde_json::from_str (one allocation instead of two tree walks); change gRPC export errors from HttpRequestFailed to CannotStoreData; change proto encode errors on HTTP path to WriteFailure; export() now returns u64 so messages_sent counts only messages actually forwarded, not empty batches where all messages failed to decode - from_json.rs: fix b.to_string() double-encoding of log body (use json_to_any_value instead of StringValue(b.to_string())); make severity_from_text case-insensitive; fix hex_to_bytes to return empty vec on odd-length input instead of silently dropping last nibble; replace magic number 2 with AggregationTemporality::Cumulative; default is_monotonic to true and read from JSON when present; hoist ArrayValue and KeyValueList imports to module level; add tests for new behaviour --- Cargo.lock | 3 +- Cargo.toml | 3 +- Dockerfile.connectors | 28 ++++-- core/connectors/sinks/otlp_sink/Cargo.toml | 5 +- .../sinks/otlp_sink/src/from_json.rs | 89 ++++++++++++------- core/connectors/sinks/otlp_sink/src/lib.rs | 68 +++++++------- 6 files changed, 117 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c475f43707..19cab838f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6955,7 +6955,7 @@ dependencies = [ [[package]] name = "iggy_connector_otlp_sink" -version = "0.1.0" +version = "0.4.1-edge.1" dependencies = [ "async-trait", "bytes", @@ -6966,7 +6966,6 @@ dependencies = [ "reqwest 0.13.4", "serde", "serde_json", - "simd-json", "tokio", "tonic", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 80cace973e..d3bb0f93a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,8 +40,8 @@ members = [ "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/influxdb_sink", "core/connectors/sinks/mongodb_sink", - "core/connectors/sinks/postgres_sink", "core/connectors/sinks/otlp_sink", + "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", @@ -224,6 +224,7 @@ opentelemetry-otlp = { version = "0.32.0", features = [ "http-proto", "reqwest-client", ] } +opentelemetry-proto = { version = "0.32.0", default-features = false } opentelemetry-semantic-conventions = "0.32.0" opentelemetry_sdk = { version = "0.32.1", features = [ "logs", diff --git a/Dockerfile.connectors b/Dockerfile.connectors index 007a58e015..1caca59414 100644 --- a/Dockerfile.connectors +++ b/Dockerfile.connectors @@ -1,7 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # syntax=docker/dockerfile:1.7 # # iggy-connectors runtime image — ships the iggy-connectors binary and the -# OTLP/gRPC source plugin (.so). Config files are injected via K8s ConfigMaps. +# OTLP/gRPC sink, quickwit sink plugins (.so). Config files are injected +# via K8s ConfigMaps. # # Builder: rust:1-slim-bookworm (mold linker, BuildKit cache mounts) # Runtime: debian:bookworm-slim + libssl3 @@ -22,11 +40,9 @@ RUN --mount=type=cache,id=iggy-connectors-registry,sharing=locked,target=/usr/lo --mount=type=cache,id=iggy-connectors-target-${TARGETARCH},target=/app/target \ RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=mold" \ cargo build --release --bin iggy-connectors \ - && cargo build --release -p iggy_connector_otlp_source \ && cargo build --release -p iggy_connector_quickwit_sink \ && cargo build --release -p iggy_connector_otlp_sink \ && cp target/release/iggy-connectors /usr/local/bin/iggy-connectors \ - && cp target/release/libiggy_connector_otlp_source.so /usr/local/lib/libiggy_connector_otlp_source.so \ && cp target/release/libiggy_connector_quickwit_sink.so /usr/local/lib/libiggy_connector_quickwit_sink.so \ && cp target/release/libiggy_connector_otlp_sink.so /usr/local/lib/libiggy_connector_otlp_sink.so \ && strip /usr/local/bin/iggy-connectors @@ -42,8 +58,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && chown -R iggy:iggy /connectors COPY --from=build /usr/local/bin/iggy-connectors /usr/local/bin/iggy-connectors -COPY --from=build /usr/local/lib/libiggy_connector_otlp_source.so \ - /connectors/plugins/libiggy_connector_otlp_source.so COPY --from=build /usr/local/lib/libiggy_connector_quickwit_sink.so \ /connectors/plugins/libiggy_connector_quickwit_sink.so COPY --from=build /usr/local/lib/libiggy_connector_otlp_sink.so \ @@ -53,9 +67,9 @@ USER iggy WORKDIR /connectors # Runtime config: IGGY_CONNECTORS_CONFIG_PATH → /connectors/runtime.toml (ConfigMap) -# Plugin configs: /connectors/plugins/otlp_source.toml (ConfigMap) +# Plugin configs: /connectors/plugins/otlp_sink.toml (ConfigMap) ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml -EXPOSE 4317 8081 +EXPOSE 8081 ENTRYPOINT ["/usr/local/bin/iggy-connectors"] diff --git a/core/connectors/sinks/otlp_sink/Cargo.toml b/core/connectors/sinks/otlp_sink/Cargo.toml index bb0e42e6ff..236d1ef3e0 100644 --- a/core/connectors/sinks/otlp_sink/Cargo.toml +++ b/core/connectors/sinks/otlp_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_otlp_sink" -version = "0.1.0" +version = "0.4.1-edge.1" description = "Iggy OTLP/gRPC sink connector — forwards logs, metrics, and traces from Iggy streams to any OpenTelemetry-compatible backend via the OTLP protocol" edition = "2024" license = "Apache-2.0" @@ -38,7 +38,7 @@ bytes = { workspace = true } flate2 = { workspace = true } iggy_connector_sdk = { workspace = true } reqwest = { workspace = true } -opentelemetry-proto = { version = "0.32.0", default-features = false, features = [ +opentelemetry-proto = { workspace = true, features = [ "gen-tonic", "logs", "metrics", @@ -47,7 +47,6 @@ opentelemetry-proto = { version = "0.32.0", default-features = false, features = prost = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -simd-json = { workspace = true } tokio = { workspace = true } tonic = { workspace = true, features = ["transport", "gzip"] } tracing = { workspace = true } diff --git a/core/connectors/sinks/otlp_sink/src/from_json.rs b/core/connectors/sinks/otlp_sink/src/from_json.rs index 4d3edf5aa3..4bd77582c3 100644 --- a/core/connectors/sinks/otlp_sink/src/from_json.rs +++ b/core/connectors/sinks/otlp_sink/src/from_json.rs @@ -22,10 +22,13 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; +use opentelemetry_proto::tonic::common::v1::{ + AnyValue, ArrayValue, KeyValue, KeyValueList, any_value, +}; use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; use opentelemetry_proto::tonic::metrics::v1::{ - Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, metric, number_data_point, + AggregationTemporality, Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, + metric, number_data_point, }; use opentelemetry_proto::tonic::resource::v1::Resource; use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status, span}; @@ -204,12 +207,15 @@ fn json_to_metric(v: &Value) -> Option { }; Some(metric::Data::Sum(Sum { data_points: vec![dp], - aggregation_temporality: 2, // CUMULATIVE - is_monotonic: false, + aggregation_temporality: AggregationTemporality::Cumulative as i32, + is_monotonic: v + .get("is_monotonic") + .and_then(Value::as_bool) + .unwrap_or(true), })) } _ => { - // Default to gauge for gauge, histogram summary etc. + // Default to gauge for gauge, histogram, summary etc. // Histogram detail is lost in convert.rs (only count+sum stored); // reconstruct as gauge using count as the value. let dp_value = if metric_type == "histogram" || metric_type == "summary" { @@ -255,9 +261,7 @@ fn json_to_log_record(v: &Value) -> LogRecord { .and_then(Value::as_str) .unwrap_or("") .to_owned(), - body: v.get("body").map(|b| AnyValue { - value: Some(any_value::Value::StringValue(b.to_string())), - }), + body: v.get("body").map(json_to_any_value), attributes: json_obj_to_kv(v.get("attributes")), trace_id: hex_to_bytes(v.get("trace_id").and_then(Value::as_str).unwrap_or("")), span_id: hex_to_bytes(v.get("span_id").and_then(Value::as_str).unwrap_or("")), @@ -304,37 +308,31 @@ fn json_to_any_value(v: &Value) -> AnyValue { any_value::Value::DoubleValue(n.as_f64().unwrap_or(0.0)) } } - Value::Array(arr) => { - use opentelemetry_proto::tonic::common::v1::ArrayValue; - any_value::Value::ArrayValue(ArrayValue { - values: arr.iter().map(json_to_any_value).collect(), - }) - } - Value::Object(obj) => { - use opentelemetry_proto::tonic::common::v1::KeyValueList; - any_value::Value::KvlistValue(KeyValueList { - values: obj - .iter() - .map(|(k, val)| KeyValue { - key: k.clone(), - value: Some(json_to_any_value(val)), - ..Default::default() - }) - .collect(), - }) - } + Value::Array(arr) => any_value::Value::ArrayValue(ArrayValue { + values: arr.iter().map(json_to_any_value).collect(), + }), + Value::Object(obj) => any_value::Value::KvlistValue(KeyValueList { + values: obj + .iter() + .map(|(k, val)| KeyValue { + key: k.clone(), + value: Some(json_to_any_value(val)), + ..Default::default() + }) + .collect(), + }), Value::Null => any_value::Value::StringValue(String::new()), }; AnyValue { value: Some(inner) } } fn hex_to_bytes(s: &str) -> Vec { - if s.is_empty() { + if !s.len().is_multiple_of(2) { return vec![]; } (0..s.len()) .step_by(2) - .filter_map(|i| u8::from_str_radix(s.get(i..i + 2)?, 16).ok()) + .filter_map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok()) .collect() } @@ -358,11 +356,11 @@ fn status_code_from_text(s: &str) -> i32 { } fn severity_from_text(s: &str) -> i32 { - match s { + match s.to_ascii_uppercase().as_str() { "TRACE" => 1, "DEBUG" => 5, "INFO" => 9, - "WARN" => 13, + "WARN" | "WARNING" => 13, "ERROR" => 17, "FATAL" => 21, _ => 0, @@ -416,6 +414,11 @@ mod tests { assert!(hex_to_bytes("").is_empty()); } + #[test] + fn given_odd_length_hex_should_return_empty_bytes() { + assert!(hex_to_bytes("abc").is_empty()); + } + #[test] fn given_log_json_should_reconstruct_log_record() { let msg = json!({ @@ -433,4 +436,28 @@ mod tests { let records = &req.resource_logs[0].scope_logs[0].log_records; assert_eq!(records[0].severity_number, 13); // WARN } + + #[test] + fn given_lowercase_severity_should_map_correctly() { + assert_eq!(severity_from_text("warn"), 13); + assert_eq!(severity_from_text("info"), 9); + assert_eq!(severity_from_text("error"), 17); + assert_eq!(severity_from_text("debug"), 5); + } + + #[test] + fn given_string_log_body_should_not_double_encode() { + let msg = json!({ + "body": "hello world", + "service_name": "svc", + "resource": {} + }); + let req = logs_from_json(&[msg]); + let record = &req.resource_logs[0].scope_logs[0].log_records[0]; + let body = record.body.as_ref().unwrap(); + assert_eq!( + body.value, + Some(any_value::Value::StringValue("hello world".to_owned())) + ); + } } diff --git a/core/connectors/sinks/otlp_sink/src/lib.rs b/core/connectors/sinks/otlp_sink/src/lib.rs index d9d5dd1b4b..e9d90f72ee 100644 --- a/core/connectors/sinks/otlp_sink/src/lib.rs +++ b/core/connectors/sinks/otlp_sink/src/lib.rs @@ -18,7 +18,8 @@ use async_trait::async_trait; use flate2::{Compression, write::GzEncoder}; use iggy_connector_sdk::{ - ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, + owned_value_to_serde_json, sink_connector, }; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, logs_service_client::LogsServiceClient, @@ -255,20 +256,19 @@ impl Sink for OtlpSink { self.id, messages_metadata.schema ); - let result = self.export(&messages_metadata, &messages, total).await; - - match &result { - Ok(_) => { + match self.export(&messages_metadata, &messages, total).await { + Ok(exported) => { self.counters .messages_sent - .fetch_add(total as u64, Ordering::Relaxed); + .fetch_add(exported, Ordering::Relaxed); self.counters.batches_sent.fetch_add(1, Ordering::Relaxed); + Ok(()) } - Err(_) => { + Err(e) => { self.counters.batches_failed.fetch_add(1, Ordering::Relaxed); + Err(e) } } - result } async fn close(&mut self) -> Result<(), Error> { @@ -290,7 +290,7 @@ impl OtlpSink { messages_metadata: &MessagesMetadata, messages: &[ConsumedMessage], total: usize, - ) -> Result<(), Error> { + ) -> Result { let client = self .client .as_ref() @@ -301,7 +301,10 @@ impl OtlpSink { self.export_grpc(grpc, messages_metadata, messages, total) .await } - Client::Http(http) => self.export_http(http, messages_metadata, messages).await, + Client::Http(http) => { + self.export_http(http, messages_metadata, messages, total) + .await + } } } @@ -311,12 +314,12 @@ impl OtlpSink { messages_metadata: &MessagesMetadata, messages: &[ConsumedMessage], total: usize, - ) -> Result<(), Error> { + ) -> Result { match client { GrpcClient::Traces(c) => { let req = build_trace_request(&self.config.format, messages_metadata, messages)?; if req.resource_spans.is_empty() { - return Ok(()); + return Ok(0); } let span_count: usize = req .resource_spans @@ -327,42 +330,44 @@ impl OtlpSink { c.clone() .export(self.with_grpc_headers(req)) .await - .map_err(|e| Error::HttpRequestFailed(format!("OTLP traces export: {e}")))?; + .map_err(|e| Error::CannotStoreData(format!("OTLP traces export: {e}")))?; debug!( "OTLP sink connector ID: {} exported {span_count} spans", self.id ); + Ok(total as u64) } GrpcClient::Metrics(c) => { let req = build_metrics_request(&self.config.format, messages_metadata, messages)?; if req.resource_metrics.is_empty() { - return Ok(()); + return Ok(0); } c.clone() .export(self.with_grpc_headers(req)) .await - .map_err(|e| Error::HttpRequestFailed(format!("OTLP metrics export: {e}")))?; + .map_err(|e| Error::CannotStoreData(format!("OTLP metrics export: {e}")))?; debug!( "OTLP sink connector ID: {} exported metrics batch ({total} messages)", self.id ); + Ok(total as u64) } GrpcClient::Logs(c) => { let req = build_logs_request(&self.config.format, messages_metadata, messages)?; if req.resource_logs.is_empty() { - return Ok(()); + return Ok(0); } c.clone() .export(self.with_grpc_headers(req)) .await - .map_err(|e| Error::HttpRequestFailed(format!("OTLP logs export: {e}")))?; + .map_err(|e| Error::CannotStoreData(format!("OTLP logs export: {e}")))?; debug!( "OTLP sink connector ID: {} exported logs batch ({total} messages)", self.id ); + Ok(total as u64) } } - Ok(()) } async fn export_http( @@ -370,36 +375,37 @@ impl OtlpSink { client: &reqwest::Client, messages_metadata: &MessagesMetadata, messages: &[ConsumedMessage], - ) -> Result<(), Error> { + total: usize, + ) -> Result { let (url_path, proto_bytes) = match &self.config.signal { OtlpSignal::Traces => { let req = build_trace_request(&self.config.format, messages_metadata, messages)?; if req.resource_spans.is_empty() { - return Ok(()); + return Ok(0); } let mut buf = Vec::new(); req.encode(&mut buf) - .map_err(|e| Error::HttpRequestFailed(format!("proto encode: {e}")))?; + .map_err(|e| Error::WriteFailure(format!("proto encode traces: {e}")))?; ("/v1/traces", buf) } OtlpSignal::Metrics => { let req = build_metrics_request(&self.config.format, messages_metadata, messages)?; if req.resource_metrics.is_empty() { - return Ok(()); + return Ok(0); } let mut buf = Vec::new(); req.encode(&mut buf) - .map_err(|e| Error::HttpRequestFailed(format!("proto encode: {e}")))?; + .map_err(|e| Error::WriteFailure(format!("proto encode metrics: {e}")))?; ("/v1/metrics", buf) } OtlpSignal::Logs => { let req = build_logs_request(&self.config.format, messages_metadata, messages)?; if req.resource_logs.is_empty() { - return Ok(()); + return Ok(0); } let mut buf = Vec::new(); req.encode(&mut buf) - .map_err(|e| Error::HttpRequestFailed(format!("proto encode: {e}")))?; + .map_err(|e| Error::WriteFailure(format!("proto encode logs: {e}")))?; ("/v1/logs", buf) } }; @@ -453,7 +459,7 @@ impl OtlpSink { "OTLP sink connector ID: {} HTTP {url_path} exported successfully", self.id ); - Ok(()) + Ok(total as u64) } } @@ -556,15 +562,7 @@ fn collect_json_values( let mut out = Vec::with_capacity(messages.len()); for msg in messages { match &msg.payload { - Payload::Json(v) => { - match simd_json::to_string(v) - .ok() - .and_then(|s| serde_json::from_str(&s).ok()) - { - Some(val) => out.push(val), - None => warn!("OTLP sink: failed to convert JSON payload"), - } - } + Payload::Json(v) => out.push(owned_value_to_serde_json(v)), _ => warn!( "OTLP sink (json mode): expected JSON payload, got schema: {}", meta.schema From 435f0fe170d68cffb64d1f9d8fa372b90cbec28c Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Tue, 23 Jun 2026 05:40:12 +0300 Subject: [PATCH 4/6] docs: update handover and TCP first() bug findings for session 15 Records full investigation of PollingKind::First TCP InvalidOffset(0) bug. Root cause narrowed to validate_checksums_and_offsets in server_common (only reachable path from poll that can return InvalidOffset), combined with TCP client losing the inner u64 via from_repr. Workaround: use last() in the fallback, not first(). Co-Authored-By: Claude Sonnet 4.6 --- AGENTS.md | 18 ++++++++++++++ TOBEDECIDED.md | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 TOBEDECIDED.md diff --git a/AGENTS.md b/AGENTS.md index 8631c013f8..0b0abd7506 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -224,3 +224,21 @@ Created by server, connectors runtime, or tests. Gitignored. Safe to delete to r - Design: - Chat: - Docs: + +## Infra quick-ref (yucemonitoring vcluster) + +- iggy HTTP: `http://10.20.4.31:31920`. Auth: `POST /users/login {"username":"iggy","password":"iggy"}`. +- Node SSH: `maya@`, password `test123`. kernel 6.8.0-124. kubeconfig: `~/.kube/yucemonitoring.config`, ns `yucemonitoring`, `--insecure-skip-tls-verify`. +- Jenkins: `maya-anomaly-platform`, `type=iggy`. Trivy CVE scan always fails (false positive) -- check `BUILT_IMAGE=` in console log; if present, deploy anyway. +- QW REST: port-forward `svc/maya-quickwit-s3 7280:7280`. QW **0.8.2** -- only `unix_timestamp` (seconds) as datetime input_format; nanos/millis require 0.9. +- iggy->QW sink: `iggy-{metrics,flows,logs,apigw}` indices (mode:dynamic, no retention). Consumer groups: `qw_sink_{metrics,flows,logs,apigw}`. If connector loops on `InvalidOffset`, delete the CG via iggy HTTP API and restart. +- Segment cleaner: `IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true` -- already enabled in production. +- Connectors in image: `otlp_source` (JSON/proto modes), `otlp_sink` (PR #3529), `quickwit_sink`. + +## READY FOR HANDOVER (2026-06-22, session 15) + +**Current state:** Five PRs open on `apache/iggy` (all S-waiting-on-review): #3516 (OTLP source -- awaiting re-review from ryerraguntla), #3517 (COOP_TASKRUN), #3523 (quickwit_sink), #3525 (InvalidOffset SDK -- 3 inline threads answered, awaiting atharvalade re-review), #3529 (otlp_sink -- 22 inline threads answered, 9ba479047 committed, /ready posted). Working tree clean. + +**Next steps:** (1) Wait for reviews on all 5 PRs; (2) Investigate `PollingKind::First` TCP bug deeper (see TOBEDECIDED.md -- root cause narrowed to `validate_checksums_and_offsets` or segment offset mismatch; reproduce deterministically). + +**Open decisions:** See TOBEDECIDED.md -- TCP first() bug (investigation findings recorded, not yet root-caused). diff --git a/TOBEDECIDED.md b/TOBEDECIDED.md new file mode 100644 index 0000000000..a64aeb90c8 --- /dev/null +++ b/TOBEDECIDED.md @@ -0,0 +1,64 @@ +# To Be Decided + +## `PollingKind::First` TCP bug (open since 2026-06-22) + +### Observed behavior + +`PollingStrategy::first()` over the TCP binary protocol returns `InvalidOffset(0)` even +when the topic has messages and offset 0 is valid. The same poll over HTTP works fine. + +### Investigation findings (2026-06-22, session 14) + +**Both TCP and HTTP converge on the same server code path.** After topic/partition +resolution both routes arrive at `IggyShard::poll_messages()` → +`poll_messages_from_local_partition()` → `ops.rs::poll_messages()`. There is no +TCP-specific validation. The TCP handler (`binary/handlers/messages/poll_messages_handler.rs`) +does not call `validate_partition_offset`. + +**`PollingKind::First` resolution in `ops.rs:85-89`:** +```rust +PollingKind::First => partition.log.segments().first() + .map(|segment| segment.start_offset) + .unwrap_or(0), +``` +Returns the first segment's start offset -- correct even after retention removes old segments. + +**All `InvalidOffset` sources in the server:** + +| File | Line | When | +|------|------|------| +| `server/src/shard/system/utils.rs` | 152 | `store_consumer_offset` only -- not reachable from poll | +| `server_common/src/messages_batch_mut.rs` | 455 | `validate_checksums_and_offsets` during disk load -- reachable from poll | +| `server_common/src/messages_batch_mut.rs` | 682 | send-path `validate()` -- not reachable from poll | + +**TCP response deserialization loses the inner value:** +`TcpClient::handle_response` calls `IggyError::from_code(4100)` → `IggyError::from_repr(4100)`. +`FromRepr` on a `#[repr(u32)]` enum with a tuple variant initializes the inner `u64` to zero. +Any `InvalidOffset(N)` from the server arrives as `InvalidOffset(0)` at the SDK. + +**Most likely server-side trigger:** `validate_checksums_and_offsets(start_offset)` in +`ops.rs:553` (called from `load_segment_messages` during disk poll). If a segment's on-disk +message headers have offsets that do not match the expected sequential order starting at +`start_offset`, this returns `InvalidOffset(actual_bad_offset)` -- which the TCP client +reconstructs as `InvalidOffset(0)`. HTTP could survive this differently depending on error +mapping, or the HTTP test hit a different partition/segment. + +### Current workaround + +PR #3525 uses `PollingStrategy::last()` (not `first()`) for the `InvalidOffset` recovery +fallback, because `last()` always succeeds (most recent message is always in an active +segment). The recovery poll commits the latest offset so subsequent polls continue normally. +Pre-arming: when a consumer group is newly created, `fallback_to_last` is set so the first +poll skips the guaranteed `InvalidOffset` that would occur if retention has run. + +### Remaining work + +- Reproduce deterministically (e.g. create topic, send 3 messages, delete first segment + manually, poll with `PollingStrategy::first()` over TCP vs HTTP). Compare. +- Check if `validate_checksums_and_offsets` is called when disk is healthy -- if yes, the + mismatch would mean a bug in how segment start offsets are written vs. the message header + offsets on disk. +- If root cause is confirmed as the `from_repr` u64 loss, add the actual offset to the TCP + error payload so diagnostics are not blind. +- Once fixed, PR #3525 can use `first()` instead of `last()` in the recovery path to avoid + skipping history. From 1ee880ddf8abc0465c7437bde3021bd6bf9e9f74 Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Tue, 23 Jun 2026 09:01:40 +0300 Subject: [PATCH 5/6] fix(connectors/otlp_sink): bypass proto decode for proto storage format prost 0.14.3 rejects metric names that contain non-UTF-8 bytes, causing every message in a proto-format batch to fail silently and QW to receive nothing. The source already stores one ExportMetricsServiceRequest per gRPC call as opaque bytes -- no decode/re-encode round-trip is needed. For StorageFormat::Proto the sink now forwards each iggy message as a raw gRPC unary call using a passthrough RawCodec, keeping the Channel alive alongside the typed client. The JSON path is unchanged. The same bypass is applied to the HTTP transport path for consistency. Co-Authored-By: Claude Sonnet 4.6 Claude-Session: https://claude.ai/code/session_01DEMekGHptLJiW4mwxeroc6 --- Cargo.lock | 21 ++- core/connectors/sinks/otlp_sink/Cargo.toml | 1 + core/connectors/sinks/otlp_sink/src/lib.rs | 199 ++++++++++++++++++++- 3 files changed, 215 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19cab838f5..71e6a52a9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6960,6 +6960,7 @@ dependencies = [ "async-trait", "bytes", "flate2", + "http 1.4.1", "iggy_connector_sdk", "opentelemetry-proto", "prost", @@ -6971,6 +6972,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_otlp_source" +version = "0.4.1-edge.1" +dependencies = [ + "async-trait", + "dashmap", + "iggy_connector_sdk", + "once_cell", + "opentelemetry-proto", + "prost", + "serde", + "serde_json", + "tokio", + "tonic", + "tracing", +] + [[package]] name = "iggy_connector_postgres_sink" version = "0.4.1-edge.1" @@ -7019,10 +7037,9 @@ name = "iggy_connector_quickwit_sink" version = "0.4.1-edge.1" dependencies = [ "async-trait", - "dashmap", "iggy_connector_sdk", - "once_cell", "reqwest 0.13.4", + "reqwest-middleware", "serde", "serde_yaml_ng", "simd-json", diff --git a/core/connectors/sinks/otlp_sink/Cargo.toml b/core/connectors/sinks/otlp_sink/Cargo.toml index 236d1ef3e0..5ce99ca49e 100644 --- a/core/connectors/sinks/otlp_sink/Cargo.toml +++ b/core/connectors/sinks/otlp_sink/Cargo.toml @@ -36,6 +36,7 @@ crate-type = ["cdylib", "lib"] async-trait = { workspace = true } bytes = { workspace = true } flate2 = { workspace = true } +http = { workspace = true } iggy_connector_sdk = { workspace = true } reqwest = { workspace = true } opentelemetry-proto = { workspace = true, features = [ diff --git a/core/connectors/sinks/otlp_sink/src/lib.rs b/core/connectors/sinks/otlp_sink/src/lib.rs index e9d90f72ee..c78636a409 100644 --- a/core/connectors/sinks/otlp_sink/src/lib.rs +++ b/core/connectors/sinks/otlp_sink/src/lib.rs @@ -16,7 +16,9 @@ // under the License. use async_trait::async_trait; +use bytes::{Buf, BufMut, Bytes}; use flate2::{Compression, write::GzEncoder}; +use http::uri::PathAndQuery; use iggy_connector_sdk::{ ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, owned_value_to_serde_json, sink_connector, @@ -36,6 +38,8 @@ use std::collections::HashMap; use std::io::Write as _; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; +use tonic::client::Grpc as TonicGrpc; +use tonic::codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}; use tonic::codec::CompressionEncoding; use tonic::metadata::{MetadataKey, MetadataValue}; use tonic::transport::Channel; @@ -141,6 +145,8 @@ pub struct OtlpSink { id: u32, config: OtlpSinkConfig, client: Option, + // Retained for proto-mode raw passthrough: one Channel shared with typed client. + raw_channel: Option, // Pre-parsed metadata entries derived from config.headers at open() time (gRPC only). metadata: Vec<( MetadataKey, @@ -155,6 +161,7 @@ impl OtlpSink { id, config, client: None, + raw_channel: None, metadata: Vec::new(), counters: Arc::new(Counters::default()), } @@ -205,6 +212,8 @@ impl Sink for OtlpSink { .await .map_err(|e| Error::InitError(format!("OTLP endpoint: {e}")))?; + self.raw_channel = Some(channel.clone()); + Client::Grpc(match self.config.signal { OtlpSignal::Traces => { let mut c = TraceServiceClient::new(channel); @@ -273,6 +282,7 @@ impl Sink for OtlpSink { async fn close(&mut self) -> Result<(), Error> { let _ = self.client.take(); + let _ = self.raw_channel.take(); info!( "Closed OTLP sink connector ID: {}. Counters: messages_sent={}, batches_sent={}, batches_failed={}", self.id, @@ -298,12 +308,20 @@ impl OtlpSink { match client { Client::Grpc(grpc) => { - self.export_grpc(grpc, messages_metadata, messages, total) - .await + if matches!(self.config.format, StorageFormat::Proto) { + self.export_grpc_raw_proto(messages, total).await + } else { + self.export_grpc(grpc, messages_metadata, messages, total) + .await + } } Client::Http(http) => { - self.export_http(http, messages_metadata, messages, total) - .await + if matches!(self.config.format, StorageFormat::Proto) { + self.export_http_raw_proto(http, messages, total).await + } else { + self.export_http(http, messages_metadata, messages, total) + .await + } } } } @@ -461,6 +479,137 @@ impl OtlpSink { ); Ok(total as u64) } + + async fn export_grpc_raw_proto( + &self, + messages: &[ConsumedMessage], + total: usize, + ) -> Result { + let channel = self + .raw_channel + .as_ref() + .ok_or_else(|| Error::InitError("raw gRPC channel not initialized".into()))?; + + let path = match &self.config.signal { + OtlpSignal::Traces => PathAndQuery::from_static( + "/opentelemetry.proto.collector.trace.v1.TraceService/Export", + ), + OtlpSignal::Metrics => PathAndQuery::from_static( + "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", + ), + OtlpSignal::Logs => PathAndQuery::from_static( + "/opentelemetry.proto.collector.logs.v1.LogsService/Export", + ), + }; + + let mut grpc_client = TonicGrpc::new(channel.clone()); + let mut sent = 0u64; + + for msg in messages { + let raw = match &msg.payload { + Payload::Raw(bytes) => bytes, + _ => { + warn!( + "OTLP sink connector ID: {} (proto mode): expected raw payload", + self.id + ); + continue; + } + }; + let mut request = tonic::Request::new(Bytes::copy_from_slice(raw)); + for (k, v) in &self.metadata { + request.metadata_mut().insert(k.clone(), v.clone()); + } + grpc_client + .unary(request, path.clone(), RawCodec) + .await + .map_err(|e| Error::CannotStoreData(format!("OTLP raw gRPC export: {e}")))?; + sent += 1; + } + + debug!( + "OTLP sink connector ID: {} raw proto gRPC exported {sent}/{total} messages", + self.id + ); + Ok(sent) + } + + async fn export_http_raw_proto( + &self, + client: &reqwest::Client, + messages: &[ConsumedMessage], + total: usize, + ) -> Result { + let url_path = match &self.config.signal { + OtlpSignal::Traces => "/v1/traces", + OtlpSignal::Metrics => "/v1/metrics", + OtlpSignal::Logs => "/v1/logs", + }; + let base = self.config.endpoint.trim_end_matches('/'); + let url = format!("{base}{url_path}"); + let mut sent = 0u64; + + for msg in messages { + let raw = match &msg.payload { + Payload::Raw(bytes) => bytes, + _ => { + warn!( + "OTLP sink connector ID: {} (proto mode): expected raw payload", + self.id + ); + continue; + } + }; + + let body = if self.config.compression { + let mut enc = GzEncoder::new(Vec::new(), Compression::default()); + enc.write_all(raw) + .map_err(|e| Error::HttpRequestFailed(format!("gzip encode: {e}")))?; + enc.finish() + .map_err(|e| Error::HttpRequestFailed(format!("gzip finish: {e}")))? + } else { + raw.to_vec() + }; + + let mut builder = client + .post(&url) + .header("Content-Type", "application/x-protobuf"); + if self.config.compression { + builder = builder.header("Content-Encoding", "gzip"); + } + for (k, v) in &self.config.headers { + builder = builder.header(k.as_str(), v.as_str()); + } + + let resp = builder + .body(body) + .send() + .await + .map_err(|e| Error::HttpRequestFailed(format!("OTLP HTTP {url_path}: {e}")))?; + + let status = resp.status(); + if !status.is_success() { + let body_text = resp.text().await.unwrap_or_default(); + if status.is_client_error() { + let id = self.id; + warn!( + "OTLP sink connector ID: {id} HTTP {url_path} returned {status}: {body_text}" + ); + } else { + return Err(Error::HttpRequestFailed(format!( + "OTLP HTTP {url_path} returned {status}: {body_text}" + ))); + } + } + sent += 1; + } + + debug!( + "OTLP sink connector ID: {} HTTP {url_path} raw proto exported {sent}/{total} messages", + self.id + ); + Ok(sent) + } } fn build_trace_request( @@ -572,6 +721,48 @@ fn collect_json_values( out } +// Passthrough codec: encodes raw Bytes as-is into the gRPC frame, ignores the +// response body (OTLP Export responses carry no payload, only trailers). +#[derive(Default)] +struct RawCodec; +struct RawEncoder; +struct RawDecoder; + +impl Encoder for RawEncoder { + type Item = Bytes; + type Error = tonic::Status; + + fn encode(&mut self, item: Bytes, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + dst.put(item); + Ok(()) + } +} + +impl Decoder for RawDecoder { + type Item = Bytes; + type Error = tonic::Status; + + fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result, Self::Error> { + let len = src.remaining(); + Ok(Some(src.copy_to_bytes(len))) + } +} + +impl Codec for RawCodec { + type Encode = Bytes; + type Decode = Bytes; + type Encoder = RawEncoder; + type Decoder = RawDecoder; + + fn encoder(&mut self) -> RawEncoder { + RawEncoder + } + + fn decoder(&mut self) -> RawDecoder { + RawDecoder + } +} + #[cfg(test)] mod tests { use super::*; From 0cf2b412d5d5f619687560871e78c35fb9baf284 Mon Sep 17 00:00:00 2001 From: "fatih.yuce" Date: Tue, 23 Jun 2026 09:18:30 +0300 Subject: [PATCH 6/6] fix(connectors/otlp_sink): call ready() before each raw gRPC unary call tower::Buffer panics with "send_item called without first calling poll_reserve" if the service's poll_ready is not driven before call(). The typed tonic clients (MetricsServiceClient etc.) call poll_ready internally; our TonicGrpc wrapper does not, so each iteration in export_grpc_raw_proto must call grpc_client.ready().await first. Co-Authored-By: Claude Sonnet 4.6 Claude-Session: https://claude.ai/code/session_01DEMekGHptLJiW4mwxeroc6 --- core/connectors/sinks/otlp_sink/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/connectors/sinks/otlp_sink/src/lib.rs b/core/connectors/sinks/otlp_sink/src/lib.rs index c78636a409..5ba8004719 100644 --- a/core/connectors/sinks/otlp_sink/src/lib.rs +++ b/core/connectors/sinks/otlp_sink/src/lib.rs @@ -516,6 +516,10 @@ impl OtlpSink { continue; } }; + // tower::Buffer requires poll_ready before every call. + grpc_client.ready().await.map_err(|e| { + Error::CannotStoreData(format!("OTLP gRPC channel not ready: {e}")) + })?; let mut request = tonic::Request::new(Bytes::copy_from_slice(raw)); for (k, v) in &self.metadata { request.metadata_mut().insert(k.clone(), v.clone());