Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 85 additions & 9 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,32 @@
#include "azure_msiauth.h"
#include "azure_kusto_store.h"

/**
* Retrieve an OAuth2 access token using Managed Service Identity (MSI).
*
* @param ctx Plugin's context containing the OAuth2 configuration.
* @return int 0 on success, -1 on failure.
*/
static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx)
{
char *token;

/* Retrieve access token */
token = flb_azure_msiauth_token_get(ctx->o);
if (!token) {
flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
flb_plg_error(ctx->ins, "error retrieving oauth2 access token (MSI access token is NULL)");
return -1;
}

return 0;
}

/**
* Retrieve an OAuth2 access token using workload identity federation.
*
* @param ctx Plugin's context containing workload identity configuration.
* @return int 0 on success, -1 on failure.
*/
static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx)
{
int ret;
Expand All @@ -70,6 +82,15 @@ static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx)
return 0;
}

/**
* Retrieve an OAuth2 access token using service principal credentials.
*
* Constructs the OAuth2 payload with client credentials and requests
* an access token from the configured OAuth2 endpoint.
*
* @param ctx Plugin's context containing client credentials and OAuth2 config.
* @return int 0 on success, -1 on failure.
*/
static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
{
int ret;
Expand Down Expand Up @@ -100,25 +121,36 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
flb_plg_error(ctx->ins, "error appending oauth2 params");
return -1;
}

/* Retrieve access token */
char *token = flb_oauth2_token_get(ctx->o);
if (!token) {
flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
flb_plg_error(ctx->ins, "error retrieving oauth2 access token - "
"check Fluent Bit logs for '[oauth2]' errors "
"(common causes: connection failure to '%s', invalid credentials, "
"or malformed response)", ctx->oauth_url ? ctx->oauth_url : "unknown");
return -1;
}

flb_plg_debug(ctx->ins, "OAuth2 token retrieval process completed successfully");
return 0;
}

/**
* Obtain the current Azure Kusto bearer token as a formatted string.
*
* Acquires the token mutex, refreshes the token if expired based on the
* configured authentication type, and returns a copy of the token string
* in the format "<token_type> <access_token>".
*
* @param ctx Plugin's context.
* @return flb_sds_t The bearer token string, or NULL on error.
*/
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
{
int ret = 0;
flb_sds_t output = NULL;

if (pthread_mutex_lock(&ctx->token_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return NULL;
}

Expand All @@ -144,6 +176,9 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
flb_sds_len(ctx->o->access_token) + 2);
if (!output) {
flb_plg_error(ctx->ins, "error creating token buffer");
if (pthread_mutex_unlock(&ctx->token_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
}
return NULL;
}
flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,
Expand Down Expand Up @@ -411,6 +446,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
struct mk_list *tmp;
struct mk_list *head;
struct mk_list *f_head;
struct mk_list *f_tmp;
struct flb_fstore_file *fsf;
struct flb_fstore_stream *fs_stream;
flb_sds_t payload = NULL;
Expand All @@ -428,10 +464,15 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con
continue;
}

mk_list_foreach_safe(f_head, tmp, &fs_stream->files) {
mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) {
fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
chunk = fsf->data;

/* Skip files with no associated chunk data (may happen during shutdown) */
if (chunk == NULL) {
continue;
}

/* Locked chunks are being processed, skip */
if (chunk->locked == FLB_TRUE) {
continue;
Expand Down Expand Up @@ -893,21 +934,28 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s",
ctx->store_dir);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
ctx->has_old_buffers = azure_kusto_store_has_data(ctx);

/* validate 'total_file_size' */
if (ctx->file_size <= 0) {
flb_plg_error(ctx->ins, "Failed to parse upload_file_size");
azure_kusto_store_exit(ctx);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
if (ctx->file_size < 1000000) {
flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB");
azure_kusto_store_exit(ctx);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
if (ctx->file_size > MAX_FILE_SIZE) {
flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE);
azure_kusto_store_exit(ctx);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}

Expand All @@ -934,15 +982,22 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
* Create upstream context for Kusto Ingestion endpoint
*/
ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u) {
flb_plg_error(ctx->ins, "upstream creation failed");
if (ctx->buffering_enabled == FLB_TRUE) {
azure_kusto_store_exit(ctx);
}
pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
if (ctx->buffering_enabled == FLB_TRUE){
flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC);
ctx->u->base.net.io_timeout = ctx->io_timeout;
ctx->has_old_buffers = azure_kusto_store_has_data(ctx);
}
if (!ctx->u) {
flb_plg_error(ctx->ins, "upstream creation failed");
return -1;
}

flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));

