diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index c2ca41bd75e..ccc19dd8081 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -82,6 +82,9 @@ /* Owner-side ingress queue status */ #define FLB_INPUT_INGRESS_BUSY -2 +/* 1 second in nsec unit */ +#define FLB_NSEC_IN_SEC 1000000000ULL + struct flb_input_instance; struct flb_http_server_config; @@ -440,6 +443,33 @@ struct flb_input_instance { struct cmt_gauge *cmt_ingress_queue_pending_events; struct cmt_gauge *cmt_ingress_queue_pending_bytes; + /* flow rate metrics */ + struct cmt_gauge *cmt_rate_bytes; /* metric: input_rate_bytes/window */ + struct cmt_gauge *cmt_rate_records;/* metric: input_rate_records/window */ + struct cmt_gauge *cmt_rate_gate_limited; /* metric: input rate gate */ + struct cmt_gauge *cmt_rate_gate_busy_chunks; + struct cmt_gauge *cmt_rate_gate_retry_attempts; + + /* + * Input rate accounting state + * --------------------------- + */ + uint64_t rate_window_start; + /* Window length used by rate accounting, in nanoseconds */ + uint64_t rate_window_size; + size_t rate_window_bytes; + size_t rate_window_records; + double rate_bytes; + double rate_records; + int rate_gate_enabled; + int rate_gate_status; + int rate_gate_use_backpressure; + double rate_gate_resume_ratio; + size_t rate_gate_max_bytes; + size_t rate_gate_max_records; + size_t rate_gate_busy_chunks; + size_t rate_gate_retry_attempts; + /* * Indexes for generated chunks: simple hash tables that keeps the latest * available chunks for writing data operations. This optimizes the @@ -743,6 +773,19 @@ static inline int flb_input_buf_paused(struct flb_input_instance *i) return FLB_FALSE; } +static inline int flb_input_paused(struct flb_input_instance *i) +{ + if (flb_input_buf_paused(i) == FLB_TRUE) { + return FLB_TRUE; + } + + if (i->rate_gate_status == FLB_INPUT_PAUSED) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + static inline int flb_input_config_map_set(struct flb_input_instance *ins, void *context) { @@ -855,6 +898,12 @@ int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_second int flb_input_pause(struct flb_input_instance *ins); int flb_input_pause_all(struct flb_config *config); int flb_input_resume(struct flb_input_instance *ins); +void flb_input_rate_update(struct flb_input_instance *ins, + uint64_t timestamp, + size_t records, + size_t bytes); +int flb_input_rate_gate_protect(struct flb_input_instance *ins); +void flb_input_rate_gate_maybe_resume(struct flb_input_instance *ins); const char *flb_input_name(struct flb_input_instance *ins); int flb_input_name_exists(const char *name, struct flb_config *config); diff --git a/src/flb_input.c b/src/flb_input.c index e990eee238c..1fc4f5425f3 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -44,6 +44,7 @@ #include #include #include +#include /* input plugin macro helpers */ #include @@ -119,6 +120,7 @@ static int flb_input_ingress_primitives_init(struct flb_input_instance *ins) #define FLB_INPUT_RING_BUFFER_CAPACITY 1024 #define FLB_INPUT_RING_BUFFER_SIZE (sizeof(void *) * FLB_INPUT_RING_BUFFER_CAPACITY) #define FLB_INPUT_RING_BUFFER_WINDOW (5) +#define FLB_INPUT_RATE_WINDOW_DEFAULT "1s" /* config map to register options available for all input plugins */ struct flb_config_map input_global_properties[] = { @@ -185,6 +187,37 @@ struct flb_config_map input_global_properties[] = { 0, FLB_FALSE, 0, "Set custom ring buffer window percentage for threaded inputs" }, + { + FLB_CONFIG_MAP_TIME, "rate_window", FLB_INPUT_RATE_WINDOW_DEFAULT, + 0, FLB_FALSE, 0, + "Set input rate window using a time unit (for example: 1s, 1m, 1h). " + "The computed rate is always published in per-second units." + }, + { + FLB_CONFIG_MAP_BOOL, "rate_gate", "false", + 0, FLB_FALSE, 0, + "Enable input rate gate control." + }, + { + FLB_CONFIG_MAP_SIZE, "rate_gate.max_bytes", "0", + 0, FLB_FALSE, 0, + "Maximum input byte rate per second before pausing ingestion." + }, + { + FLB_CONFIG_MAP_INT, "rate_gate.max_records", "0", + 0, FLB_FALSE, 0, + "Maximum input record rate per second before pausing ingestion." + }, + { + FLB_CONFIG_MAP_BOOL, "rate_gate.backpressure", "true", + 0, FLB_FALSE, 0, + "Apply retry and busy chunk pressure when computing effective limits." + }, + { + FLB_CONFIG_MAP_DOUBLE, "rate_gate.resume_ratio", "0.80", + 0, FLB_FALSE, 0, + "Hysteresis threshold used for resuming input rate gate." + }, {0} }; @@ -568,6 +601,15 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->mem_buf_status = FLB_INPUT_RUNNING; instance->mem_buf_limit = 0; instance->mem_chunks_size = 0; + instance->rate_window_size = FLB_NSEC_IN_SEC; + instance->rate_gate_enabled = FLB_FALSE; + instance->rate_gate_status = FLB_INPUT_RUNNING; + instance->rate_gate_use_backpressure = FLB_TRUE; + instance->rate_gate_resume_ratio = 0.80; + instance->rate_gate_max_bytes = 0; + instance->rate_gate_max_records = 0; + instance->rate_gate_busy_chunks = 0; + instance->rate_gate_retry_attempts = 0; instance->storage_buf_status = FLB_INPUT_RUNNING; mk_list_add(&instance->_head, &config->inputs); @@ -693,9 +735,12 @@ int flb_input_set_property(struct flb_input_instance *ins, { int len; int ret; + int seconds; int enabled; ssize_t limit; flb_sds_t tmp = NULL; + char *end; + long parsed; struct flb_kv *kv; len = strlen(k); @@ -910,6 +955,61 @@ int flb_input_set_property(struct flb_input_instance *ins, } ins->ring_buffer_window = (uint8_t) ret; } + else if (prop_key_check("rate_window", k, len) == 0 && tmp) { + seconds = flb_utils_time_to_seconds(tmp); + flb_sds_destroy(tmp); + if (seconds <= 0) { + flb_error("[input] invalid rate_window value"); + return -1; + } + ins->rate_window_size = ((uint64_t) seconds) * FLB_NSEC_IN_SEC; + } + else if (prop_key_check("rate_gate", k, len) == 0 && tmp) { + ret = flb_utils_bool(tmp); + flb_sds_destroy(tmp); + if (ret == -1) { + return -1; + } + ins->rate_gate_enabled = ret; + } + else if (prop_key_check("rate_gate.max_bytes", k, len) == 0 && tmp) { + limit = flb_utils_size_to_bytes(tmp); + flb_sds_destroy(tmp); + if (limit < 0) { + return -1; + } + ins->rate_gate_max_bytes = (size_t) limit; + } + else if (prop_key_check("rate_gate.max_records", k, len) == 0 && tmp) { + end = NULL; + parsed = strtol(tmp, &end, 10); + + if (end == tmp || *end != '\0' || parsed < 0) { + flb_sds_destroy(tmp); + return -1; + } + + flb_sds_destroy(tmp); + ins->rate_gate_max_records = (size_t) parsed; + } + else if (prop_key_check("rate_gate.backpressure", k, len) == 0 && tmp) { + ret = flb_utils_bool(tmp); + flb_sds_destroy(tmp); + if (ret == -1) { + return -1; + } + ins->rate_gate_use_backpressure = ret; + } + else if (prop_key_check("rate_gate.resume_ratio", k, len) == 0 && tmp) { + ins->rate_gate_resume_ratio = atof(tmp); + flb_sds_destroy(tmp); + + if (ins->rate_gate_resume_ratio <= 0.0 || + ins->rate_gate_resume_ratio >= 1.0) { + flb_error("[input] rate_gate.resume_ratio must be between 0 and 1"); + return -1; + } + } else if (prop_key_check("storage.pause_on_chunks_overlimit", k, len) == 0 && tmp) { ret = flb_utils_bool(tmp); flb_sds_destroy(tmp); @@ -1429,6 +1529,44 @@ int flb_input_instance_init(struct flb_input_instance *ins, 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_records, ts, 0, 1, (char *[]) {name}); + /* fluentbit_input_rate_bytes */ + ins->cmt_rate_bytes = \ + cmt_gauge_create(ins->cmt, + "fluentbit", "input", "rate_bytes", + "Current input bytes per second.", + 1, (char *[]) {"name"}); + cmt_gauge_set(ins->cmt_rate_bytes, ts, 0, 1, (char *[]) {name}); + + /* fluentbit_input_rate_records */ + ins->cmt_rate_records = \ + cmt_gauge_create(ins->cmt, + "fluentbit", "input", "rate_records", + "Current input records per second.", + 1, (char *[]) {"name"}); + cmt_gauge_set(ins->cmt_rate_records, ts, 0, 1, (char *[]) {name}); + + /* fluentbit_input_rate_gate_limited */ + ins->cmt_rate_gate_limited = \ + cmt_gauge_create(ins->cmt, + "fluentbit", "input", "rate_gate_limited", + "Is the input rate gate currently limiting ingestion?", + 1, (char *[]) {"name"}); + cmt_gauge_set(ins->cmt_rate_gate_limited, ts, 0, 1, (char *[]) {name}); + + ins->cmt_rate_gate_busy_chunks = \ + cmt_gauge_create(ins->cmt, + "fluentbit", "input", "rate_gate_busy_chunks", + "Busy chunks considered by the input rate gate.", + 1, (char *[]) {"name"}); + cmt_gauge_set(ins->cmt_rate_gate_busy_chunks, ts, 0, 1, (char *[]) {name}); + + ins->cmt_rate_gate_retry_attempts = \ + cmt_gauge_create(ins->cmt, + "fluentbit", "input", "rate_gate_retry_attempts", + "Retry attempts considered by the input rate gate.", + 1, (char *[]) {"name"}); + cmt_gauge_set(ins->cmt_rate_gate_retry_attempts, ts, 0, 1, (char *[]) {name}); + /* fluentbit_input_ingestion_paused */ ins->cmt_ingestion_paused = \ cmt_gauge_create(ins->cmt, @@ -2132,6 +2270,367 @@ int flb_input_collector_start(int coll_id, struct flb_input_instance *in) return -1; } +static void flb_input_rate_gate_effective_limit(struct flb_input_instance *ins, + double max_limit, + int for_resume, + double *effective_limit) +{ + size_t pressure; + double limit; + + limit = max_limit; + pressure = 0; + if (ins->rate_gate_use_backpressure == FLB_TRUE) { + pressure = ins->rate_gate_busy_chunks + ins->rate_gate_retry_attempts; + } + + if (pressure > 0 && limit > 0.0) { + limit = limit / (double) (1 + pressure); + } + + if (for_resume == FLB_TRUE) { + limit *= ins->rate_gate_resume_ratio; + } + + if (limit < 1.0) { + limit = 1.0; + } + + *effective_limit = limit; +} + +static int flb_input_rate_gate_is_limited(struct flb_input_instance *ins) +{ + double current_window_bytes_rate; + double current_window_records_rate; + double effective_max_bytes; + double effective_max_records; + double window_seconds; + uint64_t window_size; + + if (ins == NULL || ins->rate_gate_enabled != FLB_TRUE) { + return FLB_FALSE; + } + + /* * 1. Record-based limit check + */ + flb_input_rate_gate_effective_limit(ins, + (double) ins->rate_gate_max_records, + FLB_FALSE, + &effective_max_records); + + if (ins->rate_gate_max_records > 0) { + window_size = ins->rate_window_size; + if (window_size == 0) { + window_seconds = 1.0; + } + else { + window_seconds = (double) window_size / (double) FLB_NSEC_IN_SEC; + if (window_seconds <= 0.0) { + window_seconds = 1.0; + } + } + current_window_records_rate = (double) ins->rate_window_records / + window_seconds; + + /* Check the last calculated rate from the previous window */ + if (ins->rate_records > effective_max_records) { + return FLB_TRUE; + } + /* * ALSO check the current accumulated count in the ongoing window. + * This ensures the gate triggers even before the first window expires. + */ + if (current_window_records_rate > effective_max_records) { + return FLB_TRUE; + } + } + + /* * 2. Byte-based limit check + */ + flb_input_rate_gate_effective_limit(ins, + (double) ins->rate_gate_max_bytes, + FLB_FALSE, + &effective_max_bytes); + + if (ins->rate_gate_max_bytes > 0) { + window_size = ins->rate_window_size; + if (window_size == 0) { + window_seconds = 1.0; + } + else { + window_seconds = (double) window_size / (double) FLB_NSEC_IN_SEC; + if (window_seconds <= 0.0) { + window_seconds = 1.0; + } + } + current_window_bytes_rate = (double) ins->rate_window_bytes / + window_seconds; + + /* Check the last calculated rate from the previous window */ + if (ins->rate_bytes > effective_max_bytes) { + return FLB_TRUE; + } + /* Check the current accumulated bytes in the ongoing window */ + if (current_window_bytes_rate > effective_max_bytes) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int flb_input_rate_gate_can_resume(struct flb_input_instance *ins) +{ + double current_window_bytes_rate; + double current_window_records_rate; + double resume_max_bytes; + double resume_max_records; + double window_seconds; + uint64_t window_size; + + if (ins == NULL || ins->rate_gate_enabled != FLB_TRUE) { + return FLB_FALSE; + } + + flb_input_rate_gate_effective_limit(ins, + (double) ins->rate_gate_max_records, + FLB_TRUE, + &resume_max_records); + if (ins->rate_gate_max_records > 0) { + window_size = ins->rate_window_size; + if (window_size == 0) { + window_seconds = 1.0; + } + else { + window_seconds = (double) window_size / (double) FLB_NSEC_IN_SEC; + if (window_seconds <= 0.0) { + window_seconds = 1.0; + } + } + current_window_records_rate = (double) ins->rate_window_records / + window_seconds; + + if (ins->rate_records > resume_max_records) { + return FLB_FALSE; + } + if (current_window_records_rate > resume_max_records) { + return FLB_FALSE; + } + } + + flb_input_rate_gate_effective_limit(ins, + (double) ins->rate_gate_max_bytes, + FLB_TRUE, + &resume_max_bytes); + if (ins->rate_gate_max_bytes > 0) { + window_size = ins->rate_window_size; + if (window_size == 0) { + window_seconds = 1.0; + } + else { + window_seconds = (double) window_size / (double) FLB_NSEC_IN_SEC; + if (window_seconds <= 0.0) { + window_seconds = 1.0; + } + } + current_window_bytes_rate = (double) ins->rate_window_bytes / + window_seconds; + + if (ins->rate_bytes > resume_max_bytes) { + return FLB_FALSE; + } + if (current_window_bytes_rate > resume_max_bytes) { + return FLB_FALSE; + } + } + + return FLB_TRUE; +} + +static void flb_input_rate_gate_collect_backpressure(struct flb_input_instance *ins) +{ + struct mk_list *head; + struct mk_list *retry_head; + struct flb_input_chunk *ic; + struct flb_task *task; + struct flb_task_retry *retry; + + if (ins == NULL) { + return; + } + + ins->rate_gate_busy_chunks = 0; + ins->rate_gate_retry_attempts = 0; + + if (ins->rate_gate_use_backpressure != FLB_TRUE) { + return; + } + + mk_list_foreach(head, &ins->chunks) { + ic = mk_list_entry(head, struct flb_input_chunk, _head); + if (ic->busy == FLB_TRUE) { + ins->rate_gate_busy_chunks++; + } + } + + mk_list_foreach(head, &ins->tasks) { + task = mk_list_entry(head, struct flb_task, _head); + mk_list_foreach(retry_head, &task->retries) { + retry = mk_list_entry(retry_head, struct flb_task_retry, _head); + ins->rate_gate_retry_attempts += retry->attempts; + } + } +} + +int flb_input_rate_gate_protect(struct flb_input_instance *ins) +{ + uint64_t ts; + char *name; + + if (ins == NULL || ins->rate_gate_enabled != FLB_TRUE) { + return FLB_FALSE; + } + + ts = cfl_time_now(); + flb_input_rate_update(ins, ts, 0, 0); + flb_input_rate_gate_collect_backpressure(ins); + + if (flb_input_rate_gate_is_limited(ins) == FLB_FALSE) { + return FLB_FALSE; + } + + if (ins->rate_gate_status != FLB_INPUT_PAUSED) { + flb_warn("[input] %s paused (rate gate limit exceeded)", + flb_input_name(ins)); + flb_input_pause(ins); + ins->rate_gate_status = FLB_INPUT_PAUSED; + } + + name = (char *) flb_input_name(ins); + if (ins->cmt_rate_gate_limited != NULL) { + cmt_gauge_set(ins->cmt_rate_gate_limited, ts, 1, 1, (char *[]) {name}); + } + if (ins->cmt_rate_gate_busy_chunks != NULL) { + cmt_gauge_set(ins->cmt_rate_gate_busy_chunks, ts, + (double) ins->rate_gate_busy_chunks, 1, (char *[]) {name}); + } + if (ins->cmt_rate_gate_retry_attempts != NULL) { + cmt_gauge_set(ins->cmt_rate_gate_retry_attempts, ts, + (double) ins->rate_gate_retry_attempts, 1, (char *[]) {name}); + } + + return FLB_TRUE; +} + +void flb_input_rate_gate_maybe_resume(struct flb_input_instance *ins) +{ + int can_resume; + uint64_t ts; + char *name; + + if (ins == NULL || ins->rate_gate_enabled != FLB_TRUE) { + return; + } + + if (ins->rate_gate_status != FLB_INPUT_PAUSED) { + return; + } + + ts = cfl_time_now(); + flb_input_rate_update(ins, ts, 0, 0); + flb_input_rate_gate_collect_backpressure(ins); + can_resume = flb_input_rate_gate_can_resume(ins); + + if (can_resume == FLB_TRUE && + ins->mem_buf_status == FLB_INPUT_RUNNING && + ins->storage_buf_status == FLB_INPUT_RUNNING && + ins->config->is_running == FLB_TRUE && + ins->config->is_ingestion_active == FLB_TRUE) { + if (ins->p->cb_resume) { + flb_input_resume(ins); + flb_info("[input] %s resume (rate gate)", + flb_input_name(ins)); + } + ins->rate_gate_status = FLB_INPUT_RUNNING; + } + + name = (char *) flb_input_name(ins); + if (ins->cmt_rate_gate_limited != NULL && + ins->rate_gate_status == FLB_INPUT_RUNNING) { + cmt_gauge_set(ins->cmt_rate_gate_limited, ts, 0, 1, (char *[]) {name}); + } + else if (ins->cmt_rate_gate_limited != NULL) { + cmt_gauge_set(ins->cmt_rate_gate_limited, ts, 1, 1, (char *[]) {name}); + } + if (ins->cmt_rate_gate_busy_chunks != NULL) { + cmt_gauge_set(ins->cmt_rate_gate_busy_chunks, ts, + (double) ins->rate_gate_busy_chunks, 1, (char *[]) {name}); + } + if (ins->cmt_rate_gate_retry_attempts != NULL) { + cmt_gauge_set(ins->cmt_rate_gate_retry_attempts, ts, + (double) ins->rate_gate_retry_attempts, 1, (char *[]) {name}); + } +} + +void flb_input_rate_update(struct flb_input_instance *ins, + uint64_t timestamp, + size_t records, + size_t bytes) +{ + double elapsed_seconds; + char *name; + uint64_t rate_window_size; + + if (ins == NULL) { + return; + } + + if (timestamp == 0) { + timestamp = cfl_time_now(); + } + + if (ins->rate_window_start == 0) { + ins->rate_window_start = timestamp; + } + + rate_window_size = ins->rate_window_size; + if (rate_window_size == 0) { + rate_window_size = FLB_NSEC_IN_SEC; + } + + ins->rate_window_records += records; + ins->rate_window_bytes += bytes; + + if (timestamp < ins->rate_window_start || + (timestamp - ins->rate_window_start) < rate_window_size) { + return; + } + + elapsed_seconds = (double) (timestamp - ins->rate_window_start) / + (double) FLB_NSEC_IN_SEC; + if (elapsed_seconds <= 0.0) { + return; + } + + ins->rate_records = (double) ins->rate_window_records / elapsed_seconds; + ins->rate_bytes = (double) ins->rate_window_bytes / elapsed_seconds; + + name = (char *) flb_input_name(ins); + if (ins->cmt_rate_records != NULL) { + cmt_gauge_set(ins->cmt_rate_records, timestamp, ins->rate_records, + 1, (char *[]) {name}); + } + + if (ins->cmt_rate_bytes != NULL) { + cmt_gauge_set(ins->cmt_rate_bytes, timestamp, ins->rate_bytes, + 1, (char *[]) {name}); + } + + ins->rate_window_start = timestamp; + ins->rate_window_records = 0; + ins->rate_window_bytes = 0; +} + /* start collectors for main thread, no threaded plugins */ int flb_input_collectors_signal_start(struct flb_input_instance *ins) { @@ -2300,7 +2799,7 @@ int flb_input_pause(struct flb_input_instance *ins) int flb_input_resume(struct flb_input_instance *ins) { - if (ins->p->cb_resume) { + if (ins->p->cb_resume && ins->context) { if (flb_input_is_threaded(ins)) { /* signal the thread event loop about the 'resume' operation */ flb_input_thread_instance_resume(ins); diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 3d82fe1c381..9fc803b1b0c 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1113,6 +1113,9 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, /* fluentbit_input_bytes_total */ cmt_counter_add(in->cmt_bytes, ts, buf_size, 1, (char *[]) {(char *) flb_input_name(in)}); + if (ic->fs_backlog != FLB_TRUE) { + flb_input_rate_update(in, ts, ic->total_records, buf_size); + } /* OLD metrics */ flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics); @@ -2443,6 +2446,9 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) /* Register the total into the context variable */ in->mem_chunks_size = total; + /* Re-evaluate rate gate status first to avoid pause-state deadlocks. */ + flb_input_rate_gate_maybe_resume(in); + /* * After the adjustments, validate if the plugin is overlimit or paused * and perform further adjustments. @@ -2450,7 +2456,8 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE && in->config->is_running == FLB_TRUE && in->config->is_ingestion_active == FLB_TRUE && - in->mem_buf_status == FLB_INPUT_PAUSED) { + in->mem_buf_status == FLB_INPUT_PAUSED && + in->rate_gate_status == FLB_INPUT_RUNNING) { in->mem_buf_status = FLB_INPUT_RUNNING; if (in->p->cb_resume) { flb_input_resume(in); @@ -2463,7 +2470,8 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE && in->config->is_running == FLB_TRUE && in->config->is_ingestion_active == FLB_TRUE && - in->storage_buf_status == FLB_INPUT_PAUSED) { + in->storage_buf_status == FLB_INPUT_PAUSED && + in->rate_gate_status == FLB_INPUT_RUNNING) { in->storage_buf_status = FLB_INPUT_RUNNING; if (in->p->cb_resume) { flb_input_resume(in); @@ -2474,6 +2482,22 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) } } + /* * Check if ingestion should be resumed after rate limiting is cleared. + * Even if memory and storage limits are fine, we need this check to + * recover from a pure rate-gate pause. + */ + if (in->rate_gate_status == FLB_INPUT_RUNNING && + in->mem_buf_status == FLB_INPUT_RUNNING && + in->storage_buf_status == FLB_INPUT_RUNNING && + in->config->is_ingestion_active == FLB_TRUE) { + + /* Check if the input is still internally marked as paused */ + if (flb_input_paused(in) == FLB_TRUE) { + flb_input_resume(in); + flb_plg_info(in, "resume (rate gate limit cleared)"); + } + } + return total; } @@ -2495,6 +2519,10 @@ static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t j return FLB_TRUE; } + if (flb_input_rate_gate_protect(i) == FLB_TRUE) { + return FLB_TRUE; + } + if (storage->type == FLB_STORAGE_FS) { return FLB_FALSE; } @@ -2705,7 +2733,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, } /* Check if the input plugin has been paused */ - if (flb_input_buf_paused(in) == FLB_TRUE) { + if (flb_input_paused(in) == FLB_TRUE) { flb_debug("[input chunk] %s is paused, cannot append records", flb_input_name(in)); return -1; @@ -2800,6 +2828,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, /* fluentbit_input_bytes_total */ cmt_counter_add(in->cmt_bytes, ts, buf_size, 1, (char *[]) {(char *) flb_input_name(in)}); + flb_input_rate_update(in, ts, ic->added_records, buf_size); /* OLD api */ flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics); @@ -3104,7 +3133,18 @@ void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data) cr = NULL; while (1) { - if (flb_input_buf_paused(ins) == FLB_TRUE) { + if (flb_input_paused(ins) == FLB_TRUE) { + /* + * When an input is paused only by rate gate, there might be no + * chunk lifecycle updates to trigger a resume re-check. Poll + * it here so time-window based rate limits can recover. + */ + flb_input_rate_gate_maybe_resume(ins); + + if (flb_input_paused(ins) == FLB_FALSE) { + continue; + } + break; } diff --git a/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml new file mode 100644 index 00000000000..b43961af449 --- /dev/null +++ b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml @@ -0,0 +1,23 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: rate_gate.burst + dummy: '{"message":"burst and recovery"}' + copies: 50 + rate: 50 + rate_gate: true + rate_gate.max_bytes: 1024 + rate_gate.max_records: 25 + rate_window: 1s + rate_gate.backpressure: true + rate_gate.resume_ratio: 0.70 + + outputs: + - name: stdout + match: "*" diff --git a/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml new file mode 100644 index 00000000000..3931ee95928 --- /dev/null +++ b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml @@ -0,0 +1,30 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: rate_gate.fanout + dummy: '{"message":"fanout retry"}' + rate: 250 + rate_gate: true + rate_gate.max_bytes: 512 + rate_gate.max_records: 15 + rate_window: 1s + rate_gate.backpressure: true + + outputs: + - name: stdout + match: rate_gate.fanout + + - name: http + match: rate_gate.fanout + host: 127.0.0.1 + port: ${TEST_SUITE_HTTP_PORT} + uri: /data + format: json + json_date_key: false + retry_limit: 2 diff --git a/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml new file mode 100644 index 00000000000..235bbbba513 --- /dev/null +++ b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml @@ -0,0 +1,27 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + storage.path: /tmp/fluent-bit-rate-gate-storage + storage.sync: normal + storage.checksum: off + storage.backlog.mem_limit: 4M + +pipeline: + inputs: + - name: dummy + tag: rate_gate.filesystem + dummy: '{"message":"filesystem path"}' + copies: 30 + rate: 120 + storage.type: filesystem + rate_gate: true + rate_gate.max_bytes: 1024 + rate_gate.max_records: 20 + rate_window: 1s + rate_gate.backpressure: true + + outputs: + - name: stdout + match: "*" diff --git a/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml new file mode 100644 index 00000000000..d1881cab498 --- /dev/null +++ b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml @@ -0,0 +1,23 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: rate_gate.memrb + dummy: '{"message":"memrb path"}' + copies: 30 + rate: 120 + mem_buf_limit: 256k + rate_gate: true + rate_gate.max_bytes: 1024 + rate_gate.max_records: 20 + rate_window: 1s + rate_gate.backpressure: true + + outputs: + - name: stdout + match: "*" diff --git a/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml new file mode 100644 index 00000000000..09a7a7375c9 --- /dev/null +++ b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml @@ -0,0 +1,21 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: rate_gate.test + dummy: '{"message":"hello from config_rate_gate integration"}' + rate: 120 + rate_gate: true + rate_gate.max_bytes: 512 + rate_gate.max_records: 10 + rate_window: 1s + rate_gate.backpressure: true + + outputs: + - name: stdout + match: "*" diff --git a/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml new file mode 100644 index 00000000000..9847c2ae6a4 --- /dev/null +++ b/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml @@ -0,0 +1,21 @@ +service: + flush: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + tag: rate_gate.steady + dummy: '{"message":"steady overrate"}' + rate: 400 + rate_gate: true + rate_gate.max_bytes: 0 + rate_gate.max_records: 20 + rate_window: 1s + rate_gate.backpressure: true + + outputs: + - name: stdout + match: "*" diff --git a/tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py b/tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py new file mode 100644 index 00000000000..8f4d7ef3474 --- /dev/null +++ b/tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py @@ -0,0 +1,116 @@ +import os + +import pytest +import requests +from server.http_server import configure_http_response, data_storage, http_server_run +from utils.data_utils import read_file +from utils.fluent_bit_manager import FluentBitStartupError +from utils.test_service import FluentBitTestService + + +class Service: + def __init__(self, config_file, with_receiver=False): + if os.path.isabs(config_file): + self.config_file = config_file + else: + self.config_file = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../config", config_file) + ) + self.with_receiver = with_receiver + if self.with_receiver: + self.service = FluentBitTestService( + self.config_file, + data_storage=data_storage, + data_keys=["payloads", "requests"], + pre_start=self._start_receiver, + post_stop=self._stop_receiver, + ) + else: + self.service = FluentBitTestService(self.config_file) + + def _start_receiver(self, service): + http_server_run(service.test_suite_http_port) + self.service.wait_for_http_endpoint( + f"http://127.0.0.1:{service.test_suite_http_port}/ping", + timeout=10, + interval=0.5, + ) + + def _stop_receiver(self, service): + try: + requests.post(f"http://127.0.0.1:{service.test_suite_http_port}/shutdown", timeout=2) + except requests.RequestException: + pass + + def start(self): + self.service.start() + self.flb = self.service.flb + + def stop(self): + self.service.stop() + + def wait_for_log_contains(self, text, timeout=10): + return self.service.wait_for_condition( + lambda: read_file(self.flb.log_file) if text in read_file(self.flb.log_file) else None, + timeout=timeout, + interval=0.5, + description=f"log text {text!r}", + ) + + def wait_for_requests(self, minimum_count, timeout=10): + return self.service.wait_for_condition( + lambda: data_storage["requests"] if len(data_storage["requests"]) >= minimum_count else None, + timeout=timeout, + interval=0.5, + description=f"{minimum_count} outbound HTTP requests", + ) + + +def test_rate_gate_constructs_input_and_output_pipelines(): + service = Service("config_rate_gate_pipeline.yaml") + try: + service.start() + service.wait_for_log_contains("dummy.0 paused (rate gate limit exceeded)") + service.wait_for_log_contains("dummy.0 resume (rate gate)") + service.wait_for_log_contains("hello from config_rate_gate integration") + finally: + service.stop() + + +def test_rate_gate_multi_output_fanout_with_retries(): + configure_http_response(status_code=500, body={"status": "retry"}) + service = Service("config_rate_gate_fanout_retry.yaml", with_receiver=True) + try: + service.start() + + requests_seen = service.wait_for_requests(1, timeout=20) + assert len(requests_seen) >= 1 + + service.wait_for_log_contains("dummy.0 paused (rate gate limit exceeded)") + finally: + service.stop() + + +@pytest.mark.parametrize( + "config_file, expected_message", + [ + ("config_rate_gate_steady_overrate.yaml", "steady overrate"), + ("config_rate_gate_burst_recovery.yaml", "burst and recovery"), + ("config_rate_gate_memrb.yaml", "memrb path"), + ("config_rate_gate_filesystem.yaml", "filesystem path"), + ], +) +def test_rate_gate_rollout_scenarios(config_file, expected_message): + service = Service(config_file) + try: + service.start() + except FluentBitStartupError: + if config_file == "config_rate_gate_filesystem.yaml": + pytest.skip("filesystem storage backend is unavailable in this test environment") + raise + + try: + service.wait_for_log_contains("dummy.0 paused (rate gate limit exceeded)") + service.wait_for_log_contains(expected_message) + finally: + service.stop() diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 9f01fc93b26..fc4b3af1b35 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -54,6 +54,7 @@ set(UNIT_TESTS_FILES task_map.c strptime.c storage_inherit.c + input_rate_gate.c unicode.c opentelemetry.c storage_dlq.c diff --git a/tests/internal/input_rate_gate.c b/tests/internal/input_rate_gate.c new file mode 100644 index 00000000000..8b9d53f6e0d --- /dev/null +++ b/tests/internal/input_rate_gate.c @@ -0,0 +1,335 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include +#include + +#include "flb_tests_internal.h" + +struct test_context { + struct flb_config *config; + struct flb_input_instance *input; +}; + +static int test_context_init(struct test_context *ctx) +{ + ctx->config = flb_config_init(); + if (ctx->config == NULL) { + return -1; + } + + ctx->input = flb_input_new(ctx->config, "dummy", NULL, FLB_FALSE); + if (ctx->input == NULL) { + flb_config_exit(ctx->config); + return -1; + } + + return 0; +} + +static void test_context_destroy(struct test_context *ctx) +{ + flb_input_exit_all(ctx->config); + flb_config_exit(ctx->config); +} + +static void test_rate_gate_backpressure_limited() +{ + int ret; + struct test_context ctx; + struct flb_input_chunk *chunk; + struct flb_task *task; + struct flb_task_retry *retry; + + ret = test_context_init(&ctx); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + chunk = flb_calloc(1, sizeof(struct flb_input_chunk)); + task = flb_calloc(1, sizeof(struct flb_task)); + retry = flb_calloc(1, sizeof(struct flb_task_retry)); + TEST_CHECK(chunk != NULL); + TEST_CHECK(task != NULL); + TEST_CHECK(retry != NULL); + if (chunk == NULL || task == NULL || retry == NULL) { + flb_free(chunk); + flb_free(task); + flb_free(retry); + test_context_destroy(&ctx); + return; + } + + chunk->busy = FLB_TRUE; + mk_list_add(&chunk->_head, &ctx.input->chunks); + + mk_list_init(&task->retries); + mk_list_add(&task->_head, &ctx.input->tasks); + + retry->attempts = 3; + mk_list_add(&retry->_head, &task->retries); + + ctx.input->rate_window_start = cfl_time_now(); + ctx.input->rate_window_size = 10 * FLB_NSEC_IN_SEC; + ctx.input->rate_bytes = 26.0; + ctx.input->rate_gate_enabled = FLB_TRUE; + ctx.input->rate_gate_status = FLB_INPUT_RUNNING; + ctx.input->rate_gate_use_backpressure = FLB_TRUE; + ctx.input->rate_gate_max_bytes = 100; + ctx.input->rate_gate_max_records = 0; + + ret = flb_input_rate_gate_protect(ctx.input); + TEST_CHECK(ret == FLB_TRUE); + TEST_CHECK(ctx.input->rate_gate_status == FLB_INPUT_PAUSED); + TEST_CHECK(ctx.input->rate_gate_busy_chunks == 1); + TEST_CHECK(ctx.input->rate_gate_retry_attempts == 3); + + mk_list_del(&retry->_head); + mk_list_del(&task->_head); + mk_list_del(&chunk->_head); + flb_free(retry); + flb_free(task); + flb_free(chunk); + test_context_destroy(&ctx); +} + +static void test_rate_gate_backpressure_disabled() +{ + int ret; + struct test_context ctx; + struct flb_input_chunk *chunk; + struct flb_task *task; + struct flb_task_retry *retry; + + ret = test_context_init(&ctx); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + chunk = flb_calloc(1, sizeof(struct flb_input_chunk)); + task = flb_calloc(1, sizeof(struct flb_task)); + retry = flb_calloc(1, sizeof(struct flb_task_retry)); + TEST_CHECK(chunk != NULL); + TEST_CHECK(task != NULL); + TEST_CHECK(retry != NULL); + if (chunk == NULL || task == NULL || retry == NULL) { + flb_free(chunk); + flb_free(task); + flb_free(retry); + test_context_destroy(&ctx); + return; + } + + chunk->busy = FLB_TRUE; + mk_list_add(&chunk->_head, &ctx.input->chunks); + + mk_list_init(&task->retries); + mk_list_add(&task->_head, &ctx.input->tasks); + + retry->attempts = 3; + mk_list_add(&retry->_head, &task->retries); + + ctx.input->rate_window_start = cfl_time_now(); + ctx.input->rate_window_size = 10 * FLB_NSEC_IN_SEC; + ctx.input->rate_bytes = 26.0; + ctx.input->rate_gate_enabled = FLB_TRUE; + ctx.input->rate_gate_status = FLB_INPUT_RUNNING; + ctx.input->rate_gate_use_backpressure = FLB_FALSE; + ctx.input->rate_gate_max_bytes = 100; + ctx.input->rate_gate_max_records = 0; + + ret = flb_input_rate_gate_protect(ctx.input); + TEST_CHECK(ret == FLB_FALSE); + TEST_CHECK(ctx.input->rate_gate_status == FLB_INPUT_RUNNING); + TEST_CHECK(ctx.input->rate_gate_busy_chunks == 0); + TEST_CHECK(ctx.input->rate_gate_retry_attempts == 0); + + mk_list_del(&retry->_head); + mk_list_del(&task->_head); + mk_list_del(&chunk->_head); + flb_free(retry); + flb_free(task); + flb_free(chunk); + test_context_destroy(&ctx); +} + +static void test_rate_gate_property_parsing() +{ + int ret; + struct test_context ctx; + + ret = test_context_init(&ctx); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + ret = flb_input_set_property(ctx.input, "rate_window", "5"); + TEST_CHECK(ret == 0); + ret = flb_input_set_property(ctx.input, "rate_gate", "true"); + TEST_CHECK(ret == 0); + ret = flb_input_set_property(ctx.input, "rate_gate.max_bytes", "200"); + TEST_CHECK(ret == 0); + ret = flb_input_set_property(ctx.input, "rate_gate.max_records", "30"); + TEST_CHECK(ret == 0); + ret = flb_input_set_property(ctx.input, "rate_gate.backpressure", "false"); + TEST_CHECK(ret == 0); + ret = flb_input_set_property(ctx.input, "rate_gate.resume_ratio", "0.70"); + TEST_CHECK(ret == 0); + + TEST_CHECK(ctx.input->rate_window_size == (5 * FLB_NSEC_IN_SEC)); + TEST_CHECK(ctx.input->rate_gate_enabled == FLB_TRUE); + TEST_CHECK(ctx.input->rate_gate_max_bytes == 200); + TEST_CHECK(ctx.input->rate_gate_max_records == 30); + TEST_CHECK(ctx.input->rate_gate_use_backpressure == FLB_FALSE); + TEST_CHECK(ctx.input->rate_gate_resume_ratio > 0.69 && + ctx.input->rate_gate_resume_ratio < 0.71); + + test_context_destroy(&ctx); +} + +static void test_rate_update_window_rollover() +{ + int ret; + uint64_t ts; + struct test_context ctx; + + ret = test_context_init(&ctx); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + ctx.input->rate_window_size = FLB_NSEC_IN_SEC; + ts = cfl_time_now(); + ctx.input->rate_window_start = ts; + + flb_input_rate_update(ctx.input, ts + (100 * (FLB_NSEC_IN_SEC / 1000)), 20, 200); + TEST_CHECK(ctx.input->rate_records == 0.0); + TEST_CHECK(ctx.input->rate_bytes == 0.0); + TEST_CHECK(ctx.input->rate_window_records == 20); + TEST_CHECK(ctx.input->rate_window_bytes == 200); + + flb_input_rate_update(ctx.input, ts + FLB_NSEC_IN_SEC, 0, 0); + TEST_CHECK(ctx.input->rate_records == 20.0); + TEST_CHECK(ctx.input->rate_bytes == 200.0); + TEST_CHECK(ctx.input->rate_window_records == 0); + TEST_CHECK(ctx.input->rate_window_bytes == 0); + + flb_input_rate_update(ctx.input, ts + FLB_NSEC_IN_SEC + + (200 * (FLB_NSEC_IN_SEC / 1000)), 4, 40); + TEST_CHECK(ctx.input->rate_window_records == 4); + TEST_CHECK(ctx.input->rate_window_bytes == 40); + + test_context_destroy(&ctx); +} + +static void test_rate_gate_hysteresis_resume() +{ + int ret; + struct test_context ctx; + + ret = test_context_init(&ctx); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + ctx.input->rate_gate_enabled = FLB_TRUE; + ctx.input->rate_gate_status = FLB_INPUT_RUNNING; + ctx.input->rate_gate_use_backpressure = FLB_FALSE; + ctx.input->rate_gate_max_bytes = 100; + ctx.input->rate_gate_resume_ratio = 0.80; + ctx.input->rate_window_start = cfl_time_now(); + ctx.input->rate_window_size = 10 * FLB_NSEC_IN_SEC; + + ctx.input->rate_bytes = 110.0; + ret = flb_input_rate_gate_protect(ctx.input); + TEST_CHECK(ret == FLB_TRUE); + TEST_CHECK(ctx.input->rate_gate_status == FLB_INPUT_PAUSED); + + ctx.input->rate_bytes = 85.0; + flb_input_rate_gate_maybe_resume(ctx.input); + TEST_CHECK(ctx.input->rate_gate_status == FLB_INPUT_PAUSED); + + ctx.input->rate_bytes = 75.0; + flb_input_rate_gate_maybe_resume(ctx.input); + TEST_CHECK(ctx.input->rate_gate_status == FLB_INPUT_RUNNING); + + test_context_destroy(&ctx); +} + +static void test_rate_gate_pause_resume_stability() +{ + int ret; + struct test_context ctx; + + ret = test_context_init(&ctx); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + ctx.input->rate_gate_enabled = FLB_TRUE; + ctx.input->rate_gate_status = FLB_INPUT_RUNNING; + ctx.input->rate_gate_use_backpressure = FLB_FALSE; + ctx.input->rate_gate_max_records = 10; + ctx.input->rate_window_start = cfl_time_now(); + ctx.input->rate_window_size = 10 * FLB_NSEC_IN_SEC; + + ctx.input->rate_records = 11.0; + ret = flb_input_rate_gate_protect(ctx.input); + TEST_CHECK(ret == FLB_TRUE); + TEST_CHECK(ctx.input->rate_gate_status == FLB_INPUT_PAUSED); + + ctx.input->mem_buf_status = FLB_INPUT_PAUSED; + ctx.input->rate_records = 1.0; + flb_input_rate_gate_maybe_resume(ctx.input); + TEST_CHECK(ctx.input->rate_gate_status == FLB_INPUT_PAUSED); + TEST_CHECK(ctx.input->mem_buf_status == FLB_INPUT_PAUSED); + + ctx.input->rate_gate_status = FLB_INPUT_PAUSED; + ctx.input->mem_buf_status = FLB_INPUT_RUNNING; + ctx.input->rate_window_records = 200; + flb_input_rate_gate_maybe_resume(ctx.input); + TEST_CHECK(ctx.input->rate_gate_status == FLB_INPUT_PAUSED); + TEST_CHECK(ctx.input->rate_window_records == 200); + + test_context_destroy(&ctx); +} + +static void test_resume_without_context_is_safe() +{ + int ret; + struct test_context ctx; + + ret = test_context_init(&ctx); + TEST_CHECK(ret == 0); + if (ret != 0) { + return; + } + + TEST_CHECK(ctx.input->context == NULL); + + ret = flb_input_resume(ctx.input); + TEST_CHECK(ret == 0); + + test_context_destroy(&ctx); +} + +TEST_LIST = { + {"rate_gate_backpressure_limited", test_rate_gate_backpressure_limited}, + {"rate_gate_backpressure_disabled", test_rate_gate_backpressure_disabled}, + {"rate_gate_property_parsing", test_rate_gate_property_parsing}, + {"rate_update_window_rollover", test_rate_update_window_rollover}, + {"rate_gate_hysteresis_resume", test_rate_gate_hysteresis_resume}, + {"rate_gate_pause_resume_stability", test_rate_gate_pause_resume_stability}, + {"resume_without_context_is_safe", test_resume_without_context_is_safe}, + {0} +};