-
Notifications
You must be signed in to change notification settings - Fork 1.9k
out_opentelemetry: Add resource attributes option #11510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ec1db67
07e1ed1
d265e3b
272c9d5
3af1efd
75af5bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| #include <fluent-bit/flb_log_event_decoder.h> | ||
| #include <fluent-bit/flb_ra_key.h> | ||
| #include <fluent-bit/flb_gzip.h> | ||
| #include <fluent-bit/flb_slist.h> | ||
|
|
||
| #include <fluent-otel-proto/fluent-otel.h> | ||
|
|
||
|
|
@@ -780,6 +781,163 @@ static int logs_flush_to_otel(struct opentelemetry_context *ctx, struct flb_even | |
| return ret; | ||
| } | ||
|
|
||
| /* | ||
| * For each key name in ctx->ra_resource_attributes_message_list, look it up in | ||
| * the msgpack message body and promote the value to an OTLP resource attribute. | ||
| */ | ||
| static void set_resource_attributes_from_message_body( | ||
| struct opentelemetry_context *ctx, | ||
| msgpack_object *body, | ||
| Opentelemetry__Proto__Resource__V1__Resource *resource) | ||
| { | ||
| int i; | ||
| size_t key_len; | ||
| size_t map_key_len; | ||
| char *map_key_ptr; | ||
| struct mk_list *head; | ||
| struct flb_config_map_val *mv; | ||
| struct flb_slist_entry *entry; | ||
| const char *normalized_key; | ||
| msgpack_object_kv *kv; | ||
| Opentelemetry__Proto__Common__V1__KeyValue *attr; | ||
| Opentelemetry__Proto__Common__V1__KeyValue **tmp_attrs; | ||
|
|
||
| /* | ||
| * Use ctx->ra_resource_attributes_message directly — this is the pointer | ||
| * managed by the Fluent Bit config-map framework and is reliably available | ||
| * in every worker thread context, unlike an embedded mk_list copy. | ||
| */ | ||
| if (!ctx->ra_resource_attributes_message || | ||
| mk_list_size(ctx->ra_resource_attributes_message) == 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (body == NULL || body->type != MSGPACK_OBJECT_MAP) { | ||
| return; | ||
| } | ||
|
|
||
| /* Iterate directly over the config-map-managed list */ | ||
| flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) { | ||
| if (mk_list_size(mv->val.list) != 1) { | ||
| continue; | ||
| } | ||
|
|
||
| entry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); | ||
| normalized_key = entry->str; | ||
| key_len = flb_sds_len(entry->str); | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* | ||
| * Allow optional record accessor prefix so both "service.name" and | ||
| * "$service.name" are treated as the same map key. | ||
| */ | ||
| if (key_len > 0 && normalized_key[0] == '$') { | ||
| normalized_key++; | ||
| key_len--; | ||
| } | ||
|
|
||
| /* | ||
| * Also tolerate bracket forms like $['service.name'] and | ||
| * $["service.name"] for literal keys. | ||
| */ | ||
| if (key_len >= 4 && normalized_key[0] == '[') { | ||
| if ((normalized_key[1] == '\'' && normalized_key[key_len - 2] == '\'' && | ||
| normalized_key[key_len - 1] == ']') || | ||
| (normalized_key[1] == '"' && normalized_key[key_len - 2] == '"' && | ||
| normalized_key[key_len - 1] == ']')) { | ||
| normalized_key += 2; | ||
| key_len -= 4; | ||
| } | ||
| } | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* tolerate quoted key names like "service.name" or 'service.name' */ | ||
| if (key_len >= 2) { | ||
| if ((normalized_key[0] == '"' && normalized_key[key_len - 1] == '"') || | ||
| (normalized_key[0] == '\'' && normalized_key[key_len - 1] == '\'')) { | ||
| normalized_key++; | ||
| key_len -= 2; | ||
| } | ||
| } | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| for (i = 0; i < body->via.map.size; i++) { | ||
| kv = &body->via.map.ptr[i]; | ||
|
|
||
| if (kv->key.type == MSGPACK_OBJECT_STR) { | ||
| map_key_ptr = kv->key.via.str.ptr; | ||
| map_key_len = kv->key.via.str.size; | ||
| } | ||
| else if (kv->key.type == MSGPACK_OBJECT_BIN) { | ||
| map_key_ptr = (char *) kv->key.via.bin.ptr; | ||
| map_key_len = kv->key.via.bin.size; | ||
| } | ||
| else { | ||
| continue; | ||
| } | ||
|
|
||
| if (map_key_len != key_len) { | ||
| continue; | ||
| } | ||
|
|
||
| if (strncmp(map_key_ptr, normalized_key, key_len) != 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* Found the key — convert to OTLP KeyValue */ | ||
| if (kv->key.type == MSGPACK_OBJECT_STR) { | ||
| attr = msgpack_kv_to_otlp_any_value(kv); | ||
| } | ||
| else { | ||
| attr = otlp_kvpair_value_initialize(); | ||
| if (attr != NULL) { | ||
| attr->key = flb_strndup(map_key_ptr, map_key_len); | ||
|
|
||
| if (attr->key != NULL) { | ||
| attr->value = msgpack_object_to_otlp_any_value(&kv->val); | ||
| } | ||
|
|
||
| if (attr->key == NULL || attr->value == NULL) { | ||
| otlp_kvpair_destroy(attr); | ||
| attr = NULL; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (!attr) { | ||
| flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue", | ||
| entry->str); | ||
| break; | ||
| } | ||
|
|
||
| /* Grow the resource attributes array by one slot */ | ||
| tmp_attrs = flb_realloc(resource->attributes, | ||
| (resource->n_attributes + 1) * | ||
| sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); | ||
| if (!tmp_attrs) { | ||
| flb_plg_error(ctx->ins, "resource attributes: memory allocation failed for key '%s'", | ||
| entry->str); | ||
| otlp_kvpair_destroy(attr); | ||
| break; | ||
| } | ||
|
|
||
| resource->attributes = tmp_attrs; | ||
| resource->attributes[resource->n_attributes] = attr; | ||
| resource->n_attributes++; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| static int set_resource_attributes(struct flb_record_accessor *ra, | ||
| msgpack_object *map, | ||
| Opentelemetry__Proto__Resource__V1__Resource *resource) | ||
|
|
@@ -1021,6 +1179,18 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, | |
|
|
||
| ret = FLB_OK; | ||
| while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) { | ||
| /* | ||
| * For standalone records (non-native OTLP groups), resource attributes | ||
| * promoted from message keys are record-specific. Force a fresh | ||
| * resource/scope context per record when this feature is enabled to | ||
| * avoid carrying stale values across subsequent log lines. | ||
| */ | ||
| if (native_otel == FLB_FALSE && | ||
| ctx->ra_resource_attributes_message && | ||
| mk_list_size(ctx->ra_resource_attributes_message) > 0) { | ||
| resource_id = -1; | ||
| scope_id = -1; | ||
| } | ||
|
Comment on lines
+1188
to
+1193
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid forcing a new resource/scope for every standalone record. At Line 1188, reset is unconditional once Suggested direction- if (native_otel == FLB_FALSE &&
- ctx->ra_resource_attributes_message &&
- mk_list_size(ctx->ra_resource_attributes_message) > 0) {
+ if (native_otel == FLB_FALSE &&
+ ctx->ra_resource_attributes_message &&
+ mk_list_size(ctx->ra_resource_attributes_message) > 0 &&
+ has_promotable_resource_key(ctx, event.body) == FLB_TRUE) {
resource_id = -1;
scope_id = -1;
}Add a lightweight 🤖 Prompt for AI Agents |
||
| /* Check if the record is special (group) or a normal one */ | ||
| ret = flb_log_event_decoder_get_record_type(&event, &record_type); | ||
| if (ret != 0) { | ||
|
|
@@ -1131,6 +1301,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, | |
| /* group body: $schema_url */ | ||
| set_resource_schema_url(ctx->ra_resource_schema_url, event.body, resource_log); | ||
|
|
||
| /* message body: promote configured keys to resource attributes */ | ||
| set_resource_attributes_from_message_body(ctx, event.body, resource_log->resource); | ||
cb645j marked this conversation as resolved.
Show resolved
Hide resolved
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /* prepare the scopes */ | ||
| if (!resource_log->scope_logs) { | ||
| if (max_scopes_limit > 0) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent duplicate resource-attribute keys during promotion.
The helper appends promoted keys unconditionally. If a key already exists in
resource->attributes(e.g., from$resource['attributes']), you can emit duplicate keys for the same resource.💡 Suggested fix (skip already-present keys before append)
static void set_resource_attributes_from_message_body( struct opentelemetry_context *ctx, msgpack_object *body, Opentelemetry__Proto__Resource__V1__Resource *resource) { int i; + size_t j; + int duplicate; size_t key_len; @@ if (strncmp(kv->key.via.str.ptr, entry->str, key_len) != 0) { continue; } + duplicate = FLB_FALSE; + for (j = 0; j < resource->n_attributes; j++) { + if (resource->attributes[j] != NULL && + resource->attributes[j]->key != NULL && + strcmp(resource->attributes[j]->key, entry->str) == 0) { + duplicate = FLB_TRUE; + break; + } + } + + if (duplicate == FLB_TRUE) { + break; + } + /* Found the key — convert to OTLP KeyValue */ attr = msgpack_kv_to_otlp_any_value(kv); if (!attr) { flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue", entry->str);🤖 Prompt for AI Agents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cb645j, sounds good! The suggested diff should cover it — scanresource->attributes[0..n_attributes-1]and compare each.keyagainstattr->key(viastrcmp) before callingflb_realloc. If a match is found, callotlp_kvpair_destroy(attr)andbreakto skip the append. Let me know if you run into anything while implementing it.