Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/fluentbit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:

- name: Install duckdb
run: |
curl -fsSL https://github.com/duckdb/duckdb/releases/latest/download/duckdb_cli-linux-amd64.gz | gunzip > /usr/local/bin/duckdb
curl -fsSL https://github.com/duckdb/duckdb/releases/download/v1.4.4/duckdb_cli-linux-amd64.gz | gunzip > /usr/local/bin/duckdb
chmod +x /usr/local/bin/duckdb

- uses: imjasonh/setup-crane@v0.5
Expand Down
1 change: 1 addition & 0 deletions images/fluentbit/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ COPY *.patch /src/
COPY compact_columns.h compact_columns.c /src/fluent-bit/src/aws/compression/arrow/
COPY apply-compact-compression.sh /src/
RUN cd /src/fluent-bit && git apply /src/fix-gcs-header-lookup.patch
RUN cd /src/fluent-bit && git apply /src/close-write-flush.patch
RUN cd /src/fluent-bit && bash /src/apply-compact-compression.sh

WORKDIR /src/fluent-bit/build
Expand Down
161 changes: 161 additions & 0 deletions images/fluentbit/close-write-flush.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify.c
--- a/plugins/in_tail/tail_fs_inotify.c
+++ b/plugins/in_tail/tail_fs_inotify.c
@@ -22,6 +22,10 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_log_event.h>
+#include <fluent-bit/flb_log_event_encoder.h>
+#include <fluent-bit/flb_sds.h>

#include <stdio.h>
#include <stdlib.h>
@@ -94,6 +98,65 @@ static int debug_event_mask(struct flb_tail_config *ctx,
return 0;
}

