opentelemetry: add OTLP JSON and OTLP Proto helpers#11624
opentelemetry: add OTLP JSON and OTLP Proto helpers#11624
Conversation
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
📝 WalkthroughWalkthroughMade yyjson integration optional via CMake; added a new flb_json public API and implementation (immutable + mutable JSON); added OTLP JSON/proto encoding for logs/metrics/traces; extended Kafka/stdout plugins to emit OTLP; added metric unit support in cmetrics; adjusted packer/parser to conditionally use yyjson; expanded tests and added synchronization for metrics chunks. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Event Producer
participant KafkaPlugin as out_kafka
participant OTLPConv as OTLP Converter
participant Kafka as Kafka Broker
Client->>KafkaPlugin: push event_chunk (msgpack)
activate KafkaPlugin
KafkaPlugin->>KafkaPlugin: inspect ctx->format
alt OTLP JSON
KafkaPlugin->>OTLPConv: msgpack -> OTLP JSON
activate OTLPConv
OTLPConv->>OTLPConv: decode msgpack, build OTLP JSON
OTLPConv-->>KafkaPlugin: JSON bytes
deactivate OTLPConv
else OTLP Proto
KafkaPlugin->>OTLPConv: msgpack -> OTLP Proto
activate OTLPConv
OTLPConv->>OTLPConv: decode to cmt/ctr, build protobuf messages
OTLPConv-->>KafkaPlugin: binary proto bytes
deactivate OTLPConv
end
KafkaPlugin->>KafkaPlugin: handle queue-full retries
KafkaPlugin->>Kafka: produce_raw_payload(bytes)
KafkaPlugin-->>Client: return status
deactivate KafkaPlugin
sequenceDiagram
participant App as Consumer/Library user
participant flb_json as flb_json API
participant flb_pack as packer (msgpack)
participant yyjson as yyjson (optional)
App->>flb_json: flb_json_read(json, len)
activate flb_json
flb_json->>flb_pack: flb_pack_json(...)
flb_pack-->>flb_json: msgpack buffer
flb_json->>flb_json: wrap msgpack into flb_json_doc
flb_json-->>App: flb_json_doc*
deactivate flb_json
App->>flb_json: flb_json_write_pretty(doc)
activate flb_json
alt FLB_HAVE_YYJSON
flb_json->>yyjson: convert -> yyjson_mut, yyjson_write(pretty)
yyjson-->>flb_json: pretty JSON
else
flb_json->>flb_json: render msgpack -> pretty JSON
end
flb_json-->>App: JSON string
deactivate flb_json
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a1ae9ea8fc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7f4b0719f2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
plugins/out_kafka/kafka.c
Outdated
| } | ||
| } | ||
|
|
||
| if (ret != CTR_MPACK_INSUFFICIENT_DATA) { |
There was a problem hiding this comment.
Accept decoder EOF status for OTLP trace flushes
After the trace decode loop, this check rejects every status except CTR_MPACK_INSUFFICIENT_DATA, but ctr_decode_msgpack_create() reaches end-of-buffer as CTR_MPACK_ENGINE_ERROR once off consumes the input (the JSON helper in this same change already treats that EOF pattern as valid). In format otlp_proto, valid trace chunks can therefore be produced and then still returned as flush errors, causing retries and duplicate Kafka messages.
Useful? React with 👍 / 👎.
| } | ||
|
|
||
| formatted = flb_sds_create_len(buffer, length); | ||
| flb_free(buffer); |
There was a problem hiding this comment.
Use matching deallocator for prettified OTLP JSON buffer
flb_json_prettify() can return an SDS buffer on non-YYJSON builds (its fallback path returns buffer directly), but this site always calls flb_free(buffer). When FLB_YYJSON is disabled and stdout uses otlp_json_pretty, this frees an SDS pointer with the wrong deallocator and can corrupt memory or crash.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/flb_pack.c (1)
1237-1256:⚠️ Potential issue | 🟠 MajorGuard OTLP format enums against shared formatter in four plugins.
flb_pack_to_json_format_type()now returnsFLB_PACK_JSON_FORMAT_OTLP(4) andFLB_PACK_JSON_FORMAT_OTLP_PRETTY(5), butflb_pack_msgpack_to_json_format()only handles formats 0–3 and returnsNULLfor anything else. Four output plugins—tcp, websocket, udp, and null—parse the config string and directly pass the resulting enum to the generic formatter without validation, causing them to acceptotlp_jsonandotlp_json_prettybut fail at runtime with null payload errors.Protected plugins (http, stdout) use explicit format allowlists or OTLP-specific branches and are unaffected.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_pack.c` around lines 1237 - 1256, flb_pack_msgpack_to_json_format currently only handles enums 0–3 causing otlp enums (returned by flb_pack_to_json_format_type) to yield NULL; either extend flb_pack_msgpack_to_json_format to handle FLB_PACK_JSON_FORMAT_OTLP and FLB_PACK_JSON_FORMAT_OTLP_PRETTY (returning an appropriate non-NULL formatter or a dedicated OTLP formatter) or add validation in the four plugins that parse the format string (tcp, websocket, udp, null) to reject "otlp_json" and "otlp_json_pretty" before calling flb_pack_msgpack_to_json_format; update the code paths that call flb_pack_to_json_format_type and flb_pack_msgpack_to_json_format to ensure every enum value returned by flb_pack_to_json_format_type is either supported by flb_pack_msgpack_to_json_format or explicitly rejected with a clear error.
🧹 Nitpick comments (1)
tests/runtime/filter_log_to_metrics.c (1)
208-229:wait_with_timeout()now always burns the full timeout.Since this loop no longer exits on new or stable output, every case in this file now sleeps the full 2s even when the callback finished immediately. Consider stopping once
data_sizehas stopped changing for a couple of polls so the buffering fix doesn't add avoidable CI time.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/runtime/filter_log_to_metrics.c` around lines 208 - 229, The wait_with_timeout loop currently always waits the full timeout; update wait_with_timeout to exit early when output size stabilizes by watching a passed-in size counter (e.g., size_t *data_size_ptr) or similar observable (rather than only elapsed time). Inside wait_with_timeout (the function shown), poll the observed data size each iteration, track the previous value and a stable_count, and break if the value stays unchanged for two consecutive polls (or another small threshold) or when timeout elapses; keep the existing sleep/poll cadence and preserve the out_result behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 924-925: The plugin currently sets .event_type = FLB_OUTPUT_LOGS |
FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES on the static plugin descriptor which
incorrectly advertises metrics/traces for every out_kafka instance; change the
static descriptor to only advertise FLB_OUTPUT_LOGS, and when parsing the
instance format in the out_kafka init/config routine (e.g., kafka_config_init or
the function that sets instance->event_type based on format) conditionally OR in
FLB_OUTPUT_METRICS and FLB_OUTPUT_TRACES only for OTLP/OTLP-compatible formats
so non-OTLP instances (json/msgpack/gelf/raw) are not advertised as
metrics/traces-capable.
In `@src/flb_input_metric.c`:
- Around line 102-115: The ht_metric_chunks access in flb_input_metrics_append
is not synchronized and can race when an input instance is threaded
(FLB_INPUT_THREADED / is_threaded); add a mutex (e.g., metrics_mutex) to struct
flb_input_instance and use it to protect all operations on ht_metric_chunks
(get, add, del) including the get-then-delete sequence shown, and update
flb_input_chunk.c to also acquire the same mutex around its add/delete calls so
all hash-table mutations are serialized for that instance.
In `@src/flb_json.c`:
- Around line 1577-1608: The helpers flb_json_mut_obj_add_int,
flb_json_mut_obj_add_real (and the siblings flb_json_mut_obj_add_uint and
flb_json_mut_obj_add_str around 1611-1667) call mut_value_create and/or
strlen(value) without first validating inputs; mirror the NULL-guard behavior
used in flb_json_mut_obj_add_bool by checking that 'document' (and for the _str
helper also 'value') are non-NULL before calling mut_value_create or strlen, and
return 0 on invalid inputs so they don't dereference NULL; apply these same
input checks to flb_json_mut_obj_add_uint and flb_json_mut_obj_add_str as well.
- Around line 1401-1407: Ensure values added to a document belong to that same
document before linking to avoid dangling pointers: in
flb_json_mut_doc_set_root, verify the incoming root's owner matches document
(e.g., check root->parent_doc or root->doc == document) and refuse to set if it
doesn't; apply the same ownership check in flb_json_mut_arr_add_val and
flb_json_mut_obj_add_val for the value being added (ensure val->parent_doc ==
document) and return an error / no-op when the check fails. Include clear
fail-return paths in those functions so callers can detect the rejection.
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c`:
- Around line 1056-1064: The binary branch handling MSGPACK_OBJECT_BIN currently
copies any length; mirror the string-path OTLP ID length validation by rejecting
or skipping binary values whose value->via.bin.size is not the expected OTLP ID
lengths (16 for trace_id, 8 for span_id) before allocating field->data and
memcpy; update the logic around field->data, memcpy(value->via.bin.ptr, ...),
and field->len in flb_opentelemetry_otlp_proto.c to perform the same size checks
as the string handling so malformed/corrupted IDs are not emitted.
- Around line 395-416: msgpack_kv_to_otlp_any_value() currently assumes
input_pair->key is a string and reads input_pair->key.via.str directly; first
check the key type (e.g., input_pair->key.type == MSGPACK_OBJECT_STR) and
reject/return NULL (or clean up and return NULL) if it's not a string to avoid
reading the wrong union member; do this check immediately after allocating kv
(before accessing key.via.str) and ensure you free kv (and any allocated
subfields) on the non-string-path so no leaks occur.
In `@tests/internal/opentelemetry.c`:
- Around line 2143-2149: The test currently uses TEST_CHECK which doesn't stop
execution on failure; if flb_opentelemetry_metrics_json_to_cmt or
flb_opentelemetry_json_traces_to_ctrace fails the subsequent
cfl_list_entry_first and cmt_encode_msgpack_create / ctr_encode_msgpack_create
calls run on invalid data and can crash. Change the checks after
flb_opentelemetry_metrics_json_to_cmt and
flb_opentelemetry_json_traces_to_ctrace to abort the test on failure (e.g.,
replace TEST_CHECK with a fail-fast macro such as TEST_REQUIRE or add an
immediate return/abort when ret != 0) so that cfl_list_entry_first and
cmt_encode_msgpack_create / ctr_encode_msgpack_create are only called when setup
succeeded.
- Around line 592-599: The buffer returned by flb_json_write() is allocated via
Fluent Bit's allocator and must be freed with flb_free(), not libc free();
locate the code using flb_json_write() (the variable named buffer alongside
flb_json_doc_destroy and flb_sds_create_len) and replace the call to
free(buffer) with flb_free(buffer) so the Fluent Bit allocator is used
consistently.
---
Outside diff comments:
In `@src/flb_pack.c`:
- Around line 1237-1256: flb_pack_msgpack_to_json_format currently only handles
enums 0–3 causing otlp enums (returned by flb_pack_to_json_format_type) to yield
NULL; either extend flb_pack_msgpack_to_json_format to handle
FLB_PACK_JSON_FORMAT_OTLP and FLB_PACK_JSON_FORMAT_OTLP_PRETTY (returning an
appropriate non-NULL formatter or a dedicated OTLP formatter) or add validation
in the four plugins that parse the format string (tcp, websocket, udp, null) to
reject "otlp_json" and "otlp_json_pretty" before calling
flb_pack_msgpack_to_json_format; update the code paths that call
flb_pack_to_json_format_type and flb_pack_msgpack_to_json_format to ensure every
enum value returned by flb_pack_to_json_format_type is either supported by
flb_pack_msgpack_to_json_format or explicitly rejected with a clear error.
---
Nitpick comments:
In `@tests/runtime/filter_log_to_metrics.c`:
- Around line 208-229: The wait_with_timeout loop currently always waits the
full timeout; update wait_with_timeout to exit early when output size stabilizes
by watching a passed-in size counter (e.g., size_t *data_size_ptr) or similar
observable (rather than only elapsed time). Inside wait_with_timeout (the
function shown), poll the observed data size each iteration, track the previous
value and a stable_count, and break if the value stays unchanged for two
consecutive polls (or another small threshold) or when timeout elapses; keep the
existing sleep/poll cadence and preserve the out_result behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0b2e1eb7-387b-475c-b150-d96ef70c35d8
📒 Files selected for processing (32)
CMakeLists.txtcmake/headers.cmakeinclude/fluent-bit/flb_json.hinclude/fluent-bit/flb_opentelemetry.hinclude/fluent-bit/flb_pack.hinclude/fluent-bit/flb_time.hlib/cmetrics/CMakeLists.txtlib/cmetrics/src/cmt_decode_msgpack.clib/cmetrics/src/cmt_encode_msgpack.clib/cmetrics/tests/cat.clib/cmetrics/tests/encoding.clib/cmetrics/tests/opentelemetry.clib/ctraces/.github/workflows/packages.yamllib/ctraces/CMakeLists.txtlib/ctraces/include/ctraces/ctr_encode_opentelemetry.hlib/ctraces/src/ctr_id.cplugins/out_kafka/kafka.cplugins/out_kafka/kafka_config.cplugins/out_kafka/kafka_config.hplugins/out_stdout/stdout.csrc/CMakeLists.txtsrc/flb_input_metric.csrc/flb_json.csrc/flb_oauth2_jwt.csrc/flb_pack.csrc/flb_pack_json.csrc/flb_parser_json.csrc/opentelemetry/flb_opentelemetry_otlp_json.csrc/opentelemetry/flb_opentelemetry_otlp_proto.ctests/internal/mp.ctests/internal/opentelemetry.ctests/runtime/filter_log_to_metrics.c
plugins/out_kafka/kafka.c
Outdated
| .flags = 0, | ||
| .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES |
There was a problem hiding this comment.
Don't advertise metrics/traces for every Kafka instance.
src/flb_output.c copies .event_type into each output instance, so this makes out_kafka look metrics/traces-capable even when the instance is configured for json, msgpack, gelf, or raw. Those instances then hit the new rejection path at Line 723, and builds without FLB_HAVE_METRICS still advertise metrics they cannot encode. Keep the broader mask behind the OTLP formats instead of setting it statically on the plugin.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@plugins/out_kafka/kafka.c` around lines 924 - 925, The plugin currently sets
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES on the
static plugin descriptor which incorrectly advertises metrics/traces for every
out_kafka instance; change the static descriptor to only advertise
FLB_OUTPUT_LOGS, and when parsing the instance format in the out_kafka
init/config routine (e.g., kafka_config_init or the function that sets
instance->event_type based on format) conditionally OR in FLB_OUTPUT_METRICS and
FLB_OUTPUT_TRACES only for OTLP/OTLP-compatible formats so non-OTLP instances
(json/msgpack/gelf/raw) are not advertised as metrics/traces-capable.
| if (ret == 0 && tag != NULL) { | ||
| void *chunk_ref; | ||
|
|
||
| /* | ||
| * Keep metric chunks short-lived per append. Reusing the same tag-bound | ||
| * metric chunk can delay task creation for rapidly updated series, | ||
| * which makes runtime consumers miss freshly generated metrics. | ||
| */ | ||
| chunk_ref = flb_hash_table_get_ptr(ins->ht_metric_chunks, tag, tag_len); | ||
| if (chunk_ref != NULL) { | ||
| flb_hash_table_del_ptr(ins->ht_metric_chunks, | ||
| tag, tag_len, chunk_ref); | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if hash table operations have thread-safety documentation or locking
rg -n -A5 'flb_hash_table_del_ptr|flb_hash_table_get_ptr' --type=c src/flb_hash_table.c include/fluent-bit/flb_hash_table.h | head -80Repository: fluent/fluent-bit
Length of output: 1329
🏁 Script executed:
# Get full implementation of flb_hash_table_del_ptr function
rg -n 'flb_hash_table_del_ptr' --type=c -A 30 src/flb_hash_table.c | head -100Repository: fluent/fluent-bit
Length of output: 1087
🏁 Script executed:
# Get the structure definition of flb_hash_table
rg -n 'struct flb_hash_table' --type=h include/fluent-bit/flb_hash_table.h -B 2 -A 15Repository: fluent/fluent-bit
Length of output: 2219
🏁 Script executed:
# Check how ht_metric_chunks is initialized and what synchronization exists
rg -n 'ht_metric_chunks' --type=c src/flb_input_metric.c -B 2 -A 2 | head -100Repository: fluent/fluent-bit
Length of output: 416
🏁 Script executed:
# Look for any mutex or locking patterns around hash table operations
rg -n 'mutex|lock|pthread' --type=c src/flb_input_metric.c | head -50Repository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
# Find where ht_metric_chunks is initialized
rg -n 'ht_metric_chunks.*=' --type=c src/flb_input_metric.c -B 2 -A 2Repository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
# Check the broader context of where this deletion happens - look for the function signature
rg -n 'flb_input_metric_append' --type=c src/flb_input_metric.c -B 5 -A 1 | head -50Repository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
# Look at the input plugin structure to understand synchronization
rg -n 'struct flb_input_instance' --type=h include/fluent-bit/ -A 30 | head -100Repository: fluent/fluent-bit
Length of output: 6846
🏁 Script executed:
# Check if there's any documentation about thread safety in hash table
rg -n 'thread|Thread|THREAD|mutex|sync|concurrent' --type=h include/fluent-bit/flb_hash_table.hRepository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
# Read the flb_input_metric.c file to see the full context
wc -l src/flb_input_metric.cRepository: fluent/fluent-bit
Length of output: 87
🏁 Script executed:
# Find the flb_input_instance structure definition
rg -n 'struct flb_input_instance {' --type=h include/fluent-bit/ -A 50 | head -150Repository: fluent/fluent-bit
Length of output: 178
🏁 Script executed:
# Search for ht_metric_chunks in all files to understand its lifecycle
rg -n 'ht_metric_chunks' --type=c --type=hRepository: fluent/fluent-bit
Length of output: 2511
🏁 Script executed:
# Look at the full function containing the code in question
sed -n '80,130p' src/flb_input_metric.cRepository: fluent/fluent-bit
Length of output: 1827
🏁 Script executed:
# Read the full flb_input.h to find the full flb_input_instance structure
head -500 include/fluent-bit/flb_input.h | tail -400Repository: fluent/fluent-bit
Length of output: 14227
🏁 Script executed:
# Look for the full structure definition - search more carefully
rg -n 'struct flb_input_instance' include/fluent-bit/flb_input.h -A 80 | head -150Repository: fluent/fluent-bit
Length of output: 5384
🏁 Script executed:
# Check if there's any mutex or synchronization in the input instance
rg -n 'mutex|pthread' include/fluent-bit/flb_input.hRepository: fluent/fluent-bit
Length of output: 599
🏁 Script executed:
# Look for any comments about threading model in input handling
rg -n -i 'thread|concurrent|single' include/fluent-bit/flb_input.h | head -20Repository: fluent/fluent-bit
Length of output: 1154
Add synchronization to protect ht_metric_chunks from concurrent access in threaded input plugins.
The hash table ht_metric_chunks lacks any synchronization mechanism. The code at lines 110-114 performs a non-atomic get-then-delete pattern without holding a lock. Since some input plugins run in separate threads (indicated by FLB_INPUT_THREADED flag and is_threaded field), concurrent calls to flb_input_metrics_append can race when accessing the hash table. Additionally, flb_input_chunk.c also performs concurrent add/delete operations on the same hash table. Either:
- Protect all
ht_metric_chunksoperations with a mutex instruct flb_input_instance, or - Ensure this code path is only reachable from a single execution context per instance.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/flb_input_metric.c` around lines 102 - 115, The ht_metric_chunks access
in flb_input_metrics_append is not synchronized and can race when an input
instance is threaded (FLB_INPUT_THREADED / is_threaded); add a mutex (e.g.,
metrics_mutex) to struct flb_input_instance and use it to protect all operations
on ht_metric_chunks (get, add, del) including the get-then-delete sequence
shown, and update flb_input_chunk.c to also acquire the same mutex around its
add/delete calls so all hash-table mutations are serialized for that instance.
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 10
♻️ Duplicate comments (1)
tests/internal/opentelemetry.c (1)
1994-2003:⚠️ Potential issue | 🟡 MinorAbort these roundtrip tests once a prerequisite fails.
TEST_CHECKdoes not stop execution. Several new tests keep going into encoder/converter calls after a failed setup step, which means they can dereference uninitialized buffers or empty context lists instead of reporting a clean test failure.Proposed fix
ret = cmt_encode_msgpack_create(context, &msgpack_buffer, &msgpack_size); TEST_CHECK(ret == 0); + if (ret != 0) { + destroy_metrics_context_list(&contexts); + return; + }trace_context = flb_opentelemetry_json_traces_to_ctrace(expected, strlen(expected), &result); TEST_CHECK(trace_context != NULL); TEST_CHECK(result == 0); if (trace_context == NULL || result != 0) { return; } ret = ctr_encode_msgpack_create(trace_context, &msgpack_buffer, &msgpack_size); TEST_CHECK(ret == 0); + if (ret != 0) { + ctr_destroy(trace_context); + return; + }Apply the same fail-fast pattern to the other new setup sequences in this group.
Also applies to: 2152-2158, 2212-2218, 2261-2271, 2327-2338
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/internal/opentelemetry.c` around lines 1994 - 2003, The tests use TEST_CHECK (which doesn't abort) for setup calls like flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT) and flb_opentelemetry_logs_json_to_msgpack(...), so if those fail later code dereferences uninitialized state; change the pattern to fail-fast: after calling flb_log_event_encoder_init and after the converter call check the return value and if it indicates failure, immediately abort the test (e.g., call the test framework's fatal/abort helper or return from the test) and perform any necessary cleanup of encoder/result; apply the same fail-fast replacement to the other setup sequences referenced (around the flb_log_event_encoder_init/flb_opentelemetry_logs_json_to_msgpack groups at the other ranges).
🧹 Nitpick comments (1)
src/flb_input_metric.c (1)
25-26: Remove duplicate include.
flb_input.his already included at line 21. This duplicate include is unnecessary.🧹 Suggested fix
`#include` <fluent-bit/flb_input_plugin.h> `#include` <fluent-bit/flb_hash_table.h> -#include <fluent-bit/flb_input.h> `#include` <cfl/cfl.h>🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_input_metric.c` around lines 25 - 26, Remove the duplicate include of flb_input.h: in flb_input_metric.c eliminate the second line that includes "fluent-bit/flb_input.h" (the duplicate of the existing include at the top) so only one `#include` <fluent-bit/flb_input.h> remains; verify build compiles after removing the redundant include.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 532-545: When a non-QUEUE_FULL produce error occurs in the ret ==
-1 branch, clear the blocked flag so future flushes don't incorrectly
short-circuit: set ctx->blocked = FLB_FALSE (or equivalent) before calling
flb_plg_error and returning FLB_ERROR; update the error-handling branch that
checks rd_kafka_last_error() != RD_KAFKA_RESP_ERR__QUEUE_FULL to ensure
ctx->blocked is reset prior to the return path (referencing ctx->blocked,
rd_kafka_last_error(), rd_kafka_err2str(), and the ret == -1 error branch).
- Around line 29-31: Include the missing decoder header
<ctraces/ctr_decode_msgpack.h> and update the trace buffer error checks around
ctr_decode_msgpack_create/usage: replace checks that compare return values to
CTR_MPACK_INSUFFICIENT_DATA and CTR_MPACK_ENGINE_ERROR with the decoder status
family and specifically test for CTR_DECODE_MSGPACK_SUCCESS to distinguish
successful decode from incomplete-data or engine errors; adjust the conditional
that currently reads like the shown snippet to instead check for !=
CTR_DECODE_MSGPACK_SUCCESS and handle incomplete/partial data vs real errors
using the decoder's status values when processing event_chunk and offset (refer
to ctr_decode_msgpack_create and the subsequent ret/off logic).
In `@src/flb_input.c`:
- Around line 369-370: The mutex instance->metrics_chunk_lock is initialized but
not destroyed on several error paths; update each error-handling branch that
frees or aborts the instance after pthread_mutex_init (e.g., the branches around
where the instance is freed in flb_input.c) to call
pthread_mutex_destroy(&instance->metrics_chunk_lock) before freeing or
returning, ensuring you call pthread_mutex_destroy in every early-exit path that
runs after the init so the mutex resource is not leaked.
In `@src/flb_json.c`:
- Around line 1486-1504: The helper flb_json_mut_arr_add_real (and the sibling
helpers listed) currently call mut_value_create() before checking that the
destination array/obj is non-NULL and of the correct type, which leaks the newly
created value into document->values on failure; update flb_json_mut_arr_add_real
to perform the ownership/type check (document != NULL, array != NULL, and
array->type == FLB_JSON_MUT_ARRAY) before calling mut_value_create(), and apply
the same change to the other convenience helpers referenced (the add functions
around lines 1507-1519, 1562-1580, 1583-1601, 1604-1622, 1655-1668, 1671-1689)
so no mut_value_create()/flb_json_mut_strncpy() is invoked unless the
destination is validated first.
- Around line 1144-1154: flb_json_prettify currently returns different ownership
types (plain heap buffer from yyjson branch vs flb_sds_t from fallback); make it
consistently return an flb_sds_t so callers can free uniformly. In the yyjson
path (symbols: yyjson_read_opts, yyjson_write_opts, yyjson_doc_free), create an
flb_sds_t from the yyjson output buffer (use flb_sds_create_len or
flb_sds_create from the output pointer and its length), free the yyjson output
buffer after copying, handle allocation failure by freeing document/output and
returning NULL, and return the flb_sds_t; similarly ensure the fallback branch
returns an flb_sds_t (not a raw buffer). Apply the same change to the other
similar block mentioned (lines ~1170-1298).
- Around line 754-758: The code currently passes msgpack-backed key->via.str.ptr
into flb_json_mut_obj_add_val which uses strlen() and can read past or truncate
length-delimited keys; replace this call with the length-aware variant (or
implement one) that accepts the key length (use key->via.str.size) — e.g. call
flb_json_mut_obj_add_val_len(document, result, key->via.str.ptr,
key->via.str.size, value) or create an internal helper
flb_json_mut_obj_add_val_with_len(document, result, key->via.str.ptr,
key->via.str.size, value); apply the same change to the other identical
occurrence that currently passes key->via.str.ptr directly.
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c`:
- Around line 1355-1375: The loop handling msgpack decoding currently treats
CMT_DECODE_MSGPACK_INSUFFICIENT_DATA as a benign terminator, which lets a
non-empty truncated/corrupt buffer (decoded_count == 0 or offset != size) become
a silently dropped empty export; update the post-loop checks around
cmt_decode_msgpack_create and cmt_cat (identify merged_context, context,
decoded_count, offset, ret, and functions cmt_decode_msgpack_create/cmt_cat) so
that CMT_DECODE_MSGPACK_INSUFFICIENT_DATA is only accepted when the entire input
was consumed (offset == size) and at least one message was decoded
(decoded_count > 0); otherwise treat it like an error: destroy merged_context,
call set_error with FLB_OPENTELEMETRY_OTLP_PROTO_INVALID_ARGUMENT and return
NULL. Apply the same fix to the second analogous block referenced around lines
1378-1387.
- Around line 503-512: The positive-integer branch in the MSGPACK handling
unconditionally casts object->via.u64 into value->int_value which will wrap for
values > INT64_MAX; update the MSGPACK_OBJECT_POSITIVE_INTEGER handling in the
block where otlp_any_value_initialize is called to check if object->via.u64 <=
INT64_MAX and only then assign to value->int_value, otherwise assign the raw
unsigned value into the OTLP unsigned field (value->uint_value) and ensure the
AnyValue is initialized accordingly; reference the
MSGPACK_OBJECT_POSITIVE_INTEGER case, otlp_any_value_initialize,
value->int_value, value->uint_value, and object->via.u64 when making the change
and use the INT64_MAX constant for the comparison.
- Around line 989-993: If flb_realloc for
resource_state->resource_log->scope_logs fails, free the partially-allocated
objects before returning NULL: release scope_log (free its ScopeLogs struct),
free scope_log->scope (and its schema_url string if set), and iterate and free
any copied scope attributes (both key and value strings and the attribute
structs) attached to scope_log->scope->attributes; finally free any array
containers allocated for attributes. Ensure you perform these frees in the tmp
== NULL branch so no detached allocations (scope_log, scope, schema_url, copied
attributes) leak before returning NULL.
In `@tests/internal/opentelemetry.c`:
- Around line 259-275: The helper json_strings_equal_allowing_otlp_timestamp
currently treats any actual string with an otlp.timestamp prefix as equal to an
empty otlp object, which can hide other OTLP fields; update this function to
parse the JSON (or at least locate the "{\"otlp\":{...}}\" object), remove only
the "timestamp" key from the otlp object in both expected and actual, then
compare the resulting JSON structures/strings for equality (so other keys like
trace_id are still checked). Ensure you operate only on the otlp object's
timestamp key and use robust parsing or key-aware string manipulation inside
json_strings_equal_allowing_otlp_timestamp to avoid false positives.
---
Duplicate comments:
In `@tests/internal/opentelemetry.c`:
- Around line 1994-2003: The tests use TEST_CHECK (which doesn't abort) for
setup calls like flb_log_event_encoder_init(&encoder,
FLB_LOG_EVENT_FORMAT_DEFAULT) and flb_opentelemetry_logs_json_to_msgpack(...),
so if those fail later code dereferences uninitialized state; change the pattern
to fail-fast: after calling flb_log_event_encoder_init and after the converter
call check the return value and if it indicates failure, immediately abort the
test (e.g., call the test framework's fatal/abort helper or return from the
test) and perform any necessary cleanup of encoder/result; apply the same
fail-fast replacement to the other setup sequences referenced (around the
flb_log_event_encoder_init/flb_opentelemetry_logs_json_to_msgpack groups at the
other ranges).
---
Nitpick comments:
In `@src/flb_input_metric.c`:
- Around line 25-26: Remove the duplicate include of flb_input.h: in
flb_input_metric.c eliminate the second line that includes
"fluent-bit/flb_input.h" (the duplicate of the existing include at the top) so
only one `#include` <fluent-bit/flb_input.h> remains; verify build compiles after
removing the redundant include.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dcba6f90-bb28-4788-a49b-2b8127ca889b
📒 Files selected for processing (10)
include/fluent-bit/flb_input.hplugins/out_kafka/kafka.cplugins/out_kafka/kafka_config.cplugins/out_stdout/stdout.csrc/flb_input.csrc/flb_input_chunk.csrc/flb_input_metric.csrc/flb_json.csrc/opentelemetry/flb_opentelemetry_otlp_proto.ctests/internal/opentelemetry.c
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/out_kafka/kafka_config.c
| #include <cmetrics/cmt_encode_opentelemetry.h> | ||
| #include <ctraces/ctr_encode_opentelemetry.h> | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== kafka.c includes =="
sed -n '1,80p' plugins/out_kafka/kafka.c
echo
echo "== trace decoder symbols =="
rg -n -C2 --type=c --type=h 'ctr_decode_msgpack_create|CTR_DECODE_MSGPACK_|CTR_MPACK_' .
echo
echo "== decoder header location =="
fd 'ctr_decode_msgpack.h$' .Repository: fluent/fluent-bit
Length of output: 38773
Add missing decoder header and use correct status code family in trace buffer handling.
Lines 661–689 call ctr_decode_msgpack_create() but the decoder header <ctraces/ctr_decode_msgpack.h> is not included, and the error checks use CTR_MPACK_INSUFFICIENT_DATA and CTR_MPACK_ENGINE_ERROR instead of the decoder status family. Include the decoder header and replace the termination check with CTR_DECODE_MSGPACK_SUCCESS to correctly distinguish between incomplete data and actual errors.
Current error handling (lines 683–684)
if (ret != CTR_MPACK_INSUFFICIENT_DATA &&
!(ret == CTR_MPACK_ENGINE_ERROR && off >= event_chunk->size)) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@plugins/out_kafka/kafka.c` around lines 29 - 31, Include the missing decoder
header <ctraces/ctr_decode_msgpack.h> and update the trace buffer error checks
around ctr_decode_msgpack_create/usage: replace checks that compare return
values to CTR_MPACK_INSUFFICIENT_DATA and CTR_MPACK_ENGINE_ERROR with the
decoder status family and specifically test for CTR_DECODE_MSGPACK_SUCCESS to
distinguish successful decode from incomplete-data or engine errors; adjust the
conditional that currently reads like the shown snippet to instead check for !=
CTR_DECODE_MSGPACK_SUCCESS and handle incomplete/partial data vs real errors
using the decoder's status values when processing event_chunk and offset (refer
to ctr_decode_msgpack_create and the subsequent ret/off logic).
| if (ret == -1) { | ||
| if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { | ||
| ctx->blocked = FLB_TRUE; | ||
| flb_time_sleep(1000); | ||
| rd_kafka_poll(ctx->kafka.rk, 0); | ||
| queue_full_retries++; | ||
| goto retry; | ||
| } | ||
|
|
||
| flb_plg_error(ctx->ins, | ||
| "failed to produce OTLP payload to topic %s: %s", | ||
| rd_kafka_topic_name(topic->tp), | ||
| rd_kafka_err2str(rd_kafka_last_error())); | ||
| return FLB_ERROR; |
There was a problem hiding this comment.
Clear ctx->blocked before returning a terminal produce error.
After a queue-full retry, a later non-queue-full failure exits here with ctx->blocked still set. Every subsequent flush then short-circuits at Line 712 as FLB_RETRY, even though the local queue is no longer the reason.
Proposed fix
flb_plg_error(ctx->ins,
"failed to produce OTLP payload to topic %s: %s",
rd_kafka_topic_name(topic->tp),
rd_kafka_err2str(rd_kafka_last_error()));
+ ctx->blocked = FLB_FALSE;
return FLB_ERROR;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@plugins/out_kafka/kafka.c` around lines 532 - 545, When a non-QUEUE_FULL
produce error occurs in the ret == -1 branch, clear the blocked flag so future
flushes don't incorrectly short-circuit: set ctx->blocked = FLB_FALSE (or
equivalent) before calling flb_plg_error and returning FLB_ERROR; update the
error-handling branch that checks rd_kafka_last_error() !=
RD_KAFKA_RESP_ERR__QUEUE_FULL to ensure ctx->blocked is reset prior to the
return path (referencing ctx->blocked, rd_kafka_last_error(),
rd_kafka_err2str(), and the ret == -1 error branch).
| pthread_mutex_init(&instance->metrics_chunk_lock, NULL); | ||
|
|
There was a problem hiding this comment.
Missing mutex destruction in error paths.
The mutex is initialized at line 369, but subsequent error paths (e.g., lines 381-391, 440-458, 473-493, 546-563) free the instance without calling pthread_mutex_destroy(&instance->metrics_chunk_lock). This leaks the mutex resource on initialization failures.
🔧 Suggested fix for one error path (line 381-391)
ctx = flb_calloc(1, sizeof(struct flb_plugin_input_proxy_context));
if (!ctx) {
flb_errno();
flb_hash_table_destroy(instance->ht_log_chunks);
flb_hash_table_destroy(instance->ht_metric_chunks);
flb_hash_table_destroy(instance->ht_trace_chunks);
flb_hash_table_destroy(instance->ht_profile_chunks);
+ pthread_mutex_destroy(&instance->metrics_chunk_lock);
flb_free(instance->http_server_config);
flb_free(instance);
return NULL;
}Similar fixes should be applied to other error paths after line 369.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/flb_input.c` around lines 369 - 370, The mutex
instance->metrics_chunk_lock is initialized but not destroyed on several error
paths; update each error-handling branch that frees or aborts the instance after
pthread_mutex_init (e.g., the branches around where the instance is freed in
flb_input.c) to call pthread_mutex_destroy(&instance->metrics_chunk_lock) before
freeing or returning, ensuring you call pthread_mutex_destroy in every
early-exit path that runs after the init so the mutex resource is not leaked.
| if (key->type != MSGPACK_OBJECT_STR || value == NULL || | ||
| !flb_json_mut_obj_add_val(document, | ||
| result, | ||
| key->via.str.ptr, | ||
| value)) { |
There was a problem hiding this comment.
Don't pass raw msgpack keys into the C-string object-add helper.
key->via.str.ptr is length-delimited msgpack data, but flb_json_mut_obj_add_val() immediately calls strlen(key). That can read past the msgpack buffer or truncate embedded-NUL keys while copying an immutable map into a mutable document.
Proposed fix
- if (key->type != MSGPACK_OBJECT_STR || value == NULL ||
- !flb_json_mut_obj_add_val(document,
- result,
- key->via.str.ptr,
- value)) {
+ if (key->type != MSGPACK_OBJECT_STR || value == NULL ||
+ !mut_obj_add_val_len(document,
+ result,
+ key->via.str.ptr,
+ key->via.str.size,
+ value)) {
return NULL;
}Use an internal length-aware helper here instead of routing msgpack-backed keys through the public strlen()-based API.
Also applies to: 1709-1710
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/flb_json.c` around lines 754 - 758, The code currently passes
msgpack-backed key->via.str.ptr into flb_json_mut_obj_add_val which uses
strlen() and can read past or truncate length-delimited keys; replace this call
with the length-aware variant (or implement one) that accepts the key length
(use key->via.str.size) — e.g. call flb_json_mut_obj_add_val_len(document,
result, key->via.str.ptr, key->via.str.size, value) or create an internal helper
flb_json_mut_obj_add_val_with_len(document, result, key->via.str.ptr,
key->via.str.size, value); apply the same change to the other identical
occurrence that currently passes key->via.str.ptr directly.
| document = yyjson_read_opts((char *) input, input_length, 0, | ||
| &allocator, &read_error); | ||
| if (document != NULL) { | ||
| flags = YYJSON_WRITE_PRETTY_TWO_SPACES; | ||
| output = yyjson_write_opts(document, flags, &allocator, length, NULL); | ||
| yyjson_doc_free(document); | ||
|
|
||
| if (output != NULL) { | ||
| return output; | ||
| } | ||
| } |
There was a problem hiding this comment.
Return a single ownership model from flb_json_prettify().
The yyjson branch returns a plain heap buffer, while the fallback branch returns the live flb_sds_t. Callers now have to branch on FLB_HAVE_YYJSON just to free the result, which makes this public API unsafe to consume generically.
Proposed fix
- if (length != NULL) {
- *length = flb_sds_len(buffer);
- }
-
- return buffer;
+ {
+ char *output;
+ size_t output_length;
+
+ output_length = flb_sds_len(buffer);
+ output = flb_malloc(output_length + 1);
+ if (output == NULL) {
+ flb_sds_destroy(buffer);
+ return NULL;
+ }
+
+ memcpy(output, buffer, output_length);
+ output[output_length] = '\0';
+ flb_sds_destroy(buffer);
+
+ if (length != NULL) {
+ *length = output_length;
+ }
+
+ return output;
+ }Also applies to: 1170-1298
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/flb_json.c` around lines 1144 - 1154, flb_json_prettify currently returns
different ownership types (plain heap buffer from yyjson branch vs flb_sds_t
from fallback); make it consistently return an flb_sds_t so callers can free
uniformly. In the yyjson path (symbols: yyjson_read_opts, yyjson_write_opts,
yyjson_doc_free), create an flb_sds_t from the yyjson output buffer (use
flb_sds_create_len or flb_sds_create from the output pointer and its length),
free the yyjson output buffer after copying, handle allocation failure by
freeing document/output and returning NULL, and return the flb_sds_t; similarly
ensure the fallback branch returns an flb_sds_t (not a raw buffer). Apply the
same change to the other similar block mentioned (lines ~1170-1298).
| int flb_json_mut_arr_add_real(struct flb_json_mut_doc *document, | ||
| struct flb_json_mut_val *array, | ||
| double value) | ||
| { | ||
| struct flb_json_mut_val *entry; | ||
|
|
||
| if (document == NULL || array == NULL || | ||
| array->type != FLB_JSON_MUT_ARRAY) { | ||
| return 0; | ||
| } | ||
|
|
||
| entry = mut_value_create(document, FLB_JSON_MUT_REAL); | ||
| if (entry == NULL) { | ||
| return 0; | ||
| } | ||
|
|
||
| entry->data.real = value; | ||
|
|
||
| return flb_json_mut_arr_add_val(array, entry); |
There was a problem hiding this comment.
Validate the destination container before allocating the child node.
These helpers allocate a new mutable value first and only discover the owner/type mismatch inside _add_val(). On a rejected call, the new node stays chained in document->values, so repeated invalid API usage grows the document until teardown.
Proposed fix
int flb_json_mut_arr_add_real(struct flb_json_mut_doc *document,
struct flb_json_mut_val *array,
double value)
{
struct flb_json_mut_val *entry;
- if (document == NULL || array == NULL ||
- array->type != FLB_JSON_MUT_ARRAY) {
+ if (document == NULL || array == NULL ||
+ array->type != FLB_JSON_MUT_ARRAY ||
+ array->owner != document) {
return 0;
} int flb_json_mut_obj_add_int(struct flb_json_mut_doc *document,
struct flb_json_mut_val *object,
const char *key,
long long value)
{
struct flb_json_mut_val *entry;
- if (document == NULL || object == NULL || key == NULL) {
+ if (document == NULL || object == NULL || key == NULL ||
+ object->type != FLB_JSON_MUT_OBJECT ||
+ object->owner != document) {
return 0;
}Mirror the same upfront ownership/type check across the other convenience helpers before calling mut_value_create() / flb_json_mut_strncpy().
Also applies to: 1507-1519, 1562-1580, 1583-1601, 1604-1622, 1655-1668, 1671-1689
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/flb_json.c` around lines 1486 - 1504, The helper
flb_json_mut_arr_add_real (and the sibling helpers listed) currently call
mut_value_create() before checking that the destination array/obj is non-NULL
and of the correct type, which leaks the newly created value into
document->values on failure; update flb_json_mut_arr_add_real to perform the
ownership/type check (document != NULL, array != NULL, and array->type ==
FLB_JSON_MUT_ARRAY) before calling mut_value_create(), and apply the same change
to the other convenience helpers referenced (the add functions around lines
1507-1519, 1562-1580, 1583-1601, 1604-1622, 1655-1668, 1671-1689) so no
mut_value_create()/flb_json_mut_strncpy() is invoked unless the destination is
validated first.
| case MSGPACK_OBJECT_POSITIVE_INTEGER: | ||
| case MSGPACK_OBJECT_NEGATIVE_INTEGER: | ||
| value = otlp_any_value_initialize(object->type, 0); | ||
| if (value != NULL) { | ||
| if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { | ||
| value->int_value = (int64_t) object->via.u64; | ||
| } | ||
| else { | ||
| value->int_value = object->via.i64; | ||
| } |
There was a problem hiding this comment.
Guard AnyValue.int_value against uint64 wraparound.
MSGPACK_OBJECT_POSITIVE_INTEGER can carry values above INT64_MAX, but this path unconditionally casts them into a signed OTLP field. Large unsigned values will flip negative and corrupt the emitted payload.
Proposed fix
case MSGPACK_OBJECT_POSITIVE_INTEGER:
case MSGPACK_OBJECT_NEGATIVE_INTEGER:
value = otlp_any_value_initialize(object->type, 0);
if (value != NULL) {
if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
+ if (object->via.u64 > INT64_MAX) {
+ otlp_any_value_destroy(value);
+ return NULL;
+ }
value->int_value = (int64_t) object->via.u64;
}
else {
value->int_value = object->via.i64;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case MSGPACK_OBJECT_POSITIVE_INTEGER: | |
| case MSGPACK_OBJECT_NEGATIVE_INTEGER: | |
| value = otlp_any_value_initialize(object->type, 0); | |
| if (value != NULL) { | |
| if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { | |
| value->int_value = (int64_t) object->via.u64; | |
| } | |
| else { | |
| value->int_value = object->via.i64; | |
| } | |
| case MSGPACK_OBJECT_POSITIVE_INTEGER: | |
| case MSGPACK_OBJECT_NEGATIVE_INTEGER: | |
| value = otlp_any_value_initialize(object->type, 0); | |
| if (value != NULL) { | |
| if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { | |
| if (object->via.u64 > INT64_MAX) { | |
| otlp_any_value_destroy(value); | |
| return NULL; | |
| } | |
| value->int_value = (int64_t) object->via.u64; | |
| } | |
| else { | |
| value->int_value = object->via.i64; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c` around lines 503 - 512, The
positive-integer branch in the MSGPACK handling unconditionally casts
object->via.u64 into value->int_value which will wrap for values > INT64_MAX;
update the MSGPACK_OBJECT_POSITIVE_INTEGER handling in the block where
otlp_any_value_initialize is called to check if object->via.u64 <= INT64_MAX and
only then assign to value->int_value, otherwise assign the raw unsigned value
into the OTLP unsigned field (value->uint_value) and ensure the AnyValue is
initialized accordingly; reference the MSGPACK_OBJECT_POSITIVE_INTEGER case,
otlp_any_value_initialize, value->int_value, value->uint_value, and
object->via.u64 when making the change and use the INT64_MAX constant for the
comparison.
| tmp = flb_realloc(resource_state->resource_log->scope_logs, | ||
| sizeof(Opentelemetry__Proto__Logs__V1__ScopeLogs *) * | ||
| (resource_state->resource_log->n_scope_logs + 1)); | ||
| if (tmp == NULL) { | ||
| return NULL; |
There was a problem hiding this comment.
Free the detached scope allocations if scope_logs growth fails.
If this realloc() fails, scope_log, scope, schema_url, and any copied scope attributes are never attached anywhere and never released.
Proposed fix
tmp = flb_realloc(resource_state->resource_log->scope_logs,
sizeof(Opentelemetry__Proto__Logs__V1__ScopeLogs *) *
(resource_state->resource_log->n_scope_logs + 1));
if (tmp == NULL) {
+ flb_free(scope_log->schema_url);
+ flb_free(scope->name);
+ flb_free(scope->version);
+ otlp_kvarray_destroy(scope->attributes, scope->n_attributes);
+ flb_free(scope);
+ flb_free(scope_log);
return NULL;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tmp = flb_realloc(resource_state->resource_log->scope_logs, | |
| sizeof(Opentelemetry__Proto__Logs__V1__ScopeLogs *) * | |
| (resource_state->resource_log->n_scope_logs + 1)); | |
| if (tmp == NULL) { | |
| return NULL; | |
| tmp = flb_realloc(resource_state->resource_log->scope_logs, | |
| sizeof(Opentelemetry__Proto__Logs__V1__ScopeLogs *) * | |
| (resource_state->resource_log->n_scope_logs + 1)); | |
| if (tmp == NULL) { | |
| flb_free(scope_log->schema_url); | |
| flb_free(scope->name); | |
| flb_free(scope->version); | |
| otlp_kvarray_destroy(scope->attributes, scope->n_attributes); | |
| flb_free(scope); | |
| flb_free(scope_log); | |
| return NULL; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c` around lines 989 - 993, If
flb_realloc for resource_state->resource_log->scope_logs fails, free the
partially-allocated objects before returning NULL: release scope_log (free its
ScopeLogs struct), free scope_log->scope (and its schema_url string if set), and
iterate and free any copied scope attributes (both key and value strings and the
attribute structs) attached to scope_log->scope->attributes; finally free any
array containers allocated for attributes. Ensure you perform these frees in the
tmp == NULL branch so no detached allocations (scope_log, scope, schema_url,
copied attributes) leak before returning NULL.
| while ((ret = cmt_decode_msgpack_create(&context, | ||
| (char *) data, | ||
| size, | ||
| &offset)) == CMT_DECODE_MSGPACK_SUCCESS) { | ||
| ret = cmt_cat(merged_context, context); | ||
| cmt_destroy(context); | ||
|
|
||
| if (ret != 0) { | ||
| cmt_destroy(merged_context); | ||
| set_error(result, FLB_OPENTELEMETRY_OTLP_PROTO_NOT_SUPPORTED, ENOMEM); | ||
| return NULL; | ||
| } | ||
|
|
||
| decoded_count++; | ||
| } | ||
|
|
||
| if (ret != CMT_DECODE_MSGPACK_INSUFFICIENT_DATA && | ||
| ret != CMT_DECODE_MSGPACK_SUCCESS) { | ||
| cmt_destroy(merged_context); | ||
| set_error(result, FLB_OPENTELEMETRY_OTLP_PROTO_INVALID_ARGUMENT, EINVAL); | ||
| return NULL; |
There was a problem hiding this comment.
Don't turn undecodable metric msgpack into a successful empty export.
CMT_DECODE_MSGPACK_INSUFFICIENT_DATA is only a valid terminator after the whole buffer has been consumed. As written, a non-empty truncated/corrupt chunk with decoded_count == 0 falls into the empty-payload success path and gets silently dropped.
Proposed fix
- if (ret != CMT_DECODE_MSGPACK_INSUFFICIENT_DATA &&
- ret != CMT_DECODE_MSGPACK_SUCCESS) {
+ if ((ret == CMT_DECODE_MSGPACK_INSUFFICIENT_DATA && offset < size) ||
+ (ret != CMT_DECODE_MSGPACK_INSUFFICIENT_DATA &&
+ ret != CMT_DECODE_MSGPACK_SUCCESS)) {
cmt_destroy(merged_context);
set_error(result, FLB_OPENTELEMETRY_OTLP_PROTO_INVALID_ARGUMENT, EINVAL);
return NULL;
}
- if (decoded_count == 0) {
+ if (decoded_count == 0) {
cmt_destroy(merged_context);
- output = cfl_sds_create_size(0);
+ set_error(result, FLB_OPENTELEMETRY_OTLP_PROTO_INVALID_ARGUMENT, EINVAL);
+ return NULL;
+ }Also applies to: 1378-1387
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/opentelemetry/flb_opentelemetry_otlp_proto.c` around lines 1355 - 1375,
The loop handling msgpack decoding currently treats
CMT_DECODE_MSGPACK_INSUFFICIENT_DATA as a benign terminator, which lets a
non-empty truncated/corrupt buffer (decoded_count == 0 or offset != size) become
a silently dropped empty export; update the post-loop checks around
cmt_decode_msgpack_create and cmt_cat (identify merged_context, context,
decoded_count, offset, ret, and functions cmt_decode_msgpack_create/cmt_cat) so
that CMT_DECODE_MSGPACK_INSUFFICIENT_DATA is only accepted when the entire input
was consumed (offset == size) and at least one message was decoded
(decoded_count > 0); otherwise treat it like an error: destroy merged_context,
call set_error with FLB_OPENTELEMETRY_OTLP_PROTO_INVALID_ARGUMENT and return
NULL. Apply the same fix to the second analogous block referenced around lines
1378-1387.
| static int json_strings_equal_allowing_otlp_timestamp(const char *expected, | ||
| const char *actual) | ||
| { | ||
| if (expected == NULL || actual == NULL) { | ||
| return expected == actual; | ||
| } | ||
|
|
||
| if (strcmp(expected, actual) == 0) { | ||
| return FLB_TRUE; | ||
| } | ||
|
|
||
| if (strcmp(expected, "{\"otlp\":{}}") == 0 && | ||
| strncmp(actual, "{\"otlp\":{\"timestamp\":", 21) == 0) { | ||
| return FLB_TRUE; | ||
| } | ||
|
|
||
| return FLB_FALSE; |
There was a problem hiding this comment.
This comparison now ignores more than just the injected timestamp.
A plain prefix check makes {"otlp":{"timestamp":...,"trace_id":"..."}} equal to {"otlp":{}}, so these extended-output assertions will miss unexpected OTLP metadata additions. Strip only the timestamp key and compare the rest structurally.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/internal/opentelemetry.c` around lines 259 - 275, The helper
json_strings_equal_allowing_otlp_timestamp currently treats any actual string
with an otlp.timestamp prefix as equal to an empty otlp object, which can hide
other OTLP fields; update this function to parse the JSON (or at least locate
the "{\"otlp\":{...}}\" object), remove only the "timestamp" key from the otlp
object in both expected and actual, then compare the resulting JSON
structures/strings for equality (so other keys like trace_id are still checked).
Ensure you operate only on the otlp object's timestamp key and use robust
parsing or key-aware string manipulation inside
json_strings_equal_allowing_otlp_timestamp to avoid false positives.
It also extends stdout and kafka to format payloads in OTLP
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes
Tests