Skip to content

input: input_chunk: Handle flow with events metrics per window#11698

Open
cosmo0920 wants to merge 8 commits into
masterfrom
cosmo0920-handle-flow-with-rate-gate
Open

input: input_chunk: Handle flow with events metrics per window#11698
cosmo0920 wants to merge 8 commits into
masterfrom
cosmo0920-handle-flow-with-rate-gate

Conversation

@cosmo0920
Copy link
Copy Markdown
Contributor

@cosmo0920 cosmo0920 commented Apr 10, 2026

This branch adds hysteresis-based resume control for input rate gating (rate_gate.resume_ratio), centralizes backpressure-adjusted effective limit computation, and applies those limits consistently to pause/resume decisions. It also fixes filesystem-mode enforcement gaps and adds periodic scheduler-driven resume checks so paused inputs can recover without requiring new ingestion events.

Testing

Includes expanded internal tests for parsing/window/hysteresis/pause-resume safety and updated config_rate_gate integration scenarios for fanout+retry and rollout paths across memrb/filesystem cases.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Using with integrated tests' configurations, there's no memory leaks:

fluent_bit_results_20260413_194530/valgrind.log
==40796== Memcheck, a memory error detector
==40796== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40796== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40796== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194530/fluent_bit.log
==40796== Parent PID: 40708
==40796== 
==40796== 
==40796== HEAP SUMMARY:
==40796==     in use at exit: 0 bytes in 0 blocks
==40796==   total heap usage: 10,690 allocs, 10,690 frees, 2,647,824 bytes allocated
==40796== 
==40796== All heap blocks were freed -- no leaks are possible
==40796== 
==40796== For lists of detected and suppressed errors, rerun with: -s
==40796== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194526/valgrind.log
==40750== Memcheck, a memory error detector
==40750== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40750== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40750== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194526/fluent_bit.log
==40750== Parent PID: 40708
==40750== 
==40750== 
==40750== HEAP SUMMARY:
==40750==     in use at exit: 0 bytes in 0 blocks
==40750==   total heap usage: 12,294 allocs, 12,294 frees, 3,004,701 bytes allocated
==40750== 
==40750== All heap blocks were freed -- no leaks are possible
==40750== 
==40750== For lists of detected and suppressed errors, rerun with: -s
==40750== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194523/valgrind.log
==40741== Memcheck, a memory error detector
==40741== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40741== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40741== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194523/fluent_bit.log
==40741== Parent PID: 40708
==40741== 
==40741== 
==40741== HEAP SUMMARY:
==40741==     in use at exit: 0 bytes in 0 blocks
==40741==   total heap usage: 9,995 allocs, 9,995 frees, 3,601,066 bytes allocated
==40741== 
==40741== All heap blocks were freed -- no leaks are possible
==40741== 
==40741== For lists of detected and suppressed errors, rerun with: -s
==40741== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194519/valgrind.log
==40731== Memcheck, a memory error detector
==40731== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40731== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40731== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194519/fluent_bit.log
==40731== Parent PID: 40708
==40731== 
==40731== 
==40731== HEAP SUMMARY:
==40731==     in use at exit: 0 bytes in 0 blocks
==40731==   total heap usage: 9,639 allocs, 9,639 frees, 2,753,979 bytes allocated
==40731== 
==40731== All heap blocks were freed -- no leaks are possible
==40731== 
==40731== For lists of detected and suppressed errors, rerun with: -s
==40731== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194516/valgrind.log
==40720== Memcheck, a memory error detector
==40720== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40720== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40720== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194516/fluent_bit.log
==40720== Parent PID: 40708
==40720== 
==40720== 
==40720== HEAP SUMMARY:
==40720==     in use at exit: 0 bytes in 0 blocks
==40720==   total heap usage: 11,180 allocs, 11,180 frees, 3,133,653 bytes allocated
==40720== 
==40720== All heap blocks were freed -- no leaks are possible
==40720== 
==40720== For lists of detected and suppressed errors, rerun with: -s
==40720== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194513/valgrind.log
==40710== Memcheck, a memory error detector
==40710== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40710== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40710== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194513/fluent_bit.log
==40710== Parent PID: 40708
==40710== 
==40710== 
==40710== HEAP SUMMARY:
==40710==     in use at exit: 0 bytes in 0 blocks
==40710==   total heap usage: 6,779 allocs, 6,779 frees, 1,777,537 bytes allocated
==40710== 
==40710== All heap blocks were freed -- no leaks are possible
==40710== 
==40710== For lists of detected and suppressed errors, rerun with: -s
==40710== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Input rate-gating with configurable time window, byte/record limits, optional backpressure, and resume hysteresis; exposes runtime rate and gate-state metrics.
  • Improvements
    • Pause/resume now considers both buffer and rate-gate state for more reliable backpressure and recovery; resume logic tightened for safety.
  • Tests
    • New integration and unit tests covering burst recovery, fanout/retries, filesystem/memory backends, steady overrate, and hysteresis behavior.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 10, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a rate-gate feature for inputs: per-window ingestion accounting, rate-based pausing/resuming with hysteresis and optional backpressure, CMetrics gauges for rates and gate state, public rate-control APIs, and integration + unit tests exercising burst, fanout, filesystem, and memrb scenarios.

