Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ struct flb_config {
int storage_metrics; /* enable/disable storage metrics */
int storage_checksum; /* checksum enabled */
int storage_max_chunks_up; /* max number of chunks 'up' in memory */
char *storage_max_chunk_size; /* maximum chunk size in buffer */
int storage_del_bad_chunks; /* delete irrecoverable chunks */
char *storage_bl_mem_limit; /* storage backlog memory limit */
int storage_bl_flush_on_shutdown; /* enable/disable backlog chunks flush on shutdown */
Expand Down Expand Up @@ -412,6 +413,8 @@ enum conf_type {
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNK_SIZE \
"storage.max_chunk_size"
#define FLB_CONF_STORAGE_BL_FLUSH_ON_SHUTDOWN \
"storage.backlog.flush_on_shutdown"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
Expand Down
4 changes: 3 additions & 1 deletion include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ struct cio_chunk;
* Defines a maximum size for a Chunk in the file system: note that despite
* this is considered a limit, a Chunk size might get greater than this.
*/
#define FLB_INPUT_CHUNK_FS_MAX_SIZE 2048000 /* 2MB */
#define FLB_INPUT_CHUNK_FS_MAX_SIZE (size_t) 2048000 /* 2MB */

size_t flb_input_chunk_get_max_size(struct flb_config *config);

/* Number of bytes reserved for Metadata Header on Chunks */
#define FLB_INPUT_CHUNK_META_HEADER 4
Expand Down
19 changes: 11 additions & 8 deletions plugins/in_winevtlog/in_winevtlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_sqldb.h>
#include <fluent-bit/flb_input_chunk.h>
#include "winevtlog.h"

#define DEFAULT_INTERVAL_SEC 1
#define DEFAULT_INTERVAL_NSEC 0
#define DEFAULT_THRESHOLD_SIZE 0x7ffff /* Default reading buffer size */
/* (512kib = 524287bytes) */
#define MINIMUM_THRESHOLD_SIZE 0x0400 /* 1024 bytes */
#define MAXIMUM_THRESHOLD_SIZE (FLB_INPUT_CHUNK_FS_MAX_SIZE - (1024 * 200))
#define MAXIMUM_THRESHOLD_PERCENT 90

static int in_winevtlog_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context);
Expand Down Expand Up @@ -163,6 +164,7 @@ static int in_winevtlog_init(struct flb_input_instance *in,
int status = WINEVTLOG_SESSION_CREATE_OK;
double mult = 2.0;
DWORD tmp_ms = 0;
size_t maximum_threshold_size;

/* Initialize context */
ctx = flb_calloc(1, sizeof(struct winevtlog_config));
Expand Down Expand Up @@ -273,32 +275,33 @@ static int in_winevtlog_init(struct flb_input_instance *in,
ctx->session = session;

/* Set up total reading size threshold */
maximum_threshold_size = flb_input_chunk_get_max_size(config) / 100 * MAXIMUM_THRESHOLD_PERCENT;
Comment thread
castorsky marked this conversation as resolved.
if (ctx->total_size_threshold >= MINIMUM_THRESHOLD_SIZE &&
ctx->total_size_threshold <= MAXIMUM_THRESHOLD_SIZE) {
flb_utils_bytes_to_human_readable_size((size_t) ctx->total_size_threshold,
ctx->total_size_threshold <= maximum_threshold_size) {
flb_utils_bytes_to_human_readable_size(ctx->total_size_threshold,
human_readable_size,
sizeof(human_readable_size) - 1);
flb_plg_debug(ctx->ins,
"read limit per cycle is set up as %s",
human_readable_size);
}
else if (ctx->total_size_threshold > MAXIMUM_THRESHOLD_SIZE) {
flb_utils_bytes_to_human_readable_size((size_t) MAXIMUM_THRESHOLD_SIZE,
else if (ctx->total_size_threshold > maximum_threshold_size) {
flb_utils_bytes_to_human_readable_size(maximum_threshold_size,
human_readable_size,
sizeof(human_readable_size) - 1);
flb_plg_warn(ctx->ins,
"read limit per cycle cannot exceed %s. Set up to %s",
human_readable_size, human_readable_size);
ctx->total_size_threshold = (unsigned int) MAXIMUM_THRESHOLD_SIZE;
ctx->total_size_threshold = maximum_threshold_size;
}
else if (ctx->total_size_threshold < MINIMUM_THRESHOLD_SIZE){
flb_utils_bytes_to_human_readable_size((size_t) MINIMUM_THRESHOLD_SIZE,
flb_utils_bytes_to_human_readable_size(MINIMUM_THRESHOLD_SIZE,
human_readable_size,
sizeof(human_readable_size) - 1);
flb_plg_warn(ctx->ins,
"read limit per cycle cannot under 1KiB. Set up to %s",
human_readable_size);
ctx->total_size_threshold = (unsigned int) MINIMUM_THRESHOLD_SIZE;
ctx->total_size_threshold = MINIMUM_THRESHOLD_SIZE;
}
Comment thread
castorsky marked this conversation as resolved.

/* Open channels */
Expand Down
6 changes: 6 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ struct flb_service_config service_configs[] = {
{FLB_CONF_STORAGE_MAX_CHUNKS_UP,
FLB_CONF_TYPE_INT,
offsetof(struct flb_config, storage_max_chunks_up)},
{FLB_CONF_STORAGE_MAX_CHUNK_SIZE,
FLB_CONF_TYPE_STR,
offsetof(struct flb_config, storage_max_chunk_size)},
{FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_del_bad_chunks)},
Expand Down Expand Up @@ -621,6 +624,9 @@ void flb_config_exit(struct flb_config *config)
if (config->storage_bl_mem_limit) {
flb_free(config->storage_bl_mem_limit);
}
if (config->storage_max_chunk_size) {
flb_free(config->storage_max_chunk_size);
}
if (config->storage_rejected_path) {
flb_free(config->storage_rejected_path);
}
Expand Down
32 changes: 28 additions & 4 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/stream_processor/flb_sp.h>
#include <fluent-bit/flb_ring_buffer.h>
#include <fluent-bit/flb_utils.h>
#include <chunkio/chunkio.h>
#include <monkey/mk_core.h>
#include <string.h>
Expand Down Expand Up @@ -2632,6 +2633,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
size_t filtered_data_size;
void *final_data_buffer;
size_t final_data_size;
size_t flb_input_chunk_max_size;

/* memory ring-buffer checker */
if (in->storage_type == FLB_STORAGE_MEMRB) {
Expand Down Expand Up @@ -2828,8 +2830,9 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
real_diff = 0;
}

/* Lock buffers where size > 2MB */
if (content_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
flb_input_chunk_max_size = flb_input_chunk_get_max_size(in->config);

if (content_size > flb_input_chunk_max_size) {
cio_chunk_lock(ic->chunk);
}

Expand Down Expand Up @@ -2896,8 +2899,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
content_size = cio_chunk_get_content_size(ic->chunk);

/* Do we have less than 1% available ? */
min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01);
if (FLB_INPUT_CHUNK_FS_MAX_SIZE - content_size < min) {
min = (flb_input_chunk_max_size / 100);
if (flb_input_chunk_max_size - content_size < min) {
cio_chunk_down(ic->chunk);
}
}
Expand Down Expand Up @@ -3297,3 +3300,24 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
}
}
}

/*
* Get value of 'storage.max_chunk_size' from configuration or set default value.
*/
size_t flb_input_chunk_get_max_size(struct flb_config *config) {
int64_t config_input_chunk_max_size;

if (config != NULL) {
config_input_chunk_max_size = flb_utils_size_to_bytes(config->storage_max_chunk_size);
if (config_input_chunk_max_size > 0) {
flb_debug("[input chunk] using maximum chunk size: %" PRId64, config_input_chunk_max_size);
return (size_t) config_input_chunk_max_size;
} else if (config_input_chunk_max_size == 0) {
flb_debug("[input chunk] maximum chunk size was not set, using the default value: %zu", FLB_INPUT_CHUNK_FS_MAX_SIZE);
} else {
flb_debug("[input chunk] could not parse maximum chunk size, using the default value: %zu", FLB_INPUT_CHUNK_FS_MAX_SIZE);
}
}

return FLB_INPUT_CHUNK_FS_MAX_SIZE;
}