-
Notifications
You must be signed in to change notification settings - Fork 1.9k
input: add storage.total_limit_size to cap filesystem buffer bytes #11734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
74fa043
6380512
8586351
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -73,6 +73,15 @@ static int emitter_create(struct flb_rewrite_tag *ctx) | |||||||||||||||||||||||||||||||||||||
| flb_plg_error(ctx->ins, "cannot set storage.type"); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| /* Set the storage total limit size (filesystem cap) */ | ||||||||||||||||||||||||||||||||||||||
| if (ctx->emitter_storage_total_limit_size) { | ||||||||||||||||||||||||||||||||||||||
| ret = flb_input_set_property(ins, "storage.total_limit_size", | ||||||||||||||||||||||||||||||||||||||
| ctx->emitter_storage_total_limit_size); | ||||||||||||||||||||||||||||||||||||||
| if (ret == -1) { | ||||||||||||||||||||||||||||||||||||||
| flb_plg_error(ctx->ins, "cannot set storage.total_limit_size"); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+76
to
+83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fail emitter creation when the storage cap cannot be applied. Line 80 logs the 🐛 Proposed fix if (ctx->emitter_storage_total_limit_size) {
ret = flb_input_set_property(ins, "storage.total_limit_size",
ctx->emitter_storage_total_limit_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot set storage.total_limit_size");
+ flb_input_instance_destroy(ins);
+ return -1;
}
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| /* Initialize emitter plugin */ | ||||||||||||||||||||||||||||||||||||||
| ret = flb_input_instance_init(ins, ctx->config); | ||||||||||||||||||||||||||||||||||||||
| if (ret == -1) { | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -607,6 +616,12 @@ static struct flb_config_map config_map[] = { | |||||||||||||||||||||||||||||||||||||
| FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_mem_buf_limit), | ||||||||||||||||||||||||||||||||||||||
| "set a memory buffer limit to restrict memory usage of emitter" | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| FLB_CONFIG_MAP_STR, "emitter_storage.total_limit_size", NULL, | ||||||||||||||||||||||||||||||||||||||
| FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_storage_total_limit_size), | ||||||||||||||||||||||||||||||||||||||
| "set the maximum disk space that the emitter can use for filesystem " | ||||||||||||||||||||||||||||||||||||||
| "buffered chunks. When this limit is reached the emitter is paused." | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| /* EOF */ | ||||||||||||||||||||||||||||||||||||||
| {0} | ||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -409,6 +409,7 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, | |
| instance->data = data; | ||
| instance->storage = NULL; | ||
| instance->storage_type = -1; | ||
| instance->storage_total_limit_size = (size_t) -1; | ||
| instance->log_level = -1; | ||
| instance->log_suppress_interval = -1; | ||
| instance->runs_in_coroutine = FLB_FALSE; | ||
|
|
@@ -918,6 +919,26 @@ int flb_input_set_property(struct flb_input_instance *ins, | |
| } | ||
| ins->storage_pause_on_chunks_overlimit = ret; | ||
| } | ||
| else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) { | ||
| int64_t limit; | ||
|
|
||
| if (strcasecmp(tmp, "off") == 0 || | ||
| flb_utils_bool(tmp) == FLB_FALSE) { | ||
| limit = -1; | ||
| } | ||
| else { | ||
| limit = flb_utils_size_to_bytes(tmp); | ||
| if (limit == -1) { | ||
| flb_sds_destroy(tmp); | ||
| return -1; | ||
| } | ||
| if (limit == 0) { | ||
| limit = -1; | ||
| } | ||
| } | ||
| flb_sds_destroy(tmp); | ||
| ins->storage_total_limit_size = (size_t) limit; | ||
|
Comment on lines
+922
to
+940
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reject all negative parsed sizes before casting to Line 931 only rejects 🐛 Proposed fix@@
- ssize_t limit;
+ ssize_t limit;
+ int64_t storage_limit;
@@
else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
- int64_t limit;
-
if (strcasecmp(tmp, "off") == 0 ||
flb_utils_bool(tmp) == FLB_FALSE) {
- limit = -1;
+ storage_limit = -1;
}
else {
- limit = flb_utils_size_to_bytes(tmp);
- if (limit == -1) {
+ storage_limit = flb_utils_size_to_bytes(tmp);
+ if (storage_limit < 0) {
flb_sds_destroy(tmp);
return -1;
}
- if (limit == 0) {
- limit = -1;
+ if (storage_limit == 0) {
+ storage_limit = -1;
}
}
flb_sds_destroy(tmp);
- ins->storage_total_limit_size = (size_t) limit;
+ ins->storage_total_limit_size = (size_t) storage_limit;
}As per coding guidelines, Declare variables at the start of functions, not mid-block. 🤖 Prompt for AI Agents |
||
| } | ||
| else { | ||
| /* | ||
| * Create the property, we don't pass the value since we will | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2397,6 +2397,28 @@ static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i) | |
| return FLB_FALSE; | ||
| } | ||
|
|
||
| /* | ||
| * Return the total bytes used by all chunks (up + down) belonging to | ||
| * an input instance's stream. Used to enforce storage.total_limit_size. | ||
| */ | ||
| static size_t flb_input_chunk_fs_total_size(struct flb_input_instance *i) | ||
| { | ||
| ssize_t bytes; | ||
| size_t total = 0; | ||
| struct cio_chunk *ch; | ||
| struct mk_list *head; | ||
| struct flb_storage_input *storage = (struct flb_storage_input *) i->storage; | ||
|
|
||
| mk_list_foreach(head, &storage->stream->chunks) { | ||
| ch = mk_list_entry(head, struct cio_chunk, _head); | ||
| bytes = cio_chunk_get_real_size(ch); | ||
| if (bytes > 0) { | ||
| total += bytes; | ||
|
Comment on lines
+2414
to
+2416
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
| } | ||
| } | ||
| return total; | ||
| } | ||
|
Comment on lines
+2400
to
+2420
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the input-chunk size wrapper to avoid undercounting active chunks. Line 2414 bypasses 🐛 Proposed fix static size_t flb_input_chunk_fs_total_size(struct flb_input_instance *i)
{
ssize_t bytes;
size_t total = 0;
- struct cio_chunk *ch;
struct mk_list *head;
- struct flb_storage_input *storage = (struct flb_storage_input *) i->storage;
+ struct flb_input_chunk *ic;
- mk_list_foreach(head, &storage->stream->chunks) {
- ch = mk_list_entry(head, struct cio_chunk, _head);
- bytes = cio_chunk_get_real_size(ch);
+ mk_list_foreach(head, &i->chunks) {
+ ic = mk_list_entry(head, struct flb_input_chunk, _head);
+ bytes = flb_input_chunk_get_real_size(ic);
if (bytes > 0) {
total += bytes;
}
}🤖 Prompt for AI Agents |
||
|
|
||
| static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i) | ||
| { | ||
| struct flb_storage_input *storage = (struct flb_storage_input *)i->storage; | ||
|
|
@@ -2407,6 +2429,12 @@ static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance | |
| return FLB_TRUE; | ||
| } | ||
| } | ||
|
|
||
| if (i->storage_total_limit_size != (size_t) -1) { | ||
| if (flb_input_chunk_fs_total_size(i) >= i->storage_total_limit_size) { | ||
| return FLB_TRUE; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return FLB_FALSE; | ||
|
|
@@ -2486,10 +2514,21 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t j | |
| struct flb_storage_input *storage = i->storage; | ||
|
|
||
| if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) { | ||
| flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", | ||
| flb_input_name(i), | ||
| storage->cio->total_chunks_up, | ||
| storage->cio->max_chunks_up); | ||
| if (i->storage_total_limit_size != (size_t) -1) { | ||
| size_t fs_total = flb_input_chunk_fs_total_size(i); | ||
| if (fs_total >= i->storage_total_limit_size) { | ||
| flb_warn("[input] %s paused (storage total size overlimit " | ||
| "%zuB/%zuB)", | ||
| flb_input_name(i), fs_total, | ||
| i->storage_total_limit_size); | ||
| } | ||
| } | ||
| else { | ||
| flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", | ||
| flb_input_name(i), | ||
| storage->cio->total_chunks_up, | ||
| storage->cio->max_chunks_up); | ||
| } | ||
|
Comment on lines
+2517
to
+2531
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preserve the chunk-count overlimit log when both limits are configured. If 🛠️ Proposed fix static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t just_written_size)
{
+ int storage_total_limit_reached = FLB_FALSE;
+ size_t fs_total = 0;
struct flb_storage_input *storage = i->storage;
if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
if (i->storage_total_limit_size != (size_t) -1) {
- size_t fs_total = flb_input_chunk_fs_total_size(i);
+ fs_total = flb_input_chunk_fs_total_size(i);
if (fs_total >= i->storage_total_limit_size) {
flb_warn("[input] %s paused (storage total size overlimit "
"%zuB/%zuB)",
flb_input_name(i), fs_total,
i->storage_total_limit_size);
+ storage_total_limit_reached = FLB_TRUE;
}
}
- else {
+ if (storage_total_limit_reached == FLB_FALSE) {
flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)",
flb_input_name(i),
storage->cio->total_chunks_up,As per coding guidelines, Declare variables at the start of functions, not mid-block. 🤖 Prompt for AI Agents |
||
| flb_input_pause(i); | ||
| i->storage_buf_status = FLB_INPUT_PAUSED; | ||
| return FLB_TRUE; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
emitter_storage.total_limit_sizeis invalid (e.g., typo like10XB),flb_input_set_property()returns-1butemitter_create()only logs and continues startup. That leaves the emitter running with the default unlimited storage cap, so the configuration appears accepted while the safety limit is silently disabled.Useful? React with 👍 / 👎.