+
+/*
+ * Emit a sentinel record on a derived tag (original_tag._close) to signal
+ * downstream S3 outputs to flush buffered data for this file's tag.
+ * Called when IN_CLOSE_WRITE is detected (container runtime closed the log file).
+ */
+static int tail_emit_close_signal(struct flb_tail_file *file,
+ struct flb_config *config)
+{
+ struct flb_tail_config *ctx = file->config;
+ struct flb_time t;
+ struct flb_log_event_encoder encoder;
+ flb_sds_t signal_tag;
+ int ret;
+
+ signal_tag = flb_sds_create_size(file->tag_len + 7);
+ if (!signal_tag) {
+ return -1;
+ }
+ flb_sds_cat_safe(&signal_tag, file->tag_buf, file->tag_len);
+ flb_sds_cat_safe(&signal_tag, "._close", 7);
+
+ flb_time_get(&t);
+ ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT);
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_sds_destroy(signal_tag);
+ return -1;
+ }
+
+ ret = flb_log_event_encoder_begin_record(&encoder);
+ if (ret == FLB_EVENT_ENCODER_SUCCESS)
+ ret = flb_log_event_encoder_set_timestamp(&encoder, &t);
+ if (ret == FLB_EVENT_ENCODER_SUCCESS)
+ ret = flb_log_event_encoder_append_body_cstring(&encoder,
+ "_flb_close_write");
+ if (ret == FLB_EVENT_ENCODER_SUCCESS)
+ ret = flb_log_event_encoder_append_body_boolean(&encoder, FLB_TRUE);
+ if (ret == FLB_EVENT_ENCODER_SUCCESS)
+ ret = flb_log_event_encoder_append_body_cstring(&encoder, "file");
+ if (ret == FLB_EVENT_ENCODER_SUCCESS)
+ ret = flb_log_event_encoder_append_body_cstring(&encoder, file->name);
+ if (ret == FLB_EVENT_ENCODER_SUCCESS)
+ ret = flb_log_event_encoder_commit_record(&encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ flb_input_log_append(ctx->ins,
+ signal_tag, flb_sds_len(signal_tag),
+ encoder.output_buffer,
+ encoder.output_length);
+ flb_plg_info(ctx->ins,
+ "inode=%"PRIu64" IN_CLOSE_WRITE signal emitted: %s (tag=%s)",
+ file->inode, file->name, signal_tag);
+ }
+
+ flb_log_event_encoder_destroy(&encoder);
+ flb_sds_destroy(signal_tag);
+ return 0;
+}
+
static int tail_fs_add(struct flb_tail_file *file, int check_rotated)
{
int flags;
@@ -107,7 +170,7 @@ static int tail_fs_add(struct flb_tail_file *file, int check_rotated)
* lines from the file and once we reach EOF (and a watch_fd exists),
* we update the flags to receive notifications.
*/
- flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW;
+ flags = IN_ATTRIB | IN_IGNORED | IN_MODIFY | IN_Q_OVERFLOW | IN_CLOSE_WRITE;

if (check_rotated == FLB_TRUE) {
flags |= IN_MOVE_SELF;
@@ -250,6 +313,15 @@ static int tail_fs_event(struct flb_input_instance *ins,
}
}

+ /* IN_CLOSE_WRITE: container runtime closed the log file.
+ * Read any remaining data first, then emit a close signal. */
+ if (ev.mask & IN_CLOSE_WRITE) {
+ /* Collect any remaining data before signaling */
+ in_tail_collect_event(file, config);
+ tail_emit_close_signal(file, config);
+ return 0;
+ }
+
if (ev.mask & IN_MODIFY) {
/*
* The file was modified, check how many new bytes do
diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c
--- a/plugins/out_s3/s3.c
+++ b/plugins/out_s3/s3.c
@@ -3765,6 +3765,52 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
/* Cleanup old buffers and initialize upload timer */
flush_init(ctx);

+ /* Detect close-write signal: tag ends with "._close".
+ * Force-flush buffered data for the base tag and discard this sentinel. */
+ {
+ int cw_tag_len = flb_sds_len(event_chunk->tag);
+ if (cw_tag_len > 7 &&
+ memcmp(event_chunk->tag + cw_tag_len - 7, "._close", 7) == 0) {
+ int base_tag_len = cw_tag_len - 7;
+ struct s3_file *close_target;
+ struct multipart_upload *close_m_upload;
+ char *close_buf = NULL;
+ size_t close_size = 0;
+
+ flb_plg_info(ctx->ins,
+ "close-write signal for tag %.*s, forcing flush",
+ base_tag_len, event_chunk->tag);
+
+ close_target = s3_store_file_get(ctx, event_chunk->tag,
+ base_tag_len);
+ if (close_target != NULL) {
+ close_m_upload = get_upload(ctx, event_chunk->tag,
+ base_tag_len);
+
+ ret = construct_request_buffer(ctx, NULL, close_target,
+ &close_buf, &close_size);
+ if (ret == 0 && close_buf != NULL) {
+ ret = upload_data(ctx, close_target, close_m_upload,
+ close_buf, close_size,
+ event_chunk->tag, base_tag_len);
+ flb_free(close_buf);
+ if (ret != FLB_OK) {
+ flb_plg_warn(ctx->ins,
+ "close-write flush failed for tag %.*s",
+ base_tag_len, event_chunk->tag);
+ }
+ }
+ }
+ else {
+ flb_plg_debug(ctx->ins,
+ "close-write signal but no buffered data for "
+ "tag %.*s", base_tag_len, event_chunk->tag);
+ }
+ FLB_OUTPUT_RETURN(FLB_OK);
+ }
+ }
+
+
/* Process chunk */
if (ctx->log_key) {
chunk = flb_pack_msgpack_extract_log_key(ctx,
6 changes: 4 additions & 2 deletions k3d-example/fluentbit/fluent-bit.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
Exclude_Path /var/log/pods/*/fluent-bit/*.log
Read_from_Head On
Skip_Long_Lines On
Buffer_Chunk_Size 512K
Buffer_Max_Size 2M
Refresh_Interval 5

# The CRI parser has Time_Keep On, producing a 'time' record field (nanosecond string).
Expand All @@ -38,7 +40,7 @@
compression arrow-compact
json_date_key false
total_file_size 1M
upload_timeout 15s
upload_timeout 60s
s3_key_format_tag_delimiters .
s3_key_format /${CLUSTER_NAME}/$TAG[2]/%Y/%m/%d/${NODE_NAME}/$TAG[3]/$TAG[4]/%H/%M/$UUID.arrow

Expand All @@ -55,7 +57,7 @@
compression parquet-compact
json_date_key false
total_file_size 1M
upload_timeout 15s
upload_timeout 60s
s3_key_format_tag_delimiters .
# Ideally s3_key_format would use hive-style paths (/namespace=$TAG[2]/pod=$TAG[3]/...)
# but Fluent Bit's uri_encode() in src/flb_signv4.c excludes '=' from percent-encoding,
Expand Down
Loading