Expand All @@ -951,6 +1006,14 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
if (!ctx->o) {
flb_plg_error(ctx->ins, "cannot create oauth2 context");
if (ctx->buffering_enabled == FLB_TRUE) {
azure_kusto_store_exit(ctx);
}
flb_upstream_destroy(ctx->u);
pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
flb_output_upstream_set(ctx->u, ins);
Expand Down Expand Up @@ -1114,6 +1177,19 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int
return 0;
}

/**
* Buffer a data chunk into the file storage for later ingestion.
*
* Writes the formatted chunk data to the upload file via the store layer.
*
* @param out_context Plugin's context (cast to struct flb_azure_kusto).
* @param upload_file Target file handle for buffered storage.
* @param chunk Data chunk to buffer.
* @param chunk_size Size of the data chunk.
* @param tag Fluent Bit tag associated with the chunk.
* @param tag_len Length of the tag string.
* @return int 0 on success, -1 on failure.
*/
static int buffer_chunk(void *out_context, struct azure_kusto_file *upload_file,
flb_sds_t chunk, int chunk_size,
flb_sds_t tag, size_t tag_len)
Expand Down
9 changes: 9 additions & 0 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ struct flb_azure_kusto_resources {

/* used to reload resouces after some time */
uint64_t load_time;

/* flag to prevent concurrent coroutines from reloading simultaneously */
int loading_in_progress;

/* Old resources pending cleanup - deferred destruction to avoid use-after-free
* when other threads may still be using them during high-volume operations */
struct flb_upstream_ha *old_blob_ha;
struct flb_upstream_ha *old_queue_ha;
flb_sds_t old_identity_token;
};

struct flb_azure_kusto {
Expand Down
101 changes: 101 additions & 0 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ static int flb_azure_kusto_resources_clear(struct flb_azure_kusto_resources *res
resources->identity_token = NULL;
}

/* Also clean up any old resources pending destruction */
if (resources->old_blob_ha) {
flb_upstream_ha_destroy(resources->old_blob_ha);
resources->old_blob_ha = NULL;
}

if (resources->old_queue_ha) {
flb_upstream_ha_destroy(resources->old_queue_ha);
resources->old_queue_ha = NULL;
}

if (resources->old_identity_token) {
flb_sds_destroy(resources->old_identity_token);
resources->old_identity_token = NULL;
}

resources->load_time = 0;

return 0;
Expand Down Expand Up @@ -548,14 +564,48 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
flb_plg_debug(ctx->ins, "difference is %" PRIu64, now - ctx->resources->load_time);
flb_plg_debug(ctx->ins, "effective ingestion resource interval is %d", ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer);

/* Acquire the mutex for the staleness check to prevent concurrent coroutines
* from both deciding to reload and rotate resources simultaneously.
* Without this, two flushes can race: the second rotation destroys old_blob_ha
* which is still being used by the first flush's in-flight blob upload,
* causing a use-after-free SIGSEGV in cmt_gauge_inc.
*/
if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking resources mutex for staleness check");
return -1;
}

