Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,12 @@ static struct flb_config_map config_map[] = {
add_labels),
"Adds a custom label to the metrics use format: 'add_label name value'"
},
{
FLB_CONFIG_MAP_SLIST_1, "logs_resource_attributes_message_key", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context,
ra_resource_attributes_message),
"Specify a resource attribute key"
},
{
FLB_CONFIG_MAP_STR, "http2", "off",
0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_http2),
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ struct opentelemetry_context {
flb_sds_t logs_severity_number_message_key;
struct flb_record_accessor *ra_severity_number_message;

struct mk_list *ra_resource_attributes_message;

/* Number of logs to flush at a time */
int batch_size;

Expand Down
46 changes: 44 additions & 2 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,43 @@ static int metadata_mp_accessor_create(struct opentelemetry_context *ctx)
return 0;
}

static int config_add_labels(struct flb_output_instance *ins,
/*
* config_ra_resource_attributes_message: validate the configured keys at init
* time. The actual key lookup at flush time reads ctx->ra_resource_attributes_message
* directly (config-map-managed pointer) to avoid per-worker context list issues.
*/
static int config_ra_resource_attributes_message(struct flb_output_instance *ins,
struct opentelemetry_context *ctx)
{
struct mk_list *head;
struct flb_config_map_val *mv;
struct flb_slist_entry *k;

if (!ctx->ra_resource_attributes_message ||
mk_list_size(ctx->ra_resource_attributes_message) == 0) {
return 0;
}

flb_plg_debug(ins, "resource attributes: %d key(s) configured",
mk_list_size(ctx->ra_resource_attributes_message));

/* validate each entry has exactly one value */
flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) {
if (mk_list_size(mv->val.list) != 1) {
flb_plg_error(ins, "'logs_resource_attributes_message_key' expects a single key name, "
"e.g: 'logs_resource_attributes_message_key service.name' "
"(or '$service.name')");
return -1;
}

k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
flb_plg_debug(ins, "resource attributes: registered key '%s'", k->str);
}

return 0;
}

static int config_log_body(struct flb_output_instance *ins,
struct opentelemetry_context *ctx)
{
struct mk_list *head;
Expand Down Expand Up @@ -294,7 +330,13 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
}

/* Parse 'add_label' */
ret = config_add_labels(ins, ctx);
ret = config_log_body(ins, ctx);
if (ret == -1) {
flb_opentelemetry_context_destroy(ctx);
return NULL;
}

