-
Notifications
You must be signed in to change notification settings - Fork 348
feat(connectors/otlp_sink): add OTLP/gRPC sink connector #3529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
b5d2d71
0a612b2
f267525
435f0fe
1ee880d
0cf2b41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| # syntax=docker/dockerfile:1.7 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Apache license header. More broadly, this whole file looks out of scope for a sink PR: the comment on line 3 describes shipping the source plugin (copied from #3516?), the build step on line 25 compiles a crate that isn't here, and
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the late response -- I was running a local benchmark to make sure the setup is working end-to-end. Fixed in
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the late response -- I was running a local benchmark to make sure the setup is working end-to-end. Fixed in |
||
| # | ||
| # iggy-connectors runtime image — ships the iggy-connectors binary and the | ||
| # OTLP/gRPC sink, quickwit sink plugins (.so). Config files are injected | ||
| # via K8s ConfigMaps. | ||
| # | ||
| # Builder: rust:1-slim-bookworm (mold linker, BuildKit cache mounts) | ||
| # Runtime: debian:bookworm-slim + libssl3 | ||
|
|
||
| # ---- Build ---- | ||
| FROM rust:1-slim-bookworm AS build | ||
| WORKDIR /app | ||
|
|
||
| RUN apt-get update && apt-get install -y --no-install-recommends \ | ||
| pkg-config libssl-dev clang mold \ | ||
| && rm -rf /var/lib/apt/lists/* | ||
|
|
||
| COPY . . | ||
|
|
||
| ARG TARGETARCH | ||
| RUN --mount=type=cache,id=iggy-connectors-registry,sharing=locked,target=/usr/local/cargo/registry \ | ||
| --mount=type=cache,id=iggy-connectors-git,sharing=locked,target=/usr/local/cargo/git \ | ||
| --mount=type=cache,id=iggy-connectors-target-${TARGETARCH},target=/app/target \ | ||
| RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=mold" \ | ||
| cargo build --release --bin iggy-connectors \ | ||
| && cargo build --release -p iggy_connector_quickwit_sink \ | ||
| && cargo build --release -p iggy_connector_otlp_sink \ | ||
| && cp target/release/iggy-connectors /usr/local/bin/iggy-connectors \ | ||
| && cp target/release/libiggy_connector_quickwit_sink.so /usr/local/lib/libiggy_connector_quickwit_sink.so \ | ||
| && cp target/release/libiggy_connector_otlp_sink.so /usr/local/lib/libiggy_connector_otlp_sink.so \ | ||
| && strip /usr/local/bin/iggy-connectors | ||
|
|
||
| # ---- Runtime ---- | ||
| FROM debian:bookworm-slim | ||
| RUN apt-get update && apt-get install -y --no-install-recommends \ | ||
| ca-certificates libssl3 \ | ||
| && rm -rf /var/lib/apt/lists/* \ | ||
| && useradd --system --uid 10001 --home-dir /connectors \ | ||
| --shell /usr/sbin/nologin iggy \ | ||
| && mkdir -p /connectors/plugins /connectors/state \ | ||
| && chown -R iggy:iggy /connectors | ||
|
|
||
| COPY --from=build /usr/local/bin/iggy-connectors /usr/local/bin/iggy-connectors | ||
| COPY --from=build /usr/local/lib/libiggy_connector_quickwit_sink.so \ | ||
| /connectors/plugins/libiggy_connector_quickwit_sink.so | ||
| COPY --from=build /usr/local/lib/libiggy_connector_otlp_sink.so \ | ||
| /connectors/plugins/libiggy_connector_otlp_sink.so | ||
|
|
||
| USER iggy | ||
| WORKDIR /connectors | ||
|
|
||
| # Runtime config: IGGY_CONNECTORS_CONFIG_PATH → /connectors/runtime.toml (ConfigMap) | ||
| # Plugin configs: /connectors/plugins/otlp_sink.toml (ConfigMap) | ||
| ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EXPOSE 4317. Port 4317 is the OTLP/gRPC inbound receiver port. This container is an egress sink — it connects out to a backend, binds no ingress port. Misleads operators into opening incorrect firewall rules. Fix: remove EXPOSE 4317.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in |
||
| EXPOSE 8081 | ||
|
|
||
| ENTRYPOINT ["/usr/local/bin/iggy-connectors"] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| # To Be Decided | ||
|
|
||
| ## `PollingKind::First` TCP bug (open since 2026-06-22) | ||
|
|
||
| ### Observed behavior | ||
|
|
||
| `PollingStrategy::first()` over the TCP binary protocol returns `InvalidOffset(0)` even | ||
| when the topic has messages and offset 0 is valid. The same poll over HTTP works fine. | ||
|
|
||
| ### Investigation findings (2026-06-22, session 14) | ||
|
|
||
| **Both TCP and HTTP converge on the same server code path.** After topic/partition | ||
| resolution both routes arrive at `IggyShard::poll_messages()` → | ||
| `poll_messages_from_local_partition()` → `ops.rs::poll_messages()`. There is no | ||
| TCP-specific validation. The TCP handler (`binary/handlers/messages/poll_messages_handler.rs`) | ||
| does not call `validate_partition_offset`. | ||
|
|
||
| **`PollingKind::First` resolution in `ops.rs:85-89`:** | ||
| ```rust | ||
| PollingKind::First => partition.log.segments().first() | ||
| .map(|segment| segment.start_offset) | ||
| .unwrap_or(0), | ||
| ``` | ||
| Returns the first segment's start offset -- correct even after retention removes old segments. | ||
|
|
||
| **All `InvalidOffset` sources in the server:** | ||
|
|
||
| | File | Line | When | | ||
| |------|------|------| | ||
| | `server/src/shard/system/utils.rs` | 152 | `store_consumer_offset` only -- not reachable from poll | | ||
| | `server_common/src/messages_batch_mut.rs` | 455 | `validate_checksums_and_offsets` during disk load -- reachable from poll | | ||
| | `server_common/src/messages_batch_mut.rs` | 682 | send-path `validate()` -- not reachable from poll | | ||
|
|
||
| **TCP response deserialization loses the inner value:** | ||
| `TcpClient::handle_response` calls `IggyError::from_code(4100)` → `IggyError::from_repr(4100)`. | ||
| `FromRepr` on a `#[repr(u32)]` enum with a tuple variant initializes the inner `u64` to zero. | ||
| Any `InvalidOffset(N)` from the server arrives as `InvalidOffset(0)` at the SDK. | ||
|
|
||
| **Most likely server-side trigger:** `validate_checksums_and_offsets(start_offset)` in | ||
| `ops.rs:553` (called from `load_segment_messages` during disk poll). If a segment's on-disk | ||
| message headers have offsets that do not match the expected sequential order starting at | ||
| `start_offset`, this returns `InvalidOffset(actual_bad_offset)` -- which the TCP client | ||
| reconstructs as `InvalidOffset(0)`. HTTP could survive this differently depending on error | ||
| mapping, or the HTTP test hit a different partition/segment. | ||
|
|
||
| ### Current workaround | ||
|
|
||
| PR #3525 uses `PollingStrategy::last()` (not `first()`) for the `InvalidOffset` recovery | ||
| fallback, because `last()` always succeeds (most recent message is always in an active | ||
| segment). The recovery poll commits the latest offset so subsequent polls continue normally. | ||
| Pre-arming: when a consumer group is newly created, `fallback_to_last` is set so the first | ||
| poll skips the guaranteed `InvalidOffset` that would occur if retention has run. | ||
|
|
||
| ### Remaining work | ||
|
|
||
| - Reproduce deterministically (e.g. create topic, send 3 messages, delete first segment | ||
| manually, poll with `PollingStrategy::first()` over TCP vs HTTP). Compare. | ||
| - Check if `validate_checksums_and_offsets` is called when disk is healthy -- if yes, the | ||
| mismatch would mean a bug in how segment start offsets are written vs. the message header | ||
| offsets on disk. | ||
| - If root cause is confirmed as the `from_repr` u64 loss, add the actual offset to the TCP | ||
| error payload so diagnostics are not blind. | ||
| - Once fixed, PR #3525 can use `first()` instead of `last()` in the recovery path to avoid | ||
| skipping history. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| [package] | ||
| name = "iggy_connector_otlp_sink" | ||
| version = "0.4.1-edge.1" | ||
| description = "Iggy OTLP/gRPC sink connector — forwards logs, metrics, and traces from Iggy streams to any OpenTelemetry-compatible backend via the OTLP protocol" | ||
| edition = "2024" | ||
| license = "Apache-2.0" | ||
| keywords = ["iggy", "messaging", "streaming", "opentelemetry", "otlp"] | ||
| categories = ["command-line-utilities", "network-programming"] | ||
| homepage = "https://iggy.apache.org" | ||
| documentation = "https://iggy.apache.org/docs" | ||
| repository = "https://github.com/apache/iggy" | ||
| readme = "../../README.md" | ||
| publish = false | ||
|
|
||
| [lib] | ||
| crate-type = ["cdylib", "lib"] | ||
|
|
||
| [dependencies] | ||
| async-trait = { workspace = true } | ||
| bytes = { workspace = true } | ||
| flate2 = { workspace = true } | ||
| http = { workspace = true } | ||
| iggy_connector_sdk = { workspace = true } | ||
| reqwest = { workspace = true } | ||
| opentelemetry-proto = { workspace = true, features = [ | ||
| "gen-tonic", | ||
| "logs", | ||
| "metrics", | ||
| "trace", | ||
| ] } | ||
| prost = { workspace = true } | ||
| serde = { workspace = true } | ||
| serde_json = { workspace = true } | ||
| tokio = { workspace = true } | ||
| tonic = { workspace = true, features = ["transport", "gzip"] } | ||
| tracing = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cargo sortwill fail here.otlp_sinkis inserted afterpostgres_sink, butosorts beforep, so it must sort betweenmongodb_sinkandpostgres_sink.cargo sort --no-formatis a fail-fast gating job in CI.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in
9ba479047:otlp_sinkmoved betweenmongodb_sinkandpostgres_sinkin the workspace members list (osorts beforep).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in
9ba479047:otlp_sinknow appears betweenmongodb_sinkandpostgres_sinkin the workspace members list, matching alphabetical order.cargo sort --no-format --workspacepasses clean.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in
9ba479047:otlp_sinkhoisted to the alphabetically correct position (beforepostgres_sink, aftermongodb_sink). Also removed the duplicate entry that had slipped into the workspace members list.