From ec29b6416424c9b34005f31967085e1578e2aeb3 Mon Sep 17 00:00:00 2001 From: Yu Yi Date: Fri, 26 Jun 2026 17:13:26 -0400 Subject: [PATCH 1/4] metrics: read shared counter with relaxed atomics flb_metrics_sum() updates a counter from the owning input or output worker thread while the metrics exporter reads the same counter from the main engine thread. With a threaded input this is a data race reported by ThreadSanitizer; it is benign on the supported hardware but undefined under the C memory model. Add a small flb_atomic.h helper with relaxed load/store/fetch_add and use it for the metric value on both the summing and the reading paths. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Yu Yi --- include/fluent-bit/flb_atomic.h | 101 ++++++++++++++++++++++++++++++++ src/flb_metrics.c | 12 +++- 2 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 include/fluent-bit/flb_atomic.h diff --git a/include/fluent-bit/flb_atomic.h b/include/fluent-bit/flb_atomic.h new file mode 100644 index 00000000000..9a9ab159cee --- /dev/null +++ b/include/fluent-bit/flb_atomic.h @@ -0,0 +1,101 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_ATOMIC_H +#define FLB_ATOMIC_H + +/* + * Minimal relaxed-atomic helpers for scalar values that are read and written + * by more than one thread (e.g. counters and one-shot status fields shared + * between a threaded input worker and the main engine). + * + * Aligned word-sized loads/stores are already atomic on every platform Fluent + * Bit targets, but accessing them from multiple threads with plain operators is + * a C-level data race (undefined behavior, and flagged by ThreadSanitizer). + * These helpers make such accesses well defined. Relaxed ordering is used on + * purpose: callers only require atomicity of the individual value, not ordering + * relative to other memory (for ordered hand-offs use a mutex instead). + * + * The helpers are type-generic (int, size_t, uint64_t, ...). + */ + +#if defined(__GNUC__) || defined(__clang__) + +#define flb_atomic_load(ptr) __atomic_load_n((ptr), __ATOMIC_RELAXED) +#define flb_atomic_store(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_RELAXED) +#define flb_atomic_fetch_add(ptr, v) __atomic_fetch_add((ptr), (v), __ATOMIC_RELAXED) + +#elif defined(_MSC_VER) + +/* + * MSVC backend: the Interlocked intrinsics are atomic (full barrier, which is + * stronger than the relaxed ordering we need but always correct). The helpers + * dispatch on the operand width so they work for both 32-bit and 64-bit scalars + * on 32-bit and 64-bit targets. + */ +#include + +static __forceinline long long flb_atomic_load_n(volatile void *ptr, size_t width) +{ +#ifdef _WIN64 + if (width == 8) { + return (long long) _InterlockedOr64((volatile __int64 *) ptr, 0); + } +#endif + (void) width; + return (long long) _InterlockedOr((volatile long *) ptr, 0); +} + +static __forceinline void flb_atomic_store_n(volatile void *ptr, long long val, + size_t width) +{ +#ifdef _WIN64 + if (width == 8) { + (void) _InterlockedExchange64((volatile __int64 *) ptr, (__int64) val); + return; + } +#endif + (void) width; + (void) _InterlockedExchange((volatile long *) ptr, (long) val); +} + +static __forceinline long long flb_atomic_fetch_add_n(volatile void *ptr, + long long val, size_t width) +{ +#ifdef _WIN64 + if (width == 8) { + return (long long) _InterlockedExchangeAdd64((volatile __int64 *) ptr, + (__int64) val); + } +#endif + (void) width; + return (long long) _InterlockedExchangeAdd((volatile long *) ptr, (long) val); +} + +#define flb_atomic_load(ptr) flb_atomic_load_n((ptr), sizeof(*(ptr))) +#define flb_atomic_store(ptr, val) flb_atomic_store_n((ptr), (long long) (val), \ + sizeof(*(ptr))) +#define flb_atomic_fetch_add(ptr, v) flb_atomic_fetch_add_n((ptr), (long long) (v), \ + sizeof(*(ptr))) + +#else +#error "flb_atomic.h: no atomic backend available for this compiler" +#endif + +#endif diff --git a/src/flb_metrics.c b/src/flb_metrics.c index 4b9a17473a7..7e01ff1047e 100644 --- a/src/flb_metrics.c +++ b/src/flb_metrics.c @@ -27,6 +27,7 @@ #include #include #include +#include #include static int id_exists(int id, struct flb_metrics *metrics) @@ -181,7 +182,12 @@ int flb_metrics_sum(int id, size_t val, struct flb_metrics *metrics) return -1; } - m->val += val; + /* + * The counter is summed from the owning input/output worker thread while the + * metrics exporter reads it from the main engine thread; use a relaxed + * atomic so the access is well defined (see flb_metrics_dump_values()). + */ + flb_atomic_fetch_add(&m->val, val); return 0; } @@ -214,7 +220,7 @@ int flb_metrics_print(struct flb_metrics *metrics) mk_list_foreach(head, &metrics->list) { m = mk_list_entry(head, struct flb_metric, _head); - printf(", '%s' => %lu", m->title, m->val); + printf(", '%s' => %lu", m->title, (unsigned long) flb_atomic_load(&m->val)); } printf("\n"); @@ -240,7 +246,7 @@ int flb_metrics_dump_values(char **out_buf, size_t *out_size, m = mk_list_entry(head, struct flb_metric, _head); msgpack_pack_str(&mp_pck, flb_sds_len(m->title)); msgpack_pack_str_body(&mp_pck, m->title, flb_sds_len(m->title)); - msgpack_pack_uint64(&mp_pck, m->val); + msgpack_pack_uint64(&mp_pck, flb_atomic_load(&m->val)); } *out_buf = mp_sbuf.data; From bd0f9d964abce5daf4a003a8b9f9c1601a63320f Mon Sep 17 00:00:00 2001 From: Yu Yi Date: Fri, 26 Jun 2026 17:13:26 -0400 Subject: [PATCH 2/4] input: skip threaded inputs in flb_input_collector_fd flb_input_collector_fd() runs on the main engine thread but iterated the collectors of every input, including threaded ones. A threaded input initializes its collector descriptors from its own worker thread, so this races with the main thread reading them at startup. Those collectors are registered and dispatched in the input's own thread and event loop, so they are never matched here. Skipping threaded inputs removes the race and avoids needless iteration. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Yu Yi --- src/flb_input.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/flb_input.c b/src/flb_input.c index e990eee238c..2496c526869 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -2484,6 +2484,17 @@ int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config) mk_list_foreach(head, &config->inputs) { ins = mk_list_entry(head, struct flb_input_instance, _head); + + /* + * Collectors of a threaded input are registered and dispatched in the + * input's own thread/event loop (see flb_input_thread.c), never through + * this main-thread handler. Skipping them avoids a benign data race with + * the worker thread that concurrently initializes those collector fds. + */ + if (flb_input_is_threaded(ins)) { + continue; + } + mk_list_foreach(head_coll, &ins->collectors) { collector = mk_list_entry(head_coll, struct flb_input_collector, _head); if (collector->fd_event == fd) { From 706b07bc54da0ecdba117ea17d15030e75a58eff Mon Sep 17 00:00:00 2001 From: Yu Yi Date: Fri, 26 Jun 2026 17:13:26 -0400 Subject: [PATCH 3/4] engine: publish grace_input with a relaxed atomic store config->grace_input is written once by the engine thread during startup and read concurrently by the supervisor on the main thread, which ThreadSanitizer reports as a data race. Store it with a relaxed atomic; the matching atomic read is done at the supervisor entry point. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Yu Yi --- src/flb_engine.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/flb_engine.c b/src/flb_engine.c index 63178da843c..a0cf4a6c59e 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -1123,6 +1124,14 @@ int flb_engine_start(struct flb_config *config) return -1; } + /* + * Publish the supervisor grace window before signaling startup: the start + * notification is the happens-before edge the main thread synchronizes on, + * so storing grace_input afterwards could let it observe a stale value. + */ + flb_atomic_store(&config->grace_input, config->grace / 2); + flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input); + /* Signal that we have started */ flb_engine_started(config); @@ -1134,9 +1143,6 @@ int flb_engine_start(struct flb_config *config) return -2; } - config->grace_input = config->grace / 2; - flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input); - while (1) { rb_flush_flag = FLB_FALSE; From 6395b8cd55f90b5f3c7e88ff598ce71468a2e364 Mon Sep 17 00:00:00 2001 From: Yu Yi Date: Fri, 26 Jun 2026 17:13:26 -0400 Subject: [PATCH 4/4] bin: read grace_input with a relaxed atomic load The supervisor entry point reads config->grace_input, which the engine thread publishes during startup. Read it with a relaxed atomic so the cross-thread access is well defined, matching the engine-side store. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Yu Yi --- src/fluent-bit.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 14e5f5682a4..a1baf8720d1 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -60,6 +60,7 @@ #include #include #include +#include #ifdef FLB_HAVE_MTRACE #include @@ -1473,8 +1474,9 @@ static int flb_main_run(int argc, char **argv) ctx = flb_context_get(); if (ctx != NULL && ctx->config != NULL) { + /* grace_input is published by the engine thread (flb_engine_start) */ flb_supervisor_child_update_grace(ctx->config->grace, - ctx->config->grace_input); + flb_atomic_load(&ctx->config->grace_input)); } #ifdef FLB_HAVE_CHUNK_TRACE @@ -1514,7 +1516,7 @@ static int flb_main_run(int argc, char **argv) if (supervisor_reload_notified == FLB_FALSE && ctx != NULL && ctx->config != NULL) { flb_supervisor_child_signal_shutdown(ctx->config->grace, - ctx->config->grace_input); + flb_atomic_load(&ctx->config->grace_input)); supervisor_reload_notified = FLB_TRUE; } @@ -1526,7 +1528,7 @@ static int flb_main_run(int argc, char **argv) supervisor_reload_notified = FLB_FALSE; if (ctx != NULL && ctx->config != NULL) { flb_supervisor_child_update_grace(ctx->config->grace, - ctx->config->grace_input); + flb_atomic_load(&ctx->config->grace_input)); } } else {