From a99abc904f00c3448ea978e0fdfc826405dfcd5c Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Mon, 28 Apr 2025 10:59:07 +0530 Subject: [PATCH 1/3] out_azure_kusto: added workload identity Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 27 +++- plugins/out_azure_kusto/azure_kusto.h | 12 ++ plugins/out_azure_kusto/azure_kusto_conf.c | 89 ++++++++--- plugins/out_azure_kusto/azure_msiauth.c | 170 +++++++++++++++++++++ plugins/out_azure_kusto/azure_msiauth.h | 1 + 5 files changed, 271 insertions(+), 28 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 091f45e3d99..eabd566db70 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -56,7 +56,17 @@ static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx) static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx) { int ret; - char *token; + char *token = NULL; + + /* If using workload identity, handle token exchange */ + if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { + ret = flb_azure_workload_identity_token_get(ctx->o, ctx->workload_identity_token_file, ctx->client_id, ctx->tenant_id); + if (ret == -1) { + flb_plg_error(ctx->ins, "error retrieving workload identity token"); + return -1; + } + return 0; + } /* Clear any previous oauth2 payload content */ flb_oauth2_payload_clear(ctx->o); @@ -107,7 +117,10 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) } if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { - if (ctx->managed_identity_client_id != NULL) { + if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { + ret = azure_kusto_get_oauth2_token(ctx); + } + else if (ctx->managed_identity_client_id != NULL) { ret = azure_kusto_get_msi_token(ctx); } else { @@ -205,7 +218,7 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs ctx->ins, "Kusto ingestion command request http_do=%i, HTTP Status: %i", ret, c->resp.status); - flb_plg_debug(ctx->ins, "Kusto ingestion command HTTP request payload: %.*s", (int)c->resp.payload_size, c->resp.payload); + flb_plg_debug(ctx->ins, "Kusto ingestion command HTTP response payload: %.*s", (int)c->resp.payload_size, c->resp.payload); if (ret == 0) { if (c->resp.status == 200) { @@ -1571,6 +1584,14 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, io_timeout), "HTTP IO timeout. Default is 60s" }, + {FLB_CONFIG_MAP_STR, "auth_type", "service_principal", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, auth_type_str), + "Authentication type: 'service_principal', 'managed_identity', or 'workload_identity'" + }, + {FLB_CONFIG_MAP_STR, "workload_identity_token_file", NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, workload_identity_token_file), + "Path to the workload identity token file (default: /var/run/secrets/azure/tokens/azure-identity-token)" + }, /* EOF */ {0}}; diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index aa0c62c918b..4a2d2e5c51f 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -35,6 +35,11 @@ /* refresh token every 50 minutes */ #define FLB_AZURE_KUSTO_TOKEN_REFRESH 3000 +/* Authentication types */ +#define FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL 0 +#define FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY 1 +#define FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY 2 + /* Kusto streaming inserts oauth scope */ #define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default" @@ -92,6 +97,11 @@ struct flb_azure_kusto { int ingestion_endpoint_connect_timeout; int io_timeout; + /* Authentication */ + int auth_type; + char *auth_type_str; + char *workload_identity_token_file; + /* compress payload */ int compression_enabled; @@ -165,5 +175,7 @@ struct flb_azure_kusto { flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx); flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl); +/* Add function prototype for workload identity token exchange */ +int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, const char *client_id, const char *tenant_id); #endif \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 053d729623e..49cdcfdf3de 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -723,8 +723,9 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } - if (ctx->tenant_id == NULL && ctx->client_id == NULL && ctx->client_secret == NULL && ctx->managed_identity_client_id == NULL) { - flb_plg_error(ctx->ins, "Service Principal or Managed Identity is not defined"); + if (ctx->tenant_id == NULL && ctx->client_id == NULL && ctx->client_secret == NULL && + ctx->managed_identity_client_id == NULL && ctx->auth_type != FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { + flb_plg_error(ctx->ins, "Service Principal, Managed Identity, or Workload Identity is not defined"); flb_azure_kusto_conf_destroy(ctx); return NULL; } @@ -750,6 +751,65 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } + /* Set auth type */ + if (ctx->auth_type_str) { + if (strcasecmp(ctx->auth_type_str, "service_principal") == 0) { + ctx->auth_type = FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL; + } + else if (strcasecmp(ctx->auth_type_str, "managed_identity") == 0) { + ctx->auth_type = FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY; + } + else if (strcasecmp(ctx->auth_type_str, "workload_identity") == 0) { + ctx->auth_type = FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY; + } + else { + flb_plg_error(ins, "invalid auth_type '%s'", ctx->auth_type_str); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + } + + /* Validate configuration based on auth type */ + if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL) { + if (!ctx->tenant_id) { + flb_plg_error(ins, "tenant_id is required for service_principal auth_type"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + if (!ctx->client_id) { + flb_plg_error(ins, "client_id is required for service_principal auth_type"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + if (!ctx->client_secret) { + flb_plg_error(ins, "client_secret is required for service_principal auth_type"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + } + else if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { + if (!ctx->tenant_id) { + flb_plg_error(ins, "tenant_id is required for workload_identity auth_type"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + if (!ctx->client_id) { + flb_plg_error(ins, "client_id is required for workload_identity auth_type"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + } + else if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY) { + if (!ctx->managed_identity_client_id) { + flb_plg_error(ins, "managed_identity_client_id is required for managed_identity auth_type"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + } + + /* Create oauth2 context */ + if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY) { + /* MSI auth */ if (ctx->managed_identity_client_id != NULL) { /* system assigned managed identity */ if (strcasecmp(ctx->managed_identity_client_id, "system") == 0) { @@ -781,29 +841,8 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->managed_identity_client_id); } } - else { - /* config: 'tenant_id' */ - if (ctx->tenant_id == NULL) { - flb_plg_error(ctx->ins, "property 'tenant_id' is not defined."); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'client_id' */ - if (ctx->client_id == NULL) { - flb_plg_error(ctx->ins, "property 'client_id' is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'client_secret' */ - if (ctx->client_secret == NULL) { - flb_plg_error(ctx->ins, "property 'client_secret' is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* Create the auth URL */ + }else{ + /* Standard OAuth2 for service principal */ ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 + flb_sds_len(ctx->tenant_id)); if (!ctx->oauth_url) { diff --git a/plugins/out_azure_kusto/azure_msiauth.c b/plugins/out_azure_kusto/azure_msiauth.c index 934e0d6f686..d3121346b33 100644 --- a/plugins/out_azure_kusto/azure_msiauth.c +++ b/plugins/out_azure_kusto/azure_msiauth.c @@ -102,3 +102,173 @@ char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx) return NULL; } + +/** Read token from file */ +static flb_sds_t read_token_from_file(const char *token_file) +{ + FILE *fp; + flb_sds_t token = NULL; + char buf[4096]; /* Assuming token won't be larger than 4KB */ + size_t bytes_read; + + if (!token_file) { + flb_error("[azure workload identity] token file path is NULL"); + return NULL; + } + + fp = fopen(token_file, "r"); + if (!fp) { + flb_error("[azure workload identity] could not open token file: %s", token_file); + return NULL; + } + + bytes_read = fread(buf, 1, sizeof(buf) - 1, fp); + fclose(fp); + + if (bytes_read <= 0) { + flb_error("[azure workload identity] could not read token from file: %s", token_file); + return NULL; + } + + buf[bytes_read] = '\0'; + token = flb_sds_create(buf); + + return token; +} + +int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, const char *client_id, const char *tenant_id) +{ + int ret; + size_t b_sent; + struct flb_connection *u_conn; + struct flb_http_client *c; + flb_sds_t federated_token; + flb_sds_t body = NULL; + + flb_info("[azure workload identity] inside flb_azure_workload_identity_token_get"); + + /* Default token file location if not specified */ + if (!token_file) { + token_file = "/var/run/secrets/azure/tokens/azure-identity-token"; + } + + /* Read the federated token from file */ + federated_token = read_token_from_file(token_file); + if (!federated_token) { + flb_error("[azure workload identity] failed to read federated token"); + return -1; + } + + flb_info("[azure workload identity] after read token from file %s", federated_token); + + /* Build the form data for token exchange *before* creating the client */ + body = flb_sds_create_size(4096); + if (!body) { + flb_error("[azure workload identity] failed to allocate memory for request body"); + flb_sds_destroy(federated_token); + return -1; + } + + body = flb_sds_cat(body, "client_id=", 10); + body = flb_sds_cat(body, client_id, strlen(client_id)); + /* Use the correct grant_type and length for workload identity */ + body = flb_sds_cat(body, "&grant_type=client_credentials", 30); + body = flb_sds_cat(body, "&client_assertion_type=urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 77); + body = flb_sds_cat(body, "&client_assertion=", 18); + body = flb_sds_cat(body, federated_token, flb_sds_len(federated_token)); + /* Use the correct scope and length for Kusto */ + body = flb_sds_cat(body, "&scope=https://help.kusto.windows.net/.default", 46); + + if (!body) { + /* This check might be redundant if flb_sds_cat handles errors, but safe */ + flb_error("[azure workload identity] failed to build request body"); + flb_sds_destroy(federated_token); + return -1; + } + + /* Get upstream connection to Azure AD token endpoint */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_error("[azure workload identity] could not get an upstream connection"); + flb_sds_destroy(federated_token); + flb_sds_destroy(body); /* Clean up allocated body */ + return -1; + } + + /* Create HTTP client context, passing the body directly */ + c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + body, flb_sds_len(body), /* Pass body buffer and length here */ + ctx->host, atoi(ctx->port), NULL, 0); + if (!c) { + flb_error("[azure workload identity] error creating HTTP client context"); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(federated_token); + flb_sds_destroy(body); /* Clean up allocated body */ + return -1; + } + + /* Prepare token exchange request headers */ + flb_http_add_header(c, "Content-Type", 12, "application/x-www-form-urlencoded", 33); + + /* Remove the direct assignment as body is passed during creation */ + /* c->body_buf = body; */ + /* c->body_len = flb_sds_len(body); */ + + /* Add a debug log to verify the body content just before sending */ + flb_debug("[azure workload identity] Sending request body (len=%zu): %s", flb_sds_len(body), body); + + /* Issue request */ + ret = flb_http_do(c, &b_sent); + + /* Clean up the body sds now that the request is done or client creation failed */ + flb_sds_destroy(body); + body = NULL; + + if (ret != 0) { + flb_warn("[azure workload identity] error in HTTP request, http_do=%i", ret); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(federated_token); + /* body already destroyed */ + return -1; + } + + flb_debug("[azure workload identity] HTTP Status=%i", c->resp.status); + if (c->resp.payload_size > 0) { + if (c->resp.status == 200) { + flb_debug("[azure workload identity] token exchange successful"); + } + else { + flb_warn("[azure workload identity] token exchange failed: %s", c->resp.payload); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(federated_token); + /* body already destroyed */ + return -1; + } + } + + /* Parse the response and extract the token */ + if (c->resp.payload_size > 0 && c->resp.status == 200) { + ret = flb_oauth2_parse_json_response(c->resp.payload, + c->resp.payload_size, ctx); + if (ret == 0) { + flb_info("[azure workload identity] access token retrieved successfully"); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(federated_token); + /* body already destroyed */ + ctx->issued = time(NULL); + ctx->expires = ctx->issued + ctx->expires_in; + return 0; + } + } + + flb_error("[azure workload identity] failed to parse token response"); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + flb_sds_destroy(federated_token); + /* body already destroyed */ + + return -1; +} diff --git a/plugins/out_azure_kusto/azure_msiauth.h b/plugins/out_azure_kusto/azure_msiauth.h index 64c4ca8d8b3..c21efa16e0b 100644 --- a/plugins/out_azure_kusto/azure_msiauth.h +++ b/plugins/out_azure_kusto/azure_msiauth.h @@ -24,4 +24,5 @@ "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2021-02-01%s%s&resource=https://api.kusto.windows.net" char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx); +int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, const char *client_id, const char *tenant_id); From de2fda1c3a91b8aca80dbd6a9aa61df5731a2c2d Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Fri, 2 May 2025 06:49:27 +0530 Subject: [PATCH 2/3] out_azure_kusto: modified workload identity Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 35 +++++++++-------- plugins/out_azure_kusto/azure_kusto.h | 10 ++--- plugins/out_azure_kusto/azure_kusto_conf.c | 44 +++++++++++++++++++++- 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index eabd566db70..9f641576750 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -52,22 +52,27 @@ static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx) return 0; } -/* Create a new oauth2 context and get a oauth2 token */ -static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx) +static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx) { int ret; - char *token = NULL; - - /* If using workload identity, handle token exchange */ - if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { - ret = flb_azure_workload_identity_token_get(ctx->o, ctx->workload_identity_token_file, ctx->client_id, ctx->tenant_id); - if (ret == -1) { - flb_plg_error(ctx->ins, "error retrieving workload identity token"); - return -1; - } - return 0; + + ret = flb_azure_workload_identity_token_get(ctx->o, + ctx->workload_identity_token_file, + ctx->client_id, + ctx->tenant_id); + if (ret == -1) { + flb_plg_error(ctx->ins, "error retrieving workload identity token"); + return -1; } + + flb_plg_debug(ctx->ins, "Workload identity token retrieved successfully"); + return 0; +} +static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx) +{ + int ret; + /* Clear any previous oauth2 payload content */ flb_oauth2_payload_clear(ctx->o); @@ -96,7 +101,7 @@ static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx) } /* Retrieve access token */ - token = flb_oauth2_token_get(ctx->o); + char *token = flb_oauth2_token_get(ctx->o); if (!token) { flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); return -1; @@ -118,13 +123,13 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { - ret = azure_kusto_get_oauth2_token(ctx); + ret = azure_kusto_get_workload_identity_token(ctx); } else if (ctx->managed_identity_client_id != NULL) { ret = azure_kusto_get_msi_token(ctx); } else { - ret = azure_kusto_get_oauth2_token(ctx); + ret = azure_kusto_get_service_principal_token(ctx); } } diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 4a2d2e5c51f..ea367e066f6 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -36,9 +36,11 @@ #define FLB_AZURE_KUSTO_TOKEN_REFRESH 3000 /* Authentication types */ -#define FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL 0 -#define FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY 1 -#define FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY 2 +typedef enum { + FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL, + FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY, + FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY +} flb_azure_kusto_auth_type; /* Kusto streaming inserts oauth scope */ #define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default" @@ -175,7 +177,5 @@ struct flb_azure_kusto { flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx); flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl); -/* Add function prototype for workload identity token exchange */ -int flb_azure_workload_identity_token_get(struct flb_oauth2 *ctx, const char *token_file, const char *client_id, const char *tenant_id); #endif \ No newline at end of file diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 49cdcfdf3de..e1de47661ae 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -723,7 +723,49 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } - if (ctx->tenant_id == NULL && ctx->client_id == NULL && ctx->client_secret == NULL && + /* Auth method validation and setup */ + if (strcasecmp(ctx->auth_type_str, "service_principal") == 0) { + ctx->auth_type = FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL; + + /* Verify required parameters for Service Principal auth */ + if (!ctx->tenant_id || !ctx->client_id || !ctx->client_secret) { + flb_plg_error(ins, "When using service_principal auth, tenant_id, client_id, and client_secret are required"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + } + else if (strcasecmp(ctx->auth_type_str, "managed_identity") == 0) { + ctx->auth_type = FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY; + } + else if (strcasecmp(ctx->auth_type_str, "workload_identity") == 0) { + ctx->auth_type = FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY; + + /* Verify required parameters for Workload Identity auth */ + if (!ctx->tenant_id || !ctx->client_id) { + flb_plg_error(ins, "When using workload_identity auth, tenant_id and client_id are required"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* Set default token file path if not specified */ + if (!ctx->workload_identity_token_file) { + ctx->workload_identity_token_file = flb_strdup("/var/run/secrets/azure/tokens/azure-identity-token"); + if (!ctx->workload_identity_token_file) { + flb_errno(); + flb_plg_error(ins, "Could not allocate default workload identity token path"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + } + } + else { + flb_plg_error(ins, "Invalid auth_type '%s'. Valid options are: 'service_principal', 'managed_identity', or 'workload_identity'", + ctx->auth_type_str); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + if (ctx->tenant_id == NULL && ctx->client_id == NULL && ctx->client_secret == NULL && ctx->managed_identity_client_id == NULL && ctx->auth_type != FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { flb_plg_error(ctx->ins, "Service Principal, Managed Identity, or Workload Identity is not defined"); flb_azure_kusto_conf_destroy(ctx); From 72e4d54a14234b14373fbf0ea017eebd60829241 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Fri, 2 May 2025 16:34:59 +0530 Subject: [PATCH 3/3] out_azure_kusto: refactored auth Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 44 +++---- plugins/out_azure_kusto/azure_kusto.h | 7 +- plugins/out_azure_kusto/azure_kusto_conf.c | 108 ++++------------ tests/runtime/out_azure_kusto.c | 141 ++++++++++++++++++++- 4 files changed, 188 insertions(+), 112 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 9f641576750..5895d8bfe30 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -122,14 +122,18 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) } if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { - if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { - ret = azure_kusto_get_workload_identity_token(ctx); - } - else if (ctx->managed_identity_client_id != NULL) { - ret = azure_kusto_get_msi_token(ctx); - } - else { - ret = azure_kusto_get_service_principal_token(ctx); + switch (ctx->auth_type) { + case FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY: + ret = azure_kusto_get_workload_identity_token(ctx); + break; + case FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM: + case FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER: + ret = azure_kusto_get_msi_token(ctx); + break; + case FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL: + default: + ret = azure_kusto_get_service_principal_token(ctx); + break; } } @@ -1409,7 +1413,7 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Error handling and cleanup */ if (json) { flb_sds_destroy(json); - } + } if (is_compressed && final_payload) { flb_free(final_payload); } @@ -1490,16 +1494,18 @@ static struct flb_config_map config_map[] = { "Set the tenant ID of the AAD application used for authentication"}, {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, client_id), - "Set the client ID (Application ID) of the AAD application used for authentication"}, + "Set the client ID (Application ID) of the AAD application or the user-assigned managed identity's client ID when using managed identity authentication"}, {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, client_secret), "Set the client secret (Application Password) of the AAD application used for " "authentication"}, - {FLB_CONFIG_MAP_STR, "managed_identity_client_id", (char *)NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, managed_identity_client_id), - "A managed identity client id to authenticate with. " - "Set to 'system' for system-assigned managed identity. " - "Set the MI client ID (GUID) for user-assigned managed identity."}, + {FLB_CONFIG_MAP_STR, "workload_identity_token_file", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, workload_identity_token_file), + "Set the token file path for workload identity authentication"}, + {FLB_CONFIG_MAP_STR, "auth_type", "service_principal", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, auth_type_str), + "Set the authentication type: 'service_principal', 'managed_identity', or 'workload_identity'. " + "For managed_identity, use 'system' as client_id for system-assigned identity, or specify the managed identity's client ID"}, {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint), "Set the Kusto cluster's ingestion endpoint URL (e.g. " @@ -1589,14 +1595,6 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, io_timeout), "HTTP IO timeout. Default is 60s" }, - {FLB_CONFIG_MAP_STR, "auth_type", "service_principal", 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, auth_type_str), - "Authentication type: 'service_principal', 'managed_identity', or 'workload_identity'" - }, - {FLB_CONFIG_MAP_STR, "workload_identity_token_file", NULL, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, workload_identity_token_file), - "Path to the workload identity token file (default: /var/run/secrets/azure/tokens/azure-identity-token)" - }, /* EOF */ {0}}; diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index ea367e066f6..362b1379533 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -37,9 +37,10 @@ /* Authentication types */ typedef enum { - FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL, - FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY, - FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY + FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL = 0, /* Client ID + Client Secret */ + FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM, /* System-assigned managed identity */ + FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER, /* User-assigned managed identity */ + FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY /* Workload Identity */ } flb_azure_kusto_auth_type; /* Kusto streaming inserts oauth scope */ diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 8fe3f8e03a1..fa899eab686 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -734,7 +734,18 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * } } else if (strcasecmp(ctx->auth_type_str, "managed_identity") == 0) { - ctx->auth_type = FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY; + /* Check if client_id indicates system-assigned or user-assigned managed identity */ + if (!ctx->client_id) { + flb_plg_error(ins, "When using managed_identity auth, client_id must be set to 'system' for system-assigned or the managed identity client ID"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + if (strcasecmp(ctx->client_id, "system") == 0) { + ctx->auth_type = FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM; + } else { + ctx->auth_type = FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER; + } } else if (strcasecmp(ctx->auth_type_str, "workload_identity") == 0) { ctx->auth_type = FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY; @@ -764,13 +775,6 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } - if (ctx->tenant_id == NULL && ctx->client_id == NULL && ctx->client_secret == NULL && - ctx->managed_identity_client_id == NULL && ctx->auth_type != FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { - flb_plg_error(ctx->ins, "Service Principal, Managed Identity, or Workload Identity is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - /* config: 'ingestion_endpoint' */ if (ctx->ingestion_endpoint == NULL) { flb_plg_error(ctx->ins, "property 'ingestion_endpoint' is not defined"); @@ -792,98 +796,35 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } - /* Set auth type */ - if (ctx->auth_type_str) { - if (strcasecmp(ctx->auth_type_str, "service_principal") == 0) { - ctx->auth_type = FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL; - } - else if (strcasecmp(ctx->auth_type_str, "managed_identity") == 0) { - ctx->auth_type = FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY; - } - else if (strcasecmp(ctx->auth_type_str, "workload_identity") == 0) { - ctx->auth_type = FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY; - } - else { - flb_plg_error(ins, "invalid auth_type '%s'", ctx->auth_type_str); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - } - - /* Validate configuration based on auth type */ - if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL) { - if (!ctx->tenant_id) { - flb_plg_error(ins, "tenant_id is required for service_principal auth_type"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - if (!ctx->client_id) { - flb_plg_error(ins, "client_id is required for service_principal auth_type"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - if (!ctx->client_secret) { - flb_plg_error(ins, "client_secret is required for service_principal auth_type"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - } - else if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY) { - if (!ctx->tenant_id) { - flb_plg_error(ins, "tenant_id is required for workload_identity auth_type"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - if (!ctx->client_id) { - flb_plg_error(ins, "client_id is required for workload_identity auth_type"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - } - else if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY) { - if (!ctx->managed_identity_client_id) { - flb_plg_error(ins, "managed_identity_client_id is required for managed_identity auth_type"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - } - /* Create oauth2 context */ - if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY) { + if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM || + ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) { /* MSI auth */ - if (ctx->managed_identity_client_id != NULL) { - /* system assigned managed identity */ - if (strcasecmp(ctx->managed_identity_client_id, "system") == 0) { + /* Construct the URL template with or without client_id for managed identity */ + if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM) { ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1); - if (!ctx->oauth_url) { flb_errno(); flb_azure_kusto_conf_destroy(ctx); return NULL; } - flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", ""); - - } - else { - /* user assigned managed identity */ + } else { + /* User-assigned managed identity */ ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 + - sizeof("&client_id=") - 1 + - flb_sds_len(ctx->managed_identity_client_id)); - + sizeof("&client_id=") - 1 + + flb_sds_len(ctx->client_id)); if (!ctx->oauth_url) { flb_errno(); flb_azure_kusto_conf_destroy(ctx); return NULL; } - flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), - FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->managed_identity_client_id); + FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->client_id); } - } - }else{ - /* Standard OAuth2 for service principal */ + } else { + /* Standard OAuth2 for service principal or workload identity */ ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 + flb_sds_len(ctx->tenant_id)); if (!ctx->oauth_url) { @@ -892,10 +833,9 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), - FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id); + FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id); } - ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources)); if (!ctx->resources) { flb_errno(); diff --git a/tests/runtime/out_azure_kusto.c b/tests/runtime/out_azure_kusto.c index dc4efe41d35..6bf8499ba13 100644 --- a/tests/runtime/out_azure_kusto.c +++ b/tests/runtime/out_azure_kusto.c @@ -26,10 +26,18 @@ /* Test functions */ void flb_test_azure_kusto_json_invalid(void); +void flb_test_azure_kusto_managed_identity_system(void); +void flb_test_azure_kusto_managed_identity_user(void); +void flb_test_azure_kusto_service_principal(void); +void flb_test_azure_kusto_workload_identity(void); /* Test list */ TEST_LIST = { - {"json_invalid", flb_test_azure_kusto_json_invalid }, + {"json_invalid", flb_test_azure_kusto_json_invalid}, + {"managed_identity_system", flb_test_azure_kusto_managed_identity_system}, + {"managed_identity_user", flb_test_azure_kusto_managed_identity_user}, + {"service_principal", flb_test_azure_kusto_service_principal}, + {"workload_identity", flb_test_azure_kusto_workload_identity}, {NULL, NULL} }; @@ -54,7 +62,8 @@ void flb_test_azure_kusto_json_invalid(void) out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); TEST_CHECK(out_ffd >= 0); flb_output_set(ctx, out_ffd, "match", "test", NULL); - flb_output_set(ctx, out_ffd, "managed_identity_client_id", "SYSTEM", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); @@ -71,6 +80,134 @@ void flb_test_azure_kusto_json_invalid(void) sleep(1); /* waiting flush */ + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Test for system-assigned managed identity */ +void flb_test_azure_kusto_managed_identity_system(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Test for user-assigned managed identity */ +void flb_test_azure_kusto_managed_identity_user(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "00000000-0000-0000-0000-000000000000", NULL); /* Example UUID */ + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Test for service principal authentication */ +void flb_test_azure_kusto_service_principal(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "service_principal", NULL); + flb_output_set(ctx, out_ffd, "tenant_id", "your-tenant-id", NULL); + flb_output_set(ctx, out_ffd, "client_id", "your-client-id", NULL); + flb_output_set(ctx, out_ffd, "client_secret", "your-client-secret", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Test for workload identity authentication */ +void flb_test_azure_kusto_workload_identity(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "workload_identity", NULL); + flb_output_set(ctx, out_ffd, "tenant_id", "your-tenant-id", NULL); + flb_output_set(ctx, out_ffd, "client_id", "your-client-id", NULL); + flb_output_set(ctx, out_ffd, "workload_identity_token_file", "/path/to/token/file", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + flb_stop(ctx); flb_destroy(ctx); } \ No newline at end of file