Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions experiments/arrow-payload/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# CMake
build/
cmake-build-*/
CMakeCache.txt
CMakeFiles/
cmake_install.cmake
Makefile

# Compiled binaries
*.o
*.out
*.exe
arrow_consumer

# Python
__pycache__/
*.pyc
.venv/
venv/

# IDE
.idea/
.vscode/
*.swp
*.swo
141 changes: 141 additions & 0 deletions experiments/arrow-payload/Dockerfile.consumer
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# syntax=docker/dockerfile:1.7
#
# Arrow-IPC-on-Kafka consumer image.
#
# Build context: experiments/arrow-payload/ (see docker-compose.yaml)
#
# Pinned versions:
# - Base: debian:bookworm-slim (Debian 12)
# - librdkafka: 1.9.2-1+deb12u1 (from Debian 12 main; satisfies our use — we only need
# the high-level consumer API, which has been stable since 1.x)
# - Apache Arrow: 18.x (installed via the official apache-arrow-apt-source package;
# Debian default repos do not ship libarrow)
# - DuckDB: v1.4.0 (matches parent Dockerfile: `duckdb-cli>=1.4,<1.5`)
#
# Why Arrow 18 and not 19: 18.x is the latest release with a stable `libarrow1800` runtime
# package name in the Arrow APT repo at the time of writing. Bump to 19 once we've verified
# the runtime package naming on apt.apache.org/arrow.

# -----------------------------------------------------------------------------
# Stage 1: builder
# -----------------------------------------------------------------------------
FROM debian:bookworm-slim AS builder

# TARGETARCH is automatically populated by buildx (amd64 / arm64). We use it
# to pick the matching DuckDB precompiled release. Apple Silicon hosts MUST
# build arm64 here — building amd64 under Rosetta would invalidate any
# throughput numbers this PoC produces.
ARG TARGETARCH
ARG ARROW_VERSION=18.1.0
ARG ARROW_SO=1801
ARG DUCKDB_VERSION=1.4.0

ENV DEBIAN_FRONTEND=noninteractive

