diff --git a/images/arrow-tools/Dockerfile b/images/arrow-tools/Dockerfile new file mode 100644 index 0000000..ed12014 --- /dev/null +++ b/images/arrow-tools/Dockerfile @@ -0,0 +1,4 @@ +FROM python:3.13-slim +RUN pip install --no-cache-dir pyarrow boto3 +COPY inspect_arrow.py /usr/local/bin/ +ENTRYPOINT ["python", "/usr/local/bin/inspect_arrow.py"] diff --git a/images/arrow-tools/bin/images.sh b/images/arrow-tools/bin/images.sh new file mode 100755 index 0000000..894af54 --- /dev/null +++ b/images/arrow-tools/bin/images.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +set -eo pipefail + +PACKAGE_DIR="$(cd "$(dirname "$0")/.." && pwd)" +IMAGE_NAME="yolean/arrow-tools" +IMAGE_TAG="latest" +OUTPUT_DIR="$PACKAGE_DIR/target/images" + +mkdir -p "$OUTPUT_DIR" + +docker buildx build --load -t "$IMAGE_NAME:$IMAGE_TAG" "$PACKAGE_DIR" +docker save "$IMAGE_NAME:$IMAGE_TAG" -o "$OUTPUT_DIR/arrow-tools.tar" + +echo "Saved $OUTPUT_DIR/arrow-tools.tar" diff --git a/images/arrow-tools/inspect_arrow.py b/images/arrow-tools/inspect_arrow.py new file mode 100644 index 0000000..2ea8f14 --- /dev/null +++ b/images/arrow-tools/inspect_arrow.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +"""Inspect Arrow IPC files in S3 using pyarrow — independent format validator.""" + +import sys +import io +from datetime import datetime, timezone +import pyarrow as pa +import boto3 +import pyarrow.ipc as ipc + + +def format_timestamp_ns(ns_value): + """Format nanoseconds-since-epoch as ISO 8601 with nanosecond precision.""" + secs = ns_value // 1_000_000_000 + nanos = ns_value % 1_000_000_000 + dt = datetime.fromtimestamp(secs, tz=timezone.utc) + return dt.strftime("%Y-%m-%dT%H:%M:%S") + f".{nanos:09d}Z" + + +def format_value(scalar, field): + """Format a scalar value for display.""" + if pa.types.is_timestamp(field.type): + ns = scalar.value + return f"{ns} ({format_timestamp_ns(ns)})" + return scalar.as_py() + + +def main(): + if len(sys.argv) < 3: + print(f"Usage: {sys.argv[0]} [extension]", file=sys.stderr) + sys.exit(1) + + bucket = sys.argv[1] + prefix = sys.argv[2] + extension = sys.argv[3] if len(sys.argv) > 3 else ".arrow" + + endpoint_url = "http://versitygw:7070" + s3 = boto3.client( + "s3", + endpoint_url=endpoint_url, + aws_access_key_id="demoaccess", + aws_secret_access_key="demosecret", + region_name="us-east-1", + ) + + # List objects with prefix + response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) + contents = response.get("Contents", []) + files = [obj["Key"] for obj in contents if obj["Key"].endswith(extension)] + + if not files: + print(f"No {extension} files found in s3://{bucket}/{prefix}", file=sys.stderr) + sys.exit(1) + + # Pick the earliest file (first chronologically by path) + files.sort() + key = files[0] + + obj = s3.get_object(Bucket=bucket, Key=key) + data = obj["Body"].read() + + print(f"File: s3://{bucket}/{key}") + print(f"Size: {len(data)} bytes") + + reader = ipc.open_file(io.BytesIO(data)) + schema = reader.schema + + total_rows = sum( + reader.get_batch(i).num_rows for i in range(reader.num_record_batches) + ) + print(f"Record batches: {reader.num_record_batches}") + print(f"Total rows: {total_rows}") + print(f"Schema ({len(schema)} fields):") + for field in schema: + print(f" {field.name}: {field.type}") + + if reader.num_record_batches > 0: + batch = reader.get_batch(0) + n_sample = min(batch.num_rows, 5) + if n_sample > 0: + print(f"Sample (first {n_sample} rows):") + for row in range(n_sample): + print(f" row {row}:") + for i, field in enumerate(schema): + val = format_value(batch.column(i)[row], field) + print(f" {field.name}: {val}") + + +if __name__ == "__main__": + main() diff --git a/images/arrow-tools/package.json b/images/arrow-tools/package.json new file mode 100644 index 0000000..531c1a5 --- /dev/null +++ b/images/arrow-tools/package.json @@ -0,0 +1,9 @@ +{ + "name": "arrow-tools", + "version": "0.0.0", + "private": true, + "type": "module", + "scripts": { + "images": "bin/images.sh" + } +} diff --git a/images/fluentbit/.dockerignore b/images/fluentbit/.dockerignore index ac26fc2..54170e2 100644 --- a/images/fluentbit/.dockerignore +++ b/images/fluentbit/.dockerignore @@ -1,2 +1,5 @@ * !*.patch +!*.c +!*.h +!*.sh diff --git a/images/fluentbit/Dockerfile b/images/fluentbit/Dockerfile index a321ce5..f7927e0 100644 --- a/images/fluentbit/Dockerfile +++ b/images/fluentbit/Dockerfile @@ -42,7 +42,10 @@ RUN set -ex; \ RUN git clone --depth 1 --branch $FLB_VERSION https://github.com/fluent/fluent-bit.git /src/fluent-bit COPY *.patch /src/ -RUN cd /src/fluent-bit && git apply /src/*.patch +COPY compact_columns.h compact_columns.c /src/fluent-bit/src/aws/compression/arrow/ +COPY apply-compact-compression.sh /src/ +RUN cd /src/fluent-bit && git apply /src/fix-gcs-header-lookup.patch +RUN cd /src/fluent-bit && bash /src/apply-compact-compression.sh WORKDIR /src/fluent-bit/build @@ -65,6 +68,14 @@ RUN cmake \ RUN make -j$(nproc) +# Unit test: compact_columns +COPY test_compact_columns.c /src/ +RUN gcc -o /src/test_compact_columns /src/test_compact_columns.c \ + /src/fluent-bit/src/aws/compression/arrow/compact_columns.c \ + $(pkg-config --cflags --libs arrow-glib parquet-glib) \ + -I/src/fluent-bit/src/aws/compression/arrow \ + && /src/test_compact_columns + RUN set -ex; \ /src/fluent-bit/build/bin/fluent-bit --version; \ mkdir -p /fluent-bit/bin /fluent-bit/etc; \ diff --git a/images/fluentbit/apply-compact-compression.sh b/images/fluentbit/apply-compact-compression.sh new file mode 100644 index 0000000..17d810d --- /dev/null +++ b/images/fluentbit/apply-compact-compression.sh @@ -0,0 +1,168 @@ +#!/bin/bash +set -ex + +# Apply compact-compression changes to fluent-bit source tree +# This script runs from the fluent-bit repo root + +# 1. flb_aws_compress.h — add new compression type constants +sed -i '/#define FLB_AWS_COMPRESS_ZSTD/a\ +#define FLB_AWS_COMPRESS_ARROW_COMPACT 5\ +#define FLB_AWS_COMPRESS_PARQUET_COMPACT 6' \ + include/fluent-bit/aws/flb_aws_compress.h + +# 2. compress.h — declare new functions at end of file +cat >> src/aws/compression/arrow/compress.h << 'EOF' + +#ifdef FLB_HAVE_ARROW_PARQUET +int out_s3_compress_arrow_compact(void *json, size_t size, void **out_buf, size_t *out_size); +int out_s3_compress_parquet_compact(void *json, size_t size, void **out_buf, size_t *out_size); +#endif +EOF + +# 3. compress.c — add #include for compact_columns.h after parquet-glib include +sed -i '/#include /a\ +#include "compact_columns.h"' \ + src/aws/compression/arrow/compress.c + +# 4. compress.c — add new functions before final #endif +# The last line of the file is "#endif" (closing FLB_HAVE_ARROW_PARQUET) +# Insert the new functions before it +sed -i '$ i\ +\ +int out_s3_compress_arrow_compact(void *json, size_t size, void **out_buf, size_t *out_size)\ +{\ + GArrowTable *table;\ + GArrowTable *compacted;\ + GArrowResizableBuffer *buffer;\ + GBytes *bytes;\ + gconstpointer ptr;\ + gsize len;\ + uint8_t *buf;\ +\ + table = parse_json((uint8_t *) json, size);\ + if (table == NULL) {\ + flb_error("[aws][compress] Failed to parse JSON for arrow-compact");\ + return -1;\ + }\ +\ + compacted = compact_parquet_columns(table);\ + g_object_unref(table);\ +\ + buffer = table_to_arrow_ipc_buffer(compacted);\ + g_object_unref(compacted);\ + if (buffer == NULL) {\ + flb_error("[aws][compress] Failed to convert compacted table to arrow IPC buffer (arrow-compact)");\ + return -1;\ + }\ +\ + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));\ + if (bytes == NULL) {\ + g_object_unref(buffer);\ + return -1;\ + }\ +\ + ptr = g_bytes_get_data(bytes, &len);\ + if (ptr == NULL) {\ + g_object_unref(buffer);\ + g_bytes_unref(bytes);\ + return -1;\ + }\ +\ + buf = flb_malloc(len);\ + if (buf == NULL) {\ + flb_errno();\ + g_object_unref(buffer);\ + g_bytes_unref(bytes);\ + return -1;\ + }\ + memcpy(buf, ptr, len);\ + *out_buf = (void *) buf;\ + *out_size = len;\ +\ + g_object_unref(buffer);\ + g_bytes_unref(bytes);\ + return 0;\ +}\ +\ +int out_s3_compress_parquet_compact(void *json, size_t size, void **out_buf, size_t *out_size)\ +{\ + GArrowTable *table;\ + GArrowTable *compacted;\ + GArrowResizableBuffer *buffer;\ + GBytes *bytes;\ + gconstpointer ptr;\ + gsize len;\ + uint8_t *buf;\ +\ + table = parse_json((uint8_t *) json, size);\ + if (table == NULL) {\ + flb_error("[aws][compress] Failed to parse JSON for parquet-compact");\ + return -1;\ + }\ +\ + compacted = compact_parquet_columns(table);\ + g_object_unref(table);\ +\ + buffer = table_to_parquet_buffer(compacted);\ + g_object_unref(compacted);\ + if (buffer == NULL) {\ + flb_error("[aws][compress] Failed to convert compacted table to parquet buffer (parquet-compact)");\ + return -1;\ + }\ +\ + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));\ + if (bytes == NULL) {\ + g_object_unref(buffer);\ + return -1;\ + }\ +\ + ptr = g_bytes_get_data(bytes, &len);\ + if (ptr == NULL) {\ + g_object_unref(buffer);\ + g_bytes_unref(bytes);\ + return -1;\ + }\ +\ + buf = flb_malloc(len);\ + if (buf == NULL) {\ + flb_errno();\ + g_object_unref(buffer);\ + g_bytes_unref(bytes);\ + return -1;\ + }\ + memcpy(buf, ptr, len);\ + *out_buf = (void *) buf;\ + *out_size = len;\ +\ + g_object_unref(buffer);\ + g_bytes_unref(bytes);\ + return 0;\ +}' \ + src/aws/compression/arrow/compress.c + +# 5. CMakeLists.txt — add compact_columns.c to source list +sed -i 's/ compress.c)/ compress.c\n compact_columns.c)/' \ + src/aws/compression/arrow/CMakeLists.txt + +# 6. flb_aws_compress.c — add new entries to compression_options array +# Insert after the parquet entry (before #endif) +sed -i '/&out_s3_compress_parquet/,/},/ { + /},/ a\ + {\ + FLB_AWS_COMPRESS_ARROW_COMPACT,\ + "arrow-compact",\ + \&out_s3_compress_arrow_compact\ + },\ + {\ + FLB_AWS_COMPRESS_PARQUET_COMPACT,\ + "parquet-compact",\ + \&out_s3_compress_parquet_compact\ + }, +}' \ + src/aws/flb_aws_compress.c + +# 7. s3.c — add new compression types to use_put_object validation +sed -i 's/ret == FLB_AWS_COMPRESS_PARQUET)) {/ret == FLB_AWS_COMPRESS_PARQUET ||\n ret == FLB_AWS_COMPRESS_ARROW_COMPACT ||\n ret == FLB_AWS_COMPRESS_PARQUET_COMPACT)) {/' \ + plugins/out_s3/s3.c + +echo "=== compact-compression changes applied ===" diff --git a/images/fluentbit/compact_columns.c b/images/fluentbit/compact_columns.c new file mode 100644 index 0000000..2190303 --- /dev/null +++ b/images/fluentbit/compact_columns.c @@ -0,0 +1,418 @@ +/* + * compact_columns.c - Transform Arrow table columns for compact storage + * + * - time: ISO 8601 CRI string -> Timestamp(ns) with optional UTC timezone + * - stream, logtag: string -> dictionary-encoded + */ + +#include "compact_columns.h" +#include +#include +#include +#include + +/* + * Parse CRI timestamp "2024-01-15T10:30:45.123456789Z" into nanoseconds + * since Unix epoch. Returns 0 on success, -1 on failure. + */ +static int parse_cri_timestamp(const char *str, int64_t *nanos_out) +{ + int year, month, day, hour, min, sec; + long frac_nanos = 0; + int n; + struct tm tm_val; + time_t epoch; + const char *p; + + n = sscanf(str, "%d-%d-%dT%d:%d:%d", &year, &month, &day, &hour, &min, &sec); + if (n != 6) { + return -1; + } + + /* Parse fractional seconds after the '.' */ + p = strchr(str, '.'); + if (p) { + p++; /* skip '.' */ + char frac_buf[10] = "000000000"; + int i = 0; + while (i < 9 && p[i] >= '0' && p[i] <= '9') { + frac_buf[i] = p[i]; + i++; + } + frac_nanos = strtol(frac_buf, NULL, 10); + } + + memset(&tm_val, 0, sizeof(tm_val)); + tm_val.tm_year = year - 1900; + tm_val.tm_mon = month - 1; + tm_val.tm_mday = day; + tm_val.tm_hour = hour; + tm_val.tm_min = min; + tm_val.tm_sec = sec; + + epoch = timegm(&tm_val); + if (epoch == (time_t)-1) { + return -1; + } + + *nanos_out = (int64_t)epoch * 1000000000LL + frac_nanos; + return 0; +} + +/* + * Replace the "time" column (string) with a Timestamp(ns) column. + * No timezone annotation — DuckDB reads Timestamp(ns) as TIMESTAMP_NS + * preserving nanosecond precision. (With isAdjustedToUTC=true, DuckDB maps + * to TIMESTAMP WITH TIME ZONE which is only microsecond precision.) + * Returns a new table with the column replaced, or NULL on error. + */ +static GArrowTable *compact_time_column(GArrowTable *table) +{ + GArrowSchema *schema; + int col_idx; + GArrowChunkedArray *chunked; + GArrowTimestampDataType *ts_type; + GArrowTable *result = NULL; + GError *error = NULL; + + schema = garrow_table_get_schema(table); + col_idx = garrow_schema_get_field_index(schema, "time"); + g_object_unref(schema); + + if (col_idx < 0) { + g_warning("[compact_columns] 'time' column not found, skipping"); + return NULL; + } + + chunked = garrow_table_get_column_data(table, col_idx); + if (!chunked) { + return NULL; + } + + /* Timestamp(ns) without timezone — both formats store UTC by convention */ + ts_type = garrow_timestamp_data_type_new(GARROW_TIME_UNIT_NANO, NULL); + + /* Process each chunk */ + guint n_chunks = garrow_chunked_array_get_n_chunks(chunked); + GList *new_chunks = NULL; + gboolean ok = TRUE; + + for (guint c = 0; c < n_chunks && ok; c++) { + GArrowArray *chunk = garrow_chunked_array_get_chunk(chunked, c); + gint64 len = garrow_array_get_length(chunk); + + GArrowTimestampArrayBuilder *builder = + garrow_timestamp_array_builder_new(ts_type); + if (!builder) { + g_warning("[compact_columns] failed to create timestamp builder"); + g_object_unref(chunk); + ok = FALSE; + break; + } + + for (gint64 i = 0; i < len; i++) { + if (garrow_array_is_null(chunk, i)) { + garrow_array_builder_append_null( + GARROW_ARRAY_BUILDER(builder), &error); + if (error) { g_error_free(error); error = NULL; } + continue; + } + + GArrowStringArray *str_arr = GARROW_STRING_ARRAY(chunk); + gchar *val = garrow_string_array_get_string(str_arr, i); + int64_t nanos; + + if (parse_cri_timestamp(val, &nanos) == 0) { + garrow_timestamp_array_builder_append_value(builder, nanos, &error); + if (error) { + g_warning("[compact_columns] failed to append timestamp: %s", + error->message); + g_error_free(error); + error = NULL; + } + } else { + g_warning("[compact_columns] failed to parse time: %s", val); + garrow_array_builder_append_null( + GARROW_ARRAY_BUILDER(builder), &error); + if (error) { g_error_free(error); error = NULL; } + } + g_free(val); + } + + GArrowArray *new_arr = garrow_array_builder_finish( + GARROW_ARRAY_BUILDER(builder), &error); + if (!new_arr) { + g_warning("[compact_columns] failed to finish timestamp array: %s", + error->message); + g_error_free(error); + error = NULL; + g_object_unref(builder); + g_object_unref(chunk); + ok = FALSE; + break; + } + + new_chunks = g_list_append(new_chunks, new_arr); + g_object_unref(builder); + g_object_unref(chunk); + } + + if (ok && new_chunks) { + GArrowChunkedArray *new_chunked = garrow_chunked_array_new( + new_chunks, &error); + if (new_chunked) { + GArrowField *new_field = garrow_field_new("time", + GARROW_DATA_TYPE(ts_type)); + result = garrow_table_replace_column(table, col_idx, + new_field, new_chunked, &error); + if (!result) { + g_warning("[compact_columns] failed to replace time column: %s", + error->message); + g_error_free(error); + error = NULL; + } + g_object_unref(new_field); + g_object_unref(new_chunked); + } else { + g_warning("[compact_columns] failed to create chunked array: %s", + error->message); + g_error_free(error); + error = NULL; + } + } + + /* Cleanup */ + g_list_free_full(new_chunks, g_object_unref); + g_object_unref(ts_type); + g_object_unref(chunked); + + return result; +} + +/* + * Re-index a dictionary-encoded array from int32 to int8 indices. + * stream/logtag have very low cardinality (2-3 values), so int8 is sufficient + * and saves 3 bytes per row vs the default int32. + * Returns a new array, or NULL on error. + */ +static GArrowArray *dict_reindex_int8(GArrowDictionaryArray *dict_arr) +{ + GError *error = NULL; + GArrowArray *indices = garrow_dictionary_array_get_indices(dict_arr); + GArrowArray *dictionary = garrow_dictionary_array_get_dictionary(dict_arr); + + GArrowInt8DataType *int8_type = garrow_int8_data_type_new(); + GArrowArray *int8_indices = garrow_array_cast( + indices, GARROW_DATA_TYPE(int8_type), NULL, &error); + if (!int8_indices) { + g_warning("[compact_columns] failed to cast indices to int8: %s", + error->message); + g_error_free(error); + g_object_unref(int8_type); + g_object_unref(indices); + g_object_unref(dictionary); + return NULL; + } + + GArrowStringDataType *str_type = garrow_string_data_type_new(); + GArrowDictionaryDataType *new_dict_type = garrow_dictionary_data_type_new( + GARROW_DATA_TYPE(int8_type), GARROW_DATA_TYPE(str_type), FALSE); + + GArrowDictionaryArray *new_arr = garrow_dictionary_array_new( + GARROW_DATA_TYPE(new_dict_type), int8_indices, dictionary, &error); + + g_object_unref(new_dict_type); + g_object_unref(str_type); + g_object_unref(int8_type); + g_object_unref(int8_indices); + g_object_unref(indices); + g_object_unref(dictionary); + + if (!new_arr) { + g_warning("[compact_columns] failed to create int8 dictionary array: %s", + error->message); + g_error_free(error); + return NULL; + } + + return GARROW_ARRAY(new_arr); +} + +/* + * Dictionary-encode a string column by name, using int8 indices. + * Returns a new table with the column replaced, or NULL on error. + */ +static GArrowTable *dict_encode_column(GArrowTable *table, const char *col_name) +{ + GArrowSchema *schema; + int col_idx; + GArrowChunkedArray *chunked; + GArrowTable *result = NULL; + GError *error = NULL; + + schema = garrow_table_get_schema(table); + col_idx = garrow_schema_get_field_index(schema, col_name); + g_object_unref(schema); + + if (col_idx < 0) { + g_warning("[compact_columns] '%s' column not found, skipping", col_name); + return NULL; + } + + chunked = garrow_table_get_column_data(table, col_idx); + if (!chunked) { + return NULL; + } + + /* Dictionary-encode each chunk, then re-index to int8 */ + guint n_chunks = garrow_chunked_array_get_n_chunks(chunked); + GList *new_chunks = NULL; + gboolean ok = TRUE; + + for (guint c = 0; c < n_chunks && ok; c++) { + GArrowArray *chunk = garrow_chunked_array_get_chunk(chunked, c); + GArrowDictionaryArray *dict_arr = + garrow_array_dictionary_encode(chunk, &error); + if (!dict_arr) { + g_warning("[compact_columns] failed to dict-encode '%s': %s", + col_name, error->message); + g_error_free(error); + error = NULL; + g_object_unref(chunk); + ok = FALSE; + break; + } + + GArrowArray *int8_arr = dict_reindex_int8(dict_arr); + g_object_unref(dict_arr); + if (!int8_arr) { + g_object_unref(chunk); + ok = FALSE; + break; + } + + new_chunks = g_list_append(new_chunks, int8_arr); + g_object_unref(chunk); + } + + if (ok && new_chunks) { + /* Get the data type from the first encoded chunk */ + GArrowArray *first = GARROW_ARRAY(new_chunks->data); + GArrowDataType *dict_type = garrow_array_get_value_data_type(first); + + GArrowChunkedArray *new_chunked = garrow_chunked_array_new( + new_chunks, &error); + if (new_chunked) { + GArrowField *new_field = garrow_field_new(col_name, dict_type); + result = garrow_table_replace_column(table, col_idx, + new_field, new_chunked, &error); + if (!result) { + g_warning("[compact_columns] failed to replace '%s' column: %s", + col_name, error->message); + g_error_free(error); + error = NULL; + } + g_object_unref(new_field); + g_object_unref(new_chunked); + } else { + g_warning("[compact_columns] failed to create dict chunked array: %s", + error->message); + g_error_free(error); + error = NULL; + } + g_object_unref(dict_type); + } + + g_list_free_full(new_chunks, g_object_unref); + g_object_unref(chunked); + + return result; +} + +GArrowTable *compact_parquet_columns(GArrowTable *table) +{ + GArrowTable *current = table; + GArrowTable *next; + gboolean owns_current = FALSE; + + /* 1. Compact time column */ + next = compact_time_column(current); + if (next) { + if (owns_current) { + g_object_unref(current); + } + current = next; + owns_current = TRUE; + } + + /* 2-3. Dictionary-encode stream and logtag. + * DuckDB/nanoarrow reads these back as plain VARCHAR, but tools using + * the official Arrow library (e.g. pyarrow) see dictionary type. */ + next = dict_encode_column(current, "stream"); + if (next) { + if (owns_current) { + g_object_unref(current); + } + current = next; + owns_current = TRUE; + } + + next = dict_encode_column(current, "logtag"); + if (next) { + if (owns_current) { + g_object_unref(current); + } + current = next; + owns_current = TRUE; + } + + /* If no transformations succeeded, ref the original so caller can unref */ + if (!owns_current) { + g_object_ref(current); + } + + return current; +} + +GArrowResizableBuffer *table_to_arrow_ipc_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GArrowFeatherWriteProperties *props; + GError *error = NULL; + gboolean success; + + buffer = garrow_resizable_buffer_new(0, &error); + if (!buffer) { + g_warning("[compact_columns] failed to create buffer: %s", + error->message); + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (!sink) { + g_object_unref(buffer); + return NULL; + } + + /* ZSTD: nanoarrow/DuckDB supports ZSTD but not LZ4 for Arrow IPC bodies */ + props = garrow_feather_write_properties_new(); + g_object_set(props, "compression", + GARROW_COMPRESSION_TYPE_ZSTD, NULL); + + success = garrow_table_write_as_feather( + table, GARROW_OUTPUT_STREAM(sink), props, &error); + g_object_unref(props); + g_object_unref(sink); + + if (!success) { + g_warning("[compact_columns] failed to write feather: %s", + error->message); + g_error_free(error); + g_object_unref(buffer); + return NULL; + } + + return buffer; +} diff --git a/images/fluentbit/compact_columns.h b/images/fluentbit/compact_columns.h new file mode 100644 index 0000000..2c5caed --- /dev/null +++ b/images/fluentbit/compact_columns.h @@ -0,0 +1,11 @@ +#ifndef FLB_S3_COMPACT_COLUMNS_H +#define FLB_S3_COMPACT_COLUMNS_H +#include + +/* Timestamp(ns) without timezone — preserves nanosecond precision in DuckDB */ +GArrowTable *compact_parquet_columns(GArrowTable *table); + +/* Serialize table to Arrow IPC (Feather v2) without body compression. + * Uncompressed IPC is readable by nanoarrow/DuckDB (LZ4 default is not). */ +GArrowResizableBuffer *table_to_arrow_ipc_buffer(GArrowTable *table); +#endif diff --git a/images/fluentbit/test_compact_columns.c b/images/fluentbit/test_compact_columns.c new file mode 100644 index 0000000..15546c3 --- /dev/null +++ b/images/fluentbit/test_compact_columns.c @@ -0,0 +1,342 @@ +/* + * test_compact_columns.c - Unit test for compact_parquet_columns() + * + * Standalone C program compiled and run during Docker build. + * Tests both arrow-compact (no tz) and parquet-compact (UTC) modes. + */ + +#include +#include +#include +#include +#include +#include + +#include "compact_columns.h" + +#define TEST_JSON \ + "{\"time\":\"2024-01-15T10:30:45.123456789Z\",\"stream\":\"stdout\",\"logtag\":\"F\",\"message\":\"hello\",\"cluster\":\"test\"}\n" \ + "{\"time\":\"2024-01-15T10:30:46.000000000Z\",\"stream\":\"stderr\",\"logtag\":\"P\",\"message\":\"world\",\"cluster\":\"test\"}\n" \ + "{\"time\":\"2024-01-15T10:30:47.999999999Z\",\"stream\":\"stdout\",\"logtag\":\"F\",\"message\":\"again\",\"cluster\":\"test\"}\n" + +static int tests_passed = 0; +static int tests_failed = 0; + +#define ASSERT_MSG(cond, msg) do { \ + if (!(cond)) { \ + fprintf(stderr, "FAIL: %s (line %d)\n", msg, __LINE__); \ + tests_failed++; \ + } else { \ + printf(" PASS: %s\n", msg); \ + tests_passed++; \ + } \ +} while(0) + +static GArrowTable *parse_test_json(void) +{ + GArrowBuffer *buffer; + GArrowBufferInputStream *input; + GArrowJSONReadOptions *options; + GArrowJSONReader *reader; + GArrowTable *table; + GError *error = NULL; + + const char *json = TEST_JSON; + buffer = garrow_buffer_new((const guint8 *)json, strlen(json)); + assert(buffer); + input = garrow_buffer_input_stream_new(buffer); + assert(input); + options = garrow_json_read_options_new(); + assert(options); + reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); + if (!reader) { + fprintf(stderr, "Failed to create JSON reader: %s\n", error->message); + exit(1); + } + table = garrow_json_reader_read(reader, &error); + if (!table) { + fprintf(stderr, "Failed to read JSON: %s\n", error->message); + exit(1); + } + g_object_unref(reader); + g_object_unref(options); + g_object_unref(input); + g_object_unref(buffer); + return table; +} + +static GArrowDataType *get_column_type(GArrowTable *table, const char *name) +{ + GArrowSchema *schema = garrow_table_get_schema(table); + int idx = garrow_schema_get_field_index(schema, name); + if (idx < 0) { + g_object_unref(schema); + return NULL; + } + GArrowField *field = garrow_schema_get_field(schema, idx); + GArrowDataType *type = garrow_field_get_data_type(field); + /* garrow_field_get_data_type is transfer-none; ref before releasing field */ + if (type) g_object_ref(type); + g_object_unref(field); + g_object_unref(schema); + return type; +} + +static void assert_dict_int8(GArrowDataType *type, const char *col_name) +{ + if (!GARROW_IS_DICTIONARY_DATA_TYPE(type)) return; + GArrowDictionaryDataType *dict = GARROW_DICTIONARY_DATA_TYPE(type); + GArrowDataType *idx_type = garrow_dictionary_data_type_get_index_data_type(dict); + char msg[128]; + snprintf(msg, sizeof(msg), "%s dictionary uses int8 indices", col_name); + ASSERT_MSG(GARROW_IS_INT8_DATA_TYPE(idx_type), msg); + g_object_unref(idx_type); +} + +static gboolean write_and_read_parquet(GArrowTable *table, const char *path, + GArrowTable **out_table) +{ + GError *error = NULL; + GArrowSchema *schema; + GParquetArrowFileWriter *writer; + GParquetArrowFileReader *reader; + gboolean success; + + /* Write */ + GArrowFileOutputStream *file_out = + garrow_file_output_stream_new(path, FALSE, &error); + if (!file_out) { + fprintf(stderr, "Failed to open %s for writing: %s\n", path, error->message); + g_error_free(error); + return FALSE; + } + + schema = garrow_table_get_schema(table); + writer = gparquet_arrow_file_writer_new_arrow( + schema, GARROW_OUTPUT_STREAM(file_out), NULL, &error); + g_object_unref(schema); + if (!writer) { + fprintf(stderr, "Failed to create writer: %s\n", error->message); + g_error_free(error); + g_object_unref(file_out); + return FALSE; + } + + gint64 n_rows = garrow_table_get_n_rows(table); + success = gparquet_arrow_file_writer_write_table(writer, table, n_rows, &error); + if (!success) { + fprintf(stderr, "Failed to write table: %s\n", error->message); + g_error_free(error); + g_object_unref(writer); + g_object_unref(file_out); + return FALSE; + } + + success = gparquet_arrow_file_writer_close(writer, &error); + g_object_unref(writer); + g_object_unref(file_out); + if (!success) { + fprintf(stderr, "Failed to close writer: %s\n", error->message); + g_error_free(error); + return FALSE; + } + + /* Read back */ + reader = gparquet_arrow_file_reader_new_path(path, &error); + if (!reader) { + fprintf(stderr, "Failed to open %s for reading: %s\n", path, error->message); + g_error_free(error); + return FALSE; + } + + *out_table = gparquet_arrow_file_reader_read_table(reader, &error); + g_object_unref(reader); + if (!*out_table) { + fprintf(stderr, "Failed to read table: %s\n", error->message); + g_error_free(error); + return FALSE; + } + + return TRUE; +} + +static void test_arrow_compact(void) +{ + printf("\n--- Test: arrow-compact ---\n"); + GArrowTable *table = parse_test_json(); + GArrowTable *compacted = compact_parquet_columns(table); + g_object_unref(table); + + ASSERT_MSG(compacted != NULL, "compact_parquet_columns returned non-NULL"); + + /* Check time column type */ + GArrowDataType *time_type = get_column_type(compacted, "time"); + ASSERT_MSG(time_type != NULL, "time column exists"); + ASSERT_MSG(GARROW_IS_TIMESTAMP_DATA_TYPE(time_type), + "time is GArrowTimestampDataType"); + + if (GARROW_IS_TIMESTAMP_DATA_TYPE(time_type)) { + GArrowTimestampDataType *ts = + GARROW_TIMESTAMP_DATA_TYPE(time_type); + GArrowTimeUnit unit = + garrow_timestamp_data_type_get_unit(ts); + ASSERT_MSG(unit == GARROW_TIME_UNIT_NANO, + "time unit is NANO"); + } + if (time_type) g_object_unref(time_type); + + /* stream and logtag are dictionary-encoded with int8 indices in both paths. + * DuckDB/nanoarrow reads these as VARCHAR; pyarrow sees dictionary type. */ + GArrowDataType *stream_type = get_column_type(compacted, "stream"); + ASSERT_MSG(stream_type != NULL, "stream column exists"); + ASSERT_MSG(GARROW_IS_DICTIONARY_DATA_TYPE(stream_type), + "stream is dictionary-encoded"); + assert_dict_int8(stream_type, "stream"); + if (stream_type) g_object_unref(stream_type); + + GArrowDataType *logtag_type = get_column_type(compacted, "logtag"); + ASSERT_MSG(logtag_type != NULL, "logtag column exists"); + ASSERT_MSG(GARROW_IS_DICTIONARY_DATA_TYPE(logtag_type), + "logtag is dictionary-encoded"); + assert_dict_int8(logtag_type, "logtag"); + if (logtag_type) g_object_unref(logtag_type); + + /* Write to parquet and read back */ + GArrowTable *readback = NULL; + gboolean ok = write_and_read_parquet(compacted, "/tmp/test_arrow_compact.parquet", + &readback); + ASSERT_MSG(ok, "parquet write+read round-trip succeeded"); + + if (readback) { + ASSERT_MSG(garrow_table_get_n_rows(readback) == 3, + "round-trip preserved 3 rows"); + + /* Verify time type persists through parquet */ + GArrowDataType *rt_time = get_column_type(readback, "time"); + ASSERT_MSG(rt_time != NULL, "time column exists after round-trip"); + ASSERT_MSG(GARROW_IS_TIMESTAMP_DATA_TYPE(rt_time), + "time is still timestamp after round-trip"); + if (rt_time) g_object_unref(rt_time); + + g_object_unref(readback); + } + + g_object_unref(compacted); +} + +static void test_parquet_compact(void) +{ + printf("\n--- Test: parquet-compact ---\n"); + GArrowTable *table = parse_test_json(); + GArrowTable *compacted = compact_parquet_columns(table); + g_object_unref(table); + + ASSERT_MSG(compacted != NULL, "compact_parquet_columns returned non-NULL"); + + /* Check time column type */ + GArrowDataType *time_type = get_column_type(compacted, "time"); + ASSERT_MSG(time_type != NULL, "time column exists"); + ASSERT_MSG(GARROW_IS_TIMESTAMP_DATA_TYPE(time_type), + "time is GArrowTimestampDataType"); + + if (GARROW_IS_TIMESTAMP_DATA_TYPE(time_type)) { + GArrowTimestampDataType *ts = + GARROW_TIMESTAMP_DATA_TYPE(time_type); + GArrowTimeUnit unit = + garrow_timestamp_data_type_get_unit(ts); + ASSERT_MSG(unit == GARROW_TIME_UNIT_NANO, + "time unit is NANO"); + } + g_object_unref(time_type); + + /* Check stream is dictionary-encoded with int8 indices */ + GArrowDataType *stream_type = get_column_type(compacted, "stream"); + ASSERT_MSG(stream_type != NULL, "stream column exists"); + ASSERT_MSG(GARROW_IS_DICTIONARY_DATA_TYPE(stream_type), + "stream is dictionary-encoded"); + assert_dict_int8(stream_type, "stream"); + g_object_unref(stream_type); + + /* Check logtag is dictionary-encoded with int8 indices */ + GArrowDataType *logtag_type = get_column_type(compacted, "logtag"); + ASSERT_MSG(logtag_type != NULL, "logtag column exists"); + ASSERT_MSG(GARROW_IS_DICTIONARY_DATA_TYPE(logtag_type), + "logtag is dictionary-encoded"); + assert_dict_int8(logtag_type, "logtag"); + g_object_unref(logtag_type); + + /* Write to parquet and read back */ + GArrowTable *readback = NULL; + gboolean ok = write_and_read_parquet(compacted, "/tmp/test_parquet_compact.parquet", + &readback); + ASSERT_MSG(ok, "parquet write+read round-trip succeeded"); + + if (readback) { + ASSERT_MSG(garrow_table_get_n_rows(readback) == 3, + "round-trip preserved 3 rows"); + + GArrowDataType *rt_time = get_column_type(readback, "time"); + ASSERT_MSG(rt_time != NULL, "time column exists after round-trip"); + ASSERT_MSG(GARROW_IS_TIMESTAMP_DATA_TYPE(rt_time), + "time is still timestamp after round-trip"); + if (rt_time) g_object_unref(rt_time); + + g_object_unref(readback); + } + + g_object_unref(compacted); +} + +static void test_timestamp_values(void) +{ + printf("\n--- Test: timestamp value correctness ---\n"); + GArrowTable *table = parse_test_json(); + GArrowTable *compacted = compact_parquet_columns(table); + g_object_unref(table); + + GArrowSchema *schema = garrow_table_get_schema(compacted); + int idx = garrow_schema_get_field_index(schema, "time"); + g_object_unref(schema); + ASSERT_MSG(idx >= 0, "time column found"); + + GArrowChunkedArray *chunked = garrow_table_get_column_data(compacted, idx); + GArrowArray *chunk = garrow_chunked_array_get_chunk(chunked, 0); + + /* First row: 2024-01-15T10:30:45.123456789Z */ + /* Expected: 1705312245 * 1e9 + 123456789 = 1705312245123456789 */ + GArrowTimestampArray *ts_arr = GARROW_TIMESTAMP_ARRAY(chunk); + gint64 val0 = garrow_timestamp_array_get_value(ts_arr, 0); + ASSERT_MSG(val0 == 1705314645123456789LL, + "first timestamp value correct (nanosecond precision)"); + + /* Second row: 2024-01-15T10:30:46.000000000Z */ + gint64 val1 = garrow_timestamp_array_get_value(ts_arr, 1); + ASSERT_MSG(val1 == 1705314646000000000LL, + "second timestamp value correct (whole seconds)"); + + /* Third row: 2024-01-15T10:30:47.999999999Z */ + gint64 val2 = garrow_timestamp_array_get_value(ts_arr, 2); + ASSERT_MSG(val2 == 1705314647999999999LL, + "third timestamp value correct (max nanoseconds)"); + + g_object_unref(chunk); + g_object_unref(chunked); + g_object_unref(compacted); +} + +int main(void) +{ + printf("=== compact_columns unit tests ===\n"); + + test_arrow_compact(); + test_parquet_compact(); + test_timestamp_values(); + + printf("\n=== Results: %d passed, %d failed ===\n", + tests_passed, tests_failed); + + if (tests_failed > 0) { + return 1; + } + return 0; +} diff --git a/k3d-example/fluentbit/daemonset.yaml b/k3d-example/fluentbit/daemonset.yaml index fe7a85e..c4217cd 100644 --- a/k3d-example/fluentbit/daemonset.yaml +++ b/k3d-example/fluentbit/daemonset.yaml @@ -26,6 +26,10 @@ spec: env: - name: CLUSTER_NAME value: dev + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName envFrom: - secretRef: name: fluentbit-s3-credentials diff --git a/k3d-example/fluentbit/fluent-bit.conf b/k3d-example/fluentbit/fluent-bit.conf index cbc580a..91b17c2 100644 --- a/k3d-example/fluentbit/fluent-bit.conf +++ b/k3d-example/fluentbit/fluent-bit.conf @@ -19,25 +19,41 @@ Refresh_Interval 5 # The CRI parser has Time_Keep On, producing a 'time' record field (nanosecond string). -# We drop it here because the OUTPUT section writes the internal timestamp as 'time' -# (json_date_key=time, json_date_format=epoch_ms) — a compact BIGINT in parquet. -# Removing the CRI 'time' avoids a conflict with the output's 'time' key. +# Both compact compressions convert 'time' to native Timestamp(ns) during Arrow/Parquet write. [FILTER] Name modify Match kube.* - Remove time Add cluster ${CLUSTER_NAME} +# Arrow-compact: CRI 'time' as Timestamp(ns), stream+logtag dictionary-encoded +# json_date_key=false suppresses the internal timestamp [OUTPUT] Name s3 + Alias s3-arrow Match kube.* bucket fluentbit-logs endpoint http://versitygw:7070 tls Off use_put_object On - compression parquet - json_date_key time - json_date_format epoch_ms + compression arrow-compact + json_date_key false + total_file_size 1M + upload_timeout 15s + s3_key_format_tag_delimiters . + s3_key_format /${CLUSTER_NAME}/$TAG[2]/%Y/%m/%d/${NODE_NAME}/$TAG[3]/$TAG[4]/%H/%M/$UUID.arrow + +# Parquet-compact: CRI 'time' as TIMESTAMP(isAdjustedToUTC, NANOS), stream+logtag dictionary-encoded +# json_date_key=false suppresses the internal timestamp +[OUTPUT] + Name s3 + Alias s3-parquet + Match kube.* + bucket fluentbit-logs + endpoint http://versitygw:7070 + tls Off + use_put_object On + compression parquet-compact + json_date_key false total_file_size 1M upload_timeout 15s s3_key_format_tag_delimiters . @@ -46,4 +62,4 @@ # while the SigV4 spec requires it (%3D). versitygw (fixed in #807) encodes correctly, # causing SignatureDoesNotMatch. Using positional dirs as a workaround; # Consumers must reconstruct namespace/pod/container columns from the file path at query time. - s3_key_format /${CLUSTER_NAME}/$TAG[2]/$TAG[3]/$TAG[4]/%Y/%m/%d/%H/%M/$UUID.parquet + s3_key_format /${CLUSTER_NAME}/$TAG[2]/%Y/%m/%d/${NODE_NAME}/$TAG[3]/$TAG[4]/%H/%M/$UUID.parquet diff --git a/k3d-example/workload/workload.yaml b/k3d-example/workload/workload.yaml index c9dc21e..1dbec99 100644 --- a/k3d-example/workload/workload.yaml +++ b/k3d-example/workload/workload.yaml @@ -34,7 +34,7 @@ spec: count=$((count + 1)) if [ $((count % 10)) -eq 0 ]; then printf '{"count":%d,"message":"stack trace example\\nError: something failed\\n at main (app.js:42)\\n at run (app.js:10)","ts":"%s"}\n' \ - "$count" "$(date -u +%Y-%m-%dT%H:%M:%SZ)" + "$count" "$(date -u +%Y-%m-%dT%H:%M:%SZ)" >&2 else echo "{\"count\":$count,\"message\":\"hello from log-generator\",\"ts\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"}" fi diff --git a/test.sh b/test.sh index 52d64bc..ae7c41e 100755 --- a/test.sh +++ b/test.sh @@ -7,6 +7,11 @@ K3D_DIR="$SCRIPT_DIR/k3d-example" KUBECONFIG="$K3D_DIR/kubeconfig" KUBECTL="kubectl --kubeconfig=$KUBECONFIG" +# DuckDB/nanoarrow cannot read dictionary-encoded Arrow IPC files +# (https://github.com/paleolimbot/duckdb-nanoarrow/issues/25). +# Set to "fail" to treat DuckDB arrow failures as errors. +ON_DUCKDB_ARROW_FAILURE="${ON_DUCKDB_ARROW_FAILURE:-continue}" + # --- Helpers --- fail() { @@ -16,12 +21,36 @@ fail() { exit 1 } +duckdb_arrow_assert() { + local msg="$1" + if [[ "$ON_DUCKDB_ARROW_FAILURE" == "fail" ]]; then + echo " FAIL: $msg (DuckDB nanoarrow)" >&2 + ERRORS=$((ERRORS + 1)) + else + echo " SKIP: $msg (DuckDB nanoarrow lacks dictionary support)" + fi +} + wait_for_rollout() { local kind="$1" name="$2" timeout="${3:-120s}" echo " Waiting for $kind/$name..." $KUBECTL rollout status "$kind/$name" --timeout="$timeout" || fail "$kind/$name rollout timed out" } +S3_SETUP_SQL="INSTALL httpfs; LOAD httpfs; +INSTALL nanoarrow FROM community; LOAD nanoarrow; +SET s3_region='us-east-1'; SET s3_endpoint='localhost:30070'; +SET s3_access_key_id='demoaccess'; SET s3_secret_access_key='demosecret'; +SET s3_use_ssl=false; SET s3_url_style='path';" + +duckdb_s3() { + duckdb -noheader -csv -c "${S3_SETUP_SQL} $1" +} + +duckdb_s3_show() { + duckdb -c "${S3_SETUP_SQL} $1" +} + # --- 1. Cluster --- echo "==> Ensuring k3d cluster '$CLUSTER_NAME'" @@ -44,7 +73,8 @@ echo "==> Building container images" echo "==> Importing images into k3d" k3d image import -c "$CLUSTER_NAME" \ "$SCRIPT_DIR/images/fluentbit/target/images/fluentbit.tar" \ - "$SCRIPT_DIR/images/versitygw/target/images/versitygw.tar" + "$SCRIPT_DIR/images/versitygw/target/images/versitygw.tar" \ + "$SCRIPT_DIR/images/arrow-tools/target/images/arrow-tools.tar" # --- 4. Apply manifests --- @@ -75,28 +105,96 @@ wait_for_rollout daemonset fluent-bit 60s wait_for_rollout deployment log-generator 60s -# --- 6. Poll for parquet data --- - -echo "==> Waiting for parquet data to appear (up to 120s)..." +# --- 6. Poll for data in both formats --- POLL_TIMEOUT=120 POLL_INTERVAL=5 -elapsed=0 - -LAST_ERROR="" -while true; do - OUTPUT=$(./y-logcli --context=dev query '{namespace="default"}' 2>&1) && break - LAST_ERROR="$OUTPUT" - elapsed=$((elapsed + POLL_INTERVAL)) - if [ "$elapsed" -ge "$POLL_TIMEOUT" ]; then - echo " Last y-logcli output: $LAST_ERROR" >&2 - fail "No parquet data appeared within ${POLL_TIMEOUT}s" - fi - echo " No data yet, retrying in ${POLL_INTERVAL}s... (${elapsed}/${POLL_TIMEOUT}s)" - sleep "$POLL_INTERVAL" -done -echo "==> Data found, running assertions..." +poll_for_format() { + local fmt="$1" + local elapsed=0 last_error="" + echo "==> Waiting for $fmt data to appear (up to ${POLL_TIMEOUT}s)..." + while true; do + local output + output=$(./y-logcli --context=dev query '{namespace="default"}' -f "$fmt" 2>&1) && { + echo "$output" + return 0 + } + last_error="$output" + elapsed=$((elapsed + POLL_INTERVAL)) + if [ "$elapsed" -ge "$POLL_TIMEOUT" ]; then + echo " Last y-logcli output ($fmt): $last_error" >&2 + return 1 + fi + echo " No $fmt data yet, retrying in ${POLL_INTERVAL}s... (${elapsed}/${POLL_TIMEOUT}s)" + sleep "$POLL_INTERVAL" + done +} + +PARQUET_OUTPUT=$(poll_for_format parquet) || fail "No parquet data appeared within ${POLL_TIMEOUT}s" +echo " Parquet data found" + +echo "==> Probing arrow IPC via DuckDB nanoarrow (may fail with dictionary-encoded columns)..." +ARROW_OUTPUT=$(poll_for_format arrow 2>&1) || { + echo " DuckDB nanoarrow cannot read dictionary-encoded Arrow IPC (ON_DUCKDB_ARROW_FAILURE=$ON_DUCKDB_ARROW_FAILURE)" + ARROW_OUTPUT="" +} + +# --- 6b. Print raw file metadata for one sample of each format --- + +print_arrow_metadata() { + local glob="$1" + local sample + sample=$(duckdb_s3 "SELECT file FROM glob('${glob}') ORDER BY file DESC LIMIT 1;") + echo " --- arrow IPC (.arrow): $(basename "$sample") [DuckDB nanoarrow] ---" + echo " Schema (DuckDB DESCRIBE read_arrow):" + duckdb_s3_show " + DESCRIBE SELECT * FROM read_arrow('${sample}'); + " +} + +print_parquet_metadata() { + local glob="$1" + local sample + sample=$(duckdb_s3 "SELECT DISTINCT file_name FROM parquet_schema('${glob}') LIMIT 1;") + echo " --- parquet (.parquet): $(basename "$sample") ---" + echo " Schema (parquet logical types):" + duckdb_s3_show " + SELECT name, type, logical_type + FROM parquet_schema('${sample}') + WHERE name <> 'schema'; + " + echo " Row group metadata (encodings, sizes):" + duckdb_s3_show " + SELECT path_in_schema AS col, encodings, compression, + total_compressed_size AS comp_bytes, total_uncompressed_size AS raw_bytes + FROM parquet_metadata('${sample}'); + " +} + +echo "==> File metadata (one sample per format)..." +print_arrow_metadata "s3://fluentbit-logs/dev/default/**/*.arrow" || { + echo " (DuckDB nanoarrow read_arrow failed — expected with dictionary-encoded Arrow IPC)" +} +print_parquet_metadata "s3://fluentbit-logs/dev/default/**/*.parquet" + +echo "==> Validating arrow IPC via pyarrow (official Apache Arrow library)..." +# Find the log-generator's earliest arrow file (contains the burst with close ns timestamps) +LOG_GEN_DIR=$(duckdb_s3 " + SELECT regexp_replace(file, '/[^/]+$', '') + FROM glob('s3://fluentbit-logs/dev/default/**/app/**/*.arrow') + ORDER BY file LIMIT 1; +" | sed 's|s3://fluentbit-logs/||') +echo " log-generator prefix: $LOG_GEN_DIR" + +PYARROW_OUTPUT=$($KUBECTL run arrow-inspect --rm -i --restart=Never \ + --image=yolean/arrow-tools:latest --image-pull-policy=Never \ + --command -- python /usr/local/bin/inspect_arrow.py \ + fluentbit-logs "$LOG_GEN_DIR" .arrow \ + 2>/dev/null) || true +echo "$PYARROW_OUTPUT" + +echo "==> Running assertions..." # --- 7. Assertions --- @@ -105,15 +203,27 @@ ERRORS=0 # Note: use here-strings (<<<) not pipes for grep -q, because # pipefail + grep -q causes SIGPIPE when grep exits early on match. -# 7a. log-generator messages exist -if grep -q "hello from log-generator" <<< "$OUTPUT"; then - echo " PASS: log-generator messages found" +# 7a. log-generator messages exist (check both formats) +if grep -q "hello from log-generator" <<< "$PARQUET_OUTPUT"; then + echo " PASS: log-generator messages found in parquet" else - echo " FAIL: log-generator messages not found" >&2 + echo " FAIL: log-generator messages not found in parquet" >&2 ERRORS=$((ERRORS + 1)) fi -# 7b. JSON structure - check that parquet has expected columns +if [[ -n "$ARROW_OUTPUT" ]]; then + if grep -q "hello from log-generator" <<< "$ARROW_OUTPUT"; then + echo " PASS: log-generator messages found in arrow (DuckDB nanoarrow)" + else + duckdb_arrow_assert "log-generator messages not found in arrow" + fi +else + duckdb_arrow_assert "arrow data not readable via DuckDB nanoarrow" +fi + +# 7b. Partition columns present +OUTPUT=$(./y-logcli --context=dev query '{namespace="default"}' -f parquet 2>&1) + if grep -q "namespace" <<< "$OUTPUT"; then echo " PASS: partition column 'namespace' present" else @@ -129,7 +239,7 @@ else fi # 7c. Cluster tag added by fluent-bit filter -LINES_OUTPUT=$(./y-logcli --context=dev query '{namespace="default"}' -o lines 2>&1) +LINES_OUTPUT=$(./y-logcli --context=dev query '{namespace="default"}' -f parquet -o lines 2>&1) if grep -q "cluster.*=.*dev" <<< "$LINES_OUTPUT"; then echo " PASS: cluster tag 'dev' present in records" else @@ -137,7 +247,89 @@ else ERRORS=$((ERRORS + 1)) fi -# 7d. SIGTERM flush - verify buffered data is flushed to S3 on shutdown +# 7d. Schema comparison — both formats should be Timestamp(ns) without timezone +echo "==> Checking schemas..." + +# DuckDB nanoarrow read_arrow — may fail with dictionary-encoded columns +ARROW_TIME_TYPE=$(duckdb_s3 " + SELECT column_type FROM ( + DESCRIBE SELECT * FROM read_arrow('s3://fluentbit-logs/dev/default/**/*.arrow', filename=true) + ) WHERE column_name='time'; +" 2>/dev/null | tr -d '[:space:]') || true + +PARQUET_TIME_TYPE=$(duckdb_s3 " + SELECT column_type FROM ( + DESCRIBE SELECT * FROM read_parquet('s3://fluentbit-logs/dev/default/**/*.parquet', filename=true, hive_partitioning=false) + ) WHERE column_name='time'; +" | tr -d '[:space:]') + +echo " --- DuckDB default interpretation of persisted formats ---" +echo " Parquet (DESCRIBE read_parquet):" +duckdb_s3_show " + DESCRIBE SELECT * FROM read_parquet('s3://fluentbit-logs/dev/default/**/*.parquet', filename=true, hive_partitioning=false); +" +echo " Arrow IPC (DESCRIBE read_arrow):" +duckdb_s3_show " + DESCRIBE SELECT * FROM read_arrow('s3://fluentbit-logs/dev/default/**/*.arrow', filename=true); +" 2>&1 || echo " (DuckDB nanoarrow failed — expected with dictionary-encoded Arrow IPC)" + +if [[ -n "$ARROW_TIME_TYPE" && "$ARROW_TIME_TYPE" == "TIMESTAMP_NS" ]]; then + echo " PASS: arrow format has time as TIMESTAMP_NS (DuckDB nanoarrow)" +elif [[ -z "$ARROW_TIME_TYPE" ]]; then + duckdb_arrow_assert "arrow schema not readable via DuckDB nanoarrow" +else + echo " FAIL: arrow format has time as '$ARROW_TIME_TYPE', expected TIMESTAMP_NS" >&2 + ERRORS=$((ERRORS + 1)) +fi + +# Both formats use Timestamp(ns) without timezone — DuckDB reads as TIMESTAMP_NS +if [[ "$PARQUET_TIME_TYPE" == "TIMESTAMP_NS" ]]; then + echo " PASS: parquet format has time as TIMESTAMP_NS (nanosecond precision)" +else + echo " FAIL: parquet format has time as '$PARQUET_TIME_TYPE', expected TIMESTAMP_NS" >&2 + ERRORS=$((ERRORS + 1)) +fi + +# 7e. pyarrow independent validation — confirms Arrow IPC is readable by official Apache Arrow +if grep -q "timestamp\[ns\]" <<< "$PYARROW_OUTPUT"; then + echo " PASS: pyarrow confirms time field is timestamp[ns]" +else + echo " FAIL: pyarrow did not find timestamp[ns] type in Arrow IPC" >&2 + ERRORS=$((ERRORS + 1)) +fi + +if grep -q "dictionary fields" +else + echo " FAIL: pyarrow did not find dictionary in Arrow IPC" >&2 + ERRORS=$((ERRORS + 1)) +fi + +# 7f. Timestamp values are valid (parseable by DuckDB as timestamps) +ARROW_TIME_SAMPLE=$(duckdb_s3 " + SELECT time::VARCHAR FROM read_arrow('s3://fluentbit-logs/dev/default/**/*.arrow', filename=true) LIMIT 1; +" 2>/dev/null | tr -d '[:space:]') || true + +if [[ -n "$ARROW_TIME_SAMPLE" ]] && grep -qE '^[0-9]{4}-[0-9]{2}-[0-9]{2}' <<< "$ARROW_TIME_SAMPLE"; then + echo " PASS: arrow time is a valid timestamp: $ARROW_TIME_SAMPLE (DuckDB nanoarrow)" +elif [[ -z "$ARROW_TIME_SAMPLE" ]]; then + duckdb_arrow_assert "arrow timestamp not readable via DuckDB nanoarrow" +else + echo " FAIL: arrow time is not a valid timestamp: '$ARROW_TIME_SAMPLE'" >&2 + ERRORS=$((ERRORS + 1)) +fi + +# 7g. Parquet data queryable through y-logcli +PARQUET_COUNT=$(./y-logcli --context=dev query '{namespace="default"}' -f parquet -o raw 2>&1 | grep -c "hello from log-generator" || true) + +if [[ "$PARQUET_COUNT" -gt 0 ]]; then + echo " PASS: parquet contains data ($PARQUET_COUNT log-generator messages)" +else + echo " FAIL: no parquet data found via y-logcli" >&2 + ERRORS=$((ERRORS + 1)) +fi + +# 7h. SIGTERM flush - verify buffered data is flushed to S3 on shutdown echo "==> Testing SIGTERM flush..." MARKER="sigterm-test-$(date +%s)" @@ -151,7 +343,8 @@ $KUBECTL run "$MARKER" --rm -i --restart=Never \ sleep 8 # Kill fluent-bit before upload_timeout (15s) triggers a regular flush -$KUBECTL delete pod -l app=fluent-bit --grace-period=5 +# grace-period=10 allows time for both s3-arrow and s3-parquet outputs to flush +$KUBECTL delete pod -l app=fluent-bit --grace-period=10 wait_for_rollout daemonset fluent-bit 60s # Poll for the marker in S3 (up to 30s for the new pod to be ready) @@ -161,7 +354,7 @@ flush_elapsed=0 FLUSH_FOUND=false while [ "$flush_elapsed" -lt "$FLUSH_TIMEOUT" ]; do - FLUSH_OUTPUT=$(./y-logcli --context=dev query '{namespace="default"}' -o raw 2>&1) || true + FLUSH_OUTPUT=$(./y-logcli --context=dev query '{namespace="default"}' -f parquet -o raw 2>&1) || true if grep -q "$MARKER" <<< "$FLUSH_OUTPUT"; then FLUSH_FOUND=true break @@ -184,6 +377,6 @@ echo "" if [ "$ERRORS" -gt 0 ]; then fail "$ERRORS assertion(s) failed" else - echo "PASS: All assertions passed - state is shippable" + echo "PASS: All assertions passed" exit 0 fi diff --git a/y-logcli b/y-logcli index beb5bf7..433b4f0 100755 --- a/y-logcli +++ b/y-logcli @@ -2,7 +2,13 @@ set -euo pipefail # y-logcli - Query Kubernetes container logs stored as Parquet in S3 -# S3 path layout: /////YYYY/MM/DD/HH/MM/.parquet +# S3 path layout: ///YYYY/MM/DD////HH/MM/.{arrow,parquet} +# +# Two formats are written by fluent-bit: +# .arrow — Arrow IPC (Feather v2) with ZSTD body compression +# .parquet — Apache Parquet +# Both use Timestamp(ns) without timezone annotation (UTC by convention). +# DuckDB reads both as TIMESTAMP_NS preserving nanosecond precision. S3_ENDPOINT="localhost:30070" S3_BUCKET="fluentbit-logs" @@ -12,17 +18,21 @@ S3_SECRET_KEY="demosecret" usage() { cat <<'EOF' Usage: - y-logcli --context= query '{}' [--since=] + y-logcli --context= query '{}' [--since=] [-o ] [-f ] Examples: y-logcli --context=dev query '{pod="app-abc12345-abcdef"}' y-logcli --context=dev query '{namespace="qa"}' --since=5m - y-logcli --context=dev query '{container="app"}' --since=1h - y-logcli --context=dev query '{namespace="default"}' -o raw + y-logcli --context=dev query '{container="app"}' --since=1h -o raw + y-logcli --context=dev query '{namespace="default"}' -o table + y-logcli --context=dev query '{namespace="default"}' -f arrow + y-logcli --context=dev query '{namespace="default"}' -f parquet Selectors: namespace, pod, container (comma-separated inside braces) Duration: s (seconds), m (minutes), h (hours) -Output: -o table (default), -o raw (like kubectl logs), -o lines (key=value per field) +Output: -o table (default), -o raw (like kubectl logs), -o lines (key=value per field), + -o columns (compact: time pod container stream message) +Format: -f both (default), -f arrow (.arrow IPC), -f parquet (.parquet) EOF exit 1 } @@ -32,6 +42,7 @@ CONTEXT="" QUERY="" SINCE="" OUTPUT="table" +FORMAT="both" while [[ $# -gt 0 ]]; do case "$1" in @@ -53,6 +64,11 @@ while [[ $# -gt 0 ]]; do OUTPUT="${1:-table}" shift || true ;; + -f) + shift + FORMAT="${1:-both}" + shift || true + ;; *) echo "Unknown argument: $1" >&2 usage @@ -86,7 +102,9 @@ if [[ -n "$INNER" ]]; then fi # Build S3 glob path with partition pushdown +# Path layout: ///YYYY/MM/DD////HH/MM/.ext build_s3_path() { + local ext="$1" local ns="*" pod="*" container="*" for i in "${!SELECTOR_KEYS[@]}"; do case "${SELECTOR_KEYS[$i]}" in @@ -95,7 +113,13 @@ build_s3_path() { container) container="${SELECTOR_VALUES[$i]}" ;; esac done - echo "s3://${S3_BUCKET}/${CONTEXT}/${ns}/${pod}/${container}/**/*.parquet" + # With namespace pushdown, date/node/pod/container are deeper in the tree + # Path: cluster/namespace/YYYY/MM/DD/node/pod/container/HH/MM/file + if [[ "$pod" != "*" || "$container" != "*" ]]; then + echo "s3://${S3_BUCKET}/${CONTEXT}/${ns}/*/*/*/*/${pod}/${container}/**/*.${ext}" + else + echo "s3://${S3_BUCKET}/${CONTEXT}/${ns}/**/*.${ext}" + fi } # Convert --since duration to SQL interval @@ -111,60 +135,144 @@ since_to_interval() { esac } -S3_PATH="$(build_s3_path)" +# S3 path: s3://bucket///YYYY/MM/DD////HH/MM/.ext +# string_split('/'): [5]=namespace [9]=node [10]=pod [11]=container +PARTITION_COLS="string_split(filename, '/')[5] AS namespace, + string_split(filename, '/')[9] AS node, + string_split(filename, '/')[10] AS pod, + string_split(filename, '/')[11] AS container" + +S3_SETUP="INSTALL httpfs; LOAD httpfs; +INSTALL nanoarrow FROM community; LOAD nanoarrow; +SET s3_region = 'us-east-1'; +SET s3_endpoint = '${S3_ENDPOINT}'; +SET s3_access_key_id = '${S3_ACCESS_KEY}'; +SET s3_secret_access_key = '${S3_SECRET_KEY}'; +SET s3_use_ssl = false; +SET s3_url_style = 'path';" + +# Build per-format queries +# Both formats write Timestamp(ns) without timezone annotation. +# DuckDB reads these as TIMESTAMP_NS preserving nanosecond precision. +# (With isAdjustedToUTC=true, DuckDB would map to TIMESTAMPTZ which is only microsecond.) +TIME_EXPR="time" -# epoch_ms() returns TIMESTAMP (no tz); AT TIME ZONE 'UTC' marks it as UTC for correct comparison with now(). -TIME_EXPR="epoch_ms(time) AT TIME ZONE 'UTC'" +build_where() { + local time_expr="$1" + local parts=() + if [[ -n "$SINCE" ]]; then + local interval + interval="$(since_to_interval "$SINCE")" + parts+=("${time_expr} >= now() - INTERVAL '${interval}'") + fi + if [[ ${#parts[@]} -gt 0 ]]; then + echo "WHERE $(IFS=' AND '; echo "${parts[*]}")" + fi +} -# Build WHERE clause -WHERE_PARTS=() +# ISO 8601 with nanosecond precision in UTC. +# Both formats store UTC Timestamp(ns) without timezone annotation. +NS_TIME_FMT="strftime({t}, '%Y-%m-%dT%H:%M:%S') || '.' || lpad((epoch_ns({t}) % 1000000000)::VARCHAR, 9, '0') || 'Z'" -if [[ -n "$SINCE" ]]; then - INTERVAL="$(since_to_interval "$SINCE")" - WHERE_PARTS+=("${TIME_EXPR} >= now() - INTERVAL '${INTERVAL}'") -fi +build_select_cols() { + local time_expr="$1" + local ns_time="${NS_TIME_FMT//\{t\}/$time_expr}" + case "$OUTPUT" in + raw) echo "message" ;; + columns) echo "${ns_time} AS time, string_split(filename, '/')[10] AS pod, string_split(filename, '/')[11] AS container, stream, left(message, 60) AS message" ;; + lines) echo "cluster, ${PARTITION_COLS}, ${time_expr} AS time, stream, logtag, message" ;; + table) echo "cluster, ${PARTITION_COLS}, ${time_expr} AS time, stream, logtag, message" ;; + *) echo "Unknown output mode: $OUTPUT" >&2; exit 1 ;; + esac +} -WHERE_CLAUSE="" -if [[ ${#WHERE_PARTS[@]} -gt 0 ]]; then - WHERE_CLAUSE="WHERE $(IFS=' AND '; echo "${WHERE_PARTS[*]}")" -fi +# For UNION ALL, all columns must be in the SELECT for ORDER BY to work. +# This builds a full-column sub-query for wrapping in UNION ALL. +build_union_select_cols() { + local time_expr="$1" + echo "cluster, ${PARTITION_COLS}, ${time_expr} AS time, stream, logtag, message" +} -# S3 path: s3://bucket/////YYYY/... -# split('/'): [1]=bucket [2]=cluster [3]=ns [4]=pod [5]=container -PARTITION_COLS="string_split(filename, '/')[5] AS namespace, - string_split(filename, '/')[6] AS pod, - string_split(filename, '/')[7] AS container" +build_query_for_format() { + local fmt="$1" use_union_cols="${2:-false}" + local s3_path select_cols where_clause + s3_path="$(build_s3_path "$fmt")" + if [[ "$use_union_cols" == "true" ]]; then + select_cols="$(build_union_select_cols "$TIME_EXPR")" + else + select_cols="$(build_select_cols "$TIME_EXPR")" + fi + where_clause="$(build_where "$TIME_EXPR")" + + local read_fn="read_parquet" + local read_opts="filename=true, hive_partitioning=false" + if [[ "$fmt" == "arrow" ]]; then + read_fn="read_arrow" + read_opts="filename=true" + fi + + echo "SELECT ${select_cols} +FROM ${read_fn}('${s3_path}', ${read_opts}) +${where_clause}" +} + +# Build the combined SQL case "$OUTPUT" in - raw) SELECT_COLS="message" ; DUCKDB_MODE=(-cmd ".mode list" -cmd ".headers off") ;; - lines) SELECT_COLS="cluster, ${PARTITION_COLS}, ${TIME_EXPR} AS time, stream, logtag, message" ; DUCKDB_MODE=(-cmd ".mode line") ;; - table) SELECT_COLS="cluster, ${PARTITION_COLS}, ${TIME_EXPR} AS time, stream, logtag, message" ; DUCKDB_MODE=() ;; - *) echo "Unknown output mode: $OUTPUT (use table, raw, or lines)" >&2; exit 1 ;; + raw) DUCKDB_MODE=(-cmd ".mode list" -cmd ".headers off") ;; + columns) DUCKDB_MODE=(-cmd ".mode list" -cmd ".separator ' '" -cmd ".headers off") ;; + lines) DUCKDB_MODE=(-cmd ".mode line") ;; + table) DUCKDB_MODE=() ;; + *) echo "Unknown output mode: $OUTPUT (use table, raw, columns, or lines)" >&2; exit 1 ;; esac -SQL=$(cat <&2 + exit 1 + ;; +esac -duckdb ${DUCKDB_MODE[@]+"${DUCKDB_MODE[@]}"} -c "$SQL" +SQL="${S3_SETUP} -QUERY_SQL="SELECT ${SELECT_COLS} -FROM read_parquet('${S3_PATH}', filename=true, hive_partitioning=false) -${WHERE_CLAUSE} -ORDER BY ${TIME_EXPR};" +${QUERY_SQL}" + +if [[ "$OUTPUT" == "table" ]]; then + # Print column metadata above the table (DuckDB's box mode truncates types) + DESCRIBE_SQL="${S3_SETUP} +SELECT column_name || ': ' || column_type FROM (DESCRIBE ${QUERY_SQL%%ORDER BY*});" + duckdb -noheader -list -c "$DESCRIBE_SQL" 2>/dev/null | sed 's/^/-- /' || true + echo "" +fi + +duckdb ${DUCKDB_MODE[@]+"${DUCKDB_MODE[@]}"} -c "$SQL" echo "" echo "-- query executed:"