diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index d171ef08167..4ce6ad48880 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -246,6 +246,7 @@ struct flb_config { int storage_metrics; /* enable/disable storage metrics */ int storage_checksum; /* checksum enabled */ int storage_max_chunks_up; /* max number of chunks 'up' in memory */ + char *storage_max_chunk_size; /* maximum chunk size in buffer */ int storage_del_bad_chunks; /* delete irrecoverable chunks */ char *storage_bl_mem_limit; /* storage backlog memory limit */ int storage_bl_flush_on_shutdown; /* enable/disable backlog chunks flush on shutdown */ @@ -412,6 +413,8 @@ enum conf_type { #define FLB_CONF_STORAGE_METRICS "storage.metrics" #define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" #define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" +#define FLB_CONF_STORAGE_MAX_CHUNK_SIZE \ + "storage.max_chunk_size" #define FLB_CONF_STORAGE_BL_FLUSH_ON_SHUTDOWN \ "storage.backlog.flush_on_shutdown" #define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 552bcfdc1c0..47a6957ba21 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -40,7 +40,9 @@ struct cio_chunk; * Defines a maximum size for a Chunk in the file system: note that despite * this is considered a limit, a Chunk size might get greater than this. */ -#define FLB_INPUT_CHUNK_FS_MAX_SIZE 2048000 /* 2MB */ +#define FLB_INPUT_CHUNK_FS_MAX_SIZE (size_t) 2048000 /* 2MB */ + +size_t flb_input_chunk_get_max_size(struct flb_config *config); /* Number of bytes reserved for Metadata Header on Chunks */ #define FLB_INPUT_CHUNK_META_HEADER 4 diff --git a/plugins/in_winevtlog/in_winevtlog.c b/plugins/in_winevtlog/in_winevtlog.c index 3a57e46e2df..9f159949e47 100644 --- a/plugins/in_winevtlog/in_winevtlog.c +++ b/plugins/in_winevtlog/in_winevtlog.c @@ -23,6 +23,7 @@ #include #include #include +#include #include "winevtlog.h" #define DEFAULT_INTERVAL_SEC 1 @@ -30,7 +31,7 @@ #define DEFAULT_THRESHOLD_SIZE 0x7ffff /* Default reading buffer size */ /* (512kib = 524287bytes) */ #define MINIMUM_THRESHOLD_SIZE 0x0400 /* 1024 bytes */ -#define MAXIMUM_THRESHOLD_SIZE (FLB_INPUT_CHUNK_FS_MAX_SIZE - (1024 * 200)) +#define MAXIMUM_THRESHOLD_PERCENT 90 static int in_winevtlog_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context); @@ -163,6 +164,7 @@ static int in_winevtlog_init(struct flb_input_instance *in, int status = WINEVTLOG_SESSION_CREATE_OK; double mult = 2.0; DWORD tmp_ms = 0; + size_t maximum_threshold_size; /* Initialize context */ ctx = flb_calloc(1, sizeof(struct winevtlog_config)); @@ -273,32 +275,33 @@ static int in_winevtlog_init(struct flb_input_instance *in, ctx->session = session; /* Set up total reading size threshold */ + maximum_threshold_size = flb_input_chunk_get_max_size(config) / 100 * MAXIMUM_THRESHOLD_PERCENT; if (ctx->total_size_threshold >= MINIMUM_THRESHOLD_SIZE && - ctx->total_size_threshold <= MAXIMUM_THRESHOLD_SIZE) { - flb_utils_bytes_to_human_readable_size((size_t) ctx->total_size_threshold, + ctx->total_size_threshold <= maximum_threshold_size) { + flb_utils_bytes_to_human_readable_size(ctx->total_size_threshold, human_readable_size, sizeof(human_readable_size) - 1); flb_plg_debug(ctx->ins, "read limit per cycle is set up as %s", human_readable_size); } - else if (ctx->total_size_threshold > MAXIMUM_THRESHOLD_SIZE) { - flb_utils_bytes_to_human_readable_size((size_t) MAXIMUM_THRESHOLD_SIZE, + else if (ctx->total_size_threshold > maximum_threshold_size) { + flb_utils_bytes_to_human_readable_size(maximum_threshold_size, human_readable_size, sizeof(human_readable_size) - 1); flb_plg_warn(ctx->ins, "read limit per cycle cannot exceed %s. Set up to %s", human_readable_size, human_readable_size); - ctx->total_size_threshold = (unsigned int) MAXIMUM_THRESHOLD_SIZE; + ctx->total_size_threshold = maximum_threshold_size; } else if (ctx->total_size_threshold < MINIMUM_THRESHOLD_SIZE){ - flb_utils_bytes_to_human_readable_size((size_t) MINIMUM_THRESHOLD_SIZE, + flb_utils_bytes_to_human_readable_size(MINIMUM_THRESHOLD_SIZE, human_readable_size, sizeof(human_readable_size) - 1); flb_plg_warn(ctx->ins, "read limit per cycle cannot under 1KiB. Set up to %s", human_readable_size); - ctx->total_size_threshold = (unsigned int) MINIMUM_THRESHOLD_SIZE; + ctx->total_size_threshold = MINIMUM_THRESHOLD_SIZE; } /* Open channels */ diff --git a/src/flb_config.c b/src/flb_config.c index b2c33312b2b..4599ee43dca 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -152,6 +152,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_MAX_CHUNKS_UP, FLB_CONF_TYPE_INT, offsetof(struct flb_config, storage_max_chunks_up)}, + {FLB_CONF_STORAGE_MAX_CHUNK_SIZE, + FLB_CONF_TYPE_STR, + offsetof(struct flb_config, storage_max_chunk_size)}, {FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS, FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, storage_del_bad_chunks)}, @@ -621,6 +624,9 @@ void flb_config_exit(struct flb_config *config) if (config->storage_bl_mem_limit) { flb_free(config->storage_bl_mem_limit); } + if (config->storage_max_chunk_size) { + flb_free(config->storage_max_chunk_size); + } if (config->storage_rejected_path) { flb_free(config->storage_rejected_path); } diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index dca829e3fc6..5b450383533 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -2632,6 +2633,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, size_t filtered_data_size; void *final_data_buffer; size_t final_data_size; + size_t flb_input_chunk_max_size; /* memory ring-buffer checker */ if (in->storage_type == FLB_STORAGE_MEMRB) { @@ -2828,8 +2830,9 @@ static int input_chunk_append_raw(struct flb_input_instance *in, real_diff = 0; } - /* Lock buffers where size > 2MB */ - if (content_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { + flb_input_chunk_max_size = flb_input_chunk_get_max_size(in->config); + + if (content_size > flb_input_chunk_max_size) { cio_chunk_lock(ic->chunk); } @@ -2896,8 +2899,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in, content_size = cio_chunk_get_content_size(ic->chunk); /* Do we have less than 1% available ? */ - min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01); - if (FLB_INPUT_CHUNK_FS_MAX_SIZE - content_size < min) { + min = (flb_input_chunk_max_size / 100); + if (flb_input_chunk_max_size - content_size < min) { cio_chunk_down(ic->chunk); } } @@ -3297,3 +3300,24 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, } } } + +/* + * Get value of 'storage.max_chunk_size' from configuration or set default value. + */ +size_t flb_input_chunk_get_max_size(struct flb_config *config) { + int64_t config_input_chunk_max_size; + + if (config != NULL) { + config_input_chunk_max_size = flb_utils_size_to_bytes(config->storage_max_chunk_size); + if (config_input_chunk_max_size > 0) { + flb_debug("[input chunk] using maximum chunk size: %" PRId64, config_input_chunk_max_size); + return (size_t) config_input_chunk_max_size; + } else if (config_input_chunk_max_size == 0) { + flb_debug("[input chunk] maximum chunk size was not set, using the default value: %zu", FLB_INPUT_CHUNK_FS_MAX_SIZE); + } else { + flb_debug("[input chunk] could not parse maximum chunk size, using the default value: %zu", FLB_INPUT_CHUNK_FS_MAX_SIZE); + } + } + + return FLB_INPUT_CHUNK_FS_MAX_SIZE; + }