Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions images/arrow-tools/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM python:3.13-slim
RUN pip install --no-cache-dir pyarrow boto3
COPY inspect_arrow.py /usr/local/bin/
ENTRYPOINT ["python", "/usr/local/bin/inspect_arrow.py"]
14 changes: 14 additions & 0 deletions images/arrow-tools/bin/images.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash
set -eo pipefail

PACKAGE_DIR="$(cd "$(dirname "$0")/.." && pwd)"
IMAGE_NAME="yolean/arrow-tools"
IMAGE_TAG="latest"
OUTPUT_DIR="$PACKAGE_DIR/target/images"

mkdir -p "$OUTPUT_DIR"

docker buildx build --load -t "$IMAGE_NAME:$IMAGE_TAG" "$PACKAGE_DIR"
docker save "$IMAGE_NAME:$IMAGE_TAG" -o "$OUTPUT_DIR/arrow-tools.tar"

echo "Saved $OUTPUT_DIR/arrow-tools.tar"
90 changes: 90 additions & 0 deletions images/arrow-tools/inspect_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""Inspect Arrow IPC files in S3 using pyarrow — independent format validator."""

import sys
import io
from datetime import datetime, timezone
import pyarrow as pa
import boto3
import pyarrow.ipc as ipc


def format_timestamp_ns(ns_value):
"""Format nanoseconds-since-epoch as ISO 8601 with nanosecond precision."""
secs = ns_value // 1_000_000_000
nanos = ns_value % 1_000_000_000
dt = datetime.fromtimestamp(secs, tz=timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%S") + f".{nanos:09d}Z"


def format_value(scalar, field):
"""Format a scalar value for display."""
if pa.types.is_timestamp(field.type):
ns = scalar.value
return f"{ns} ({format_timestamp_ns(ns)})"
return scalar.as_py()


def main():
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <bucket> <prefix> [extension]", file=sys.stderr)
sys.exit(1)

bucket = sys.argv[1]
prefix = sys.argv[2]
extension = sys.argv[3] if len(sys.argv) > 3 else ".arrow"

endpoint_url = "http://versitygw:7070"
s3 = boto3.client(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id="demoaccess",
aws_secret_access_key="demosecret",
region_name="us-east-1",
)

# List objects with prefix
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
contents = response.get("Contents", [])
files = [obj["Key"] for obj in contents if obj["Key"].endswith(extension)]

if not files:
print(f"No {extension} files found in s3://{bucket}/{prefix}", file=sys.stderr)
sys.exit(1)

# Pick the earliest file (first chronologically by path)
files.sort()
key = files[0]

obj = s3.get_object(Bucket=bucket, Key=key)
data = obj["Body"].read()

print(f"File: s3://{bucket}/{key}")
print(f"Size: {len(data)} bytes")

reader = ipc.open_file(io.BytesIO(data))
schema = reader.schema

total_rows = sum(
reader.get_batch(i).num_rows for i in range(reader.num_record_batches)
)
print(f"Record batches: {reader.num_record_batches}")
print(f"Total rows: {total_rows}")
print(f"Schema ({len(schema)} fields):")
for field in schema:
print(f" {field.name}: {field.type}")

if reader.num_record_batches > 0:
batch = reader.get_batch(0)
n_sample = min(batch.num_rows, 5)
if n_sample > 0:
print(f"Sample (first {n_sample} rows):")
for row in range(n_sample):
print(f" row {row}:")
for i, field in enumerate(schema):
val = format_value(batch.column(i)[row], field)
print(f" {field.name}: {val}")


if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions images/arrow-tools/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "arrow-tools",
"version": "0.0.0",
"private": true,
"type": "module",
"scripts": {
"images": "bin/images.sh"
}
}
3 changes: 3 additions & 0 deletions images/fluentbit/.dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*
!*.patch
!*.c
!*.h
!*.sh
13 changes: 12 additions & 1 deletion images/fluentbit/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ RUN set -ex; \
RUN git clone --depth 1 --branch $FLB_VERSION https://github.com/fluent/fluent-bit.git /src/fluent-bit

COPY *.patch /src/
RUN cd /src/fluent-bit && git apply /src/*.patch
COPY compact_columns.h compact_columns.c /src/fluent-bit/src/aws/compression/arrow/
COPY apply-compact-compression.sh /src/
RUN cd /src/fluent-bit && git apply /src/fix-gcs-header-lookup.patch
RUN cd /src/fluent-bit && bash /src/apply-compact-compression.sh

WORKDIR /src/fluent-bit/build

Expand All @@ -65,6 +68,14 @@ RUN cmake \

RUN make -j$(nproc)

# Unit test: compact_columns
COPY test_compact_columns.c /src/
RUN gcc -o /src/test_compact_columns /src/test_compact_columns.c \
/src/fluent-bit/src/aws/compression/arrow/compact_columns.c \
$(pkg-config --cflags --libs arrow-glib parquet-glib) \
-I/src/fluent-bit/src/aws/compression/arrow \
&& /src/test_compact_columns

RUN set -ex; \
/src/fluent-bit/build/bin/fluent-bit --version; \
mkdir -p /fluent-bit/bin /fluent-bit/etc; \
Expand Down
168 changes: 168 additions & 0 deletions images/fluentbit/apply-compact-compression.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#!/bin/bash
set -ex

# Apply compact-compression changes to fluent-bit source tree
# This script runs from the fluent-bit repo root

# 1. flb_aws_compress.h — add new compression type constants
sed -i '/#define FLB_AWS_COMPRESS_ZSTD/a\
#define FLB_AWS_COMPRESS_ARROW_COMPACT 5\
#define FLB_AWS_COMPRESS_PARQUET_COMPACT 6' \
include/fluent-bit/aws/flb_aws_compress.h

# 2. compress.h — declare new functions at end of file
cat >> src/aws/compression/arrow/compress.h << 'EOF'

#ifdef FLB_HAVE_ARROW_PARQUET
int out_s3_compress_arrow_compact(void *json, size_t size, void **out_buf, size_t *out_size);
int out_s3_compress_parquet_compact(void *json, size_t size, void **out_buf, size_t *out_size);
#endif
EOF

# 3. compress.c — add #include for compact_columns.h after parquet-glib include
sed -i '/#include <parquet-glib\/parquet-glib.h>/a\
#include "compact_columns.h"' \
src/aws/compression/arrow/compress.c

# 4. compress.c — add new functions before final #endif
# The last line of the file is "#endif" (closing FLB_HAVE_ARROW_PARQUET)
# Insert the new functions before it
sed -i '$ i\
\
int out_s3_compress_arrow_compact(void *json, size_t size, void **out_buf, size_t *out_size)\
{\
GArrowTable *table;\
GArrowTable *compacted;\
GArrowResizableBuffer *buffer;\
GBytes *bytes;\
gconstpointer ptr;\
gsize len;\
uint8_t *buf;\
\
table = parse_json((uint8_t *) json, size);\
if (table == NULL) {\
flb_error("[aws][compress] Failed to parse JSON for arrow-compact");\
return -1;\
}\
\
compacted = compact_parquet_columns(table);\
g_object_unref(table);\
\
buffer = table_to_arrow_ipc_buffer(compacted);\
g_object_unref(compacted);\
if (buffer == NULL) {\
flb_error("[aws][compress] Failed to convert compacted table to arrow IPC buffer (arrow-compact)");\
return -1;\
}\
\
bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));\
if (bytes == NULL) {\
g_object_unref(buffer);\
return -1;\
}\
\
ptr = g_bytes_get_data(bytes, &len);\
if (ptr == NULL) {\
g_object_unref(buffer);\
g_bytes_unref(bytes);\
return -1;\
}\
\
buf = flb_malloc(len);\
if (buf == NULL) {\
flb_errno();\
g_object_unref(buffer);\
g_bytes_unref(bytes);\
return -1;\
}\
memcpy(buf, ptr, len);\
*out_buf = (void *) buf;\
*out_size = len;\
\
g_object_unref(buffer);\
g_bytes_unref(bytes);\
return 0;\
}\
\
int out_s3_compress_parquet_compact(void *json, size_t size, void **out_buf, size_t *out_size)\
{\
GArrowTable *table;\
GArrowTable *compacted;\
GArrowResizableBuffer *buffer;\
GBytes *bytes;\
gconstpointer ptr;\
gsize len;\
uint8_t *buf;\
\
table = parse_json((uint8_t *) json, size);\
if (table == NULL) {\
flb_error("[aws][compress] Failed to parse JSON for parquet-compact");\
return -1;\
}\
\
compacted = compact_parquet_columns(table);\
g_object_unref(table);\
\
buffer = table_to_parquet_buffer(compacted);\
g_object_unref(compacted);\
if (buffer == NULL) {\
flb_error("[aws][compress] Failed to convert compacted table to parquet buffer (parquet-compact)");\
return -1;\
}\
\
bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));\
if (bytes == NULL) {\
g_object_unref(buffer);\
return -1;\
}\
\
ptr = g_bytes_get_data(bytes, &len);\
if (ptr == NULL) {\
g_object_unref(buffer);\
g_bytes_unref(bytes);\
return -1;\
}\
\
buf = flb_malloc(len);\
if (buf == NULL) {\
flb_errno();\
g_object_unref(buffer);\
g_bytes_unref(bytes);\
return -1;\
}\
memcpy(buf, ptr, len);\
*out_buf = (void *) buf;\
*out_size = len;\
\
g_object_unref(buffer);\
g_bytes_unref(bytes);\
return 0;\
}' \
src/aws/compression/arrow/compress.c

# 5. CMakeLists.txt — add compact_columns.c to source list
sed -i 's/ compress.c)/ compress.c\n compact_columns.c)/' \
src/aws/compression/arrow/CMakeLists.txt

# 6. flb_aws_compress.c — add new entries to compression_options array
# Insert after the parquet entry (before #endif)
sed -i '/&out_s3_compress_parquet/,/},/ {
/},/ a\
{\
FLB_AWS_COMPRESS_ARROW_COMPACT,\
"arrow-compact",\
\&out_s3_compress_arrow_compact\
},\
{\
FLB_AWS_COMPRESS_PARQUET_COMPACT,\
"parquet-compact",\
\&out_s3_compress_parquet_compact\
},
}' \
src/aws/flb_aws_compress.c

# 7. s3.c — add new compression types to use_put_object validation
sed -i 's/ret == FLB_AWS_COMPRESS_PARQUET)) {/ret == FLB_AWS_COMPRESS_PARQUET ||\n ret == FLB_AWS_COMPRESS_ARROW_COMPACT ||\n ret == FLB_AWS_COMPRESS_PARQUET_COMPACT)) {/' \
plugins/out_s3/s3.c

echo "=== compact-compression changes applied ==="
Loading