From 9f31ffbba0f8848188e87b732e1441f6c5c72480 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 26 Jan 2026 17:12:01 +0900 Subject: [PATCH 1/3] in_splunk: Extract x-forwarded-for as a remote_addr record Signed-off-by: Hiroshi Hatake --- plugins/in_splunk/splunk.c | 10 ++ plugins/in_splunk/splunk.h | 6 + plugins/in_splunk/splunk_config.c | 3 + plugins/in_splunk/splunk_prot.c | 278 +++++++++++++++++++++++++++--- plugins/in_splunk/splunk_prot.h | 2 + 5 files changed, 273 insertions(+), 26 deletions(-) diff --git a/plugins/in_splunk/splunk.c b/plugins/in_splunk/splunk.c index 096c9078705..13a18ce4013 100644 --- a/plugins/in_splunk/splunk.c +++ b/plugins/in_splunk/splunk.c @@ -261,6 +261,16 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_splunk, tag_key), "Set a record key to specify the tag of the record" }, + { + FLB_CONFIG_MAP_BOOL, "add_remote_addr", "false", + 0, FLB_TRUE, offsetof(struct flb_splunk, add_remote_addr), + "Inject a remote address using the X-Forwarded-For header or connection address" + }, + { + FLB_CONFIG_MAP_STR, "remote_addr_key", "remote_addr", + 0, FLB_TRUE, offsetof(struct flb_splunk, remote_addr_key), + "Set a record key for storing the remote address" + }, /* EOF */ diff --git a/plugins/in_splunk/splunk.h b/plugins/in_splunk/splunk.h index 56a45bf2d05..9140747a7fb 100644 --- a/plugins/in_splunk/splunk.h +++ b/plugins/in_splunk/splunk.h @@ -54,6 +54,8 @@ struct flb_splunk { size_t ingested_auth_header_len; int store_token_in_metadata; flb_sds_t store_token_key; + int add_remote_addr; + flb_sds_t remote_addr_key; struct flb_log_event_encoder log_encoder; @@ -71,6 +73,10 @@ struct flb_splunk { struct flb_downstream *downstream; /* Client manager */ struct mk_list connections; /* linked list of connections */ struct mk_server *server; + + /* Remote address */ + flb_sds_t current_remote_addr; + size_t current_remote_addr_len; }; diff --git a/plugins/in_splunk/splunk_config.c b/plugins/in_splunk/splunk_config.c index 7e3d058f208..014dc8aab69 100644 --- a/plugins/in_splunk/splunk_config.c +++ b/plugins/in_splunk/splunk_config.c @@ -146,6 +146,9 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins) ctx->ingested_auth_header = NULL; + ctx->current_remote_addr = NULL; + ctx->current_remote_addr_len = 0; + ret = setup_hec_tokens(ctx); if (ret != 0) { splunk_config_destroy(ctx); diff --git a/plugins/in_splunk/splunk_prot.c b/plugins/in_splunk/splunk_prot.c index 536a939f108..8541575e832 100644 --- a/plugins/in_splunk/splunk_prot.c +++ b/plugins/in_splunk/splunk_prot.c @@ -20,10 +20,14 @@ #include #include #include +#include #include #include #include #include +#include +#include +#include #include #include @@ -155,6 +159,152 @@ static int send_json_message_response(struct splunk_conn *conn, int http_status, return 0; } +/* + * We use two backends for HTTP parsing and it depends on the version of the + * protocol: + * + * http/1.x: we use Monkey HTTP parser: struct mk_http_session.parser + */ +static int http_header_lookup(int version, void *ptr, char *key, + char **val, size_t *val_len) +{ + int key_len; + + /* HTTP/1.1 */ + struct mk_list *head; + struct mk_http_session *session; + struct mk_http_request *request_11; + struct mk_http_header *header; + + /* HTTP/2.0 */ + char *value; + struct flb_http_request *request_20; + + if (!key) { + return -1; + } + + key_len = strlen(key); + if (key_len <= 0) { + return -1; + } + + if (version <= HTTP_PROTOCOL_VERSION_11) { + if (!ptr) { + return -1; + } + + request_11 = (struct mk_http_request *) ptr; + session = request_11->session; + mk_list_foreach(head, &session->parser.header_list) { + header = mk_list_entry(head, struct mk_http_header, _head); + if (header->key.len == key_len && + strncasecmp(header->key.data, key, key_len) == 0) { + *val = header->val.data; + *val_len = header->val.len; + return 0; + } + } + return -1; + } + else if (version == HTTP_PROTOCOL_VERSION_20) { + request_20 = ptr; + if (!request_20) { + return -1; + } + + value = flb_http_request_get_header(request_20, key); + if (!value) { + return -1; + } + + *val = value; + *val_len = strlen(value); + return 0; + } + + return -1; +} + +static void extract_xff_value(const char *value, size_t value_len, + const char **out_value, size_t *out_len) +{ + const char *start; + const char *end; + const char *comma; + + *out_value = NULL; + *out_len = 0; + + if (value == NULL || value_len == 0) { + return; + } + + start = value; + end = value + value_len; + + while (start < end && (*start == ' ' || *start == '\t')) { + start++; + } + + comma = memchr(start, ',', end - start); + if (comma != NULL) { + end = comma; + } + + while (end > start && (end[-1] == ' ' || end[-1] == '\t')) { + end--; + } + + if (end > start) { + *out_value = start; + *out_len = end - start; + } +} + +static int extract_remote_address(const char *xff_value, + size_t xff_value_len, + struct flb_connection *connection, + char **out, + size_t *out_len) +{ + const char *value = NULL; + size_t len = 0; + + extract_xff_value(xff_value, xff_value_len, &value, &len); + + if (value == NULL && connection != NULL) { + value = flb_connection_get_remote_address(connection); + if (value != NULL) { + len = strlen(value); + } + } + + if (value == NULL || len == 0) { + return -1; + } + + *out = value; + *out_len = len; + return 0; +} + +static int append_remote_addr(struct flb_splunk *ctx, + const char *addr, + size_t addr_len) +{ + if (ctx->add_remote_addr != FLB_TRUE || + ctx->remote_addr_key == NULL || + addr == NULL || addr_len == 0) { + return FLB_EVENT_ENCODER_SUCCESS; + } + + return flb_log_event_encoder_append_body_values( + &ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(ctx->remote_addr_key), + FLB_LOG_EVENT_STRING_VALUE(addr, addr_len)); +} + /* implements functionality to get tag from key in record */ static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map) { @@ -236,6 +386,12 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char } } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = append_remote_addr(ctx, + ctx->current_remote_addr, + ctx->current_remote_addr_len); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); } @@ -267,7 +423,10 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *record, flb_sds_t tag, flb_sds_t tag_from_record, - struct flb_time tm) { + struct flb_time tm, + const char *remote_addr, + size_t remote_addr_len) +{ int ret; int i; msgpack_object_kv *kv; @@ -280,13 +439,25 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor &tm); } - if (ctx->store_token_in_metadata == FLB_TRUE) { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - record); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + /* Always build body by appending map entries so we can extend it */ + if (record->type == MSGPACK_OBJECT_MAP) { + kv = record->via.map.ptr; + for (i = 0; i < record->via.map.size && + ret == FLB_EVENT_ENCODER_SUCCESS; i++) { + ret = flb_log_event_encoder_append_body_values( + &ctx->log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val)); + } + } + else { + ret = flb_log_event_encoder_set_body_from_msgpack_object(&ctx->log_encoder, + record); } + } + if (ctx->store_token_in_metadata == FLB_TRUE) { if (ctx->ingested_auth_header != NULL) { if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_append_metadata_values( @@ -299,15 +470,6 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor } else { if (ctx->ingested_auth_header != NULL) { - /* iterate through the old record map to create the appendable new buffer */ - kv = record->via.map.ptr; - for(i = 0; i < record->via.map.size && ret == FLB_EVENT_ENCODER_SUCCESS; i++) { - ret = flb_log_event_encoder_append_body_values( - &ctx->log_encoder, - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val)); - } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_append_body_values( &ctx->log_encoder, @@ -316,13 +478,12 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor ctx->ingested_auth_header_len)); } } - else { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - record); - } - } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = append_remote_addr(ctx, + ctx->current_remote_addr, + ctx->current_remote_addr_len); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -378,7 +539,9 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char tag_from_record = tag_key(ctx, &result.data); } - process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm); + process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm, + ctx->current_remote_addr, + ctx->current_remote_addr_len); flb_log_event_encoder_reset(&ctx->log_encoder); } @@ -393,7 +556,9 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char tag_from_record = tag_key(ctx, &record); } - process_flb_log_append(ctx, &record, tag, tag_from_record, tm); + process_flb_log_append(ctx, &record, tag, tag_from_record, tm, + ctx->current_remote_addr, + ctx->current_remote_addr_len); /* TODO : Optimize this * @@ -616,7 +781,9 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *conn, flb_sds_t tag, struct mk_http_session *session, - struct mk_http_request *request) + struct mk_http_request *request, + const char *remote_addr, + size_t remote_addr_len) { int ret = -1; struct mk_http_header *header; @@ -691,6 +858,9 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, off_t diff; flb_sds_t tag; struct mk_http_header *header; + char *hval = NULL; + size_t hlen = 0; + const char *peer; if (request->uri.data[0] != '/') { send_response(conn, 400, "error: invalid request\n"); @@ -835,13 +1005,34 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, request->data.len = out_chunked_size; } + /* Resolve per-request remote address */ + ctx->current_remote_addr = NULL; + ctx->current_remote_addr_len = 0; + + if (http_header_lookup(HTTP_PROTOCOL_VERSION_11, request, + SPLUNK_XFF_HEADER, &hval, &hlen) == 0) { + extract_remote_address(hval, hlen, conn->connection, + &ctx->current_remote_addr, + &ctx->current_remote_addr_len); + } + else { + /* fallback to peer addr */ + peer = flb_connection_get_remote_address(conn->connection); + if (peer) { + ctx->current_remote_addr = peer; + ctx->current_remote_addr_len = strlen(peer); + } + } + /* Handle every ingested payload cleanly */ flb_log_event_encoder_reset(&ctx->log_encoder); if (request->method == MK_METHOD_POST) { if (strcasecmp(uri, "/services/collector/raw/1.0") == 0 || strcasecmp(uri, "/services/collector/raw") == 0) { - ret = process_hec_raw_payload(ctx, conn, tag, session, request); + ret = process_hec_raw_payload(ctx, conn, tag, session, request, + ctx->current_remote_addr, + ctx->current_remote_addr_len); if (ret == -2) { /* Response already sent, skip further response */ @@ -927,6 +1118,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, request->data.data = original_data; request->data.len = original_data_size; + /* Clear per-request remote address to avoid leakage across keep-alive/pipeline */ + ctx->current_remote_addr = NULL; + ctx->current_remote_addr_len = 0; + return ret; } @@ -1138,6 +1333,10 @@ int splunk_prot_handle_ng(struct flb_http_request *request, struct flb_splunk *context; int ret = -1; flb_sds_t tag; + struct flb_http_server_session *parent_session; + char *hval = NULL; + size_t hlen = 0; + const char *peer; context = (struct flb_splunk *) response->stream->user_data; @@ -1185,6 +1384,28 @@ int splunk_prot_handle_ng(struct flb_http_request *request, /* Handle every ingested payload cleanly */ flb_log_event_encoder_reset(&context->log_encoder); + /* Resolve per-request remote address */ + context->current_remote_addr = NULL; + context->current_remote_addr_len = 0; + + parent_session = (struct flb_http_server_session *) request->stream->parent; + if (parent_session != NULL) { + if (http_header_lookup(HTTP_PROTOCOL_VERSION_20, request, + SPLUNK_XFF_HEADER, &hval, &hlen) == 0) { + extract_remote_address(hval, hlen, parent_session->connection, + &context->current_remote_addr, + &context->current_remote_addr_len); + } + else { + /* fallback to peer addr */ + peer = flb_connection_get_remote_address(parent_session->connection); + if (peer) { + context->current_remote_addr = peer; + context->current_remote_addr_len = strlen(peer); + } + } + } + if (request->method != HTTP_METHOD_POST) { /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/ send_response_ng(response, 400, "error: invalid HTTP method\n"); @@ -1239,5 +1460,10 @@ int splunk_prot_handle_ng(struct flb_http_request *request, } flb_sds_destroy(tag); + + /* Clear per-request remote address to avoid leakage across keep-alive/pipeline */ + context->current_remote_addr = NULL; + context->current_remote_addr_len = 0; + return ret; } diff --git a/plugins/in_splunk/splunk_prot.h b/plugins/in_splunk/splunk_prot.h index f979d4755a8..5c965155686 100644 --- a/plugins/in_splunk/splunk_prot.h +++ b/plugins/in_splunk/splunk_prot.h @@ -25,6 +25,8 @@ #define SPLUNK_AUTH_MISSING_CRED -1 #define SPLUNK_AUTH_UNAUTHORIZED -2 +#define SPLUNK_XFF_HEADER "x-forwarded-for" + #include int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, From c6ca81e69e2912909d3aa00d7d15bb0d6aa13689 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 26 Jan 2026 17:33:04 +0900 Subject: [PATCH 2/3] tests: in_splunk: Add a remote_addr extraction case Signed-off-by: Hiroshi Hatake --- tests/runtime/in_splunk.c | 88 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/tests/runtime/in_splunk.c b/tests/runtime/in_splunk.c index 666866442c7..8221b9857e3 100644 --- a/tests/runtime/in_splunk.c +++ b/tests/runtime/in_splunk.c @@ -27,6 +27,7 @@ #include #include #include "flb_tests_runtime.h" +#include "../../plugins/in_splunk/splunk_prot.h" #define JSON_CONTENT_TYPE "application/json" @@ -922,6 +923,92 @@ void flb_test_splunk_collector_event_1_0() flb_test_splunk(8819, "/services/collector/event/1.0"); } +void flb_test_splunk_xff_extract() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + char *buf = "{\"event\": \"Pony 1 has left the barn\"}"; + char *expected = "\"xff\":\"203.0.113.1\""; + char *xff_value = " 203.0.113.1, 70.41.3.18, 150.172.238.178"; + char sport[16]; + int port = 8820; + + snprintf(sport, 16, "%d", port); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + "add_remote_addr", "true", + "remote_addr_key", "xff", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = splunk_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/services/collector/event", + buf, strlen(buf), "127.0.0.1", port, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, + strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + ret = flb_http_add_header(c, SPLUNK_XFF_HEADER, + strlen(SPLUNK_XFF_HEADER), + xff_value, strlen(xff_value)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("splunk_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + TEST_LIST = { {"health", flb_test_splunk_health}, {"collector", flb_test_splunk_collector}, @@ -936,5 +1023,6 @@ TEST_LIST = { {"tag_key", flb_test_splunk_tag_key}, {"collector_event_with_auth_key", flb_test_splunk_collector_event_hec_token_key}, {"collector_raw_with_auth_key", flb_test_splunk_collector_raw_hec_token_key}, + {"collector_xff_extract", flb_test_splunk_xff_extract}, {NULL, NULL} }; From f01436783c7f591980cafa1d230451f26b315381 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 17 Feb 2026 16:20:20 +0900 Subject: [PATCH 3/3] in_splunk: Plug memory issues Signed-off-by: Hiroshi Hatake --- plugins/in_splunk/splunk.h | 4 - plugins/in_splunk/splunk_config.c | 3 - plugins/in_splunk/splunk_prot.c | 134 ++++++++++++++++-------------- 3 files changed, 73 insertions(+), 68 deletions(-) diff --git a/plugins/in_splunk/splunk.h b/plugins/in_splunk/splunk.h index 9140747a7fb..93db1f444bf 100644 --- a/plugins/in_splunk/splunk.h +++ b/plugins/in_splunk/splunk.h @@ -73,10 +73,6 @@ struct flb_splunk { struct flb_downstream *downstream; /* Client manager */ struct mk_list connections; /* linked list of connections */ struct mk_server *server; - - /* Remote address */ - flb_sds_t current_remote_addr; - size_t current_remote_addr_len; }; diff --git a/plugins/in_splunk/splunk_config.c b/plugins/in_splunk/splunk_config.c index 014dc8aab69..7e3d058f208 100644 --- a/plugins/in_splunk/splunk_config.c +++ b/plugins/in_splunk/splunk_config.c @@ -146,9 +146,6 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins) ctx->ingested_auth_header = NULL; - ctx->current_remote_addr = NULL; - ctx->current_remote_addr_len = 0; - ret = setup_hec_tokens(ctx); if (ret != 0) { splunk_config_destroy(ctx); diff --git a/plugins/in_splunk/splunk_prot.c b/plugins/in_splunk/splunk_prot.c index 8541575e832..0d81ef26304 100644 --- a/plugins/in_splunk/splunk_prot.c +++ b/plugins/in_splunk/splunk_prot.c @@ -341,7 +341,10 @@ static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map) * Process a raw text payload for Splunk HEC requests, uses the delimited character to split records, * return the number of processed bytes */ -static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size) +static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, + char *buf, size_t size, + const char *remote_addr, + size_t remote_addr_len) { int ret = FLB_EVENT_ENCODER_SUCCESS; @@ -388,8 +391,8 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = append_remote_addr(ctx, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + remote_addr, + remote_addr_len); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -481,9 +484,7 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = append_remote_addr(ctx, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + ret = append_remote_addr(ctx, remote_addr, remote_addr_len); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -519,7 +520,10 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor } } -static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size) +static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, + char *buf, size_t size, + const char *remote_addr, + size_t remote_addr_len) { size_t off = 0; msgpack_unpacked result; @@ -540,8 +544,8 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char } process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + remote_addr, + remote_addr_len); flb_log_event_encoder_reset(&ctx->log_encoder); } @@ -557,8 +561,8 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char } process_flb_log_append(ctx, &record, tag, tag_from_record, tm, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + remote_addr, + remote_addr_len); /* TODO : Optimize this * @@ -588,7 +592,9 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char } static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag, - char *payload, size_t size) + char *payload, size_t size, + const char *remote_addr, + size_t remote_addr_len) { int ret; int out_size; @@ -617,7 +623,8 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag, } /* Process the packaged JSON and return the last byte used */ - process_json_payload_pack(ctx, tag, pack, out_size); + process_json_payload_pack(ctx, tag, pack, out_size, + remote_addr, remote_addr_len); flb_free(pack); return 0; @@ -675,22 +682,28 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request * } static int handle_hec_payload(struct flb_splunk *ctx, int content_type, - flb_sds_t tag, char *buf, size_t size) + flb_sds_t tag, char *buf, size_t size, + const char *remote_addr, + size_t remote_addr_len) { int ret = -1; if (content_type == HTTP_CONTENT_JSON) { - ret = parse_hec_payload_json(ctx, tag, buf, size); + ret = parse_hec_payload_json(ctx, tag, buf, size, + remote_addr, remote_addr_len); } else if (content_type == HTTP_CONTENT_TEXT) { - ret = process_raw_payload_pack(ctx, tag, buf, size); + ret = process_raw_payload_pack(ctx, tag, buf, size, + remote_addr, remote_addr_len); } else if (content_type == HTTP_CONTENT_UNKNOWN) { if (buf[0] == '{') { - ret = parse_hec_payload_json(ctx, tag, buf, size); + ret = parse_hec_payload_json(ctx, tag, buf, size, + remote_addr, remote_addr_len); } else { - ret = process_raw_payload_pack(ctx, tag, buf, size); + ret = process_raw_payload_pack(ctx, tag, buf, size, + remote_addr, remote_addr_len); } } @@ -700,7 +713,9 @@ static int handle_hec_payload(struct flb_splunk *ctx, int content_type, static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, flb_sds_t tag, struct mk_http_session *session, - struct mk_http_request *request) + struct mk_http_request *request, + const char *remote_addr, + size_t remote_addr_len) { int i = 0; int ret = 0; @@ -768,11 +783,13 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, return -1; } - ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size); + ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size, + remote_addr, remote_addr_len); flb_free(gz_data); } else { - ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len); + ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len, + remote_addr, remote_addr_len); } return ret; @@ -814,7 +831,8 @@ static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *c } /* Always handle as raw type of payloads here */ - ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len); + ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len, + remote_addr, remote_addr_len); return ret; } @@ -861,6 +879,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, char *hval = NULL; size_t hlen = 0; const char *peer; + const char *remote_addr = NULL; + size_t remote_addr_len = 0; if (request->uri.data[0] != '/') { send_response(conn, 400, "error: invalid request\n"); @@ -1005,22 +1025,17 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, request->data.len = out_chunked_size; } - /* Resolve per-request remote address */ - ctx->current_remote_addr = NULL; - ctx->current_remote_addr_len = 0; - if (http_header_lookup(HTTP_PROTOCOL_VERSION_11, request, SPLUNK_XFF_HEADER, &hval, &hlen) == 0) { extract_remote_address(hval, hlen, conn->connection, - &ctx->current_remote_addr, - &ctx->current_remote_addr_len); + (char **) &remote_addr, + &remote_addr_len); } - else { - /* fallback to peer addr */ + if (remote_addr == NULL || remote_addr_len == 0) { peer = flb_connection_get_remote_address(conn->connection); - if (peer) { - ctx->current_remote_addr = peer; - ctx->current_remote_addr_len = strlen(peer); + if (peer != NULL) { + remote_addr = peer; + remote_addr_len = strlen(peer); } } @@ -1031,8 +1046,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, if (strcasecmp(uri, "/services/collector/raw/1.0") == 0 || strcasecmp(uri, "/services/collector/raw") == 0) { ret = process_hec_raw_payload(ctx, conn, tag, session, request, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + remote_addr, remote_addr_len); if (ret == -2) { /* Response already sent, skip further response */ @@ -1057,7 +1071,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, strcasecmp(uri, "/services/collector/event") == 0 || strcasecmp(uri, "/services/collector") == 0) { - ret = process_hec_payload(ctx, conn, tag, session, request); + ret = process_hec_payload(ctx, conn, tag, session, request, + remote_addr, remote_addr_len); if (ret == -2) { flb_sds_destroy(tag); mk_mem_free(uri); @@ -1118,10 +1133,6 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, request->data.data = original_data; request->data.len = original_data_size; - /* Clear per-request remote address to avoid leakage across keep-alive/pipeline */ - ctx->current_remote_addr = NULL; - ctx->current_remote_addr_len = 0; - return ret; } @@ -1251,7 +1262,9 @@ static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_reque static int process_hec_payload_ng(struct flb_http_request *request, struct flb_http_response *response, flb_sds_t tag, - struct flb_splunk *ctx) + struct flb_splunk *ctx, + const char *remote_addr, + size_t remote_addr_len) { int type = -1; int ret = 0; @@ -1287,13 +1300,16 @@ static int process_hec_payload_ng(struct flb_http_request *request, return -2; } - return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body)); + return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body), + remote_addr, remote_addr_len); } static int process_hec_raw_payload_ng(struct flb_http_request *request, struct flb_http_response *response, flb_sds_t tag, - struct flb_splunk *ctx) + struct flb_splunk *ctx, + const char *remote_addr, + size_t remote_addr_len) { int ret = 0; size_t size = 0; @@ -1324,7 +1340,8 @@ static int process_hec_raw_payload_ng(struct flb_http_request *request, } /* Always handle as raw type of payloads here */ - return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body)); + return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body), + remote_addr, remote_addr_len); } int splunk_prot_handle_ng(struct flb_http_request *request, @@ -1337,6 +1354,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request, char *hval = NULL; size_t hlen = 0; const char *peer; + const char *remote_addr = NULL; + size_t remote_addr_len = 0; context = (struct flb_splunk *) response->stream->user_data; @@ -1384,24 +1403,19 @@ int splunk_prot_handle_ng(struct flb_http_request *request, /* Handle every ingested payload cleanly */ flb_log_event_encoder_reset(&context->log_encoder); - /* Resolve per-request remote address */ - context->current_remote_addr = NULL; - context->current_remote_addr_len = 0; - parent_session = (struct flb_http_server_session *) request->stream->parent; if (parent_session != NULL) { if (http_header_lookup(HTTP_PROTOCOL_VERSION_20, request, SPLUNK_XFF_HEADER, &hval, &hlen) == 0) { extract_remote_address(hval, hlen, parent_session->connection, - &context->current_remote_addr, - &context->current_remote_addr_len); + (char **) &remote_addr, + &remote_addr_len); } - else { - /* fallback to peer addr */ + if (remote_addr == NULL || remote_addr_len == 0) { peer = flb_connection_get_remote_address(parent_session->connection); - if (peer) { - context->current_remote_addr = peer; - context->current_remote_addr_len = strlen(peer); + if (peer != NULL) { + remote_addr = peer; + remote_addr_len = strlen(peer); } } } @@ -1421,7 +1435,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request, if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 || strcasecmp(request->path, "/services/collector/raw") == 0) { - ret = process_hec_raw_payload_ng(request, response, tag, context); + ret = process_hec_raw_payload_ng(request, response, tag, context, + remote_addr, remote_addr_len); if (ret == -2) { /* Response already sent, skip further response */ flb_sds_destroy(tag); @@ -1439,7 +1454,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request, else if (strcasecmp(request->path, "/services/collector/event/1.0") == 0 || strcasecmp(request->path, "/services/collector/event") == 0 || strcasecmp(request->path, "/services/collector") == 0) { - ret = process_hec_payload_ng(request, response, tag, context); + ret = process_hec_payload_ng(request, response, tag, context, + remote_addr, remote_addr_len); if (ret == -2) { /* Response already sent, skip further response */ flb_sds_destroy(tag); @@ -1461,9 +1477,5 @@ int splunk_prot_handle_ng(struct flb_http_request *request, flb_sds_destroy(tag); - /* Clear per-request remote address to avoid leakage across keep-alive/pipeline */ - context->current_remote_addr = NULL; - context->current_remote_addr_len = 0; - return ret; }