Changes

Cohort / File(s) Summary
Header Declarations
include/fluent-bit/flb_input.h
Added FLB_NSEC_IN_SEC, new rate metric pointers and rate-gate/accounting fields on struct flb_input_instance, inline flb_input_paused() and three public rate-control function declarations.
Core Rate-Gate Implementation
src/flb_input.c
Parsed rate_window/rate_gate.* properties, initialized rate-gate state and CMetrics gauges, implemented flb_input_rate_update, flb_input_rate_gate_protect, flb_input_rate_gate_maybe_resume, and tightened flb_input_resume() to require ins->context.
Chunk Processing Integration
src/flb_input_chunk.c
Hooked rate accounting into chunk restore/append flows, replaced buffer-only pause checks with flb_input_paused(), prioritized rate-gate protection in flb_input_chunk_protect(), updated resume logic in flb_input_chunk_set_limits().
Integration Test Scenarios
tests/integration/scenarios/config_rate_gate/config/config_rate_gate_*.yaml
Added six scenario configs (burst_recovery, fanout_retry, filesystem, memrb, pipeline, steady_overrate) exercising various rate-gate/backpressure configurations.
Integration Test Suite
tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
New test module with Service harness, optional HTTP receiver, log/request polling helpers, and tests validating pause/resume, retry/fanout interactions, and rollout scenarios.
Unit Test Infrastructure
tests/internal/CMakeLists.txt, tests/internal/input_rate_gate.c
Registered new internal unit test executable and added unit tests covering backpressure behavior, window rollover, hysteresis resume, property parsing, and pause/resume edge cases.

Sequence Diagram(s)

sequenceDiagram
    participant Chunk as Input Chunk
    participant Input as Input Instance
    participant RateGate as Rate-Gate Control
    participant Metrics as CMetrics
    participant Pauser as Pause/Resume Handler

    Chunk->>Input: flb_input_rate_update(timestamp, records, bytes)
    Input->>RateGate: accumulate window counts
    alt window elapsed
        RateGate->>RateGate: compute per-second rates
        RateGate->>Metrics: publish rate gauges
    end

    Chunk->>Input: flb_input_chunk_protect()
    Input->>RateGate: flb_input_rate_gate_protect()
    RateGate->>RateGate: sample backpressure (busy chunks, retries)
    RateGate->>RateGate: evaluate limits
    alt limits exceeded && backpressure enabled
        RateGate->>Input: set rate_gate_status = PAUSED
        RateGate->>Metrics: update gate gauges
        RateGate-->>Input: return FLB_TRUE (pause)
    else within limits
        RateGate-->>Input: return FLB_FALSE
    end

    Chunk->>Input: flb_input_chunk_set_limits()
    Input->>RateGate: flb_input_rate_gate_maybe_resume()
    RateGate->>RateGate: apply resume_ratio hysteresis
    alt rates below resume threshold
        RateGate->>Input: set rate_gate_status = RUNNING
        RateGate->>Pauser: signal resume (flb_input_resume)
        RateGate->>Metrics: update gate gauges
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • http: add workers support #11563 — modifies struct flb_input_instance and related input/core files; likely overlapping changes to input declarations and rate accounting.

Suggested reviewers

  • edsiper
  • pwhelan

Poem

