diff --git a/AGENTS.md b/AGENTS.md index 8631c013f8..0b0abd7506 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -224,3 +224,21 @@ Created by server, connectors runtime, or tests. Gitignored. Safe to delete to r - Design: - Chat: - Docs: + +## Infra quick-ref (yucemonitoring vcluster) + +- iggy HTTP: `http://10.20.4.31:31920`. Auth: `POST /users/login {"username":"iggy","password":"iggy"}`. +- Node SSH: `maya@`, password `test123`. kernel 6.8.0-124. kubeconfig: `~/.kube/yucemonitoring.config`, ns `yucemonitoring`, `--insecure-skip-tls-verify`. +- Jenkins: `maya-anomaly-platform`, `type=iggy`. Trivy CVE scan always fails (false positive) -- check `BUILT_IMAGE=` in console log; if present, deploy anyway. +- QW REST: port-forward `svc/maya-quickwit-s3 7280:7280`. QW **0.8.2** -- only `unix_timestamp` (seconds) as datetime input_format; nanos/millis require 0.9. +- iggy->QW sink: `iggy-{metrics,flows,logs,apigw}` indices (mode:dynamic, no retention). Consumer groups: `qw_sink_{metrics,flows,logs,apigw}`. If connector loops on `InvalidOffset`, delete the CG via iggy HTTP API and restart. +- Segment cleaner: `IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true` -- already enabled in production. +- Connectors in image: `otlp_source` (JSON/proto modes), `otlp_sink` (PR #3529), `quickwit_sink`. + +## READY FOR HANDOVER (2026-06-22, session 15) + +**Current state:** Five PRs open on `apache/iggy` (all S-waiting-on-review): #3516 (OTLP source -- awaiting re-review from ryerraguntla), #3517 (COOP_TASKRUN), #3523 (quickwit_sink), #3525 (InvalidOffset SDK -- 3 inline threads answered, awaiting atharvalade re-review), #3529 (otlp_sink -- 22 inline threads answered, 9ba479047 committed, /ready posted). Working tree clean. + +**Next steps:** (1) Wait for reviews on all 5 PRs; (2) Investigate `PollingKind::First` TCP bug deeper (see TOBEDECIDED.md -- root cause narrowed to `validate_checksums_and_offsets` or segment offset mismatch; reproduce deterministically). + +**Open decisions:** See TOBEDECIDED.md -- TCP first() bug (investigation findings recorded, not yet root-caused). diff --git a/Cargo.lock b/Cargo.lock index 12382acfca..71e6a52a9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6953,6 +6953,42 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_otlp_sink" +version = "0.4.1-edge.1" +dependencies = [ + "async-trait", + "bytes", + "flate2", + "http 1.4.1", + "iggy_connector_sdk", + "opentelemetry-proto", + "prost", + "reqwest 0.13.4", + "serde", + "serde_json", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "iggy_connector_otlp_source" +version = "0.4.1-edge.1" +dependencies = [ + "async-trait", + "dashmap", + "iggy_connector_sdk", + "once_cell", + "opentelemetry-proto", + "prost", + "serde", + "serde_json", + "tokio", + "tonic", + "tracing", +] + [[package]] name = "iggy_connector_postgres_sink" version = "0.4.1-edge.1" @@ -7001,10 +7037,9 @@ name = "iggy_connector_quickwit_sink" version = "0.4.1-edge.1" dependencies = [ "async-trait", - "dashmap", "iggy_connector_sdk", - "once_cell", "reqwest 0.13.4", + "reqwest-middleware", "serde", "serde_yaml_ng", "simd-json", @@ -10713,9 +10748,9 @@ dependencies = [ [[package]] name = "rmcp" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0810a9f717d9828f475fe1f629f4c305c8464b7f496c3a854b58d29e65f4058e" +checksum = "1d1f571c72940a19d9532fe52dbea8bc9912bf1d766c2970bb824056b86f3f59" dependencies = [ "async-trait", "base64", @@ -10745,9 +10780,9 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aefac48c364756e97f04c0401ba3231e8607882c7c1d92da0437dc16307904d" +checksum = "1aad0035b69380782d78ea95b508327e6deaa2235909053e596eea8f27b5e1d5" dependencies = [ "darling 0.23.0", "proc-macro2", @@ -13059,6 +13094,7 @@ dependencies = [ "axum", "base64", "bytes", + "flate2", "h2 0.4.15", "http 1.4.2", "http-body 1.0.1", diff --git a/Cargo.toml b/Cargo.toml index 4c6777fdea..d3bb0f93a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/influxdb_sink", "core/connectors/sinks/mongodb_sink", + "core/connectors/sinks/otlp_sink", "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", "core/connectors/sinks/stdout_sink", @@ -158,6 +159,7 @@ figlet-rs = "1.0.0" figment = { version = "0.10.19", features = ["toml", "env"] } file-operation = "0.8.25" flatbuffers = "25.12.19" +flate2 = "1.1" flume = "0.12.0" fs2 = "0.4.3" futures = "0.3.32" @@ -222,6 +224,7 @@ opentelemetry-otlp = { version = "0.32.0", features = [ "http-proto", "reqwest-client", ] } +opentelemetry-proto = { version = "0.32.0", default-features = false } opentelemetry-semantic-conventions = "0.32.0" opentelemetry_sdk = { version = "0.32.1", features = [ "logs", @@ -308,6 +311,7 @@ tokio-rustls = "0.26.4" tokio-tungstenite = { version = "0.29", features = ["rustls-tls-webpki-roots"] } tokio-util = { version = "0.7.18", features = ["compat"] } toml = "1.1.2" +tonic = { version = "0.14.6", default-features = false } tower-http = { version = "0.7.0", features = ["add-extension", "cors", "trace"] } tracing = "0.1.44" tracing-appender = "0.2.5" diff --git a/Dockerfile.connectors b/Dockerfile.connectors new file mode 100644 index 0000000000..1caca59414 --- /dev/null +++ b/Dockerfile.connectors @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# syntax=docker/dockerfile:1.7 +# +# iggy-connectors runtime image — ships the iggy-connectors binary and the +# OTLP/gRPC sink, quickwit sink plugins (.so). Config files are injected +# via K8s ConfigMaps. +# +# Builder: rust:1-slim-bookworm (mold linker, BuildKit cache mounts) +# Runtime: debian:bookworm-slim + libssl3 + +# ---- Build ---- +FROM rust:1-slim-bookworm AS build +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + pkg-config libssl-dev clang mold \ + && rm -rf /var/lib/apt/lists/* + +COPY . . + +ARG TARGETARCH +RUN --mount=type=cache,id=iggy-connectors-registry,sharing=locked,target=/usr/local/cargo/registry \ + --mount=type=cache,id=iggy-connectors-git,sharing=locked,target=/usr/local/cargo/git \ + --mount=type=cache,id=iggy-connectors-target-${TARGETARCH},target=/app/target \ + RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=mold" \ + cargo build --release --bin iggy-connectors \ + && cargo build --release -p iggy_connector_quickwit_sink \ + && cargo build --release -p iggy_connector_otlp_sink \ + && cp target/release/iggy-connectors /usr/local/bin/iggy-connectors \ + && cp target/release/libiggy_connector_quickwit_sink.so /usr/local/lib/libiggy_connector_quickwit_sink.so \ + && cp target/release/libiggy_connector_otlp_sink.so /usr/local/lib/libiggy_connector_otlp_sink.so \ + && strip /usr/local/bin/iggy-connectors + +# ---- Runtime ---- +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates libssl3 \ + && rm -rf /var/lib/apt/lists/* \ + && useradd --system --uid 10001 --home-dir /connectors \ + --shell /usr/sbin/nologin iggy \ + && mkdir -p /connectors/plugins /connectors/state \ + && chown -R iggy:iggy /connectors + +COPY --from=build /usr/local/bin/iggy-connectors /usr/local/bin/iggy-connectors +COPY --from=build /usr/local/lib/libiggy_connector_quickwit_sink.so \ + /connectors/plugins/libiggy_connector_quickwit_sink.so +COPY --from=build /usr/local/lib/libiggy_connector_otlp_sink.so \ + /connectors/plugins/libiggy_connector_otlp_sink.so + +USER iggy +WORKDIR /connectors + +# Runtime config: IGGY_CONNECTORS_CONFIG_PATH → /connectors/runtime.toml (ConfigMap) +# Plugin configs: /connectors/plugins/otlp_sink.toml (ConfigMap) +ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml + +EXPOSE 8081 + +ENTRYPOINT ["/usr/local/bin/iggy-connectors"] diff --git a/TOBEDECIDED.md b/TOBEDECIDED.md new file mode 100644 index 0000000000..a64aeb90c8 --- /dev/null +++ b/TOBEDECIDED.md @@ -0,0 +1,64 @@ +# To Be Decided + +## `PollingKind::First` TCP bug (open since 2026-06-22) + +### Observed behavior + +`PollingStrategy::first()` over the TCP binary protocol returns `InvalidOffset(0)` even +when the topic has messages and offset 0 is valid. The same poll over HTTP works fine. + +### Investigation findings (2026-06-22, session 14) + +**Both TCP and HTTP converge on the same server code path.** After topic/partition +resolution both routes arrive at `IggyShard::poll_messages()` → +`poll_messages_from_local_partition()` → `ops.rs::poll_messages()`. There is no +TCP-specific validation. The TCP handler (`binary/handlers/messages/poll_messages_handler.rs`) +does not call `validate_partition_offset`. + +**`PollingKind::First` resolution in `ops.rs:85-89`:** +```rust +PollingKind::First => partition.log.segments().first() + .map(|segment| segment.start_offset) + .unwrap_or(0), +``` +Returns the first segment's start offset -- correct even after retention removes old segments. + +**All `InvalidOffset` sources in the server:** + +| File | Line | When | +|------|------|------| +| `server/src/shard/system/utils.rs` | 152 | `store_consumer_offset` only -- not reachable from poll | +| `server_common/src/messages_batch_mut.rs` | 455 | `validate_checksums_and_offsets` during disk load -- reachable from poll | +| `server_common/src/messages_batch_mut.rs` | 682 | send-path `validate()` -- not reachable from poll | + +**TCP response deserialization loses the inner value:** +`TcpClient::handle_response` calls `IggyError::from_code(4100)` → `IggyError::from_repr(4100)`. +`FromRepr` on a `#[repr(u32)]` enum with a tuple variant initializes the inner `u64` to zero. +Any `InvalidOffset(N)` from the server arrives as `InvalidOffset(0)` at the SDK. + +**Most likely server-side trigger:** `validate_checksums_and_offsets(start_offset)` in +`ops.rs:553` (called from `load_segment_messages` during disk poll). If a segment's on-disk +message headers have offsets that do not match the expected sequential order starting at +`start_offset`, this returns `InvalidOffset(actual_bad_offset)` -- which the TCP client +reconstructs as `InvalidOffset(0)`. HTTP could survive this differently depending on error +mapping, or the HTTP test hit a different partition/segment. + +### Current workaround + +PR #3525 uses `PollingStrategy::last()` (not `first()`) for the `InvalidOffset` recovery +fallback, because `last()` always succeeds (most recent message is always in an active +segment). The recovery poll commits the latest offset so subsequent polls continue normally. +Pre-arming: when a consumer group is newly created, `fallback_to_last` is set so the first +poll skips the guaranteed `InvalidOffset` that would occur if retention has run. + +### Remaining work + +- Reproduce deterministically (e.g. create topic, send 3 messages, delete first segment + manually, poll with `PollingStrategy::first()` over TCP vs HTTP). Compare. +- Check if `validate_checksums_and_offsets` is called when disk is healthy -- if yes, the + mismatch would mean a bug in how segment start offsets are written vs. the message header + offsets on disk. +- If root cause is confirmed as the `from_repr` u64 loss, add the actual offset to the TCP + error payload so diagnostics are not blind. +- Once fixed, PR #3525 can use `first()` instead of `last()` in the recovery path to avoid + skipping history. diff --git a/core/connectors/sinks/otlp_sink/Cargo.toml b/core/connectors/sinks/otlp_sink/Cargo.toml new file mode 100644 index 0000000000..5ce99ca49e --- /dev/null +++ b/core/connectors/sinks/otlp_sink/Cargo.toml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_otlp_sink" +version = "0.4.1-edge.1" +description = "Iggy OTLP/gRPC sink connector — forwards logs, metrics, and traces from Iggy streams to any OpenTelemetry-compatible backend via the OTLP protocol" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "opentelemetry", "otlp"] +categories = ["command-line-utilities", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +bytes = { workspace = true } +flate2 = { workspace = true } +http = { workspace = true } +iggy_connector_sdk = { workspace = true } +reqwest = { workspace = true } +opentelemetry-proto = { workspace = true, features = [ + "gen-tonic", + "logs", + "metrics", + "trace", +] } +prost = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true, features = ["transport", "gzip"] } +tracing = { workspace = true } diff --git a/core/connectors/sinks/otlp_sink/src/from_json.rs b/core/connectors/sinks/otlp_sink/src/from_json.rs new file mode 100644 index 0000000000..4bd77582c3 --- /dev/null +++ b/core/connectors/sinks/otlp_sink/src/from_json.rs @@ -0,0 +1,463 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reconstruct OTLP proto requests from the JSON produced by `otlp_source`'s +//! `convert.rs`. The JSON field names match what `convert.rs` emits; adding a +//! new field there requires a corresponding parse here. + +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::{ + AnyValue, ArrayValue, KeyValue, KeyValueList, any_value, +}; +use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; +use opentelemetry_proto::tonic::metrics::v1::{ + AggregationTemporality, Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, + metric, number_data_point, +}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status, span}; +use serde_json::Value; +use std::collections::HashMap; + +// --------------------------------------------------------------------------- +// Public entry points +// --------------------------------------------------------------------------- + +pub fn traces_from_json(messages: &[Value]) -> ExportTraceServiceRequest { + // Group spans by service_name so each service gets one ResourceSpans. + let mut groups: HashMap)> = HashMap::new(); + + for msg in messages { + let service = msg + .get("service_name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + let resource = msg.get("resource").cloned().unwrap_or(Value::Null); + let span = json_to_span(msg); + groups + .entry(service) + .or_insert_with(|| (resource, Vec::new())) + .1 + .push(span); + } + + ExportTraceServiceRequest { + resource_spans: groups + .into_values() + .map(|(resource, spans)| ResourceSpans { + resource: Some(json_to_resource(&resource)), + scope_spans: vec![ScopeSpans { + scope: None, + spans, + schema_url: String::new(), + }], + schema_url: String::new(), + }) + .collect(), + } +} + +pub fn metrics_from_json(messages: &[Value]) -> ExportMetricsServiceRequest { + let mut groups: HashMap)> = HashMap::new(); + + for msg in messages { + let service = msg + .get("service_name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + let resource = msg.get("resource").cloned().unwrap_or(Value::Null); + if let Some(metric) = json_to_metric(msg) { + groups + .entry(service) + .or_insert_with(|| (resource, Vec::new())) + .1 + .push(metric); + } + } + + ExportMetricsServiceRequest { + resource_metrics: groups + .into_values() + .map(|(resource, metrics)| ResourceMetrics { + resource: Some(json_to_resource(&resource)), + scope_metrics: vec![ScopeMetrics { + scope: None, + metrics, + schema_url: String::new(), + }], + schema_url: String::new(), + }) + .collect(), + } +} + +pub fn logs_from_json(messages: &[Value]) -> ExportLogsServiceRequest { + let mut groups: HashMap)> = HashMap::new(); + + for msg in messages { + let service = msg + .get("service_name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + let resource = msg.get("resource").cloned().unwrap_or(Value::Null); + let record = json_to_log_record(msg); + groups + .entry(service) + .or_insert_with(|| (resource, Vec::new())) + .1 + .push(record); + } + + ExportLogsServiceRequest { + resource_logs: groups + .into_values() + .map(|(resource, log_records)| ResourceLogs { + resource: Some(json_to_resource(&resource)), + scope_logs: vec![ScopeLogs { + scope: None, + log_records, + schema_url: String::new(), + }], + schema_url: String::new(), + }) + .collect(), + } +} + +// --------------------------------------------------------------------------- +// Per-signal converters +// --------------------------------------------------------------------------- + +fn json_to_span(v: &Value) -> Span { + Span { + trace_id: hex_to_bytes(v.get("trace_id").and_then(Value::as_str).unwrap_or("")), + span_id: hex_to_bytes(v.get("span_id").and_then(Value::as_str).unwrap_or("")), + parent_span_id: hex_to_bytes( + v.get("parent_span_id") + .and_then(Value::as_str) + .unwrap_or(""), + ), + name: v + .get("name") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(), + kind: span_kind_from_text(v.get("kind").and_then(Value::as_str).unwrap_or("")), + start_time_unix_nano: v.get("start_time_ns").and_then(Value::as_u64).unwrap_or(0), + end_time_unix_nano: v.get("end_time_ns").and_then(Value::as_u64).unwrap_or(0), + attributes: json_obj_to_kv(v.get("attributes")), + dropped_attributes_count: 0, + events: vec![], + dropped_events_count: 0, + links: vec![], + dropped_links_count: 0, + status: Some(Status { + code: status_code_from_text(v.get("status").and_then(Value::as_str).unwrap_or("unset")), + message: v + .get("status_message") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(), + }), + flags: 0, + trace_state: String::new(), + } +} + +fn json_to_metric(v: &Value) -> Option { + let name = v.get("name")?.as_str()?.to_owned(); + let unit = v + .get("unit") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(); + let time_ns = v.get("timestamp_ns").and_then(Value::as_u64).unwrap_or(0); + let attrs = json_obj_to_kv(v.get("attributes")); + let metric_type = v.get("type").and_then(Value::as_str).unwrap_or("gauge"); + + let value_field = v.get("value"); + let as_double = value_field.and_then(Value::as_f64).unwrap_or(0.0); + + let data = match metric_type { + "sum" => { + let dp = NumberDataPoint { + attributes: attrs, + time_unix_nano: time_ns, + value: Some(number_data_point::Value::AsDouble(as_double)), + ..Default::default() + }; + Some(metric::Data::Sum(Sum { + data_points: vec![dp], + aggregation_temporality: AggregationTemporality::Cumulative as i32, + is_monotonic: v + .get("is_monotonic") + .and_then(Value::as_bool) + .unwrap_or(true), + })) + } + _ => { + // Default to gauge for gauge, histogram, summary etc. + // Histogram detail is lost in convert.rs (only count+sum stored); + // reconstruct as gauge using count as the value. + let dp_value = if metric_type == "histogram" || metric_type == "summary" { + value_field + .and_then(|v| v.get("count")) + .and_then(Value::as_f64) + .unwrap_or(0.0) + } else { + as_double + }; + Some(metric::Data::Gauge(Gauge { + data_points: vec![NumberDataPoint { + attributes: attrs, + time_unix_nano: time_ns, + value: Some(number_data_point::Value::AsDouble(dp_value)), + ..Default::default() + }], + })) + } + }; + + Some(Metric { + name, + description: String::new(), + unit, + data, + metadata: vec![], + }) +} + +fn json_to_log_record(v: &Value) -> LogRecord { + LogRecord { + time_unix_nano: v.get("timestamp_ns").and_then(Value::as_u64).unwrap_or(0), + observed_time_unix_nano: v + .get("observed_timestamp_ns") + .and_then(Value::as_u64) + .unwrap_or(0), + severity_number: severity_from_text( + v.get("severity").and_then(Value::as_str).unwrap_or(""), + ), + severity_text: v + .get("severity_text") + .and_then(Value::as_str) + .unwrap_or("") + .to_owned(), + body: v.get("body").map(json_to_any_value), + attributes: json_obj_to_kv(v.get("attributes")), + trace_id: hex_to_bytes(v.get("trace_id").and_then(Value::as_str).unwrap_or("")), + span_id: hex_to_bytes(v.get("span_id").and_then(Value::as_str).unwrap_or("")), + dropped_attributes_count: 0, + flags: 0, + ..Default::default() + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn json_to_resource(v: &Value) -> Resource { + Resource { + attributes: json_obj_to_kv(Some(v)), + dropped_attributes_count: 0, + ..Default::default() + } +} + +fn json_obj_to_kv(v: Option<&Value>) -> Vec { + let obj = match v.and_then(Value::as_object) { + Some(o) => o, + None => return vec![], + }; + obj.iter() + .map(|(k, val)| KeyValue { + key: k.clone(), + value: Some(json_to_any_value(val)), + ..Default::default() + }) + .collect() +} + +fn json_to_any_value(v: &Value) -> AnyValue { + let inner = match v { + Value::String(s) => any_value::Value::StringValue(s.clone()), + Value::Bool(b) => any_value::Value::BoolValue(*b), + Value::Number(n) => { + if let Some(i) = n.as_i64() { + any_value::Value::IntValue(i) + } else { + any_value::Value::DoubleValue(n.as_f64().unwrap_or(0.0)) + } + } + Value::Array(arr) => any_value::Value::ArrayValue(ArrayValue { + values: arr.iter().map(json_to_any_value).collect(), + }), + Value::Object(obj) => any_value::Value::KvlistValue(KeyValueList { + values: obj + .iter() + .map(|(k, val)| KeyValue { + key: k.clone(), + value: Some(json_to_any_value(val)), + ..Default::default() + }) + .collect(), + }), + Value::Null => any_value::Value::StringValue(String::new()), + }; + AnyValue { value: Some(inner) } +} + +fn hex_to_bytes(s: &str) -> Vec { + if !s.len().is_multiple_of(2) { + return vec![]; + } + (0..s.len()) + .step_by(2) + .filter_map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok()) + .collect() +} + +fn span_kind_from_text(s: &str) -> i32 { + match s { + "internal" => span::SpanKind::Internal as i32, + "server" => span::SpanKind::Server as i32, + "client" => span::SpanKind::Client as i32, + "producer" => span::SpanKind::Producer as i32, + "consumer" => span::SpanKind::Consumer as i32, + _ => span::SpanKind::Unspecified as i32, + } +} + +fn status_code_from_text(s: &str) -> i32 { + match s { + "ok" => 1, + "error" => 2, + _ => 0, // unset + } +} + +fn severity_from_text(s: &str) -> i32 { + match s.to_ascii_uppercase().as_str() { + "TRACE" => 1, + "DEBUG" => 5, + "INFO" => 9, + "WARN" | "WARNING" => 13, + "ERROR" => 17, + "FATAL" => 21, + _ => 0, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn given_empty_messages_should_return_empty_request() { + let req = traces_from_json(&[]); + assert!(req.resource_spans.is_empty()); + } + + #[test] + fn given_trace_json_should_reconstruct_span() { + let msg = json!({ + "signal": "trace", + "trace_id": "0102030405060708090a0b0c0d0e0f10", + "span_id": "0102030405060708", + "name": "test-span", + "kind": "server", + "start_time_ns": 1_000_000_u64, + "end_time_ns": 2_000_000_u64, + "status": "ok", + "service_name": "svc-a", + "resource": { "service.name": "svc-a" }, + "attributes": { "http.method": "GET" } + }); + let req = traces_from_json(&[msg]); + assert_eq!(req.resource_spans.len(), 1); + let spans = &req.resource_spans[0].scope_spans[0].spans; + assert_eq!(spans.len(), 1); + assert_eq!(spans[0].name, "test-span"); + assert_eq!(spans[0].kind, span::SpanKind::Server as i32); + assert_eq!(spans[0].start_time_unix_nano, 1_000_000); + assert_eq!(spans[0].status.as_ref().unwrap().code, 1); // ok + } + + #[test] + fn given_hex_trace_id_should_decode_correctly() { + let bytes = hex_to_bytes("deadbeef"); + assert_eq!(bytes, vec![0xde, 0xad, 0xbe, 0xef]); + } + + #[test] + fn given_empty_hex_should_return_empty_bytes() { + assert!(hex_to_bytes("").is_empty()); + } + + #[test] + fn given_odd_length_hex_should_return_empty_bytes() { + assert!(hex_to_bytes("abc").is_empty()); + } + + #[test] + fn given_log_json_should_reconstruct_log_record() { + let msg = json!({ + "signal": "log", + "timestamp_ns": 999_u64, + "severity": "WARN", + "severity_text": "WARNING", + "body": "something happened", + "service_name": "svc-b", + "resource": {}, + "attributes": {} + }); + let req = logs_from_json(&[msg]); + assert_eq!(req.resource_logs.len(), 1); + let records = &req.resource_logs[0].scope_logs[0].log_records; + assert_eq!(records[0].severity_number, 13); // WARN + } + + #[test] + fn given_lowercase_severity_should_map_correctly() { + assert_eq!(severity_from_text("warn"), 13); + assert_eq!(severity_from_text("info"), 9); + assert_eq!(severity_from_text("error"), 17); + assert_eq!(severity_from_text("debug"), 5); + } + + #[test] + fn given_string_log_body_should_not_double_encode() { + let msg = json!({ + "body": "hello world", + "service_name": "svc", + "resource": {} + }); + let req = logs_from_json(&[msg]); + let record = &req.resource_logs[0].scope_logs[0].log_records[0]; + let body = record.body.as_ref().unwrap(); + assert_eq!( + body.value, + Some(any_value::Value::StringValue("hello world".to_owned())) + ); + } +} diff --git a/core/connectors/sinks/otlp_sink/src/lib.rs b/core/connectors/sinks/otlp_sink/src/lib.rs new file mode 100644 index 0000000000..5ba8004719 --- /dev/null +++ b/core/connectors/sinks/otlp_sink/src/lib.rs @@ -0,0 +1,812 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bytes::{Buf, BufMut, Bytes}; +use flate2::{Compression, write::GzEncoder}; +use http::uri::PathAndQuery; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, + owned_value_to_serde_json, sink_connector, +}; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceRequest, logs_service_client::LogsServiceClient, +}; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsServiceRequest, metrics_service_client::MetricsServiceClient, +}; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, trace_service_client::TraceServiceClient, +}; +use prost::Message; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::io::Write as _; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tonic::client::Grpc as TonicGrpc; +use tonic::codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}; +use tonic::codec::CompressionEncoding; +use tonic::metadata::{MetadataKey, MetadataValue}; +use tonic::transport::Channel; +use tracing::{debug, info, warn}; + +mod from_json; + +sink_connector!(OtlpSink); + +/// Which OTLP signal this sink handles. Must match the Iggy topic content. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum OtlpSignal { + Traces, + Metrics, + Logs, +} + +/// How messages are stored in the Iggy topic. +/// +/// `Json` (default): messages are JSON produced by the otlp_source JSON path. +/// The sink reconstructs OTLP proto from the JSON before forwarding. +/// +/// `Proto`: messages are raw prost-encoded OTLP proto bytes (one +/// `Export*ServiceRequest` per message). The sink forwards them directly, +/// with zero deserialization overhead. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum StorageFormat { + #[default] + Json, + Proto, +} + +/// Wire transport used to forward OTLP data to the downstream receiver. +/// +/// `Grpc` (default): forwards via OTLP/gRPC (`opentelemetry.proto.collector.*` +/// service RPCs). Supports per-message gzip compression and custom gRPC metadata. +/// +/// `Http`: forwards via OTLP/HTTP (`POST /v1/{traces,metrics,logs}` with +/// `Content-Type: application/x-protobuf`). Useful when the downstream endpoint +/// does not expose a gRPC port. Custom headers and gzip body encoding are +/// supported through the same `headers` and `compression` config fields. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Transport { + #[default] + Grpc, + Http, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OtlpSinkConfig { + /// gRPC or HTTP endpoint, e.g. "http://quickwit:7281" + pub endpoint: String, + pub signal: OtlpSignal, + #[serde(default)] + pub transport: Transport, + #[serde(default)] + pub format: StorageFormat, + #[serde(default)] + pub compression: bool, + /// Extra headers sent with every export request. + /// For gRPC these become metadata entries; for HTTP they become request headers. + /// QW uses these to route to a specific index: + /// qw-otel-traces-index = "flows3" + /// qw-otel-logs-index = "otel-logs-v0_7" + #[serde(default)] + pub headers: HashMap, +} + +enum GrpcClient { + Traces(TraceServiceClient), + Metrics(MetricsServiceClient), + Logs(LogsServiceClient), +} + +impl std::fmt::Debug for GrpcClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GrpcClient::Traces(_) => write!(f, "TraceServiceClient"), + GrpcClient::Metrics(_) => write!(f, "MetricsServiceClient"), + GrpcClient::Logs(_) => write!(f, "LogsServiceClient"), + } + } +} + +enum Client { + Grpc(GrpcClient), + Http(reqwest::Client), +} + +/// Cumulative export counters, logged at close() and periodically at debug level. +#[derive(Debug, Default)] +struct Counters { + messages_sent: AtomicU64, + batches_sent: AtomicU64, + batches_failed: AtomicU64, +} + +#[derive(Debug)] +pub struct OtlpSink { + id: u32, + config: OtlpSinkConfig, + client: Option, + // Retained for proto-mode raw passthrough: one Channel shared with typed client. + raw_channel: Option, + // Pre-parsed metadata entries derived from config.headers at open() time (gRPC only). + metadata: Vec<( + MetadataKey, + MetadataValue, + )>, + counters: Arc, +} + +impl OtlpSink { + pub fn new(id: u32, config: OtlpSinkConfig) -> Self { + Self { + id, + config, + client: None, + raw_channel: None, + metadata: Vec::new(), + counters: Arc::new(Counters::default()), + } + } + + fn with_grpc_headers(&self, msg: T) -> tonic::Request { + let mut req = tonic::Request::new(msg); + for (k, v) in &self.metadata { + req.metadata_mut().insert(k.clone(), v.clone()); + } + req + } +} + +impl std::fmt::Debug for Client { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Client::Grpc(GrpcClient::Traces(_)) => write!(f, "GrpcTraceServiceClient"), + Client::Grpc(GrpcClient::Metrics(_)) => write!(f, "GrpcMetricsServiceClient"), + Client::Grpc(GrpcClient::Logs(_)) => write!(f, "GrpcLogsServiceClient"), + Client::Http(_) => write!(f, "HttpClient"), + } + } +} + +#[async_trait] +impl Sink for OtlpSink { + async fn open(&mut self) -> Result<(), Error> { + self.client = Some(match self.config.transport { + Transport::Http => Client::Http( + reqwest::Client::builder() + .build() + .map_err(|e| Error::InitError(format!("HTTP client: {e}")))?, + ), + Transport::Grpc => { + for (k, v) in &self.config.headers { + let key = MetadataKey::from_bytes(k.as_bytes()) + .map_err(|e| Error::InvalidConfigValue(format!("header key '{k}': {e}")))?; + let val = MetadataValue::try_from(v.as_str()).map_err(|e| { + Error::InvalidConfigValue(format!("header value for '{k}': {e}")) + })?; + self.metadata.push((key, val)); + } + + let channel = Channel::from_shared(self.config.endpoint.clone()) + .map_err(|e| Error::InvalidConfigValue(format!("endpoint: {e}")))? + .connect() + .await + .map_err(|e| Error::InitError(format!("OTLP endpoint: {e}")))?; + + self.raw_channel = Some(channel.clone()); + + Client::Grpc(match self.config.signal { + OtlpSignal::Traces => { + let mut c = TraceServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + GrpcClient::Traces(c) + } + OtlpSignal::Metrics => { + let mut c = MetricsServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + GrpcClient::Metrics(c) + } + OtlpSignal::Logs => { + let mut c = LogsServiceClient::new(channel); + if self.config.compression { + c = c + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + } + GrpcClient::Logs(c) + } + }) + } + }); + + info!( + "Opened OTLP sink connector ID: {}, signal: {:?}, transport: {:?}, endpoint: {}", + self.id, self.config.signal, self.config.transport, self.config.endpoint + ); + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let total = messages.len(); + debug!( + "OTLP sink connector ID: {} received {total} messages, schema: {}", + self.id, messages_metadata.schema + ); + + match self.export(&messages_metadata, &messages, total).await { + Ok(exported) => { + self.counters + .messages_sent + .fetch_add(exported, Ordering::Relaxed); + self.counters.batches_sent.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + Err(e) => { + self.counters.batches_failed.fetch_add(1, Ordering::Relaxed); + Err(e) + } + } + } + + async fn close(&mut self) -> Result<(), Error> { + let _ = self.client.take(); + let _ = self.raw_channel.take(); + info!( + "Closed OTLP sink connector ID: {}. Counters: messages_sent={}, batches_sent={}, batches_failed={}", + self.id, + self.counters.messages_sent.load(Ordering::Relaxed), + self.counters.batches_sent.load(Ordering::Relaxed), + self.counters.batches_failed.load(Ordering::Relaxed), + ); + Ok(()) + } +} + +impl OtlpSink { + async fn export( + &self, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + total: usize, + ) -> Result { + let client = self + .client + .as_ref() + .ok_or_else(|| Error::InitError("OTLP sink client not initialized".into()))?; + + match client { + Client::Grpc(grpc) => { + if matches!(self.config.format, StorageFormat::Proto) { + self.export_grpc_raw_proto(messages, total).await + } else { + self.export_grpc(grpc, messages_metadata, messages, total) + .await + } + } + Client::Http(http) => { + if matches!(self.config.format, StorageFormat::Proto) { + self.export_http_raw_proto(http, messages, total).await + } else { + self.export_http(http, messages_metadata, messages, total) + .await + } + } + } + } + + async fn export_grpc( + &self, + client: &GrpcClient, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + total: usize, + ) -> Result { + match client { + GrpcClient::Traces(c) => { + let req = build_trace_request(&self.config.format, messages_metadata, messages)?; + if req.resource_spans.is_empty() { + return Ok(0); + } + let span_count: usize = req + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .map(|ss| ss.spans.len()) + .sum(); + c.clone() + .export(self.with_grpc_headers(req)) + .await + .map_err(|e| Error::CannotStoreData(format!("OTLP traces export: {e}")))?; + debug!( + "OTLP sink connector ID: {} exported {span_count} spans", + self.id + ); + Ok(total as u64) + } + GrpcClient::Metrics(c) => { + let req = build_metrics_request(&self.config.format, messages_metadata, messages)?; + if req.resource_metrics.is_empty() { + return Ok(0); + } + c.clone() + .export(self.with_grpc_headers(req)) + .await + .map_err(|e| Error::CannotStoreData(format!("OTLP metrics export: {e}")))?; + debug!( + "OTLP sink connector ID: {} exported metrics batch ({total} messages)", + self.id + ); + Ok(total as u64) + } + GrpcClient::Logs(c) => { + let req = build_logs_request(&self.config.format, messages_metadata, messages)?; + if req.resource_logs.is_empty() { + return Ok(0); + } + c.clone() + .export(self.with_grpc_headers(req)) + .await + .map_err(|e| Error::CannotStoreData(format!("OTLP logs export: {e}")))?; + debug!( + "OTLP sink connector ID: {} exported logs batch ({total} messages)", + self.id + ); + Ok(total as u64) + } + } + } + + async fn export_http( + &self, + client: &reqwest::Client, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + total: usize, + ) -> Result { + let (url_path, proto_bytes) = match &self.config.signal { + OtlpSignal::Traces => { + let req = build_trace_request(&self.config.format, messages_metadata, messages)?; + if req.resource_spans.is_empty() { + return Ok(0); + } + let mut buf = Vec::new(); + req.encode(&mut buf) + .map_err(|e| Error::WriteFailure(format!("proto encode traces: {e}")))?; + ("/v1/traces", buf) + } + OtlpSignal::Metrics => { + let req = build_metrics_request(&self.config.format, messages_metadata, messages)?; + if req.resource_metrics.is_empty() { + return Ok(0); + } + let mut buf = Vec::new(); + req.encode(&mut buf) + .map_err(|e| Error::WriteFailure(format!("proto encode metrics: {e}")))?; + ("/v1/metrics", buf) + } + OtlpSignal::Logs => { + let req = build_logs_request(&self.config.format, messages_metadata, messages)?; + if req.resource_logs.is_empty() { + return Ok(0); + } + let mut buf = Vec::new(); + req.encode(&mut buf) + .map_err(|e| Error::WriteFailure(format!("proto encode logs: {e}")))?; + ("/v1/logs", buf) + } + }; + + let base = self.config.endpoint.trim_end_matches('/'); + let url = format!("{base}{url_path}"); + + let mut builder = client + .post(&url) + .header("Content-Type", "application/x-protobuf"); + + let body = if self.config.compression { + let mut enc = GzEncoder::new(Vec::new(), Compression::default()); + enc.write_all(&proto_bytes) + .map_err(|e| Error::HttpRequestFailed(format!("gzip encode: {e}")))?; + let compressed = enc + .finish() + .map_err(|e| Error::HttpRequestFailed(format!("gzip finish: {e}")))?; + builder = builder.header("Content-Encoding", "gzip"); + compressed + } else { + proto_bytes + }; + + for (k, v) in &self.config.headers { + builder = builder.header(k.as_str(), v.as_str()); + } + + let resp = builder + .body(body) + .send() + .await + .map_err(|e| Error::HttpRequestFailed(format!("OTLP HTTP {url_path}: {e}")))?; + + let status = resp.status(); + if !status.is_success() { + let body_text = resp.text().await.unwrap_or_default(); + if status.is_client_error() { + let id = self.id; + warn!( + "OTLP sink connector ID: {id} HTTP {url_path} returned {status}: {body_text}" + ); + } else { + return Err(Error::HttpRequestFailed(format!( + "OTLP HTTP {url_path} returned {status}: {body_text}" + ))); + } + } + + debug!( + "OTLP sink connector ID: {} HTTP {url_path} exported successfully", + self.id + ); + Ok(total as u64) + } + + async fn export_grpc_raw_proto( + &self, + messages: &[ConsumedMessage], + total: usize, + ) -> Result { + let channel = self + .raw_channel + .as_ref() + .ok_or_else(|| Error::InitError("raw gRPC channel not initialized".into()))?; + + let path = match &self.config.signal { + OtlpSignal::Traces => PathAndQuery::from_static( + "/opentelemetry.proto.collector.trace.v1.TraceService/Export", + ), + OtlpSignal::Metrics => PathAndQuery::from_static( + "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", + ), + OtlpSignal::Logs => PathAndQuery::from_static( + "/opentelemetry.proto.collector.logs.v1.LogsService/Export", + ), + }; + + let mut grpc_client = TonicGrpc::new(channel.clone()); + let mut sent = 0u64; + + for msg in messages { + let raw = match &msg.payload { + Payload::Raw(bytes) => bytes, + _ => { + warn!( + "OTLP sink connector ID: {} (proto mode): expected raw payload", + self.id + ); + continue; + } + }; + // tower::Buffer requires poll_ready before every call. + grpc_client.ready().await.map_err(|e| { + Error::CannotStoreData(format!("OTLP gRPC channel not ready: {e}")) + })?; + let mut request = tonic::Request::new(Bytes::copy_from_slice(raw)); + for (k, v) in &self.metadata { + request.metadata_mut().insert(k.clone(), v.clone()); + } + grpc_client + .unary(request, path.clone(), RawCodec) + .await + .map_err(|e| Error::CannotStoreData(format!("OTLP raw gRPC export: {e}")))?; + sent += 1; + } + + debug!( + "OTLP sink connector ID: {} raw proto gRPC exported {sent}/{total} messages", + self.id + ); + Ok(sent) + } + + async fn export_http_raw_proto( + &self, + client: &reqwest::Client, + messages: &[ConsumedMessage], + total: usize, + ) -> Result { + let url_path = match &self.config.signal { + OtlpSignal::Traces => "/v1/traces", + OtlpSignal::Metrics => "/v1/metrics", + OtlpSignal::Logs => "/v1/logs", + }; + let base = self.config.endpoint.trim_end_matches('/'); + let url = format!("{base}{url_path}"); + let mut sent = 0u64; + + for msg in messages { + let raw = match &msg.payload { + Payload::Raw(bytes) => bytes, + _ => { + warn!( + "OTLP sink connector ID: {} (proto mode): expected raw payload", + self.id + ); + continue; + } + }; + + let body = if self.config.compression { + let mut enc = GzEncoder::new(Vec::new(), Compression::default()); + enc.write_all(raw) + .map_err(|e| Error::HttpRequestFailed(format!("gzip encode: {e}")))?; + enc.finish() + .map_err(|e| Error::HttpRequestFailed(format!("gzip finish: {e}")))? + } else { + raw.to_vec() + }; + + let mut builder = client + .post(&url) + .header("Content-Type", "application/x-protobuf"); + if self.config.compression { + builder = builder.header("Content-Encoding", "gzip"); + } + for (k, v) in &self.config.headers { + builder = builder.header(k.as_str(), v.as_str()); + } + + let resp = builder + .body(body) + .send() + .await + .map_err(|e| Error::HttpRequestFailed(format!("OTLP HTTP {url_path}: {e}")))?; + + let status = resp.status(); + if !status.is_success() { + let body_text = resp.text().await.unwrap_or_default(); + if status.is_client_error() { + let id = self.id; + warn!( + "OTLP sink connector ID: {id} HTTP {url_path} returned {status}: {body_text}" + ); + } else { + return Err(Error::HttpRequestFailed(format!( + "OTLP HTTP {url_path} returned {status}: {body_text}" + ))); + } + } + sent += 1; + } + + debug!( + "OTLP sink connector ID: {} HTTP {url_path} raw proto exported {sent}/{total} messages", + self.id + ); + Ok(sent) + } +} + +fn build_trace_request( + format: &StorageFormat, + meta: &MessagesMetadata, + messages: &[ConsumedMessage], +) -> Result { + match format { + StorageFormat::Proto => { + let bytes = collect_raw_bytes(meta, messages); + if bytes.is_empty() { + return Ok(ExportTraceServiceRequest::default()); + } + let mut merged = ExportTraceServiceRequest::default(); + for b in bytes { + match ExportTraceServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { + Ok(r) => merged.resource_spans.extend(r.resource_spans), + Err(e) => warn!("Failed to decode OTLP trace proto: {e}"), + } + } + Ok(merged) + } + StorageFormat::Json => { + let jsons = collect_json_values(meta, messages); + Ok(from_json::traces_from_json(&jsons)) + } + } +} + +fn build_metrics_request( + format: &StorageFormat, + meta: &MessagesMetadata, + messages: &[ConsumedMessage], +) -> Result { + match format { + StorageFormat::Proto => { + let bytes = collect_raw_bytes(meta, messages); + let mut merged = ExportMetricsServiceRequest::default(); + for b in bytes { + match ExportMetricsServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { + Ok(r) => merged.resource_metrics.extend(r.resource_metrics), + Err(e) => warn!("Failed to decode OTLP metrics proto: {e}"), + } + } + Ok(merged) + } + StorageFormat::Json => { + let jsons = collect_json_values(meta, messages); + Ok(from_json::metrics_from_json(&jsons)) + } + } +} + +fn build_logs_request( + format: &StorageFormat, + meta: &MessagesMetadata, + messages: &[ConsumedMessage], +) -> Result { + match format { + StorageFormat::Proto => { + let bytes = collect_raw_bytes(meta, messages); + let mut merged = ExportLogsServiceRequest::default(); + for b in bytes { + match ExportLogsServiceRequest::decode(bytes::Bytes::copy_from_slice(b)) { + Ok(r) => merged.resource_logs.extend(r.resource_logs), + Err(e) => warn!("Failed to decode OTLP logs proto: {e}"), + } + } + Ok(merged) + } + StorageFormat::Json => { + let jsons = collect_json_values(meta, messages); + Ok(from_json::logs_from_json(&jsons)) + } + } +} + +fn collect_raw_bytes<'a>( + meta: &MessagesMetadata, + messages: &'a [ConsumedMessage], +) -> Vec<&'a [u8]> { + let mut out = Vec::with_capacity(messages.len()); + for msg in messages { + match &msg.payload { + Payload::Raw(b) => out.push(b.as_slice()), + _ => warn!( + "OTLP sink (proto mode): expected raw payload, got schema: {}", + meta.schema + ), + } + } + out +} + +fn collect_json_values( + meta: &MessagesMetadata, + messages: &[ConsumedMessage], +) -> Vec { + let mut out = Vec::with_capacity(messages.len()); + for msg in messages { + match &msg.payload { + Payload::Json(v) => out.push(owned_value_to_serde_json(v)), + _ => warn!( + "OTLP sink (json mode): expected JSON payload, got schema: {}", + meta.schema + ), + } + } + out +} + +// Passthrough codec: encodes raw Bytes as-is into the gRPC frame, ignores the +// response body (OTLP Export responses carry no payload, only trailers). +#[derive(Default)] +struct RawCodec; +struct RawEncoder; +struct RawDecoder; + +impl Encoder for RawEncoder { + type Item = Bytes; + type Error = tonic::Status; + + fn encode(&mut self, item: Bytes, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + dst.put(item); + Ok(()) + } +} + +impl Decoder for RawDecoder { + type Item = Bytes; + type Error = tonic::Status; + + fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result, Self::Error> { + let len = src.remaining(); + Ok(Some(src.copy_to_bytes(len))) + } +} + +impl Codec for RawCodec { + type Encode = Bytes; + type Decode = Bytes; + type Encoder = RawEncoder; + type Decoder = RawDecoder; + + fn encoder(&mut self) -> RawEncoder { + RawEncoder + } + + fn decoder(&mut self) -> RawDecoder { + RawDecoder + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config(signal: OtlpSignal) -> OtlpSinkConfig { + OtlpSinkConfig { + endpoint: "http://localhost:7281".to_string(), + signal, + transport: Transport::Grpc, + format: StorageFormat::Json, + compression: false, + headers: HashMap::new(), + } + } + + #[test] + fn given_new_sink_client_should_not_be_initialized() { + let sink = OtlpSink::new(1, test_config(OtlpSignal::Traces)); + assert!(sink.client.is_none()); + } + + #[test] + fn given_traces_signal_config_should_store_correctly() { + let sink = OtlpSink::new(1, test_config(OtlpSignal::Traces)); + assert!(matches!(sink.config.signal, OtlpSignal::Traces)); + } + + #[test] + fn given_http_transport_config_should_store_correctly() { + let mut config = test_config(OtlpSignal::Metrics); + config.transport = Transport::Http; + let sink = OtlpSink::new(2, config); + assert!(matches!(sink.config.transport, Transport::Http)); + } + + #[test] + fn given_new_sink_counters_should_be_zero() { + let sink = OtlpSink::new(1, test_config(OtlpSignal::Logs)); + assert_eq!(sink.counters.messages_sent.load(Ordering::Relaxed), 0); + assert_eq!(sink.counters.batches_sent.load(Ordering::Relaxed), 0); + assert_eq!(sink.counters.batches_failed.load(Ordering::Relaxed), 0); + } +}