From e6f1696cbcd7bc15fbcef06add6a3e06712fbc74 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 17:32:57 -0600 Subject: [PATCH 1/3] input: add struct field to know origin of being paused Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_input.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 9140b1dc71f..216ddef5592 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -73,6 +73,10 @@ #define FLB_INPUT_RUNNING 1 #define FLB_INPUT_PAUSED 0 +/* Paused by */ +#define FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT 0 +#define FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW 1 + struct flb_input_instance; struct flb_input_plugin { @@ -249,6 +253,9 @@ struct flb_input_instance { */ int storage_buf_status; + + int storage_buf_paused_by; + /* * Optional data passed to the plugin, this info is useful when * running Fluent Bit in library mode and the target plugin needs From 74cccffea00544564834c606946757a31e6e6e5c Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 17:33:49 -0600 Subject: [PATCH 2/3] output: add new config property called 'storage.overflow_action' When input plugins uses a filesystem based storage and the output plugin sets a 'storage.total_limit_size', as of now the default action when the buffer fills up is to drop the oldest chunk from the list, either from the backlog storage of from the plugin queue it self. The following patch, extends the functionality by allowing to register a new behavior throught the configuration option 'storage.overflow_action'. This new configuration can take two values: - drop_oldest_chunk: remove the oldest chunk to make room for new data. This is the default behavior. - pause_ingestion: once the buffer 'almost fills up', pause the ingestion for plugins that are sending data on that route. Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_output.h | 10 ++++++++++ src/flb_output.c | 18 ++++++++++++++++-- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 58433fcb0d5..808a7e575b3 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -77,6 +77,8 @@ int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_inst #define FLB_OUTPUT_PRIVATE 1024 #define FLB_OUTPUT_SYNCHRONOUS 2048 /* run one task at a time, no flush cycle limit */ +#define FLB_OUTPUT_STORAGE_OVERFLOW_DROP 0 /* storage.overflow_action: drop_oldest_chunk */ +#define FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION 1 /* storage.overlow_action : pause_ingestion */ /* * Event type handlers @@ -406,6 +408,14 @@ struct flb_output_instance { */ size_t total_limit_size; + /* when the instance has been configured with a limit for storage.total_limit_size, + * if the limit is reached the action to take is defined by storage_overflow_action: + * + * - FLB_OUTPUT_STORAGE_OVERFLOW_DROP (default) + * - FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION + */ + int storage_overflow_action; + /* Queue for singleplexed tasks */ struct flb_task_queue *singleplex_queue; diff --git a/src/flb_output.c b/src/flb_output.c index 1c7853f79f1..02b15a83475 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -279,7 +279,7 @@ int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue, if (is_empty) { return flb_output_task_queue_flush_one(out_ins->singleplex_queue); } - + return 0; } @@ -300,7 +300,7 @@ int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue) mk_list_del(&ended_task->_head); flb_free(ended_task); } - + /* Flush if there is a pending task queued */ is_empty = mk_list_is_empty(&queue->pending) == 0; if (!is_empty) { @@ -923,6 +923,20 @@ int flb_output_set_property(struct flb_output_instance *ins, flb_sds_destroy(tmp); ins->total_limit_size = (size_t) limit; } + else if (prop_key_check("storage.overflow_action", k, len) == 0 && tmp) { + if (strcasecmp(tmp, "drop_oldest_chunk") == 0) { + ins->storage_overflow_action = FLB_OUTPUT_STORAGE_OVERFLOW_DROP; + } + else if (strcasecmp(tmp, "pause_ingestion") == 0) { + ins->storage_overflow_action = FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION; + } + else { + flb_error("[config] invalid overflow_action '%s' for %s plugin", + tmp, (char *) flb_output_name(ins)); + flb_sds_destroy(tmp); + return -1; + } + } else if (prop_key_check("workers", k, len) == 0 && tmp) { /* Set the number of workers */ ins->tp_workers = atoi(tmp); From 8e33da45b23810b8da4cef8463b501456ec541e2 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 17:37:52 -0600 Subject: [PATCH 3/3] input_chunk: add support for the new 'pause_ingestion' overflow action If the output plugin has been configured with 'storage.overflow_action: pause_ingestion', the input plugin sending data on that route will be paused, if: - output buffer queue size is over 90% of the value set. - output buffer queue has less than 5MB of free space. note that to make this work, the service must have enabled filesystem storage and the input plugin be using 'storage.type: filesystem' Signed-off-by: Eduardo Silva --- src/flb_input_chunk.c | 85 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index ca02e6fca68..d81d606b970 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1203,11 +1203,15 @@ static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance { struct flb_storage_input *storage = (struct flb_storage_input *)i->storage; - if (storage->type == FLB_STORAGE_FS) { - if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) { - if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) { - return FLB_TRUE; - } + /* Not applicable for storage based on memory */ + if (storage->type == FLB_STORAGE_MEM) { + return FLB_FALSE; + } + + /* if input instance has enabled 'storage.pause_on_chunks_overlimit' */ + if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) { + if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) { + return FLB_TRUE; } } @@ -1260,10 +1264,12 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) in->name); } } + if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE && in->config->is_running == FLB_TRUE && in->config->is_ingestion_active == FLB_TRUE && - in->storage_buf_status == FLB_INPUT_PAUSED) { + in->storage_buf_status == FLB_INPUT_PAUSED && + in->storage_buf_paused_by == FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT) { in->storage_buf_status = FLB_INPUT_RUNNING; if (in->p->cb_resume) { flb_input_resume(in); @@ -1274,6 +1280,17 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) } } + if (in->config->is_running == FLB_TRUE && + in->config->is_ingestion_active == FLB_TRUE && + in->storage_buf_status == FLB_INPUT_PAUSED && + in->storage_buf_paused_by == FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW) { + in->storage_buf_status = FLB_INPUT_RUNNING; + if (in->p->cb_resume) { + flb_input_resume(in); + flb_info("[input] %s resume from storage.overflow_action", flb_input_name(in)); + } + } + return total; } @@ -1281,10 +1298,14 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) * If the number of bytes in use by the chunks are over the imposed limit * by configuration, pause the instance. */ -static inline int flb_input_chunk_protect(struct flb_input_instance *i) +static inline int flb_input_chunk_protect(struct flb_input_instance *i, struct flb_input_chunk *last_input_chunk) { + struct mk_list *head; + struct flb_config *config; + struct flb_output_instance *o_ins; struct flb_storage_input *storage = i->storage; + config = i->config; if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) { flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", i->name, @@ -1292,14 +1313,13 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i) storage->cio->max_chunks_up); flb_input_pause(i); i->storage_buf_status = FLB_INPUT_PAUSED; + i->storage_buf_paused_by = FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT; return FLB_TRUE; } - if (storage->type == FLB_STORAGE_FS) { - return FLB_FALSE; - } - if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) { + if ((storage->type == FLB_STORAGE_MEM || storage->type == FLB_STORAGE_MEM) && + flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) { /* * if the plugin is already overlimit and the strategy is based on * a memory-ring-buffer logic, do not pause the plugin, upon next @@ -1313,13 +1333,50 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i) * The plugin is using 'memory' buffering only and already reached * it limit, just pause the ingestion. */ - flb_warn("[input] %s paused (mem buf overlimit)", - i->name); + flb_warn("[input] %s paused (mem buf overlimit)", flb_input_name(i)); flb_input_pause(i); i->mem_buf_status = FLB_INPUT_PAUSED; return FLB_TRUE; } + /* + * if the output plugin that is a route for this input instance has enabled + * `storage.total_limit_size` and it capacity is at 90% or less than 5MB, we + * will pause the input instance. + */ + if (storage->type == FLB_STORAGE_FS) { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + + /* only count outputs with storage.total_limit_size set */ + if (o_ins->total_limit_size <= 0) { + continue; + } + + /* skip outputs with overflow action set to drop */ + if (o_ins->storage_overflow_action == FLB_OUTPUT_STORAGE_OVERFLOW_DROP) { + continue; + } + + /* check if this output plugin matches the route of the input */ + if (flb_routes_mask_get_bit(last_input_chunk->routes_mask, o_ins->id) == 0) { + continue; + } + + /* pause ingestion if capacity is at 90%, or less than 5MB available */ + if ((o_ins->fs_chunks_size >= (o_ins->total_limit_size * 0.9)) || + (o_ins->fs_chunks_size + (5 * 1024 * 1024)) >= o_ins->total_limit_size) { + flb_warn("[input] %s paused by storage.overflow_action - output %s usage is %zu/%zu)", + flb_input_name(i), flb_output_name(o_ins), + o_ins->fs_chunks_size, o_ins->total_limit_size); + flb_input_pause(i); + i->storage_buf_status = FLB_INPUT_PAUSED; + i->storage_buf_paused_by = FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW; + return FLB_TRUE; + } + } + } + return FLB_FALSE; } @@ -1750,7 +1807,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, } #endif /* FLB_HAVE_CHUNK_TRACE */ - flb_input_chunk_protect(in); + flb_input_chunk_protect(in, ic); return 0; }