From 74fa043318e4aa8d6496f74c39a09c0e606f7cf8 Mon Sep 17 00:00:00 2001 From: Amit Shinde Date: Wed, 22 Apr 2026 09:17:45 +0530 Subject: [PATCH 1/3] input: add storage.total_limit_size property for filesystem buffer cap When an input plugin uses filesystem-backed storage, chunks can accumulate on disk without any byte-level limit. The existing controls are either memory-only (mem_buf_limit, which is skipped for filesystem storage in flb_input_chunk_protect) or count-based (storage.pause_on_chunks_overlimit + max_chunks_up). Add a new storage.total_limit_size field to flb_input_instance, mirroring the same property already present on output instances. Parse the property in flb_input_set_property, accepting human- readable sizes (e.g. "2G") and defaulting to unlimited (-1). Signed-off-by: Amit Shinde Made-with: Cursor --- include/fluent-bit/flb_input.h | 7 +++++++ src/flb_input.c | 21 +++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index c2ca41bd75e..9e5031041f2 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -270,6 +270,13 @@ struct flb_input_instance { /* flag to pause input when storage is full */ int storage_pause_on_chunks_overlimit; + /* + * Optional limit for total filesystem storage (bytes). When set, + * the input is paused once the sum of all its chunk sizes on disk + * reaches this threshold. -1 means unlimited (default). + */ + size_t storage_total_limit_size; + /* * Input network info: * diff --git a/src/flb_input.c b/src/flb_input.c index e990eee238c..bd19b61493b 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -409,6 +409,7 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->data = data; instance->storage = NULL; instance->storage_type = -1; + instance->storage_total_limit_size = (size_t) -1; instance->log_level = -1; instance->log_suppress_interval = -1; instance->runs_in_coroutine = FLB_FALSE; @@ -918,6 +919,26 @@ int flb_input_set_property(struct flb_input_instance *ins, } ins->storage_pause_on_chunks_overlimit = ret; } + else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) { + int64_t limit; + + if (strcasecmp(tmp, "off") == 0 || + flb_utils_bool(tmp) == FLB_FALSE) { + limit = -1; + } + else { + limit = flb_utils_size_to_bytes(tmp); + if (limit == -1) { + flb_sds_destroy(tmp); + return -1; + } + if (limit == 0) { + limit = -1; + } + } + flb_sds_destroy(tmp); + ins->storage_total_limit_size = (size_t) limit; + } else { /* * Create the property, we don't pass the value since we will From 638051225395a0a1e9d464f55b21d17ec7a3f46e Mon Sep 17 00:00:00 2001 From: Amit Shinde Date: Wed, 22 Apr 2026 09:17:54 +0530 Subject: [PATCH 2/3] input_chunk: enforce storage.total_limit_size on filesystem inputs Add flb_input_chunk_fs_total_size() to compute the total bytes used by all chunks (both up and down) in an input's chunkio stream. Extend flb_input_chunk_is_storage_overlimit() to check the total filesystem size against the input's storage_total_limit_size when set. When the limit is reached the input is paused via the existing backpressure mechanism, and resumes once chunks are flushed below the threshold. Update flb_input_chunk_protect() to log a distinct warning when the pause is triggered by the total size limit versus the existing chunk-count overlimit. Signed-off-by: Amit Shinde Made-with: Cursor --- src/flb_input_chunk.c | 47 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 3d82fe1c381..985ff49d352 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -2397,6 +2397,28 @@ static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i) return FLB_FALSE; } +/* + * Return the total bytes used by all chunks (up + down) belonging to + * an input instance's stream. Used to enforce storage.total_limit_size. + */ +static size_t flb_input_chunk_fs_total_size(struct flb_input_instance *i) +{ + ssize_t bytes; + size_t total = 0; + struct cio_chunk *ch; + struct mk_list *head; + struct flb_storage_input *storage = (struct flb_storage_input *) i->storage; + + mk_list_foreach(head, &storage->stream->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + bytes = cio_chunk_get_real_size(ch); + if (bytes > 0) { + total += bytes; + } + } + return total; +} + static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i) { struct flb_storage_input *storage = (struct flb_storage_input *)i->storage; @@ -2407,6 +2429,12 @@ static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance return FLB_TRUE; } } + + if (i->storage_total_limit_size != (size_t) -1) { + if (flb_input_chunk_fs_total_size(i) >= i->storage_total_limit_size) { + return FLB_TRUE; + } + } } return FLB_FALSE; @@ -2486,10 +2514,21 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t j struct flb_storage_input *storage = i->storage; if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) { - flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", - flb_input_name(i), - storage->cio->total_chunks_up, - storage->cio->max_chunks_up); + if (i->storage_total_limit_size != (size_t) -1) { + size_t fs_total = flb_input_chunk_fs_total_size(i); + if (fs_total >= i->storage_total_limit_size) { + flb_warn("[input] %s paused (storage total size overlimit " + "%zuB/%zuB)", + flb_input_name(i), fs_total, + i->storage_total_limit_size); + } + } + else { + flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", + flb_input_name(i), + storage->cio->total_chunks_up, + storage->cio->max_chunks_up); + } flb_input_pause(i); i->storage_buf_status = FLB_INPUT_PAUSED; return FLB_TRUE; From 8586351d5e69c246184e67facd465403e397d613 Mon Sep 17 00:00:00 2001 From: Amit Shinde Date: Wed, 22 Apr 2026 09:18:05 +0530 Subject: [PATCH 3/3] filter_rewrite_tag: expose emitter_storage.total_limit_size option Add emitter_storage.total_limit_size to the rewrite_tag filter config map. When set, the value is forwarded to the internal emitter input instance as storage.total_limit_size, capping the total filesystem buffer the emitter can consume. This allows users to prevent unbounded disk growth when using emitter_storage.type = filesystem under sustained backpressure. Example configuration: [FILTER] Name rewrite_tag Emitter_Storage.type filesystem Emitter_Mem_Buf_Limit 500M Emitter_Storage.total_limit_size 2G Signed-off-by: Amit Shinde Made-with: Cursor --- plugins/filter_rewrite_tag/rewrite_tag.c | 15 +++++++++++++++ plugins/filter_rewrite_tag/rewrite_tag.h | 1 + 2 files changed, 16 insertions(+) diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c index 080a096a4f7..814686e45d3 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.c +++ b/plugins/filter_rewrite_tag/rewrite_tag.c @@ -73,6 +73,15 @@ static int emitter_create(struct flb_rewrite_tag *ctx) flb_plg_error(ctx->ins, "cannot set storage.type"); } + /* Set the storage total limit size (filesystem cap) */ + if (ctx->emitter_storage_total_limit_size) { + ret = flb_input_set_property(ins, "storage.total_limit_size", + ctx->emitter_storage_total_limit_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "cannot set storage.total_limit_size"); + } + } + /* Initialize emitter plugin */ ret = flb_input_instance_init(ins, ctx->config); if (ret == -1) { @@ -607,6 +616,12 @@ static struct flb_config_map config_map[] = { FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_mem_buf_limit), "set a memory buffer limit to restrict memory usage of emitter" }, + { + FLB_CONFIG_MAP_STR, "emitter_storage.total_limit_size", NULL, + FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_storage_total_limit_size), + "set the maximum disk space that the emitter can use for filesystem " + "buffered chunks. When this limit is reached the emitter is paused." + }, /* EOF */ {0} }; diff --git a/plugins/filter_rewrite_tag/rewrite_tag.h b/plugins/filter_rewrite_tag/rewrite_tag.h index 9064848def6..c08ea8e7c53 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.h +++ b/plugins/filter_rewrite_tag/rewrite_tag.h @@ -43,6 +43,7 @@ struct flb_rewrite_tag { flb_sds_t emitter_name; /* emitter input plugin name */ flb_sds_t emitter_storage_type; /* emitter storage type */ size_t emitter_mem_buf_limit; /* Emitter buffer limit */ + flb_sds_t emitter_storage_total_limit_size; /* Emitter FS total size cap */ struct mk_list rules; /* processed rules */ struct mk_list *cm_rules; /* config_map rules (only strings) */ struct flb_input_instance *ins_emitter; /* emitter input plugin instance */