Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,21 @@ Created by server, connectors runtime, or tests. Gitignored. Safe to delete to r
- Design: <https://github.com/apache/iggy/discussions>
- Chat: <https://discord.gg/apache-iggy>
- Docs: <https://iggy.apache.org/docs/>

## Infra quick-ref (yucemonitoring vcluster)

- iggy HTTP: `http://10.20.4.31:31920`. Auth: `POST /users/login {"username":"iggy","password":"iggy"}`.
- Node SSH: `maya@<ip>`, password `test123`. kernel 6.8.0-124. kubeconfig: `~/.kube/yucemonitoring.config`, ns `yucemonitoring`, `--insecure-skip-tls-verify`.
- Jenkins: `maya-anomaly-platform`, `type=iggy`. Trivy CVE scan always fails (false positive) -- check `BUILT_IMAGE=` in console log; if present, deploy anyway.
- QW REST: port-forward `svc/maya-quickwit-s3 7280:7280`. QW **0.8.2** -- only `unix_timestamp` (seconds) as datetime input_format; nanos/millis require 0.9.
- iggy->QW sink: `iggy-{metrics,flows,logs,apigw}` indices (mode:dynamic, no retention). Consumer groups: `qw_sink_{metrics,flows,logs,apigw}`. If connector loops on `InvalidOffset`, delete the CG via iggy HTTP API and restart.
- Segment cleaner: `IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true` -- already enabled in production.
- Connectors in image: `otlp_source` (JSON/proto modes), `otlp_sink` (PR #3529), `quickwit_sink`.

## READY FOR HANDOVER (2026-06-22, session 15)

**Current state:** Five PRs open on `apache/iggy` (all S-waiting-on-review): #3516 (OTLP source -- awaiting re-review from ryerraguntla), #3517 (COOP_TASKRUN), #3523 (quickwit_sink), #3525 (InvalidOffset SDK -- 3 inline threads answered, awaiting atharvalade re-review), #3529 (otlp_sink -- 22 inline threads answered, 9ba479047 committed, /ready posted). Working tree clean.

**Next steps:** (1) Wait for reviews on all 5 PRs; (2) Investigate `PollingKind::First` TCP bug deeper (see TOBEDECIDED.md -- root cause narrowed to `validate_checksums_and_offsets` or segment offset mismatch; reproduce deterministically).

**Open decisions:** See TOBEDECIDED.md -- TCP first() bug (investigation findings recorded, not yet root-caused).
48 changes: 42 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ members = [
"core/connectors/sinks/iceberg_sink",
"core/connectors/sinks/influxdb_sink",
"core/connectors/sinks/mongodb_sink",
"core/connectors/sinks/otlp_sink",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cargo sort will fail here. otlp_sink is inserted after postgres_sink, but o sorts before p, so it must sort between mongodb_sink and postgres_sink. cargo sort --no-format is a fail-fast gating job in CI.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9ba479047: otlp_sink moved between mongodb_sink and postgres_sink in the workspace members list (o sorts before p).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9ba479047: otlp_sink now appears between mongodb_sink and postgres_sink in the workspace members list, matching alphabetical order. cargo sort --no-format --workspace passes clean.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9ba479047: otlp_sink hoisted to the alphabetically correct position (before postgres_sink, after mongodb_sink). Also removed the duplicate entry that had slipped into the workspace members list.

"core/connectors/sinks/postgres_sink",
"core/connectors/sinks/quickwit_sink",
"core/connectors/sinks/stdout_sink",
Expand Down Expand Up @@ -158,6 +159,7 @@ figlet-rs = "1.0.0"
figment = { version = "0.10.19", features = ["toml", "env"] }
file-operation = "0.8.25"
flatbuffers = "25.12.19"
flate2 = "1.1"
flume = "0.12.0"
fs2 = "0.4.3"
futures = "0.3.32"
Expand Down Expand Up @@ -222,6 +224,7 @@ opentelemetry-otlp = { version = "0.32.0", features = [
"http-proto",
"reqwest-client",
] }
opentelemetry-proto = { version = "0.32.0", default-features = false }
opentelemetry-semantic-conventions = "0.32.0"
opentelemetry_sdk = { version = "0.32.1", features = [
"logs",
Expand Down Expand Up @@ -308,6 +311,7 @@ tokio-rustls = "0.26.4"
tokio-tungstenite = { version = "0.29", features = ["rustls-tls-webpki-roots"] }
tokio-util = { version = "0.7.18", features = ["compat"] }
toml = "1.1.2"
tonic = { version = "0.14.6", default-features = false }
tower-http = { version = "0.7.0", features = ["add-extension", "cors", "trace"] }
tracing = "0.1.44"
tracing-appender = "0.2.5"
Expand Down
75 changes: 75 additions & 0 deletions Dockerfile.connectors
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# syntax=docker/dockerfile:1.7

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Apache license header. scripts/ci/license-headers.sh runs addlicense -check across the repo (root-level Dockerfiles aren't in IGNORE_PATTERNS, and the existing root Dockerfile carries the ASF block), so this file will fail the license CI job. A # syntax= directive plus a descriptive comment doesn't satisfy addlicense.

More broadly, this whole file looks out of scope for a sink PR: the comment on line 3 describes shipping the source plugin (copied from #3516?), the build step on line 25 compiles a crate that isn't here, and EXPOSE 4317 on line 59 is a receiver port. The canonical connectors image is core/connectors/runtime/Dockerfile, and no publish workflow references a root Dockerfile.connectors. Consider dropping it from this PR.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9ba479047: Apache 2.0 license header added to Dockerfile.connectors.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response -- I was running a local benchmark to make sure the setup is working end-to-end.

Fixed in 9ba479047: Apache 2.0 license header added to Dockerfile.connectors.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response -- I was running a local benchmark to make sure the setup is working end-to-end.

Fixed in 9ba479047: Apache 2.0 license header added to Dockerfile.connectors.

#
# iggy-connectors runtime image — ships the iggy-connectors binary and the
# OTLP/gRPC sink, quickwit sink plugins (.so). Config files are injected
# via K8s ConfigMaps.
#
# Builder: rust:1-slim-bookworm (mold linker, BuildKit cache mounts)
# Runtime: debian:bookworm-slim + libssl3

# ---- Build ----
FROM rust:1-slim-bookworm AS build
WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
pkg-config libssl-dev clang mold \
&& rm -rf /var/lib/apt/lists/*

COPY . .

ARG TARGETARCH
RUN --mount=type=cache,id=iggy-connectors-registry,sharing=locked,target=/usr/local/cargo/registry \
--mount=type=cache,id=iggy-connectors-git,sharing=locked,target=/usr/local/cargo/git \
--mount=type=cache,id=iggy-connectors-target-${TARGETARCH},target=/app/target \
RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=mold" \
cargo build --release --bin iggy-connectors \
&& cargo build --release -p iggy_connector_quickwit_sink \
&& cargo build --release -p iggy_connector_otlp_sink \
&& cp target/release/iggy-connectors /usr/local/bin/iggy-connectors \
&& cp target/release/libiggy_connector_quickwit_sink.so /usr/local/lib/libiggy_connector_quickwit_sink.so \
&& cp target/release/libiggy_connector_otlp_sink.so /usr/local/lib/libiggy_connector_otlp_sink.so \
&& strip /usr/local/bin/iggy-connectors

# ---- Runtime ----
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates libssl3 \
&& rm -rf /var/lib/apt/lists/* \
&& useradd --system --uid 10001 --home-dir /connectors \
--shell /usr/sbin/nologin iggy \
&& mkdir -p /connectors/plugins /connectors/state \
&& chown -R iggy:iggy /connectors

COPY --from=build /usr/local/bin/iggy-connectors /usr/local/bin/iggy-connectors
COPY --from=build /usr/local/lib/libiggy_connector_quickwit_sink.so \
/connectors/plugins/libiggy_connector_quickwit_sink.so
COPY --from=build /usr/local/lib/libiggy_connector_otlp_sink.so \
/connectors/plugins/libiggy_connector_otlp_sink.so

USER iggy
WORKDIR /connectors

# Runtime config: IGGY_CONNECTORS_CONFIG_PATH → /connectors/runtime.toml (ConfigMap)
# Plugin configs: /connectors/plugins/otlp_sink.toml (ConfigMap)
ENV IGGY_CONNECTORS_CONFIG_PATH=/connectors/runtime.toml

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EXPOSE 4317. Port 4317 is the OTLP/gRPC inbound receiver port. This container is an egress sink — it connects out to a backend, binds no ingress port. Misleads operators into opening incorrect firewall rules. Fix: remove EXPOSE 4317.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9ba479047: EXPOSE 4317 removed from Dockerfile.connectors; EXPOSE 8081 kept (runtime metrics endpoint).

EXPOSE 8081

ENTRYPOINT ["/usr/local/bin/iggy-connectors"]
64 changes: 64 additions & 0 deletions TOBEDECIDED.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# To Be Decided

## `PollingKind::First` TCP bug (open since 2026-06-22)

### Observed behavior

`PollingStrategy::first()` over the TCP binary protocol returns `InvalidOffset(0)` even
when the topic has messages and offset 0 is valid. The same poll over HTTP works fine.

### Investigation findings (2026-06-22, session 14)

**Both TCP and HTTP converge on the same server code path.** After topic/partition
resolution both routes arrive at `IggyShard::poll_messages()` →
`poll_messages_from_local_partition()` → `ops.rs::poll_messages()`. There is no
TCP-specific validation. The TCP handler (`binary/handlers/messages/poll_messages_handler.rs`)
does not call `validate_partition_offset`.

**`PollingKind::First` resolution in `ops.rs:85-89`:**
```rust
PollingKind::First => partition.log.segments().first()
.map(|segment| segment.start_offset)
.unwrap_or(0),
```
Returns the first segment's start offset -- correct even after retention removes old segments.

**All `InvalidOffset` sources in the server:**

| File | Line | When |
|------|------|------|
| `server/src/shard/system/utils.rs` | 152 | `store_consumer_offset` only -- not reachable from poll |
| `server_common/src/messages_batch_mut.rs` | 455 | `validate_checksums_and_offsets` during disk load -- reachable from poll |
| `server_common/src/messages_batch_mut.rs` | 682 | send-path `validate()` -- not reachable from poll |

**TCP response deserialization loses the inner value:**
`TcpClient::handle_response` calls `IggyError::from_code(4100)` → `IggyError::from_repr(4100)`.
`FromRepr` on a `#[repr(u32)]` enum with a tuple variant initializes the inner `u64` to zero.
Any `InvalidOffset(N)` from the server arrives as `InvalidOffset(0)` at the SDK.

**Most likely server-side trigger:** `validate_checksums_and_offsets(start_offset)` in
`ops.rs:553` (called from `load_segment_messages` during disk poll). If a segment's on-disk
message headers have offsets that do not match the expected sequential order starting at
`start_offset`, this returns `InvalidOffset(actual_bad_offset)` -- which the TCP client
reconstructs as `InvalidOffset(0)`. HTTP could survive this differently depending on error
mapping, or the HTTP test hit a different partition/segment.

### Current workaround

PR #3525 uses `PollingStrategy::last()` (not `first()`) for the `InvalidOffset` recovery
fallback, because `last()` always succeeds (most recent message is always in an active
segment). The recovery poll commits the latest offset so subsequent polls continue normally.
Pre-arming: when a consumer group is newly created, `fallback_to_last` is set so the first
poll skips the guaranteed `InvalidOffset` that would occur if retention has run.

### Remaining work

- Reproduce deterministically (e.g. create topic, send 3 messages, delete first segment
manually, poll with `PollingStrategy::first()` over TCP vs HTTP). Compare.
- Check if `validate_checksums_and_offsets` is called when disk is healthy -- if yes, the
mismatch would mean a bug in how segment start offsets are written vs. the message header
offsets on disk.
- If root cause is confirmed as the `from_repr` u64 loss, add the actual offset to the TCP
error payload so diagnostics are not blind.
- Once fixed, PR #3525 can use `first()` instead of `last()` in the recovery path to avoid
skipping history.
53 changes: 53 additions & 0 deletions core/connectors/sinks/otlp_sink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iggy_connector_otlp_sink"
version = "0.4.1-edge.1"
description = "Iggy OTLP/gRPC sink connector — forwards logs, metrics, and traces from Iggy streams to any OpenTelemetry-compatible backend via the OTLP protocol"
edition = "2024"
license = "Apache-2.0"
keywords = ["iggy", "messaging", "streaming", "opentelemetry", "otlp"]
categories = ["command-line-utilities", "network-programming"]
homepage = "https://iggy.apache.org"
documentation = "https://iggy.apache.org/docs"
repository = "https://github.com/apache/iggy"
readme = "../../README.md"
publish = false

[lib]
crate-type = ["cdylib", "lib"]

[dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
flate2 = { workspace = true }
http = { workspace = true }
iggy_connector_sdk = { workspace = true }
reqwest = { workspace = true }
opentelemetry-proto = { workspace = true, features = [
"gen-tonic",
"logs",
"metrics",
"trace",
] }
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, features = ["transport", "gzip"] }
tracing = { workspace = true }
Loading