diff --git a/include/fluent-bit/flb_atomic.h b/include/fluent-bit/flb_atomic.h new file mode 100644 index 00000000000..9a9ab159cee --- /dev/null +++ b/include/fluent-bit/flb_atomic.h @@ -0,0 +1,101 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_ATOMIC_H +#define FLB_ATOMIC_H + +/* + * Minimal relaxed-atomic helpers for scalar values that are read and written + * by more than one thread (e.g. counters and one-shot status fields shared + * between a threaded input worker and the main engine). + * + * Aligned word-sized loads/stores are already atomic on every platform Fluent + * Bit targets, but accessing them from multiple threads with plain operators is + * a C-level data race (undefined behavior, and flagged by ThreadSanitizer). + * These helpers make such accesses well defined. Relaxed ordering is used on + * purpose: callers only require atomicity of the individual value, not ordering + * relative to other memory (for ordered hand-offs use a mutex instead). + * + * The helpers are type-generic (int, size_t, uint64_t, ...). + */ + +#if defined(__GNUC__) || defined(__clang__) + +#define flb_atomic_load(ptr) __atomic_load_n((ptr), __ATOMIC_RELAXED) +#define flb_atomic_store(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_RELAXED) +#define flb_atomic_fetch_add(ptr, v) __atomic_fetch_add((ptr), (v), __ATOMIC_RELAXED) + +#elif defined(_MSC_VER) + +/* + * MSVC backend: the Interlocked intrinsics are atomic (full barrier, which is + * stronger than the relaxed ordering we need but always correct). The helpers + * dispatch on the operand width so they work for both 32-bit and 64-bit scalars + * on 32-bit and 64-bit targets. + */ +#include + +static __forceinline long long flb_atomic_load_n(volatile void *ptr, size_t width) +{ +#ifdef _WIN64 + if (width == 8) { + return (long long) _InterlockedOr64((volatile __int64 *) ptr, 0); + } +#endif + (void) width; + return (long long) _InterlockedOr((volatile long *) ptr, 0); +} + +static __forceinline void flb_atomic_store_n(volatile void *ptr, long long val, + size_t width) +{ +#ifdef _WIN64 + if (width == 8) { + (void) _InterlockedExchange64((volatile __int64 *) ptr, (__int64) val); + return; + } +#endif + (void) width; + (void) _InterlockedExchange((volatile long *) ptr, (long) val); +} + +static __forceinline long long flb_atomic_fetch_add_n(volatile void *ptr, + long long val, size_t width) +{ +#ifdef _WIN64 + if (width == 8) { + return (long long) _InterlockedExchangeAdd64((volatile __int64 *) ptr, + (__int64) val); + } +#endif + (void) width; + return (long long) _InterlockedExchangeAdd((volatile long *) ptr, (long) val); +} + +#define flb_atomic_load(ptr) flb_atomic_load_n((ptr), sizeof(*(ptr))) +#define flb_atomic_store(ptr, val) flb_atomic_store_n((ptr), (long long) (val), \ + sizeof(*(ptr))) +#define flb_atomic_fetch_add(ptr, v) flb_atomic_fetch_add_n((ptr), (long long) (v), \ + sizeof(*(ptr))) + +#else +#error "flb_atomic.h: no atomic backend available for this compiler" +#endif + +#endif diff --git a/src/flb_engine.c b/src/flb_engine.c index 63178da843c..a0cf4a6c59e 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -1123,6 +1124,14 @@ int flb_engine_start(struct flb_config *config) return -1; } + /* + * Publish the supervisor grace window before signaling startup: the start + * notification is the happens-before edge the main thread synchronizes on, + * so storing grace_input afterwards could let it observe a stale value. + */ + flb_atomic_store(&config->grace_input, config->grace / 2); + flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input); + /* Signal that we have started */ flb_engine_started(config); @@ -1134,9 +1143,6 @@ int flb_engine_start(struct flb_config *config) return -2; } - config->grace_input = config->grace / 2; - flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input); - while (1) { rb_flush_flag = FLB_FALSE; diff --git a/src/flb_input.c b/src/flb_input.c index e990eee238c..2496c526869 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -2484,6 +2484,17 @@ int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config) mk_list_foreach(head, &config->inputs) { ins = mk_list_entry(head, struct flb_input_instance, _head); + + /* + * Collectors of a threaded input are registered and dispatched in the + * input's own thread/event loop (see flb_input_thread.c), never through + * this main-thread handler. Skipping them avoids a benign data race with + * the worker thread that concurrently initializes those collector fds. + */ + if (flb_input_is_threaded(ins)) { + continue; + } + mk_list_foreach(head_coll, &ins->collectors) { collector = mk_list_entry(head_coll, struct flb_input_collector, _head); if (collector->fd_event == fd) { diff --git a/src/flb_metrics.c b/src/flb_metrics.c index 4b9a17473a7..7e01ff1047e 100644 --- a/src/flb_metrics.c +++ b/src/flb_metrics.c @@ -27,6 +27,7 @@ #include #include #include +#include #include static int id_exists(int id, struct flb_metrics *metrics) @@ -181,7 +182,12 @@ int flb_metrics_sum(int id, size_t val, struct flb_metrics *metrics) return -1; } - m->val += val; + /* + * The counter is summed from the owning input/output worker thread while the + * metrics exporter reads it from the main engine thread; use a relaxed + * atomic so the access is well defined (see flb_metrics_dump_values()). + */ + flb_atomic_fetch_add(&m->val, val); return 0; } @@ -214,7 +220,7 @@ int flb_metrics_print(struct flb_metrics *metrics) mk_list_foreach(head, &metrics->list) { m = mk_list_entry(head, struct flb_metric, _head); - printf(", '%s' => %lu", m->title, m->val); + printf(", '%s' => %lu", m->title, (unsigned long) flb_atomic_load(&m->val)); } printf("\n"); @@ -240,7 +246,7 @@ int flb_metrics_dump_values(char **out_buf, size_t *out_size, m = mk_list_entry(head, struct flb_metric, _head); msgpack_pack_str(&mp_pck, flb_sds_len(m->title)); msgpack_pack_str_body(&mp_pck, m->title, flb_sds_len(m->title)); - msgpack_pack_uint64(&mp_pck, m->val); + msgpack_pack_uint64(&mp_pck, flb_atomic_load(&m->val)); } *out_buf = mp_sbuf.data; diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 14e5f5682a4..a1baf8720d1 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -60,6 +60,7 @@ #include #include #include +#include #ifdef FLB_HAVE_MTRACE #include @@ -1473,8 +1474,9 @@ static int flb_main_run(int argc, char **argv) ctx = flb_context_get(); if (ctx != NULL && ctx->config != NULL) { + /* grace_input is published by the engine thread (flb_engine_start) */ flb_supervisor_child_update_grace(ctx->config->grace, - ctx->config->grace_input); + flb_atomic_load(&ctx->config->grace_input)); } #ifdef FLB_HAVE_CHUNK_TRACE @@ -1514,7 +1516,7 @@ static int flb_main_run(int argc, char **argv) if (supervisor_reload_notified == FLB_FALSE && ctx != NULL && ctx->config != NULL) { flb_supervisor_child_signal_shutdown(ctx->config->grace, - ctx->config->grace_input); + flb_atomic_load(&ctx->config->grace_input)); supervisor_reload_notified = FLB_TRUE; } @@ -1526,7 +1528,7 @@ static int flb_main_run(int argc, char **argv) supervisor_reload_notified = FLB_FALSE; if (ctx != NULL && ctx->config != NULL) { flb_supervisor_child_update_grace(ctx->config->grace, - ctx->config->grace_input); + flb_atomic_load(&ctx->config->grace_input)); } } else {