diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index c2ca41bd75e..9e5031041f2 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -270,6 +270,13 @@ struct flb_input_instance { /* flag to pause input when storage is full */ int storage_pause_on_chunks_overlimit; + /* + * Optional limit for total filesystem storage (bytes). When set, + * the input is paused once the sum of all its chunk sizes on disk + * reaches this threshold. -1 means unlimited (default). + */ + size_t storage_total_limit_size; + /* * Input network info: * diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c index 080a096a4f7..814686e45d3 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.c +++ b/plugins/filter_rewrite_tag/rewrite_tag.c @@ -73,6 +73,15 @@ static int emitter_create(struct flb_rewrite_tag *ctx) flb_plg_error(ctx->ins, "cannot set storage.type"); } + /* Set the storage total limit size (filesystem cap) */ + if (ctx->emitter_storage_total_limit_size) { + ret = flb_input_set_property(ins, "storage.total_limit_size", + ctx->emitter_storage_total_limit_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "cannot set storage.total_limit_size"); + } + } + /* Initialize emitter plugin */ ret = flb_input_instance_init(ins, ctx->config); if (ret == -1) { @@ -607,6 +616,12 @@ static struct flb_config_map config_map[] = { FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_mem_buf_limit), "set a memory buffer limit to restrict memory usage of emitter" }, + { + FLB_CONFIG_MAP_STR, "emitter_storage.total_limit_size", NULL, + FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_storage_total_limit_size), + "set the maximum disk space that the emitter can use for filesystem " + "buffered chunks. When this limit is reached the emitter is paused." + }, /* EOF */ {0} }; diff --git a/plugins/filter_rewrite_tag/rewrite_tag.h b/plugins/filter_rewrite_tag/rewrite_tag.h index 9064848def6..c08ea8e7c53 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.h +++ b/plugins/filter_rewrite_tag/rewrite_tag.h @@ -43,6 +43,7 @@ struct flb_rewrite_tag { flb_sds_t emitter_name; /* emitter input plugin name */ flb_sds_t emitter_storage_type; /* emitter storage type */ size_t emitter_mem_buf_limit; /* Emitter buffer limit */ + flb_sds_t emitter_storage_total_limit_size; /* Emitter FS total size cap */ struct mk_list rules; /* processed rules */ struct mk_list *cm_rules; /* config_map rules (only strings) */ struct flb_input_instance *ins_emitter; /* emitter input plugin instance */ diff --git a/src/flb_input.c b/src/flb_input.c index e990eee238c..bd19b61493b 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -409,6 +409,7 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->data = data; instance->storage = NULL; instance->storage_type = -1; + instance->storage_total_limit_size = (size_t) -1; instance->log_level = -1; instance->log_suppress_interval = -1; instance->runs_in_coroutine = FLB_FALSE; @@ -918,6 +919,26 @@ int flb_input_set_property(struct flb_input_instance *ins, } ins->storage_pause_on_chunks_overlimit = ret; } + else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) { + int64_t limit; + + if (strcasecmp(tmp, "off") == 0 || + flb_utils_bool(tmp) == FLB_FALSE) { + limit = -1; + } + else { + limit = flb_utils_size_to_bytes(tmp); + if (limit == -1) { + flb_sds_destroy(tmp); + return -1; + } + if (limit == 0) { + limit = -1; + } + } + flb_sds_destroy(tmp); + ins->storage_total_limit_size = (size_t) limit; + } else { /* * Create the property, we don't pass the value since we will diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 3d82fe1c381..985ff49d352 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -2397,6 +2397,28 @@ static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i) return FLB_FALSE; } +/* + * Return the total bytes used by all chunks (up + down) belonging to + * an input instance's stream. Used to enforce storage.total_limit_size. + */ +static size_t flb_input_chunk_fs_total_size(struct flb_input_instance *i) +{ + ssize_t bytes; + size_t total = 0; + struct cio_chunk *ch; + struct mk_list *head; + struct flb_storage_input *storage = (struct flb_storage_input *) i->storage; + + mk_list_foreach(head, &storage->stream->chunks) { + ch = mk_list_entry(head, struct cio_chunk, _head); + bytes = cio_chunk_get_real_size(ch); + if (bytes > 0) { + total += bytes; + } + } + return total; +} + static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i) { struct flb_storage_input *storage = (struct flb_storage_input *)i->storage; @@ -2407,6 +2429,12 @@ static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance return FLB_TRUE; } } + + if (i->storage_total_limit_size != (size_t) -1) { + if (flb_input_chunk_fs_total_size(i) >= i->storage_total_limit_size) { + return FLB_TRUE; + } + } } return FLB_FALSE; @@ -2486,10 +2514,21 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t j struct flb_storage_input *storage = i->storage; if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) { - flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", - flb_input_name(i), - storage->cio->total_chunks_up, - storage->cio->max_chunks_up); + if (i->storage_total_limit_size != (size_t) -1) { + size_t fs_total = flb_input_chunk_fs_total_size(i); + if (fs_total >= i->storage_total_limit_size) { + flb_warn("[input] %s paused (storage total size overlimit " + "%zuB/%zuB)", + flb_input_name(i), fs_total, + i->storage_total_limit_size); + } + } + else { + flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", + flb_input_name(i), + storage->cio->total_chunks_up, + storage->cio->max_chunks_up); + } flb_input_pause(i); i->storage_buf_status = FLB_INPUT_PAUSED; return FLB_TRUE;