From 016f629bc6216ca81908b127379df7ce2bcdc4d8 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Tue, 10 Mar 2026 08:12:34 +0100 Subject: [PATCH 1/3] flush S3 on IN_CLOSE_WRITE: immediate upload when containers terminate Patches fluent-bit's tail_fs_inotify.c to watch IN_CLOSE_WRITE events. When CRI closes a container log file, a sentinel record on tag._close triggers the S3 output to force-flush buffered data for that tag. Adds e2e tests for exit 0, exit 1, and kubectl delete pod scenarios. Co-Authored-By: Claude Opus 4.6 --- images/fluentbit/Dockerfile | 1 + images/fluentbit/close-write-flush.patch | 157 +++++++++++++++++++++++ test.sh | 99 ++++++++++++++ 3 files changed, 257 insertions(+) create mode 100644 images/fluentbit/close-write-flush.patch diff --git a/images/fluentbit/Dockerfile b/images/fluentbit/Dockerfile index f7927e0..edb7bb3 100644 --- a/images/fluentbit/Dockerfile +++ b/images/fluentbit/Dockerfile @@ -45,6 +45,7 @@ COPY *.patch /src/ COPY compact_columns.h compact_columns.c /src/fluent-bit/src/aws/compression/arrow/ COPY apply-compact-compression.sh /src/ RUN cd /src/fluent-bit && git apply /src/fix-gcs-header-lookup.patch +RUN cd /src/fluent-bit && git apply /src/close-write-flush.patch RUN cd /src/fluent-bit && bash /src/apply-compact-compression.sh WORKDIR /src/fluent-bit/build diff --git a/images/fluentbit/close-write-flush.patch b/images/fluentbit/close-write-flush.patch new file mode 100644 index 0000000..2c51027 --- /dev/null +++ b/images/fluentbit/close-write-flush.patch @@ -0,0 +1,157 @@ +diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify.c +--- a/plugins/in_tail/tail_fs_inotify.c ++++ b/plugins/in_tail/tail_fs_inotify.c +@@ -20,6 +20,10 @@ + #include + #include + #include ++#include ++#include ++#include ++#include + + #include "tail.h" + #include "tail_file.h" +@@ -90,6 +94,60 @@ static inline void debug_event_mask(struct flb_tail_config *ctx, + } + } + ++/* ++ * Emit a sentinel record on a derived tag (original_tag._close) to signal ++ * downstream S3 outputs to flush buffered data for this file's tag. ++ * Called when IN_CLOSE_WRITE is detected (container runtime closed the log file). ++ */ ++static int tail_emit_close_signal(struct flb_tail_file *file, ++ struct flb_config *config) ++{ ++ struct flb_tail_config *ctx = file->config; ++ struct flb_time t; ++ struct flb_log_event_encoder encoder; ++ flb_sds_t signal_tag; ++ int ret; ++ ++ signal_tag = flb_sds_create_size(file->tag_len + 7); ++ if (!signal_tag) { ++ return -1; ++ } ++ flb_sds_cat_safe(&signal_tag, file->tag_buf, file->tag_len); ++ flb_sds_cat_safe(&signal_tag, "._close", 7); ++ ++ flb_time_get(&t); ++ ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); ++ if (ret != FLB_EVENT_ENCODER_SUCCESS) { ++ flb_sds_destroy(signal_tag); ++ return -1; ++ } ++ ++ ret = flb_log_event_encoder_begin_record(&encoder); ++ if (ret == FLB_EVENT_ENCODER_SUCCESS) ++ ret = flb_log_event_encoder_set_timestamp(&encoder, &t); ++ if (ret == FLB_EVENT_ENCODER_SUCCESS) ++ ret = flb_log_event_encoder_append_body_cstring(&encoder, ++ "_flb_close_write"); ++ if (ret == FLB_EVENT_ENCODER_SUCCESS) ++ ret = flb_log_event_encoder_append_body_boolean(&encoder, FLB_TRUE); ++ if (ret == FLB_EVENT_ENCODER_SUCCESS) ++ ret = flb_log_event_encoder_append_body_cstring(&encoder, "file"); ++ if (ret == FLB_EVENT_ENCODER_SUCCESS) ++ ret = flb_log_event_encoder_append_body_cstring(&encoder, file->name); ++ if (ret == FLB_EVENT_ENCODER_SUCCESS) ++ ret = flb_log_event_encoder_commit_record(&encoder); ++ ++ if (ret == FLB_EVENT_ENCODER_SUCCESS) { ++ flb_input_log_append(ctx->ins, ++ signal_tag, flb_sds_len(signal_tag), ++ encoder.output_buffer, ++ encoder.output_length); ++ flb_plg_info(ctx->ins, ++ "inode=%"PRIu64" IN_CLOSE_WRITE signal emitted: %s (tag=%s)", ++ file->inode, file->name, signal_tag); ++ } ++ ++ flb_log_event_encoder_destroy(&encoder); ++ flb_sds_destroy(signal_tag); ++ return 0; ++} ++ + static int tail_fs_event(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) + { +@@ -142,6 +200,15 @@ static int tail_fs_event(struct flb_input_instance *ins, + } + } + ++ /* IN_CLOSE_WRITE: container runtime closed the log file. ++ * Read any remaining data first, then emit a close signal. */ ++ if (ev.mask & IN_CLOSE_WRITE) { ++ /* Collect any remaining data before signaling */ ++ in_tail_collect_event(file, config); ++ tail_emit_close_signal(file, config); ++ return 0; ++ } ++ + /* IN_MODIFY: handle truncation (size_delta < 0 => seek to 0) */ + if (ev.mask & IN_MODIFY) { + if (size_delta < 0) { +@@ -161,7 +228,7 @@ static int tail_fs_add(struct flb_tail_file *file, int check_rotated) + char *name; + struct flb_tail_config *ctx = file->config; + +- flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW; ++ flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW | IN_CLOSE_WRITE; + + if (check_rotated == FLB_TRUE) { + flags |= IN_MOVE_SELF; +diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c +--- a/plugins/out_s3/s3.c ++++ b/plugins/out_s3/s3.c +@@ -3766,6 +3766,46 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, + + flush_init(ctx); + ++ /* Detect close-write signal: tag ends with "._close". ++ * Force-flush buffered data for the base tag and discard this sentinel. */ ++ { ++ int cw_tag_len = flb_sds_len(event_chunk->tag); ++ if (cw_tag_len > 7 && ++ memcmp(event_chunk->tag + cw_tag_len - 7, "._close", 7) == 0) { ++ int base_tag_len = cw_tag_len - 7; ++ struct s3_file *close_target; ++ struct multipart_upload *close_m_upload; ++ char *close_buf = NULL; ++ size_t close_size = 0; ++ ++ flb_plg_info(ctx->ins, ++ "close-write signal for tag %.*s, forcing flush", ++ base_tag_len, event_chunk->tag); ++ ++ close_target = s3_store_file_get(ctx, event_chunk->tag, ++ base_tag_len); ++ if (close_target != NULL) { ++ close_m_upload = get_upload(ctx, event_chunk->tag, ++ base_tag_len); ++ ++ ret = construct_request_buffer(ctx, NULL, close_target, ++ &close_buf, &close_size); ++ if (ret == 0 && close_buf != NULL) { ++ ret = upload_data(ctx, close_target, close_m_upload, ++ close_buf, close_size, ++ event_chunk->tag, base_tag_len); ++ flb_free(close_buf); ++ if (ret != FLB_OK) { ++ flb_plg_warn(ctx->ins, ++ "close-write flush failed for tag %.*s", ++ base_tag_len, event_chunk->tag); ++ } ++ } ++ } ++ else { ++ flb_plg_debug(ctx->ins, ++ "close-write signal but no buffered data for " ++ "tag %.*s", base_tag_len, event_chunk->tag); ++ } ++ FLB_OUTPUT_RETURN(FLB_OK); ++ } ++ } ++ + if (ctx->log_key) { diff --git a/test.sh b/test.sh index 9a5203c..8b55292 100755 --- a/test.sh +++ b/test.sh @@ -445,6 +445,105 @@ fi $KUBECTL delete pod -l test=concurrent-flush --grace-period=0 --force 2>/dev/null || true +# 7j. IN_CLOSE_WRITE flush — container termination triggers immediate S3 upload +# without restarting fluent-bit. The close-write-flush.patch watches for IN_CLOSE_WRITE +# inotify events (CRI runtime closes the log fd) and emits a sentinel on tag._close +# which forces S3 output to flush the base tag's buffer. +# +# We verify that: +# 1. Logs are NOT flushed before container terminates (within upload_timeout=15s window) +# 2. After termination, logs appear in S3 within seconds (not waiting for upload_timeout) +# +# Helper: poll S3 for a marker, returns 0 if found within timeout +close_write_poll() { + local marker="$1" + local timeout="$2" + local interval="${3:-2}" + local elapsed=0 + while [ "$elapsed" -lt "$timeout" ]; do + local output + output=$(./y-logcli --context=dev query '{namespace="default"}' -f parquet -o raw 2>&1) || true + if grep -q "$marker" <<< "$output"; then + return 0 + fi + elapsed=$((elapsed + interval)) + echo " Polling... (${elapsed}/${timeout}s)" + sleep "$interval" + done + return 1 +} + +# 7j-1. Exit 0 — container exits normally +echo "==> Testing IN_CLOSE_WRITE flush: exit 0..." +CW_MARKER_0="cw-exit0-$(date +%s)" +$KUBECTL run cw-exit0 --restart=Never \ + --image=busybox:1.37 \ + --command -- sh -c "echo '${CW_MARKER_0}'; sleep 10" + +$KUBECTL wait pod cw-exit0 --for=condition=Ready --timeout=30s + +# Wait for fluent-bit to discover and tail the log file +sleep 8 + +# Pre-check: marker should NOT be in S3 yet (container still running, upload_timeout=15s hasn't fired) +if close_write_poll "$CW_MARKER_0" 0 1 2>/dev/null; then + echo " WARN: Marker already in S3 before container termination (upload_timeout may have fired)" +fi + +# Wait for container to exit (sleep 10 in the container) +echo " Waiting for container to exit 0..." +$KUBECTL wait pod cw-exit0 --for=jsonpath='{.status.phase}'=Succeeded --timeout=30s + +# After close-write, logs should flush quickly (well within upload_timeout=15s) +if close_write_poll "$CW_MARKER_0" 30; then + echo " PASS: IN_CLOSE_WRITE flush (exit 0) — marker '$CW_MARKER_0' found in S3" +else + echo " FAIL: IN_CLOSE_WRITE flush (exit 0) — marker '$CW_MARKER_0' not found in S3" >&2 + ERRORS=$((ERRORS + 1)) +fi +$KUBECTL delete pod cw-exit0 --grace-period=0 --force 2>/dev/null || true + +# 7j-2. Exit 1 — container crashes +echo "==> Testing IN_CLOSE_WRITE flush: exit 1..." +CW_MARKER_1="cw-exit1-$(date +%s)" +$KUBECTL run cw-exit1 --restart=Never \ + --image=busybox:1.37 \ + --command -- sh -c "echo '${CW_MARKER_1}'; sleep 5; exit 1" + +$KUBECTL wait pod cw-exit1 --for=condition=Ready --timeout=30s +sleep 8 + +echo " Waiting for container to exit 1..." +$KUBECTL wait pod cw-exit1 --for=jsonpath='{.status.phase}'=Failed --timeout=30s + +if close_write_poll "$CW_MARKER_1" 30; then + echo " PASS: IN_CLOSE_WRITE flush (exit 1) — marker '$CW_MARKER_1' found in S3" +else + echo " FAIL: IN_CLOSE_WRITE flush (exit 1) — marker '$CW_MARKER_1' not found in S3" >&2 + ERRORS=$((ERRORS + 1)) +fi +$KUBECTL delete pod cw-exit1 --grace-period=0 --force 2>/dev/null || true + +# 7j-3. kubectl delete pod — forced termination +echo "==> Testing IN_CLOSE_WRITE flush: kubectl delete pod..." +CW_MARKER_DEL="cw-delete-$(date +%s)" +$KUBECTL run cw-delete --restart=Never \ + --image=busybox:1.37 \ + --command -- sh -c "echo '${CW_MARKER_DEL}'; sleep 3600" + +$KUBECTL wait pod cw-delete --for=condition=Ready --timeout=30s +sleep 8 + +echo " Deleting pod..." +$KUBECTL delete pod cw-delete --grace-period=5 + +if close_write_poll "$CW_MARKER_DEL" 30; then + echo " PASS: IN_CLOSE_WRITE flush (delete pod) — marker '$CW_MARKER_DEL' found in S3" +else + echo " FAIL: IN_CLOSE_WRITE flush (delete pod) — marker '$CW_MARKER_DEL' not found in S3" >&2 + ERRORS=$((ERRORS + 1)) +fi + # --- 8. Result --- echo "" From e72295ffc45f289fb4be8000900fba398637c574 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Tue, 10 Mar 2026 09:19:47 +0100 Subject: [PATCH 2/3] e2e: assert all four flush mechanisms with upload_timeout=60s Increases upload_timeout to 60s to clearly separate flush triggers: - 7h: size-based (>1M burst via yes|head, appears in <45s) - 7i: timeout-based (small data, NOT in S3 at 20s, IS after ~88s) - 7j/7k: SIGTERM (shutdown handler flushes buffers) - 7l: close-write (container exits, appears in <30s without killing fluentbit) Adds Buffer_Chunk_Size=512K and Buffer_Max_Size=2M to tail input so size-based flush can accumulate >1M within a few engine cycles. Drops redundant exit-1 close-write test (same CRI behavior as exit-0). Fixes corrupt patch file (wrong context lines for v4.2.2 source). Co-Authored-By: Claude Opus 4.6 --- images/fluentbit/close-write-flush.patch | 58 +++++---- k3d-example/fluentbit/fluent-bit.conf | 6 +- test.sh | 150 +++++++++++++++++------ 3 files changed, 147 insertions(+), 67 deletions(-) diff --git a/images/fluentbit/close-write-flush.patch b/images/fluentbit/close-write-flush.patch index 2c51027..98f6267 100644 --- a/images/fluentbit/close-write-flush.patch +++ b/images/fluentbit/close-write-flush.patch @@ -1,21 +1,22 @@ diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify.c --- a/plugins/in_tail/tail_fs_inotify.c +++ b/plugins/in_tail/tail_fs_inotify.c -@@ -20,6 +20,10 @@ +@@ -22,6 +22,10 @@ + #include + #include #include - #include - #include +#include +#include +#include +#include - - #include "tail.h" - #include "tail_file.h" -@@ -90,6 +94,60 @@ static inline void debug_event_mask(struct flb_tail_config *ctx, - } + + #include + #include +@@ -94,6 +98,65 @@ static int debug_event_mask(struct flb_tail_config *ctx, + return 0; } - + ++ +/* + * Emit a sentinel record on a derived tag (original_tag._close) to signal + * downstream S3 outputs to flush buffered data for this file's tag. @@ -74,13 +75,22 @@ diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify + return 0; +} + - static int tail_fs_event(struct flb_input_instance *ins, - struct flb_config *config, void *in_context) + static int tail_fs_add(struct flb_tail_file *file, int check_rotated) { -@@ -142,6 +200,15 @@ static int tail_fs_event(struct flb_input_instance *ins, + int flags; +@@ -107,7 +170,7 @@ static int tail_fs_add(struct flb_tail_file *file, int check_rotated) + * lines from the file and once we reach EOF (and a watch_fd exists), + * we update the flags to receive notifications. + */ +- flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW; ++ flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW | IN_CLOSE_WRITE; + + if (check_rotated == FLB_TRUE) { + flags |= IN_MOVE_SELF; +@@ -250,6 +313,15 @@ static int tail_fs_event(struct flb_input_instance *ins, } } - + + /* IN_CLOSE_WRITE: container runtime closed the log file. + * Read any remaining data first, then emit a close signal. */ + if (ev.mask & IN_CLOSE_WRITE) { @@ -90,25 +100,16 @@ diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify + return 0; + } + - /* IN_MODIFY: handle truncation (size_delta < 0 => seek to 0) */ if (ev.mask & IN_MODIFY) { - if (size_delta < 0) { -@@ -161,7 +228,7 @@ static int tail_fs_add(struct flb_tail_file *file, int check_rotated) - char *name; - struct flb_tail_config *ctx = file->config; - -- flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW; -+ flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW | IN_CLOSE_WRITE; - - if (check_rotated == FLB_TRUE) { - flags |= IN_MOVE_SELF; + /* + * The file was modified, check how many new bytes do diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c -@@ -3766,6 +3766,46 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, - +@@ -3765,6 +3765,52 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, + /* Cleanup old buffers and initialize upload timer */ flush_init(ctx); - + + /* Detect close-write signal: tag ends with "._close". + * Force-flush buffered data for the base tag and discard this sentinel. */ + { @@ -154,4 +155,7 @@ diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c + } + } + ++ + /* Process chunk */ if (ctx->log_key) { + chunk = flb_pack_msgpack_extract_log_key(ctx, diff --git a/k3d-example/fluentbit/fluent-bit.conf b/k3d-example/fluentbit/fluent-bit.conf index 91b17c2..b913ad0 100644 --- a/k3d-example/fluentbit/fluent-bit.conf +++ b/k3d-example/fluentbit/fluent-bit.conf @@ -16,6 +16,8 @@ Exclude_Path /var/log/pods/*/fluent-bit/*.log Read_from_Head On Skip_Long_Lines On + Buffer_Chunk_Size 512K + Buffer_Max_Size 2M Refresh_Interval 5 # The CRI parser has Time_Keep On, producing a 'time' record field (nanosecond string). @@ -38,7 +40,7 @@ compression arrow-compact json_date_key false total_file_size 1M - upload_timeout 15s + upload_timeout 60s s3_key_format_tag_delimiters . s3_key_format /${CLUSTER_NAME}/$TAG[2]/%Y/%m/%d/${NODE_NAME}/$TAG[3]/$TAG[4]/%H/%M/$UUID.arrow @@ -55,7 +57,7 @@ compression parquet-compact json_date_key false total_file_size 1M - upload_timeout 15s + upload_timeout 60s s3_key_format_tag_delimiters . # Ideally s3_key_format would use hive-style paths (/namespace=$TAG[2]/pod=$TAG[3]/...) # but Fluent Bit's uri_encode() in src/flb_signv4.c excludes '=' from percent-encoding, diff --git a/test.sh b/test.sh index 8b55292..3e5da9d 100755 --- a/test.sh +++ b/test.sh @@ -107,7 +107,7 @@ wait_for_rollout deployment log-generator 60s # --- 6. Poll for data in both formats --- -POLL_TIMEOUT=120 +POLL_TIMEOUT=180 POLL_INTERVAL=5 poll_for_format() { @@ -329,7 +329,102 @@ else ERRORS=$((ERRORS + 1)) fi -# 7h. SIGTERM flush - verify buffered data is flushed to S3 on shutdown +# 7h. Size-based flush — verify data is flushed when buffer exceeds total_file_size (1M). +# Write >1M in one burst; it should appear in S3 well before upload_timeout (60s). +# Buffer_Chunk_Size=512K and Buffer_Max_Size=2M allow tail to ingest quickly. +echo "==> Testing size-based flush (total_file_size=1M)..." +SIZE_MARKER="size-flush-$(date +%s)" +# Generate >1.5M instantly using yes+head (busybox shell loops are too slow). +# 'yes' outputs ~80-char lines at full speed; 20000 lines ≈ 1.6M. +$KUBECTL run size-flush --restart=Never \ + --image=busybox:1.37 \ + --command -- sh -c "echo '${SIZE_MARKER}'; yes 'size-padding-data-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' | head -n 20000; sleep 3600" + +$KUBECTL wait pod size-flush --for=condition=Ready --timeout=30s + +# Wait for fluent-bit to discover and read the file (Refresh_Interval=5, plus ingestion time) +sleep 12 + +# Size-based flush should trigger within seconds of ingestion (buffer > 1M). +# Poll for 45s — well under the 60s upload_timeout, proving size triggered it. +SIZE_FLUSH_TIMEOUT=45 +SIZE_FLUSH_FOUND=false +size_elapsed=0 +while [ "$size_elapsed" -lt "$SIZE_FLUSH_TIMEOUT" ]; do + SIZE_OUTPUT=$(./y-logcli --context=dev query '{namespace="default"}' -f parquet -o raw 2>&1) || true + if grep -q "$SIZE_MARKER" <<< "$SIZE_OUTPUT"; then + SIZE_FLUSH_FOUND=true + break + fi + size_elapsed=$((size_elapsed + 3)) + echo " Polling... (${size_elapsed}/${SIZE_FLUSH_TIMEOUT}s)" + sleep 3 +done + +if [ "$SIZE_FLUSH_FOUND" = true ]; then + echo " PASS: Size-based flush — marker found in S3 within ${size_elapsed}s (before 60s upload_timeout)" +else + echo " FAIL: Size-based flush — marker '$SIZE_MARKER' not found in S3 within ${SIZE_FLUSH_TIMEOUT}s" >&2 + ERRORS=$((ERRORS + 1)) +fi +$KUBECTL delete pod size-flush --grace-period=0 --force 2>/dev/null || true + +# 7i. Timeout-based flush — verify data is flushed after upload_timeout (60s) +# when the buffer never reaches total_file_size. +# Write a small marker (well under 1M), container stays running. +# Data should NOT appear before ~60s (not size-triggered) but SHOULD appear after. +echo "==> Testing timeout-based flush (upload_timeout=60s)..." +TIMEOUT_MARKER="timeout-flush-$(date +%s)" +$KUBECTL run timeout-flush --restart=Never \ + --image=busybox:1.37 \ + --command -- sh -c "echo '${TIMEOUT_MARKER}'; sleep 3600" + +$KUBECTL wait pod timeout-flush --for=condition=Ready --timeout=30s + +# Wait for fluent-bit to discover the log file +sleep 8 + +# Pre-check: marker should NOT be in S3 within 20s (no size trigger, timeout is 60s) +echo " Verifying marker is NOT in S3 yet (expecting no flush before upload_timeout)..." +if close_write_poll "$TIMEOUT_MARKER" 20 5 2>/dev/null; then + echo " WARN: Marker appeared before upload_timeout — size or close-write may have triggered early" +else + echo " OK: Marker not in S3 after 20s (as expected — waiting for upload_timeout)" +fi + +# Now wait for upload_timeout to fire. From pod start: 8s discovery + 20s pre-check = ~28s elapsed. +# upload_timeout=60s starts when fluent-bit ingests (at ~8s), so it fires at ~68s from pod start. +# We've used ~28s, need to wait ~40s more, then poll. +TIMEOUT_FLUSH_DEADLINE=120 +TIMEOUT_FLUSH_INTERVAL=5 +timeout_elapsed=28 +TIMEOUT_FLUSH_FOUND=false + +# Wait for the remaining time until upload_timeout fires +echo " Waiting for upload_timeout to fire..." +sleep 40 +timeout_elapsed=68 + +while [ "$timeout_elapsed" -lt "$TIMEOUT_FLUSH_DEADLINE" ]; do + TIMEOUT_OUTPUT=$(./y-logcli --context=dev query '{namespace="default"}' -f parquet -o raw 2>&1) || true + if grep -q "$TIMEOUT_MARKER" <<< "$TIMEOUT_OUTPUT"; then + TIMEOUT_FLUSH_FOUND=true + break + fi + timeout_elapsed=$((timeout_elapsed + TIMEOUT_FLUSH_INTERVAL)) + echo " Polling... (${timeout_elapsed}/${TIMEOUT_FLUSH_DEADLINE}s)" + sleep "$TIMEOUT_FLUSH_INTERVAL" +done + +if [ "$TIMEOUT_FLUSH_FOUND" = true ]; then + echo " PASS: Timeout-based flush — marker found in S3 at ~${timeout_elapsed}s (after upload_timeout=60s)" +else + echo " FAIL: Timeout-based flush — marker '$TIMEOUT_MARKER' not found in S3 within ${TIMEOUT_FLUSH_DEADLINE}s" >&2 + ERRORS=$((ERRORS + 1)) +fi +$KUBECTL delete pod timeout-flush --grace-period=0 --force 2>/dev/null || true + +# 7j. SIGTERM flush - verify buffered data is flushed to S3 on shutdown echo "==> Testing SIGTERM flush..." MARKER="sigterm-test-$(date +%s)" @@ -350,10 +445,10 @@ echo " fluent-bit pod to be killed: $FB_POD" echo " fluent-bit watched files (last 5):" $KUBECTL logs "$FB_POD" | grep -E 'inotify_fs_add|Successfully uploaded' | tail -5 || true -# Kill fluent-bit before upload_timeout (15s) triggers a regular flush. +# Kill fluent-bit before upload_timeout (60s) triggers a regular flush. # grace-period gives time for SIGTERM handler to flush both s3-arrow and s3-parquet. -echo " Killing fluent-bit with grace-period=15..." -$KUBECTL delete pod "$FB_POD" --grace-period=15 +echo " Killing fluent-bit with grace-period=30..." +$KUBECTL delete pod "$FB_POD" --grace-period=30 wait_for_rollout daemonset fluent-bit 60s # Show the killed pod's last logs (termination output from previous instance) @@ -384,9 +479,9 @@ else ERRORS=$((ERRORS + 1)) fi -# 7i. Concurrent SIGTERM flush — verify buffers from 20+ pods all flush within grace period. +# 7k. Concurrent SIGTERM flush — verify buffers from 20+ pods all flush within grace period. # Each pod×container×output creates a separate S3 buffer, so 25 pods × 2 outputs = 50 buffers. -# Sequential flush would need ~50-100s; within 15s grace period proves concurrency. +# Sequential flush would need ~50-100s; within 30s grace period proves concurrency. echo "==> Testing concurrent SIGTERM flush (25 pods)..." CONCURRENT_MARKER="concurrent-$(date +%s)" FLUSH_POD_COUNT=25 @@ -407,8 +502,8 @@ echo " Waiting for fluent-bit to discover and tail $FLUSH_POD_COUNT log files.. sleep 12 FB_POD=$($KUBECTL get pod -l app=fluent-bit -o jsonpath='{.items[0].metadata.name}') -echo " Killing fluent-bit pod $FB_POD with grace-period=15..." -$KUBECTL delete pod "$FB_POD" --grace-period=15 +echo " Killing fluent-bit pod $FB_POD with grace-period=30..." +$KUBECTL delete pod "$FB_POD" --grace-period=30 wait_for_rollout daemonset fluent-bit 60s echo " Previous fluent-bit shutdown logs:" @@ -437,7 +532,7 @@ while [ "$flush_elapsed" -lt "$FLUSH_TIMEOUT" ]; do done if [ "$FOUND_COUNT" -ge "$FLUSH_POD_COUNT" ]; then - echo " PASS: Concurrent flush — all $FLUSH_POD_COUNT pod markers found in S3 (50 buffers flushed within 15s)" + echo " PASS: Concurrent flush — all $FLUSH_POD_COUNT pod markers found in S3 (50 buffers flushed within 30s)" else echo " FAIL: Concurrent flush — only $FOUND_COUNT/$FLUSH_POD_COUNT pod markers found (buffers lost on SIGTERM)" >&2 ERRORS=$((ERRORS + 1)) @@ -445,14 +540,14 @@ fi $KUBECTL delete pod -l test=concurrent-flush --grace-period=0 --force 2>/dev/null || true -# 7j. IN_CLOSE_WRITE flush — container termination triggers immediate S3 upload +# 7l. IN_CLOSE_WRITE flush — container termination triggers immediate S3 upload # without restarting fluent-bit. The close-write-flush.patch watches for IN_CLOSE_WRITE # inotify events (CRI runtime closes the log fd) and emits a sentinel on tag._close # which forces S3 output to flush the base tag's buffer. # # We verify that: -# 1. Logs are NOT flushed before container terminates (within upload_timeout=15s window) -# 2. After termination, logs appear in S3 within seconds (not waiting for upload_timeout) +# 1. Logs are NOT flushed before container terminates (upload_timeout=60s hasn't fired) +# 2. After termination, logs appear in S3 within 30s (well before 60s upload_timeout) # # Helper: poll S3 for a marker, returns 0 if found within timeout close_write_poll() { @@ -473,7 +568,7 @@ close_write_poll() { return 1 } -# 7j-1. Exit 0 — container exits normally +# 7l-1. Exit 0 — container exits normally echo "==> Testing IN_CLOSE_WRITE flush: exit 0..." CW_MARKER_0="cw-exit0-$(date +%s)" $KUBECTL run cw-exit0 --restart=Never \ @@ -485,7 +580,7 @@ $KUBECTL wait pod cw-exit0 --for=condition=Ready --timeout=30s # Wait for fluent-bit to discover and tail the log file sleep 8 -# Pre-check: marker should NOT be in S3 yet (container still running, upload_timeout=15s hasn't fired) +# Pre-check: marker should NOT be in S3 yet (container still running, upload_timeout=60s hasn't fired) if close_write_poll "$CW_MARKER_0" 0 1 2>/dev/null; then echo " WARN: Marker already in S3 before container termination (upload_timeout may have fired)" fi @@ -494,7 +589,7 @@ fi echo " Waiting for container to exit 0..." $KUBECTL wait pod cw-exit0 --for=jsonpath='{.status.phase}'=Succeeded --timeout=30s -# After close-write, logs should flush quickly (well within upload_timeout=15s) +# After close-write, logs should flush quickly (well within upload_timeout=60s) if close_write_poll "$CW_MARKER_0" 30; then echo " PASS: IN_CLOSE_WRITE flush (exit 0) — marker '$CW_MARKER_0' found in S3" else @@ -503,28 +598,7 @@ else fi $KUBECTL delete pod cw-exit0 --grace-period=0 --force 2>/dev/null || true -# 7j-2. Exit 1 — container crashes -echo "==> Testing IN_CLOSE_WRITE flush: exit 1..." -CW_MARKER_1="cw-exit1-$(date +%s)" -$KUBECTL run cw-exit1 --restart=Never \ - --image=busybox:1.37 \ - --command -- sh -c "echo '${CW_MARKER_1}'; sleep 5; exit 1" - -$KUBECTL wait pod cw-exit1 --for=condition=Ready --timeout=30s -sleep 8 - -echo " Waiting for container to exit 1..." -$KUBECTL wait pod cw-exit1 --for=jsonpath='{.status.phase}'=Failed --timeout=30s - -if close_write_poll "$CW_MARKER_1" 30; then - echo " PASS: IN_CLOSE_WRITE flush (exit 1) — marker '$CW_MARKER_1' found in S3" -else - echo " FAIL: IN_CLOSE_WRITE flush (exit 1) — marker '$CW_MARKER_1' not found in S3" >&2 - ERRORS=$((ERRORS + 1)) -fi -$KUBECTL delete pod cw-exit1 --grace-period=0 --force 2>/dev/null || true - -# 7j-3. kubectl delete pod — forced termination +# 7l-2. kubectl delete pod — forced termination echo "==> Testing IN_CLOSE_WRITE flush: kubectl delete pod..." CW_MARKER_DEL="cw-delete-$(date +%s)" $KUBECTL run cw-delete --restart=Never \ From 7d4cb31a7e3f09e71287f5395b01c47190605ec4 Mon Sep 17 00:00:00 2001 From: Staffan Olsson Date: Tue, 10 Mar 2026 12:30:42 +0100 Subject: [PATCH 3/3] ci: pin duckdb to v1.4.4 (nanoarrow extension not yet built for v1.5.0) Co-Authored-By: Claude Opus 4.6 --- .github/workflows/fluentbit.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/fluentbit.yaml b/.github/workflows/fluentbit.yaml index 2ea30db..18856e9 100644 --- a/.github/workflows/fluentbit.yaml +++ b/.github/workflows/fluentbit.yaml @@ -41,7 +41,7 @@ jobs: - name: Install duckdb run: | - curl -fsSL https://github.com/duckdb/duckdb/releases/latest/download/duckdb_cli-linux-amd64.gz | gunzip > /usr/local/bin/duckdb + curl -fsSL https://github.com/duckdb/duckdb/releases/download/v1.4.4/duckdb_cli-linux-amd64.gz | gunzip > /usr/local/bin/duckdb chmod +x /usr/local/bin/duckdb - uses: imjasonh/setup-crane@v0.5