diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 9140b1dc71f..216ddef5592 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -73,6 +73,10 @@ #define FLB_INPUT_RUNNING 1 #define FLB_INPUT_PAUSED 0 +/* Paused by */ +#define FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT 0 +#define FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW 1 + struct flb_input_instance; struct flb_input_plugin { @@ -249,6 +253,9 @@ struct flb_input_instance { */ int storage_buf_status; + + int storage_buf_paused_by; + /* * Optional data passed to the plugin, this info is useful when * running Fluent Bit in library mode and the target plugin needs diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 58433fcb0d5..808a7e575b3 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -77,6 +77,8 @@ int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_inst #define FLB_OUTPUT_PRIVATE 1024 #define FLB_OUTPUT_SYNCHRONOUS 2048 /* run one task at a time, no flush cycle limit */ +#define FLB_OUTPUT_STORAGE_OVERFLOW_DROP 0 /* storage.overflow_action: drop_oldest_chunk */ +#define FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION 1 /* storage.overlow_action : pause_ingestion */ /* * Event type handlers @@ -406,6 +408,14 @@ struct flb_output_instance { */ size_t total_limit_size; + /* when the instance has been configured with a limit for storage.total_limit_size, + * if the limit is reached the action to take is defined by storage_overflow_action: + * + * - FLB_OUTPUT_STORAGE_OVERFLOW_DROP (default) + * - FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION + */ + int storage_overflow_action; + /* Queue for singleplexed tasks */ struct flb_task_queue *singleplex_queue; diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index ca02e6fca68..d81d606b970 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1203,11 +1203,15 @@ static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance { struct flb_storage_input *storage = (struct flb_storage_input *)i->storage; - if (storage->type == FLB_STORAGE_FS) { - if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) { - if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) { - return FLB_TRUE; - } + /* Not applicable for storage based on memory */ + if (storage->type == FLB_STORAGE_MEM) { + return FLB_FALSE; + } + + /* if input instance has enabled 'storage.pause_on_chunks_overlimit' */ + if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) { + if (storage->cio->total_chunks_up >= storage->cio->max_chunks_up) { + return FLB_TRUE; } } @@ -1260,10 +1264,12 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) in->name); } } + if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE && in->config->is_running == FLB_TRUE && in->config->is_ingestion_active == FLB_TRUE && - in->storage_buf_status == FLB_INPUT_PAUSED) { + in->storage_buf_status == FLB_INPUT_PAUSED && + in->storage_buf_paused_by == FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT) { in->storage_buf_status = FLB_INPUT_RUNNING; if (in->p->cb_resume) { flb_input_resume(in); @@ -1274,6 +1280,17 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) } } + if (in->config->is_running == FLB_TRUE && + in->config->is_ingestion_active == FLB_TRUE && + in->storage_buf_status == FLB_INPUT_PAUSED && + in->storage_buf_paused_by == FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW) { + in->storage_buf_status = FLB_INPUT_RUNNING; + if (in->p->cb_resume) { + flb_input_resume(in); + flb_info("[input] %s resume from storage.overflow_action", flb_input_name(in)); + } + } + return total; } @@ -1281,10 +1298,14 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) * If the number of bytes in use by the chunks are over the imposed limit * by configuration, pause the instance. */ -static inline int flb_input_chunk_protect(struct flb_input_instance *i) +static inline int flb_input_chunk_protect(struct flb_input_instance *i, struct flb_input_chunk *last_input_chunk) { + struct mk_list *head; + struct flb_config *config; + struct flb_output_instance *o_ins; struct flb_storage_input *storage = i->storage; + config = i->config; if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) { flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", i->name, @@ -1292,14 +1313,13 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i) storage->cio->max_chunks_up); flb_input_pause(i); i->storage_buf_status = FLB_INPUT_PAUSED; + i->storage_buf_paused_by = FLB_INPUT_PAUSED_BY_CHUNKS_OVERLIMIT; return FLB_TRUE; } - if (storage->type == FLB_STORAGE_FS) { - return FLB_FALSE; - } - if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) { + if ((storage->type == FLB_STORAGE_MEM || storage->type == FLB_STORAGE_MEM) && + flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) { /* * if the plugin is already overlimit and the strategy is based on * a memory-ring-buffer logic, do not pause the plugin, upon next @@ -1313,13 +1333,50 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i) * The plugin is using 'memory' buffering only and already reached * it limit, just pause the ingestion. */ - flb_warn("[input] %s paused (mem buf overlimit)", - i->name); + flb_warn("[input] %s paused (mem buf overlimit)", flb_input_name(i)); flb_input_pause(i); i->mem_buf_status = FLB_INPUT_PAUSED; return FLB_TRUE; } + /* + * if the output plugin that is a route for this input instance has enabled + * `storage.total_limit_size` and it capacity is at 90% or less than 5MB, we + * will pause the input instance. + */ + if (storage->type == FLB_STORAGE_FS) { + mk_list_foreach(head, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + + /* only count outputs with storage.total_limit_size set */ + if (o_ins->total_limit_size <= 0) { + continue; + } + + /* skip outputs with overflow action set to drop */ + if (o_ins->storage_overflow_action == FLB_OUTPUT_STORAGE_OVERFLOW_DROP) { + continue; + } + + /* check if this output plugin matches the route of the input */ + if (flb_routes_mask_get_bit(last_input_chunk->routes_mask, o_ins->id) == 0) { + continue; + } + + /* pause ingestion if capacity is at 90%, or less than 5MB available */ + if ((o_ins->fs_chunks_size >= (o_ins->total_limit_size * 0.9)) || + (o_ins->fs_chunks_size + (5 * 1024 * 1024)) >= o_ins->total_limit_size) { + flb_warn("[input] %s paused by storage.overflow_action - output %s usage is %zu/%zu)", + flb_input_name(i), flb_output_name(o_ins), + o_ins->fs_chunks_size, o_ins->total_limit_size); + flb_input_pause(i); + i->storage_buf_status = FLB_INPUT_PAUSED; + i->storage_buf_paused_by = FLB_INPUT_PAUSED_BY_STORAGE_OVERFLOW; + return FLB_TRUE; + } + } + } + return FLB_FALSE; } @@ -1750,7 +1807,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, } #endif /* FLB_HAVE_CHUNK_TRACE */ - flb_input_chunk_protect(in); + flb_input_chunk_protect(in, ic); return 0; } diff --git a/src/flb_output.c b/src/flb_output.c index 1c7853f79f1..02b15a83475 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -279,7 +279,7 @@ int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue, if (is_empty) { return flb_output_task_queue_flush_one(out_ins->singleplex_queue); } - + return 0; } @@ -300,7 +300,7 @@ int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue) mk_list_del(&ended_task->_head); flb_free(ended_task); } - + /* Flush if there is a pending task queued */ is_empty = mk_list_is_empty(&queue->pending) == 0; if (!is_empty) { @@ -923,6 +923,20 @@ int flb_output_set_property(struct flb_output_instance *ins, flb_sds_destroy(tmp); ins->total_limit_size = (size_t) limit; } + else if (prop_key_check("storage.overflow_action", k, len) == 0 && tmp) { + if (strcasecmp(tmp, "drop_oldest_chunk") == 0) { + ins->storage_overflow_action = FLB_OUTPUT_STORAGE_OVERFLOW_DROP; + } + else if (strcasecmp(tmp, "pause_ingestion") == 0) { + ins->storage_overflow_action = FLB_OUTPUT_STORAGE_OVERFLOW_PAUSE_INGESTION; + } + else { + flb_error("[config] invalid overflow_action '%s' for %s plugin", + tmp, (char *) flb_output_name(ins)); + flb_sds_destroy(tmp); + return -1; + } + } else if (prop_key_check("workers", k, len) == 0 && tmp) { /* Set the number of workers */ ins->tp_workers = atoi(tmp);