ret = config_ra_resource_attributes_message(ins, ctx);
if (ret == -1) {
flb_opentelemetry_context_destroy(ctx);
return NULL;
Expand Down
173 changes: 173 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_ra_key.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_slist.h>

#include <fluent-otel-proto/fluent-otel.h>

Expand Down Expand Up @@ -780,6 +781,163 @@ static int logs_flush_to_otel(struct opentelemetry_context *ctx, struct flb_even
return ret;
}

/*
* For each key name in ctx->ra_resource_attributes_message_list, look it up in
* the msgpack message body and promote the value to an OTLP resource attribute.
*/
static void set_resource_attributes_from_message_body(
struct opentelemetry_context *ctx,
msgpack_object *body,
Opentelemetry__Proto__Resource__V1__Resource *resource)
{
int i;
size_t key_len;
size_t map_key_len;
char *map_key_ptr;
struct mk_list *head;
struct flb_config_map_val *mv;
struct flb_slist_entry *entry;
const char *normalized_key;
msgpack_object_kv *kv;
Opentelemetry__Proto__Common__V1__KeyValue *attr;
Opentelemetry__Proto__Common__V1__KeyValue **tmp_attrs;

/*
* Use ctx->ra_resource_attributes_message directly — this is the pointer
* managed by the Fluent Bit config-map framework and is reliably available
* in every worker thread context, unlike an embedded mk_list copy.
*/
if (!ctx->ra_resource_attributes_message ||
mk_list_size(ctx->ra_resource_attributes_message) == 0) {
return;
}

if (body == NULL || body->type != MSGPACK_OBJECT_MAP) {
return;
}

/* Iterate directly over the config-map-managed list */
flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) {
if (mk_list_size(mv->val.list) != 1) {
continue;
}

entry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
normalized_key = entry->str;
key_len = flb_sds_len(entry->str);

if (key_len == 0) {
continue;
}

/*
* Allow optional record accessor prefix so both "service.name" and
* "$service.name" are treated as the same map key.
*/
if (key_len > 0 && normalized_key[0] == '$') {
normalized_key++;
key_len--;
}

/*
* Also tolerate bracket forms like $['service.name'] and
* $["service.name"] for literal keys.
*/
if (key_len >= 4 && normalized_key[0] == '[') {
if ((normalized_key[1] == '\'' && normalized_key[key_len - 2] == '\'' &&
normalized_key[key_len - 1] == ']') ||
(normalized_key[1] == '"' && normalized_key[key_len - 2] == '"' &&
normalized_key[key_len - 1] == ']')) {
normalized_key += 2;
key_len -= 4;
}
}

if (key_len == 0) {
continue;
}

/* tolerate quoted key names like "service.name" or 'service.name' */
if (key_len >= 2) {
if ((normalized_key[0] == '"' && normalized_key[key_len - 1] == '"') ||
(normalized_key[0] == '\'' && normalized_key[key_len - 1] == '\'')) {
normalized_key++;
key_len -= 2;
}
}

if (key_len == 0) {
continue;
}

for (i = 0; i < body->via.map.size; i++) {
kv = &body->via.map.ptr[i];

if (kv->key.type == MSGPACK_OBJECT_STR) {
map_key_ptr = kv->key.via.str.ptr;
map_key_len = kv->key.via.str.size;
}
else if (kv->key.type == MSGPACK_OBJECT_BIN) {
map_key_ptr = (char *) kv->key.via.bin.ptr;
map_key_len = kv->key.via.bin.size;
}
else {
continue;
}

if (map_key_len != key_len) {
continue;
}

if (strncmp(map_key_ptr, normalized_key, key_len) != 0) {
continue;
}

/* Found the key — convert to OTLP KeyValue */
if (kv->key.type == MSGPACK_OBJECT_STR) {
attr = msgpack_kv_to_otlp_any_value(kv);
}
else {
attr = otlp_kvpair_value_initialize();
if (attr != NULL) {
attr->key = flb_strndup(map_key_ptr, map_key_len);

if (attr->key != NULL) {
attr->value = msgpack_object_to_otlp_any_value(&kv->val);
}

if (attr->key == NULL || attr->value == NULL) {
otlp_kvpair_destroy(attr);
attr = NULL;
}
}
}

if (!attr) {
flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue",
entry->str);
break;
}

/* Grow the resource attributes array by one slot */
tmp_attrs = flb_realloc(resource->attributes,
(resource->n_attributes + 1) *
sizeof(Opentelemetry__Proto__Common__V1__KeyValue *));
if (!tmp_attrs) {
flb_plg_error(ctx->ins, "resource attributes: memory allocation failed for key '%s'",
entry->str);
otlp_kvpair_destroy(attr);
break;
}

resource->attributes = tmp_attrs;
resource->attributes[resource->n_attributes] = attr;
resource->n_attributes++;
break;
Comment on lines +793 to +936
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Prevent duplicate resource-attribute keys during promotion.

The helper appends promoted keys unconditionally. If a key already exists in resource->attributes (e.g., from $resource['attributes']), you can emit duplicate keys for the same resource.

💡 Suggested fix (skip already-present keys before append)
 static void set_resource_attributes_from_message_body(
     struct opentelemetry_context *ctx,
     msgpack_object *body,
     Opentelemetry__Proto__Resource__V1__Resource *resource)
 {
     int i;
+    size_t j;
+    int duplicate;
     size_t key_len;
@@
             if (strncmp(kv->key.via.str.ptr, entry->str, key_len) != 0) {
                 continue;
             }
 
+            duplicate = FLB_FALSE;
+            for (j = 0; j < resource->n_attributes; j++) {
+                if (resource->attributes[j] != NULL &&
+                    resource->attributes[j]->key != NULL &&
+                    strcmp(resource->attributes[j]->key, entry->str) == 0) {
+                    duplicate = FLB_TRUE;
+                    break;
+                }
+            }
+
+            if (duplicate == FLB_TRUE) {
+                break;
+            }
+
             /* Found the key — convert to OTLP KeyValue */
             attr = msgpack_kv_to_otlp_any_value(kv);
             if (!attr) {
                 flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue",
                              entry->str);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/out_opentelemetry/opentelemetry_logs.c` around lines 793 - 862, The
code currently appends promoted keys into resource->attributes unconditionally,
causing duplicates; after creating attr via msgpack_kv_to_otlp_any_value and
before calling flb_realloc, scan the existing resource->attributes
(resource->n_attributes entries) and compare their key strings to attr->key
(e.g., strcmp(resource->attributes[i]->key, attr->key)); if a match is found,
call otlp_kvpair_destroy(attr) and skip the append (break/continue out of
promotion loop) to avoid reallocating and adding a duplicate; if no match,
proceed with the existing flb_realloc / assignment logic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making change

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cb645j, sounds good! The suggested diff should cover it — scan resource->attributes[0..n_attributes-1] and compare each .key against attr->key (via strcmp) before calling flb_realloc. If a match is found, call otlp_kvpair_destroy(attr) and break to skip the append. Let me know if you run into anything while implementing it.

}
}
}

static int set_resource_attributes(struct flb_record_accessor *ra,
msgpack_object *map,
Opentelemetry__Proto__Resource__V1__Resource *resource)
Expand Down Expand Up @@ -1021,6 +1179,18 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,

ret = FLB_OK;
while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) {
/*
* For standalone records (non-native OTLP groups), resource attributes
* promoted from message keys are record-specific. Force a fresh
* resource/scope context per record when this feature is enabled to
* avoid carrying stale values across subsequent log lines.
*/
if (native_otel == FLB_FALSE &&
ctx->ra_resource_attributes_message &&
mk_list_size(ctx->ra_resource_attributes_message) > 0) {
resource_id = -1;
scope_id = -1;
}
Comment on lines +1188 to +1193
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid forcing a new resource/scope for every standalone record.

At Line 1188, reset is unconditional once logs_resource_attributes_message_key is configured. Even records with no matching key get split into fresh resources/scopes, which can fragment batches and hit max_resources unexpectedly.

Suggested direction
-        if (native_otel == FLB_FALSE &&
-            ctx->ra_resource_attributes_message &&
-            mk_list_size(ctx->ra_resource_attributes_message) > 0) {
+        if (native_otel == FLB_FALSE &&
+            ctx->ra_resource_attributes_message &&
+            mk_list_size(ctx->ra_resource_attributes_message) > 0 &&
+            has_promotable_resource_key(ctx, event.body) == FLB_TRUE) {
             resource_id = -1;
             scope_id = -1;
         }

Add a lightweight has_promotable_resource_key(...) pre-check so reset only happens when the current record actually contains one of the configured keys.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/out_opentelemetry/opentelemetry_logs.c` around lines 1188 - 1193, The
unconditional reset of resource_id/scope_id when
ctx->ra_resource_attributes_message (and logs_resource_attributes_message_key)
is configured causes standalone records without the key to be split into new
resources/scopes; add a lightweight pre-check function (e.g.,
has_promotable_resource_key(record, ctx->logs_resource_attributes_message_key))
and call it in the conditional so that resource_id = -1 and scope_id = -1 are
only set when native_otel == FLB_FALSE, ctx->ra_resource_attributes_message
exists, its list size > 0, AND has_promotable_resource_key(...) returns true for
the current record; implement has_promotable_resource_key to scan the record for
any of the configured keys and return a boolean.

/* Check if the record is special (group) or a normal one */
ret = flb_log_event_decoder_get_record_type(&event, &record_type);
if (ret != 0) {
Expand Down Expand Up @@ -1131,6 +1301,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
/* group body: $schema_url */
set_resource_schema_url(ctx->ra_resource_schema_url, event.body, resource_log);

/* message body: promote configured keys to resource attributes */
set_resource_attributes_from_message_body(ctx, event.body, resource_log->resource);

/* prepare the scopes */
if (!resource_log->scope_logs) {
if (max_scopes_limit > 0) {
Expand Down
Loading