/* check if we have all resources and they are not stale */
if (ctx->resources->blob_ha && ctx->resources->queue_ha &&
ctx->resources->identity_token &&
now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer) {
flb_plg_debug(ctx->ins, "resources are already loaded and are not stale");
pthread_mutex_unlock(&ctx->resources_mutex);
ret = 0;
}
else if (ctx->resources->loading_in_progress) {
/*
* Another coroutine is already loading resources. Return error to
* trigger FLB_RETRY so this flush will be retried once the resources
* are available. This prevents concurrent rotations that caused
* use-after-free SIGSEGV.
*/
flb_plg_info(ctx->ins, "ingestion resources loading already in progress by another coroutine, will retry");
pthread_mutex_unlock(&ctx->resources_mutex);
/* Return -1 directly without going through cleanup, since cleanup
* would reset load_time and loading_in_progress which belong to
* the coroutine that is actually doing the loading.
*/
return -1;
}
else {
/*
* Mark loading in progress (while holding mutex) to prevent other
* concurrent coroutines from also starting a reload.
*/
ctx->resources->loading_in_progress = FLB_TRUE;
/* Release the mutex before making network calls (which may yield the coroutine) */
pthread_mutex_unlock(&ctx->resources_mutex);
flb_plg_info(ctx->ins, "loading kusto ingestion resources and refresh interval is %d", ctx->ingestion_resources_refresh_interval * 1000 + generated_random_integer);
response = execute_ingest_csl_command(ctx, ".get ingestion resources");

Expand Down Expand Up @@ -598,16 +648,57 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
parse_ingestion_identity_token(ctx, response);

if (identity_token) {
/*
Deferred cleanup: destroy resources from two refresh cycles ago,
then move current resources to 'old' before assigning new ones.
This avoids use-after-free when other threads may still be using
the current resources during high-volume operations.

With a 1-hour refresh interval, the race condition requires an
ingest operation to take >1 hour (the deferred cleanup grace period).
This is extremely unlikely under normal conditions (and hence a lock based
mechanism is avoided for performance).
*/
if (ctx->resources->old_blob_ha) {
flb_upstream_ha_destroy(ctx->resources->old_blob_ha);
flb_plg_debug(ctx->ins, "clearing up old blob HA");
}
if (ctx->resources->old_queue_ha) {
flb_upstream_ha_destroy(ctx->resources->old_queue_ha);
flb_plg_debug(ctx->ins, "clearing up old queue HA");
}
if (ctx->resources->old_identity_token) {
flb_sds_destroy(ctx->resources->old_identity_token);
flb_plg_debug(ctx->ins, "clearing up old identity token");
}

/* Move current to old */
ctx->resources->old_blob_ha = ctx->resources->blob_ha;
ctx->resources->old_queue_ha = ctx->resources->queue_ha;
ctx->resources->old_identity_token = ctx->resources->identity_token;

/* Assign new resources */
ctx->resources->blob_ha = blob_ha;
ctx->resources->queue_ha = queue_ha;
ctx->resources->identity_token = identity_token;
ctx->resources->load_time = now;

flb_plg_info(ctx->ins, "ingestion resources rotated successfully, "
"previous resources moved to deferred cleanup");

/* Clear the loading flag on success */
ctx->resources->loading_in_progress = FLB_FALSE;
ret = 0;
}
else {
flb_plg_error(ctx->ins,
"error parsing ingestion identity token");
/*
* Must unlock the mutex before goto cleanup to avoid
* deadlock (cleanup also acquires the mutex to reset
* load_time). This was a pre-existing bug.
*/
pthread_mutex_unlock(&ctx->resources_mutex);
ret = -1;
goto cleanup;
}
Expand Down Expand Up @@ -665,6 +756,16 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,

cleanup:
if (ret == -1) {
/*
* Reset load_time to 0 so the next call will retry the reload.
* We set load_time = now at the start to prevent concurrent reloads,
* but if the reload failed, we need to allow future retries.
*/
if (pthread_mutex_lock(&ctx->resources_mutex) == 0) {
ctx->resources->load_time = 0;
ctx->resources->loading_in_progress = FLB_FALSE;
pthread_mutex_unlock(&ctx->resources_mutex);
}
if (queue_ha) {
flb_upstream_ha_destroy(queue_ha);
}
Expand Down
Loading
Loading