🐰 I counted bytes beneath the moonlit log,
I gated streams where bursts would once clog,
Windows roll on, metrics hum their tune,
Hysteresis waits — then resume comes soon,
A tiny hop: ingestion back in motion.

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.07% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'input: input_chunk: Handle flow with events metrics per window' refers to a real part of the change (metrics and windowing), but is vague and does not highlight the main point: hysteresis-based resume control with rate-gate limits and backpressure. Consider a more descriptive title that emphasizes the primary change, such as 'input_chunk: Add hysteresis-based rate-gate resume control' or 'input: Add rate-gate resume logic with hysteresis and backpressure modeling'.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch cosmo0920-handle-flow-with-rate-gate

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cosmo0920 cosmo0920 force-pushed the cosmo0920-handle-flow-with-rate-gate branch from 1ba07dc to 28ad2e6 Compare April 10, 2026 10:51
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/flb_input_chunk.c (2)

2451-2497: ⚠️ Potential issue | 🟠 Major

The new rate-gate guard creates a circular wait.

These resume branches now require rate_gate_status == FLB_INPUT_RUNNING, but flb_input_rate_gate_maybe_resume() only clears the rate gate after mem_buf_status and storage_buf_status are already running. In a mixed-pause state, neither side can transition first, so scheduler-driven recovery stalls.