# Base build deps.
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
cmake \
pkg-config \
git \
ca-certificates \
wget \
unzip \
lsb-release \
gnupg \
librdkafka-dev \
&& rm -rf /var/lib/apt/lists/*

# Apache Arrow C++ via the official Arrow APT repo.
# See https://arrow.apache.org/install/ — the `apache-arrow-apt-source` .deb drops
# a signed sources.list entry into /etc/apt/sources.list.d/.
RUN wget -qO /tmp/apache-arrow-apt-source.deb \
"https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb" \
&& apt-get install -y --no-install-recommends /tmp/apache-arrow-apt-source.deb \
&& rm /tmp/apache-arrow-apt-source.deb \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
"libarrow-dev=${ARROW_VERSION}-1" \
&& rm -rf /var/lib/apt/lists/*

# DuckDB C/C++ headers + shared library (precompiled upstream release).
# Pick the right zip per host arch — DuckDB ships amd64 and aarch64 builds.
RUN case "${TARGETARCH}" in \
amd64) DUCKDB_ARCH=linux-amd64 ;; \
arm64) DUCKDB_ARCH=linux-arm64 ;; \
*) echo "unsupported TARGETARCH=${TARGETARCH}" >&2; exit 1 ;; \
esac \
&& wget -qO /tmp/libduckdb.zip \
"https://github.com/duckdb/duckdb/releases/download/v${DUCKDB_VERSION}/libduckdb-${DUCKDB_ARCH}.zip" \
&& unzip -o /tmp/libduckdb.zip -d /tmp/libduckdb \
&& install -m 0644 /tmp/libduckdb/duckdb.h /usr/local/include/duckdb.h \
&& install -m 0644 /tmp/libduckdb/duckdb.hpp /usr/local/include/duckdb.hpp \
&& install -m 0755 /tmp/libduckdb/libduckdb.so /usr/local/lib/libduckdb.so \
&& ldconfig \
&& rm -rf /tmp/libduckdb /tmp/libduckdb.zip

# Pre-install DuckDB extensions so the consumer's runtime LOAD has no network
# dependency. Mirrors parent project's Dockerfile pattern. We grab the matching
# DuckDB CLI release, run INSTALL into /root/.duckdb, and stage the extensions
# directory for the runtime stage to copy.
RUN case "${TARGETARCH}" in \
amd64) DUCKDB_ARCH=linux-amd64 ;; \
arm64) DUCKDB_ARCH=linux-arm64 ;; \
esac \
&& wget -qO /tmp/duckdb_cli.zip \
"https://github.com/duckdb/duckdb/releases/download/v${DUCKDB_VERSION}/duckdb_cli-${DUCKDB_ARCH}.zip" \
&& unzip -o /tmp/duckdb_cli.zip -d /tmp/duckdb_cli \
&& install -m 0755 /tmp/duckdb_cli/duckdb /usr/local/bin/duckdb \
&& /usr/local/bin/duckdb -c "INSTALL httpfs; INSTALL ducklake; INSTALL postgres;" \
&& rm -rf /tmp/duckdb_cli /tmp/duckdb_cli.zip

# Build the consumer.
# NOTE: build context is experiments/arrow-payload/, so this COPY grabs the consumer/ subdir.
WORKDIR /build
COPY consumer/ /build/
RUN cmake -B build -S . -DCMAKE_BUILD_TYPE=Release \
&& cmake --build build -j"$(nproc)"

# -----------------------------------------------------------------------------
# Stage 2: runtime
# -----------------------------------------------------------------------------
FROM debian:bookworm-slim AS runtime

ARG ARROW_VERSION=18.1.0
ARG ARROW_SO=1801

ENV DEBIAN_FRONTEND=noninteractive

# Install Arrow runtime via the same APT repo as the builder, then the small
# set of runtime libraries the consumer binary links against.
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
wget \
lsb-release \
gnupg \
&& wget -qO /tmp/apache-arrow-apt-source.deb \
"https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb" \
&& apt-get install -y --no-install-recommends /tmp/apache-arrow-apt-source.deb \
&& rm /tmp/apache-arrow-apt-source.deb \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
librdkafka1 \
"libarrow${ARROW_SO}=${ARROW_VERSION}-1" \
&& apt-get purge -y --auto-remove wget lsb-release gnupg \
&& rm -rf /var/lib/apt/lists/*

# DuckDB shared library (headers not needed at runtime).
COPY --from=builder /usr/local/lib/libduckdb.so /usr/local/lib/libduckdb.so
RUN ldconfig

# Consumer binary.
COPY --from=builder /build/build/arrow-consumer /usr/local/bin/arrow-consumer

# DuckDB extensions baked in at build time. The consumer process runs as root
# in this image (PoC; matches simplicity, not parent-stack non-root norms),
# so extensions land in /root/.duckdb where the consumer's LOAD statements
# expect them. The consumer issues `LOAD httpfs/ducklake/postgres` at startup
# and these pre-installed extensions are picked up without any network call.
COPY --from=builder /root/.duckdb /root/.duckdb

ENTRYPOINT ["/usr/local/bin/arrow-consumer"]
36 changes: 36 additions & 0 deletions experiments/arrow-payload/Dockerfile.producer
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Dockerfile for the arrow-payload experiment's Python producer.
#
# IMPORTANT: the build context MUST be the repository root
# (/Users/jakob/src/millpond). From experiments/arrow-payload/docker-compose.yaml
# this means:
#
# build:
# context: ../..
# dockerfile: experiments/arrow-payload/Dockerfile.producer
#
# We reuse test/producer.py verbatim for its PostHog event generators
# (pageview, autocapture, identify, etc.) and put it on PYTHONPATH so
# arrow_producer.py can `import producer`.

FROM python:3.12-slim

# Pinned to minor versions. confluent-kafka 2.6.x ships with librdkafka
# 2.6.x; pyarrow 18.1.x is the current stable at the time of writing.
RUN pip install --no-cache-dir \
"pyarrow==18.1.*" \
"confluent-kafka==2.6.*"

WORKDIR /app

# Reused generators (copied from repo root — build context is repo root).
COPY test/producer.py /app/posthog_gen/producer.py

# The Arrow IPC producer itself.
COPY experiments/arrow-payload/producer/arrow_producer.py /app/arrow_producer.py

# Put the reused generators on PYTHONPATH so `import producer` in
# arrow_producer.py resolves to /app/posthog_gen/producer.py.
ENV PYTHONPATH=/app/posthog_gen
ENV PYTHONUNBUFFERED=1

ENTRYPOINT ["python", "/app/arrow_producer.py"]
161 changes: 161 additions & 0 deletions experiments/arrow-payload/FINDINGS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Arrow IPC on the Wire: Findings

Experiment branch: `experiment/arrow-message-payload`

## Question

Can a C++ consumer achieve near-zero-copy ingest from Kafka into DuckLake
when the producer ships Arrow IPC on the wire?

## Answer

Yes. The PoC demonstrates:

```
librdkafka recv buffer ──(zero-copy wrap)──> Arrow IPC reader ──(Arrow C Data Interface)──> DuckDB arrow_scan ──> INSERT into DuckLake
```

No `memcpy` occurs between librdkafka's receive buffer and DuckDB's scan.
The only copy is DuckDB materializing into its own column store during
`INSERT`, which is unavoidable regardless of source format.

## What the original doc got wrong

`WICKED_COOL_NEXT_STEPS.md` line 25 says:

> The pipeline is memcpy from librdkafka recv buffer -> Arrow buffer -> DuckDB zero-copy scan.

This implies two hops. In practice there is **one**: librdkafka buffer
**is** the Arrow buffer (wrapped in place via `arrow::Buffer(ptr, len)`).
There is no intermediate copy. The table at line 31 listing `~memcpy` as
the per-record CPU cost is also misleading — the per-record cost is closer
to zero (pointer arithmetic inside Arrow's IPC reader + DuckDB's columnar
materialization on INSERT).

## Buffer ownership — the key invariant

| Question | Answer |
|---|---|
| Who owns `msg->payload`? | librdkafka, until `rd_kafka_message_destroy()` |
| Can we read it in place? | Yes, for the lifetime of the message |
| Can we hold the pointer past `destroy()`? | No |
| How does the consumer handle this? | Option A: non-owning `arrow::Buffer(ptr, len)`. All Arrow + DuckDB work runs synchronously in one loop iteration. `rd_kafka_message_destroy()` is the **last** call in the iteration, after DuckDB has fully materialized the batch. |
| Is there a use-after-free risk? | Only if `INSERT` were ever made async or deferred. It isn't — `duckdb_query` is synchronous, so by the time it returns, DuckDB owns a copy in its column store and the borrowed pointers are dead. |
| Buffer alignment? | librdkafka does not align to Arrow's preferred 64-byte boundary. Arrow IPC tolerates this. DuckDB copies into its own storage on INSERT, so alignment is moot. |

## DuckDB Arrow C API — the `duckdb_arrow_stream` ABI trap

The most significant finding from QE review. The `duckdb_arrow_stream`
typedef is defined as:

```c
typedef struct _duckdb_arrow_stream {
void *internal_ptr;
} *duckdb_arrow_stream;
```

This looks like it wants a wrapper struct with `internal_ptr` pointing at
your `ArrowArrayStream`. **It does not.** At the ABI boundary, DuckDB's
`duckdb_arrow_scan` implementation (`src/main/capi/arrow-c.cpp`) does:

```cpp
reinterpret_cast<ArrowArrayStream *>(arrow)
```

It interprets the argument directly as an `ArrowArrayStream*`. If you pass
a wrapper struct, DuckDB treats the wrapper's first word (`internal_ptr`,
which is the address of your `ArrowArrayStream`) as the `get_schema`
function pointer and calls it. **Guaranteed segfault.**

Correct usage:

```cpp
ArrowArrayStream c_stream{};
arrow::ExportRecordBatchReader(reader, &c_stream);
duckdb_arrow_scan(con, view, reinterpret_cast<duckdb_arrow_stream>(&c_stream));
```

This is underdocumented. The Python API (`conn.register('arrow_batch', table)`)
routes through a completely different code path and doesn't illustrate the
C ABI contract.

## Smoke-test results

| Metric | Value |
|---|---|
| Platform | Apple Silicon (arm64), Docker native |
| Producer rate (intentionally rate-limited) | 10 batches/sec x 1000 records = 10K rec/sec |
| Per-batch Arrow IPC size (uncompressed, on wire) | ~1.10-1.14 MB |
| Consumer behavior | Kept up trivially, 0 lag across 8 partitions |
| Records ingested (~30s run) | 660,000 |
| Parquet files written to MinIO | 660, ~345-352 KB each |
| Arrow IPC -> parquet compression ratio | ~3.2x |
| Consumer crashes / segfaults | 0 |
| Data lost | 0 |

These numbers do not represent peak consumer throughput. The producer was
rate-limited; the consumer was never saturated. Measuring the ceiling
requires removing the rate limiter or running multiple producer instances.

## Build issues encountered

| # | Symptom | Root cause | Fix |
|---|---|---|---|
| 1 | `wget` exit 8 fetching DuckDB lib | DuckDB names arm64 assets `linux-arm64`, not `linux-aarch64` | `TARGETARCH`-driven `case` in Dockerfile |
| 2 | `libarrow.so.1801 not found` at runtime | Builder pulled Arrow 18.1.0 (SONAME 1801), runtime installed `libarrow1800` (18.0.x) | Pin both stages to `18.1.0-1` |
| 3 | `MSG_SIZE_TOO_LARGE` from producer | 1000 PostHog events x ~1.1 KB avg > 1 MB Kafka default | Bumped broker, topic, and producer `message.max.bytes` to 16 MiB |
| 4 | DuckDB `Parser Error` on `ATTACH` | Inner single quotes in Postgres connstring (`dbname='ducklake'`) closed the outer SQL string literal | Dropped inner quotes (values contain no whitespace) |

## Architecture of the PoC

```
experiments/arrow-payload/
├── producer/arrow_producer.py # 233 lines: generate events, batch to Arrow IPC, publish
├── consumer/main.cpp # 250 lines: poll, wrap buffer, arrow_scan, INSERT, commit
├── consumer/CMakeLists.txt # 35 lines
├── Dockerfile.producer # python:3.12-slim + pyarrow + confluent-kafka
├── Dockerfile.consumer # debian bookworm + Arrow 18.1 + DuckDB 1.4.0 + librdkafka
└── docker-compose.yaml # kafka + postgres + minio + producer + consumer
```

The consumer loop is ~50 lines of load-bearing code:

1. `rd_kafka_consumer_poll` — get message
2. `arrow::Buffer(msg->payload, msg->len)` — zero-copy wrap
3. `arrow::ipc::RecordBatchStreamReader::Open` — parse IPC stream in place
4. `arrow::ExportRecordBatchReader` — export to Arrow C Data Interface
5. `duckdb_arrow_scan` — register as scannable view
6. `INSERT INTO lake.main.events_arrow SELECT *, NOW() FROM view` — materialize
7. `rd_kafka_commit` — commit offset (synchronous, after successful write)
8. `rd_kafka_message_destroy` — release librdkafka buffer (last call)

## What this does not cover

- Peak throughput (producer was rate-limited; consumer never saturated)
- Comparison vs the Python millpond consumer at equal load
- Schema evolution (fixed schema, no ALTER TABLE)
- Compression on the wire (Arrow IPC sent uncompressed; LZ4 could reduce wire size but adds a decompression copy)
- Pure-C implementation (would need Arrow GLib for IPC reading)
- Metrics, health checks, graceful shutdown under load
- Multi-consumer scaling (static `assign()` over all partitions; only one consumer instance ran)

## Implications for the production pipeline

The claim in `WICKED_COOL_NEXT_STEPS.md` holds: if the producer writes
Arrow IPC, the consumer reduces to "three pointers and a loop." The JSON
parse + `from_pylist()` + Python object creation that dominates the current
pipeline's CPU profile disappears entirely.

What changes on the producer side: PostHog's ingestion pipeline would need
to emit Arrow IPC instead of JSON. This is a non-trivial change — the
current pipeline is JSON-native end to end. The consumer savings are
meaningless without the producer change, so the decision is whether the
aggregate system benefit (fewer consumer pods, less Postgres catalog
contention, simpler consumer code) justifies the producer-side work.

The consumer-side complexity delta: 250 lines of C++ vs ~400 lines of
Python (millpond's consumer + arrow_converter + config). The C++ has no
runtime, no GC, no allocator pressure per record. The trade-off is
operational: C++ is harder to debug in production, has no Prometheus
metrics integration without additional work, and requires a separate
build toolchain.
Loading
Loading