Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ struct flb_input_instance {
/* flag to pause input when storage is full */
int storage_pause_on_chunks_overlimit;

/*
* Optional limit for total filesystem storage (bytes). When set,
* the input is paused once the sum of all its chunk sizes on disk
* reaches this threshold. -1 means unlimited (default).
*/
size_t storage_total_limit_size;

/*
* Input network info:
*
Expand Down
15 changes: 15 additions & 0 deletions plugins/filter_rewrite_tag/rewrite_tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ static int emitter_create(struct flb_rewrite_tag *ctx)
flb_plg_error(ctx->ins, "cannot set storage.type");
}

/* Set the storage total limit size (filesystem cap) */
if (ctx->emitter_storage_total_limit_size) {
ret = flb_input_set_property(ins, "storage.total_limit_size",
ctx->emitter_storage_total_limit_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot set storage.total_limit_size");
}
Comment on lines +80 to +82
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject invalid emitter storage limit instead of continuing

When emitter_storage.total_limit_size is invalid (e.g., typo like 10XB), flb_input_set_property() returns -1 but emitter_create() only logs and continues startup. That leaves the emitter running with the default unlimited storage cap, so the configuration appears accepted while the safety limit is silently disabled.

Useful? React with 👍 / 👎.

}
Comment on lines +76 to +83
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fail emitter creation when the storage cap cannot be applied.

Line 80 logs the flb_input_set_property() failure but continues initializing the emitter, so an invalid or env-expanded emitter_storage.total_limit_size silently becomes unlimited.

🐛 Proposed fix
     if (ctx->emitter_storage_total_limit_size) {
         ret = flb_input_set_property(ins, "storage.total_limit_size",
                                      ctx->emitter_storage_total_limit_size);
         if (ret == -1) {
             flb_plg_error(ctx->ins, "cannot set storage.total_limit_size");
+            flb_input_instance_destroy(ins);
+            return -1;
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Set the storage total limit size (filesystem cap) */
if (ctx->emitter_storage_total_limit_size) {
ret = flb_input_set_property(ins, "storage.total_limit_size",
ctx->emitter_storage_total_limit_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot set storage.total_limit_size");
}
}
/* Set the storage total limit size (filesystem cap) */
if (ctx->emitter_storage_total_limit_size) {
ret = flb_input_set_property(ins, "storage.total_limit_size",
ctx->emitter_storage_total_limit_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot set storage.total_limit_size");
flb_input_instance_destroy(ins);
return -1;
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/filter_rewrite_tag/rewrite_tag.c` around lines 76 - 83, The code
currently logs a failure when flb_input_set_property(ins,
"storage.total_limit_size", ctx->emitter_storage_total_limit_size) returns -1
but continues emitter initialization; change this to treat the failure as fatal:
after detecting ret == -1, call flb_plg_error with the existing message
(including ctx->ins context) and return an error status to abort emitter
creation (e.g., return -1 or propagate the error from the enclosing init
function) so an invalid or unapplyable ctx->emitter_storage_total_limit_size no
longer results in an unintended unlimited storage configuration.


/* Initialize emitter plugin */
ret = flb_input_instance_init(ins, ctx->config);
if (ret == -1) {
Expand Down Expand Up @@ -607,6 +616,12 @@ static struct flb_config_map config_map[] = {
FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_mem_buf_limit),
"set a memory buffer limit to restrict memory usage of emitter"
},
{
FLB_CONFIG_MAP_STR, "emitter_storage.total_limit_size", NULL,
FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_storage_total_limit_size),
"set the maximum disk space that the emitter can use for filesystem "
"buffered chunks. When this limit is reached the emitter is paused."
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/filter_rewrite_tag/rewrite_tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct flb_rewrite_tag {
flb_sds_t emitter_name; /* emitter input plugin name */
flb_sds_t emitter_storage_type; /* emitter storage type */
size_t emitter_mem_buf_limit; /* Emitter buffer limit */
flb_sds_t emitter_storage_total_limit_size; /* Emitter FS total size cap */
struct mk_list rules; /* processed rules */
struct mk_list *cm_rules; /* config_map rules (only strings) */
struct flb_input_instance *ins_emitter; /* emitter input plugin instance */
Expand Down
21 changes: 21 additions & 0 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
instance->data = data;
instance->storage = NULL;
instance->storage_type = -1;
instance->storage_total_limit_size = (size_t) -1;
instance->log_level = -1;
instance->log_suppress_interval = -1;
instance->runs_in_coroutine = FLB_FALSE;
Expand Down Expand Up @@ -918,6 +919,26 @@ int flb_input_set_property(struct flb_input_instance *ins,
}
ins->storage_pause_on_chunks_overlimit = ret;
}
else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
int64_t limit;

if (strcasecmp(tmp, "off") == 0 ||
flb_utils_bool(tmp) == FLB_FALSE) {
limit = -1;
}
else {
limit = flb_utils_size_to_bytes(tmp);
if (limit == -1) {
flb_sds_destroy(tmp);
return -1;
}
if (limit == 0) {
limit = -1;
}
}
flb_sds_destroy(tmp);
ins->storage_total_limit_size = (size_t) limit;
Comment on lines +922 to +940
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject all negative parsed sizes before casting to size_t.

Line 931 only rejects -1; other negative parsed values would wrap at Line 940 into a huge limit, effectively disabling the cap. Use < 0 and keep the storage-specific variable with the function-scope declarations.

🐛 Proposed fix
@@
-    ssize_t limit;
+    ssize_t limit;
+    int64_t storage_limit;
@@
     else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
-        int64_t limit;
-
         if (strcasecmp(tmp, "off") == 0 ||
             flb_utils_bool(tmp) == FLB_FALSE) {
-            limit = -1;
+            storage_limit = -1;
         }
         else {
-            limit = flb_utils_size_to_bytes(tmp);
-            if (limit == -1) {
+            storage_limit = flb_utils_size_to_bytes(tmp);
+            if (storage_limit < 0) {
                 flb_sds_destroy(tmp);
                 return -1;
             }
-            if (limit == 0) {
-                limit = -1;
+            if (storage_limit == 0) {
+                storage_limit = -1;
             }
         }
         flb_sds_destroy(tmp);
-        ins->storage_total_limit_size = (size_t) limit;
+        ins->storage_total_limit_size = (size_t) storage_limit;
     }

As per coding guidelines, Declare variables at the start of functions, not mid-block.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input.c` around lines 922 - 940, The parsed size handling in the
prop_key_check branch uses flb_utils_size_to_bytes (assigned to int64_t limit)
but only rejects == -1; any other negative value will wrap when cast to size_t
for ins->storage_total_limit_size. Change the negative check to if (limit < 0)
to reject all negative returns from flb_utils_size_to_bytes before casting, keep
limit declared as int64_t at the top of the function (per guidelines) rather
than mid-block, preserve the existing zero-to-off mapping (if limit == 0 set to
-1), ensure flb_sds_destroy(tmp) is still called on error paths, and finally
assign ins->storage_total_limit_size only after handling negatives so the cast
is safe.

}
else {
/*
* Create the property, we don't pass the value since we will
Expand Down
47 changes: 43 additions & 4 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,28 @@ static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i)
return FLB_FALSE;
}

/*
* Return the total bytes used by all chunks (up + down) belonging to
* an input instance's stream. Used to enforce storage.total_limit_size.
*/
static size_t flb_input_chunk_fs_total_size(struct flb_input_instance *i)
{
ssize_t bytes;
size_t total = 0;
struct cio_chunk *ch;
struct mk_list *head;
struct flb_storage_input *storage = (struct flb_storage_input *) i->storage;

mk_list_foreach(head, &storage->stream->chunks) {
ch = mk_list_entry(head, struct cio_chunk, _head);
bytes = cio_chunk_get_real_size(ch);
if (bytes > 0) {
total += bytes;
Comment on lines +2414 to +2416
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Count unsynced filesystem chunks when enforcing storage cap

flb_input_chunk_fs_total_size() sums chunk bytes via cio_chunk_get_real_size(), but this API can return 0 before the real size is synced (the same file already has flb_input_chunk_get_real_size() fallback logic for that case). Because 0-sized entries are skipped here, active FS chunks can be undercounted and storage.total_limit_size may not pause ingestion when it should, especially while chunks are still up/in-flight under backpressure.

Useful? React with 👍 / 👎.

}
}
return total;
}
Comment on lines +2400 to +2420
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use the input-chunk size wrapper to avoid undercounting active chunks.

Line 2414 bypasses flb_input_chunk_get_real_size(), which has the fallback for chunks whose real size is not synced yet. A non-empty active chunk can be counted as 0, delaying or skipping the new total-size cap.

🐛 Proposed fix
 static size_t flb_input_chunk_fs_total_size(struct flb_input_instance *i)
 {
     ssize_t bytes;
     size_t total = 0;
-    struct cio_chunk *ch;
     struct mk_list *head;
-    struct flb_storage_input *storage = (struct flb_storage_input *) i->storage;
+    struct flb_input_chunk *ic;
 
-    mk_list_foreach(head, &storage->stream->chunks) {
-        ch = mk_list_entry(head, struct cio_chunk, _head);
-        bytes = cio_chunk_get_real_size(ch);
+    mk_list_foreach(head, &i->chunks) {
+        ic = mk_list_entry(head, struct flb_input_chunk, _head);
+        bytes = flb_input_chunk_get_real_size(ic);
         if (bytes > 0) {
             total += bytes;
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input_chunk.c` around lines 2400 - 2420, The function
flb_input_chunk_fs_total_size currently calls cio_chunk_get_real_size and can
undercount active chunks; change the loop to use the input-chunk wrapper
flb_input_chunk_get_real_size instead: replace mk_list_entry(..., struct
cio_chunk, _head) with mk_list_entry(..., struct flb_input_chunk, _head) (e.g.,
struct flb_input_chunk *chunk), call flb_input_chunk_get_real_size(chunk) into
bytes, and keep the existing if (bytes > 0) total += bytes logic so the
wrapper’s fallback for unsynced sizes is used.


static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i)
{
struct flb_storage_input *storage = (struct flb_storage_input *)i->storage;
Expand All @@ -2407,6 +2429,12 @@ static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance
return FLB_TRUE;
}
}

if (i->storage_total_limit_size != (size_t) -1) {
if (flb_input_chunk_fs_total_size(i) >= i->storage_total_limit_size) {
return FLB_TRUE;
}
}
}

return FLB_FALSE;
Expand Down Expand Up @@ -2486,10 +2514,21 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t j
struct flb_storage_input *storage = i->storage;

if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)",
flb_input_name(i),
storage->cio->total_chunks_up,
storage->cio->max_chunks_up);
if (i->storage_total_limit_size != (size_t) -1) {
size_t fs_total = flb_input_chunk_fs_total_size(i);
if (fs_total >= i->storage_total_limit_size) {
flb_warn("[input] %s paused (storage total size overlimit "
"%zuB/%zuB)",
flb_input_name(i), fs_total,
i->storage_total_limit_size);
}
}
else {
flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)",
flb_input_name(i),
storage->cio->total_chunks_up,
storage->cio->max_chunks_up);
}
Comment on lines +2517 to +2531
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Preserve the chunk-count overlimit log when both limits are configured.

If storage.pause_on_chunks_overlimit trips first and storage_total_limit_size is set but not exceeded, this branch emits no pause reason. Fall back to the existing chunk-count warning unless the total-size cap is the actual cause.

🛠️ Proposed fix
 static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t just_written_size)
 {
+    int storage_total_limit_reached = FLB_FALSE;
+    size_t fs_total = 0;
     struct flb_storage_input *storage = i->storage;
 
     if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
         if (i->storage_total_limit_size != (size_t) -1) {
-            size_t fs_total = flb_input_chunk_fs_total_size(i);
+            fs_total = flb_input_chunk_fs_total_size(i);
             if (fs_total >= i->storage_total_limit_size) {
                 flb_warn("[input] %s paused (storage total size overlimit "
                          "%zuB/%zuB)",
                          flb_input_name(i), fs_total,
                          i->storage_total_limit_size);
+                storage_total_limit_reached = FLB_TRUE;
             }
         }
-        else {
+        if (storage_total_limit_reached == FLB_FALSE) {
             flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)",
                      flb_input_name(i),
                      storage->cio->total_chunks_up,

As per coding guidelines, Declare variables at the start of functions, not mid-block.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input_chunk.c` around lines 2517 - 2531, When deciding which pause
reason to log in flb_input_chunk handling, preserve the chunk-count overlimit
message unless the storage total-size cap (i->storage_total_limit_size) is
actually exceeded: compute fs_total via flb_input_chunk_fs_total_size(i) and if
fs_total >= i->storage_total_limit_size log the size-overlimit message (using
flb_warn as shown), otherwise emit the existing chunk-count warning using
storage->cio->total_chunks_up and storage->cio->max_chunks_up; also move the
declaration of size_t fs_total to the start of the function (not mid-block) to
comply with the variable-declaration guideline.

flb_input_pause(i);
i->storage_buf_status = FLB_INPUT_PAUSED;
return FLB_TRUE;
Expand Down
Loading