Suggested fix
-        in->mem_buf_status == FLB_INPUT_PAUSED &&
-        in->rate_gate_status == FLB_INPUT_RUNNING) {
+        in->mem_buf_status == FLB_INPUT_PAUSED) {
         in->mem_buf_status = FLB_INPUT_RUNNING;
-        if (in->p->cb_resume) {
+        if (in->rate_gate_status == FLB_INPUT_RUNNING &&
+            in->p->cb_resume) {
             flb_input_resume(in);
             flb_info("[input] %s resume (mem buf overlimit - buf size %zuB now below limit %zuB)",
                      flb_input_name(in),
                      in->mem_chunks_size,
                      in->mem_buf_limit);
         }
     }
@@
-        in->storage_buf_status == FLB_INPUT_PAUSED &&
-        in->rate_gate_status == FLB_INPUT_RUNNING) {
+        in->storage_buf_status == FLB_INPUT_PAUSED) {
         in->storage_buf_status = FLB_INPUT_RUNNING;
-        if (in->p->cb_resume) {
+        if (in->rate_gate_status == FLB_INPUT_RUNNING &&
+            in->p->cb_resume) {
             flb_input_resume(in);
             flb_info("[input] %s resume (storage buf overlimit %zu/%zu)",
                      flb_input_name(in),
                      ((struct flb_storage_input *) in->storage)->cio->total_chunks_up,
                      ((struct flb_storage_input *) in->storage)->cio->max_chunks_up);
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input_chunk.c` around lines 2451 - 2497, The new guard requiring
rate_gate_status == FLB_INPUT_RUNNING causes a circular wait between the
mem/storage resume checks and the rate-gate clearance: call
flb_input_rate_gate_maybe_resume() before the memory/storage resume blocks (or
alternatively remove the rate_gate_status == FLB_INPUT_RUNNING check from the
mem/storage resume branches) so the rate gate can clear first and allow
flb_input_chunk_is_mem_overlimit, flb_input_chunk_is_storage_overlimit and
subsequent flb_input_resume() calls to proceed; update the logic around
in->rate_gate_status, flb_input_rate_gate_maybe_resume(), flb_input_resume(),
flb_input_paused(), and the two checks that use rate_gate_status to break the
deadlock.

1103-1116: ⚠️ Potential issue | 🟠 Major

Don't count restored backlog as fresh ingress.

flb_input_chunk_map() marks these chunks as fs_backlog, so feeding them into flb_input_rate_update() makes old filesystem backlog count against the live rate gate. A large backlog can pause the input on startup/rollout before any new events arrive.

Suggested fix
-        flb_input_rate_update(in, ts, ic->total_records, buf_size);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input_chunk.c` around lines 1103 - 1116, The code is updating input
rate metrics for chunks restored from disk, which were marked as fs_backlog by
flb_input_chunk_map(), causing old backlog to be counted as fresh ingress;
change the logic so flb_input_rate_update() is not called for fs_backlog chunks
(check the chunk/backlog flag on ic, e.g. ic->fs_backlog or the corresponding
indicator set by flb_input_chunk_map()) while retaining the counters if
desired—wrap the flb_input_rate_update(in, ts, ic->total_records, buf_size) call
in a conditional that skips it when the chunk is a filesystem backlog.
🧹 Nitpick comments (1)
tests/internal/input_rate_gate.c (1)

326-335: Add a backlog-vs-live rate-gate regression.

The new accounting touches both live appends and chunk restoration, but this suite never exercises an fs_backlog chunk being mapped before live traffic. A focused case here would catch startup/filesystem false positives.

Based on learnings, "Applies to tests/internal/**/*.{c,h} : Add regression tests for: mixed signals, processor drop/modify paths, multi-route fan-out, backlog + live ingestion parity".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/internal/input_rate_gate.c` around lines 326 - 335, Add a regression
test that exercises an fs_backlog chunk being mapped before any live appends to
ensure rate-gate accounting treats backlog vs live correctly: implement a new
test function (e.g., test_rate_gate_backlog_vs_live) that maps/ restores a
backlog chunk into the filesystem, then starts live append traffic and asserts
the rate-gate metrics/state (pause/resume/hysteresis or counters) reflect both
backlog and live paths without falsely throttling new live writes; add this test
symbol to TEST_LIST (alongside existing test_rate_gate_* entries) so the suite
runs it and ensure the test covers mixed signals (backlog mapping then live
ingestion) and verifies parity between backlog accounting and live append
accounting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/flb_input_chunk.c`:
- Around line 2451-2497: The new guard requiring rate_gate_status ==
FLB_INPUT_RUNNING causes a circular wait between the mem/storage resume checks
and the rate-gate clearance: call flb_input_rate_gate_maybe_resume() before the
memory/storage resume blocks (or alternatively remove the rate_gate_status ==
FLB_INPUT_RUNNING check from the mem/storage resume branches) so the rate gate
can clear first and allow flb_input_chunk_is_mem_overlimit,
flb_input_chunk_is_storage_overlimit and subsequent flb_input_resume() calls to
proceed; update the logic around in->rate_gate_status,
flb_input_rate_gate_maybe_resume(), flb_input_resume(), flb_input_paused(), and
the two checks that use rate_gate_status to break the deadlock.
- Around line 1103-1116: The code is updating input rate metrics for chunks
restored from disk, which were marked as fs_backlog by flb_input_chunk_map(),
causing old backlog to be counted as fresh ingress; change the logic so
flb_input_rate_update() is not called for fs_backlog chunks (check the
chunk/backlog flag on ic, e.g. ic->fs_backlog or the corresponding indicator set
by flb_input_chunk_map()) while retaining the counters if desired—wrap the
flb_input_rate_update(in, ts, ic->total_records, buf_size) call in a conditional
that skips it when the chunk is a filesystem backlog.

---

Nitpick comments:
In `@tests/internal/input_rate_gate.c`:
- Around line 326-335: Add a regression test that exercises an fs_backlog chunk
being mapped before any live appends to ensure rate-gate accounting treats
backlog vs live correctly: implement a new test function (e.g.,
test_rate_gate_backlog_vs_live) that maps/ restores a backlog chunk into the
filesystem, then starts live append traffic and asserts the rate-gate
metrics/state (pause/resume/hysteresis or counters) reflect both backlog and
live paths without falsely throttling new live writes; add this test symbol to
TEST_LIST (alongside existing test_rate_gate_* entries) so the suite runs it and
ensure the test covers mixed signals (backlog mapping then live ingestion) and
verifies parity between backlog accounting and live append accounting.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1b590a09-ca8b-412f-a77d-f630e579ae40

📥 Commits

Reviewing files that changed from the base of the PR and between 3a16ddf and b935e4b.

📒 Files selected for processing (12)
  • include/fluent-bit/flb_input.h
  • src/flb_input.c
  • src/flb_input_chunk.c
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
  • tests/internal/CMakeLists.txt
  • tests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (7)
  • tests/internal/CMakeLists.txt
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
tests/internal/input_rate_gate.c (1)

233-266: Test relies on implicit default values for resume conditions.

Looking at flb_input_rate_gate_maybe_resume() in src/flb_input.c, the resume check requires multiple conditions beyond just the rate threshold:

if (can_resume == FLB_TRUE &&
    ins->mem_buf_status == FLB_INPUT_RUNNING &&
    ins->storage_buf_status == FLB_INPUT_RUNNING &&
    ins->config->is_running == FLB_TRUE &&
    ins->config->is_ingestion_active == FLB_TRUE) {

The test at line 262-263 expects rate_gate_status to become FLB_INPUT_RUNNING, but doesn't explicitly set mem_buf_status, storage_buf_status, config->is_running, or config->is_ingestion_active. This relies on default initialization values being correct.

For test robustness and clarity, consider explicitly setting these preconditions:

🔧 Suggested improvement for explicit preconditions
     ctx.input->rate_gate_max_bytes = 100;
     ctx.input->rate_gate_resume_ratio = 0.80;
     ctx.input->rate_window_start = cfl_time_now();
     ctx.input->rate_window_size = 10 * FLB_NSEC_IN_SEC;
+    ctx.input->mem_buf_status = FLB_INPUT_RUNNING;
+    ctx.input->storage_buf_status = FLB_INPUT_RUNNING;
+    ctx.config->is_running = FLB_TRUE;
+    ctx.config->is_ingestion_active = FLB_TRUE;
 
     ctx.input->rate_bytes = 110.0;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/internal/input_rate_gate.c` around lines 233 - 266, The test
test_rate_gate_hysteresis_resume relies on implicit defaults when calling
flb_input_rate_gate_maybe_resume; explicitly set the preconditions used in the
resume check: set ctx.input->mem_buf_status = FLB_INPUT_RUNNING,
ctx.input->storage_buf_status = FLB_INPUT_RUNNING, ctx.input->config->is_running
= FLB_TRUE, and ctx.input->config->is_ingestion_active = FLB_TRUE before calling
flb_input_rate_gate_maybe_resume so the resume path in
flb_input_rate_gate_maybe_resume() can be exercised deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@tests/internal/input_rate_gate.c`:
- Around line 233-266: The test test_rate_gate_hysteresis_resume relies on
implicit defaults when calling flb_input_rate_gate_maybe_resume; explicitly set
the preconditions used in the resume check: set ctx.input->mem_buf_status =
FLB_INPUT_RUNNING, ctx.input->storage_buf_status = FLB_INPUT_RUNNING,
ctx.input->config->is_running = FLB_TRUE, and
ctx.input->config->is_ingestion_active = FLB_TRUE before calling
flb_input_rate_gate_maybe_resume so the resume path in
flb_input_rate_gate_maybe_resume() can be exercised deterministically.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d488da05-b9a8-4e56-98c0-82031eca786d

📥 Commits

Reviewing files that changed from the base of the PR and between b935e4b and 03c0db8.

📒 Files selected for processing (9)
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
  • tests/internal/CMakeLists.txt
  • tests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (6)
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/internal/CMakeLists.txt
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/flb_input_chunk.c`:
- Around line 1116-1118: The mapped-chunk path never triggers
flb_input_rate_update because ic->fs_backlog is left set by
flb_input_chunk_map(); to fix, ensure mapped/restored chunks get the same
rate-gate accounting: either clear ic->fs_backlog (set to FLB_FALSE) after
flb_input_chunk_map() when the chunk is restored into the active route, or
explicitly call flb_input_rate_update(in, ts, ic->total_records, buf_size) for
mapped chunks before this conditional; locate flb_input_chunk_map(), the
ic->fs_backlog field, and the flb_input_rate_update(...) call to implement the
chosen change so restored chunks preserve route state and accounting parity with
live-ingested chunks.

In `@src/flb_input.c`:
- Around line 981-987: The code currently parses rate_gate.max_records using
atoi (in the prop_key_check(...) branch) which silently accepts malformed
strings; replace atoi with strtol to strictly validate the entire tmp string:
call strtol(tmp, &endptr, 10), check errno for ERANGE, ensure endptr points to
the terminating NUL (no leftover characters), and verify the parsed value is >=
0 and fits into size_t (reject negatives and out-of-range values); on any
validation failure free tmp with flb_sds_destroy(tmp) and return -1, otherwise
assign ins->rate_gate_max_records = (size_t) parsed_value.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 04a9929e-c6a9-4276-9926-9db5414d48b5

📥 Commits

Reviewing files that changed from the base of the PR and between d4fe0b6 and 4fd1b83.

📒 Files selected for processing (12)
  • include/fluent-bit/flb_input.h
  • src/flb_input.c
  • src/flb_input_chunk.c
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
  • tests/internal/CMakeLists.txt
  • tests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (6)
  • tests/internal/CMakeLists.txt
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml

Comment thread src/flb_input_chunk.c
Comment thread src/flb_input.c Outdated
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant