From 64615375e26a0eec719fa3ff73e4871d2e3be35f Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 16 Sep 2022 20:38:50 +0000 Subject: [PATCH 01/17] out_s3: use retry_limit in fluent-bit to replace MAX_UPLOAD_ERROR and update s3 warn output messages with function s3_retry_warn() Signed-off-by: Clay Cheng --- plugins/out_s3/s3.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index b7475cbc8c6..6858a1b46bf 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -140,7 +140,7 @@ static void s3_retry_warn(struct flb_s3 *ctx, const char *tag, flb_plg_warn(ctx->ins, "chunk tag=%s, create_time=%s cannot be retried", tag, create_time_str); - } + } } else { if (less_than_limit == FLB_TRUE) { @@ -1118,12 +1118,12 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { chunk->failures += 1; if (chunk->failures > ctx->ins->retry_limit){ - s3_retry_warn(ctx, tag, chunk->input_name, create_time, FLB_FALSE); + s3_retry_warn(ctx, tag, chunk->input_name, file_first_log_time, FLB_FALSE); s3_store_file_delete(ctx, chunk); return -2; } else { - s3_retry_warn(ctx, tag, chunk->input_name, create_time, FLB_TRUE); + s3_retry_warn(ctx, tag, chunk->input_name, file_first_log_time, FLB_TRUE); s3_store_file_unlock(chunk); return -1; } From a951e0beb62dac22aeb6dea96d4ef7dc277b34ed Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 23 Feb 2023 22:02:53 -0800 Subject: [PATCH 02/17] out_s3: remove unneeded code comments Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 6858a1b46bf..46aafe1b673 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1114,7 +1114,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, flb_free(payload_buf); } if (ret < 0) { - /* re-add chunk to list */ if (chunk) { chunk->failures += 1; if (chunk->failures > ctx->ins->retry_limit){ @@ -1173,7 +1172,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, flb_free(payload_buf); } - /* re-add chunk to list */ if (chunk) { chunk->failures += 1; if (chunk->failures > ctx->ins->retry_limit) { From 14e133c47dff42383320e2ffa9b467cf9c22db26 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 23 Feb 2023 22:04:48 -0800 Subject: [PATCH 03/17] out_s3: remove unnecesary chunk->failure check Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 8 -------- 1 file changed, 8 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 46aafe1b673..f7c65942ead 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -2213,14 +2213,6 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, m_upload_file, file_first_log_time); } - /* Discard upload_file if it has failed to upload ctx->ins->retry_limit times */ - if (upload_file != NULL && upload_file->failures > ctx->ins->retry_limit) { - s3_retry_warn(ctx, event_chunk->tag, out_flush->task->i_ins->name, - upload_file->create_time, FLB_FALSE); - s3_store_file_delete(ctx, upload_file); - upload_file = NULL; - } - /* If upload_timeout has elapsed, upload file */ if (upload_file != NULL && time(NULL) > (upload_file->create_time + ctx->upload_timeout)) { From f5cfe3cfebdf206ae950a88a8a7122311df7b1a4 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 23 Feb 2023 22:17:10 -0800 Subject: [PATCH 04/17] out_s3: fix logic to be send then check failures/retries in put_all_chunks Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index f7c65942ead..29d97ece316 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1281,13 +1281,6 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) continue; } - if (chunk->failures > ctx->ins->retry_limit) { - s3_retry_warn(ctx, (char *) fsf->meta_buf, - NULL, chunk->create_time, FLB_FALSE); - flb_fstore_file_delete(ctx->fs, fsf); - continue; - } - ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); if (ret < 0) { @@ -1319,9 +1312,18 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) if (ret < 0) { chunk->failures += 1; if (is_startup == FLB_TRUE) { - s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, - chunk->create_time, FLB_TRUE); - s3_store_file_unlock(chunk); + if (chunk->failures > ctx->ins->retry_limit){ + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, FLB_FALSE); + s3_store_file_delete(ctx, chunk); + return -1; + } + else { + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, FLB_TRUE); + s3_store_file_unlock(chunk); + return -1; + } } else { flb_plg_error(ctx->ins, "Failed to flush chunk tag=%s, " From 5eaefeec0013fdcd399fac2db6d3a2b2ae9b99ad Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 23 Feb 2023 22:38:08 -0800 Subject: [PATCH 05/17] out_s3: locking chunks is no longer needed for async http S3 now always uses sync IO. In the future, we will switch back to async with the concept of a daemon coroutine that is the only thing that can send data. Either way, we don't need to mark that chunks are actively being sent with the locked bool A subsequent commit will refactor this code to use locked for a similar but slightly different purpose. Locked will mean a chunk is ready/done and there should be no new appends to it. Locked chunks will be added to a queue for sending. Unlocked chunks are ready for new flushes to buffer new data. Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 28 ---------------------------- plugins/out_s3/s3_store.h | 2 +- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 29d97ece316..a3c696d4ad5 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1123,7 +1123,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, } else { s3_retry_warn(ctx, tag, chunk->input_name, file_first_log_time, FLB_TRUE); - s3_store_file_unlock(chunk); return -1; } } @@ -1141,9 +1140,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, m_upload = create_upload(ctx, tag, tag_len, file_first_log_time); if (!m_upload) { flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag); - if (chunk) { - s3_store_file_unlock(chunk); - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1155,9 +1151,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = create_multipart_upload(ctx, m_upload); if (ret < 0) { flb_plg_error(ctx->ins, "Could not initiate multipart upload"); - if (chunk) { - s3_store_file_unlock(chunk); - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1200,7 +1193,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, else { s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, chunk->create_time, FLB_TRUE); - s3_store_file_unlock(chunk); return -1; } } @@ -1276,11 +1268,6 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); chunk = fsf->data; - /* Locked chunks are being processed, skip */ - if (chunk->locked == FLB_TRUE) { - continue; - } - ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); if (ret < 0) { @@ -1321,7 +1308,6 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) else { s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, chunk->create_time, FLB_TRUE); - s3_store_file_unlock(chunk); return -1; } } @@ -1370,11 +1356,6 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, return -1; } - /* - * lock the chunk from buffer list- needed for async http so that the - * same chunk won't be sent more than once. - */ - s3_store_file_lock(chunk); body = buffered_data; body_size = buffer_size; } @@ -1390,9 +1371,6 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, if (!tmp) { flb_errno(); flb_free(buffered_data); - if (chunk) { - s3_store_file_unlock(chunk); - } return -1; } body = buffered_data = tmp; @@ -1839,11 +1817,6 @@ static void cb_s3_upload(struct flb_config *config, void *data) continue; /* Only send chunks which have timed out */ } - /* Locked chunks are being processed, skip */ - if (chunk->locked == FLB_TRUE) { - continue; - } - m_upload = get_upload(ctx, (const char *) fsf->meta_buf, fsf->meta_size); ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); @@ -2251,7 +2224,6 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, if (ret < 0) { FLB_OUTPUT_RETURN(FLB_RETRY); } - s3_store_file_lock(upload_file); /* Add chunk file to upload queue */ ret = add_to_queue(ctx, upload_file, m_upload_file, diff --git a/plugins/out_s3/s3_store.h b/plugins/out_s3/s3_store.h index ff1be7bb78c..818d2231591 100644 --- a/plugins/out_s3/s3_store.h +++ b/plugins/out_s3/s3_store.h @@ -24,7 +24,7 @@ #include struct s3_file { - int locked; /* locked chunk is busy, cannot write to it */ + int locked; /* locked = no appends to this chunk */ int failures; /* delivery failures */ char *input_name; /* for s3_retry_warn output message gets input name */ size_t size; /* file size */ From cafd4709617c6465312220f03f1e2ea4bda5505e Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 24 Feb 2023 10:03:27 -0800 Subject: [PATCH 06/17] out_s3: remove all existing upload_queue code Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 171 ++------------------------------------------ plugins/out_s3/s3.h | 13 ---- 2 files changed, 4 insertions(+), 180 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index a3c696d4ad5..b0527597531 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -59,7 +59,6 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, int tag_len, time_t file_first_log_time); -static void remove_from_queue(struct upload_queue *entry); static struct flb_aws_header content_encoding_header = { .key = "Content-Encoding", @@ -489,7 +488,6 @@ static void s3_context_destroy(struct flb_s3 *ctx) struct mk_list *head; struct mk_list *tmp; struct multipart_upload *m_upload; - struct upload_queue *upload_contents; if (!ctx) { return; } @@ -541,13 +539,6 @@ static void s3_context_destroy(struct flb_s3 *ctx) multipart_upload_destroy(m_upload); } - mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { - upload_contents = mk_list_entry(head, struct upload_queue, _head); - s3_store_file_delete(ctx, upload_contents->upload_file); - multipart_upload_destroy(upload_contents->m_upload_file); - remove_from_queue(upload_contents); - } - flb_free(ctx); } @@ -575,9 +566,6 @@ static int cb_s3_init(struct flb_output_instance *ins, } ctx->ins = ins; mk_list_init(&ctx->uploads); - mk_list_init(&ctx->upload_queue); - - ctx->upload_queue_success = FLB_FALSE; /* Export context */ flb_output_set_context(ins, ctx); @@ -966,7 +954,7 @@ static int cb_s3_init(struct flb_output_instance *ins, /* * S3 must ALWAYS use sync mode - * In the timer thread we do a mk_list_foreach_safe on the queue of uplaods and chunks + * In the timer thread we do a mk_list_foreach_safe on the queue of uploads and chunks * Iterating over those lists is not concurrent safe. If a flush call ran at the same time * And deleted an item from the list, this could cause a crash/corruption. */ @@ -1610,83 +1598,6 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *ta return m_upload; } -/* Adds an entry to upload queue */ -static int add_to_queue(struct flb_s3 *ctx, struct s3_file *upload_file, - struct multipart_upload *m_upload_file, const char *tag, int tag_len) -{ - struct upload_queue *upload_contents; - char *tag_cpy; - - /* Create upload contents object and add to upload queue */ - upload_contents = flb_malloc(sizeof(struct upload_queue)); - if (upload_contents == NULL) { - flb_plg_error(ctx->ins, "Error allocating memory for upload_queue entry"); - flb_errno(); - return -1; - } - upload_contents->upload_file = upload_file; - upload_contents->m_upload_file = m_upload_file; - upload_contents->tag_len = tag_len; - upload_contents->upload_time = -1; - - /* Necessary to create separate string for tag to prevent corruption */ - tag_cpy = flb_malloc(tag_len); - if (tag_cpy == NULL) { - flb_free(upload_contents); - flb_plg_error(ctx->ins, "Error allocating memory for tag in add_to_queue"); - flb_errno(); - return -1; - } - strncpy(tag_cpy, tag, tag_len); - upload_contents->tag = tag_cpy; - - /* Add entry to upload queue */ - mk_list_add(&upload_contents->_head, &ctx->upload_queue); - return 0; -} - -/* Removes an entry from upload_queue */ -void remove_from_queue(struct upload_queue *entry) -{ - mk_list_del(&entry->_head); - flb_free(entry->tag); - flb_free(entry); - return; -} - -/* Validity check for upload queue object */ -static int upload_queue_valid(struct upload_queue *upload_contents, time_t now, - void *out_context) -{ - struct flb_s3 *ctx = out_context; - - if (upload_contents == NULL) { - flb_plg_error(ctx->ins, "Error getting entry from upload_queue"); - return -1; - } - if (upload_contents->_head.next == NULL || upload_contents->_head.prev == NULL) { - flb_plg_debug(ctx->ins, "Encountered previously deleted entry in " - "upload_queue. Deleting invalid entry"); - mk_list_del(&upload_contents->_head); - return -1; - } - if (upload_contents->upload_file->locked == FLB_FALSE) { - flb_plg_debug(ctx->ins, "Encountered unlocked file in upload_queue. " - "Exiting"); - return -1; - } - if (upload_contents->upload_file->size <= 0) { - flb_plg_debug(ctx->ins, "Encountered empty chunk file in upload_queue. " - "Deleting empty chunk file"); - remove_from_queue(upload_contents); - return -1; - } - if (now < upload_contents->upload_time) { - flb_plg_debug(ctx->ins, "Found valid chunk file but not ready to upload"); - return -1; - } - return 0; -} static int send_upload_request(void *out_context, flb_sds_t chunk, struct s3_file *upload_file, @@ -1733,62 +1644,6 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file, return 0; } -/* Uploads all chunk files in queue synchronously */ -static void s3_upload_queue(struct flb_config *config, void *out_context) -{ - int ret; - time_t now; - struct upload_queue *upload_contents; - struct flb_s3 *ctx = out_context; - struct mk_list *tmp; - struct mk_list *head; - - flb_plg_debug(ctx->ins, "Running upload timer callback (upload_queue).."); - - /* No chunks in upload queue. Scan for timed out chunks. */ - if (mk_list_size(&ctx->upload_queue) == 0) { - flb_plg_debug(ctx->ins, "No files found in upload_queue. Scanning for timed " - "out chunks"); - cb_s3_upload(config, out_context); - } - - /* Iterate through each file in upload queue */ - mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { - upload_contents = mk_list_entry(head, struct upload_queue, _head); - - now = time(NULL); - - /* Checks if upload_contents is valid */ - ret = upload_queue_valid(upload_contents, now, ctx); - if (ret < 0) { - goto exit; - } - - /* Try to upload file. Return value can be 0, -1, -2. */ - ret = send_upload_request(ctx, NULL, upload_contents->upload_file, - upload_contents->m_upload_file, - upload_contents->tag, upload_contents->tag_len); - - if (ret == 0) { - remove_from_queue(upload_contents); - ctx->upload_queue_success = FLB_TRUE; - } - else { - ctx->upload_queue_success = FLB_FALSE; - - /* If retry limit was reached, remove file from queue */ - if (ret == -2) { - remove_from_queue(upload_contents); - continue; - } - - break; - } - } - -exit: - return; -} static void cb_s3_upload(struct flb_config *config, void *data) { @@ -2091,14 +1946,9 @@ static void flush_init(void *out_context) sched = flb_sched_ctx_get(); - if (ctx->preserve_data_ordering) { - ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, s3_upload_queue, ctx, NULL); - } - else { - ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, cb_s3_upload, ctx, NULL); - } + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_ms, cb_s3_upload, ctx, NULL); + if (ret == -1) { flb_plg_error(ctx->ins, "Failed to create upload timer"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -2225,19 +2075,6 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - /* Add chunk file to upload queue */ - ret = add_to_queue(ctx, upload_file, m_upload_file, - event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_ERROR); - } - - /* Go through upload queue and return error if something went wrong */ - s3_upload_queue(config, ctx); - if (ctx->upload_queue_success == FLB_FALSE) { - ctx->upload_queue_success = FLB_TRUE; - FLB_OUTPUT_RETURN(FLB_ERROR); - } FLB_OUTPUT_RETURN(FLB_OK); } else { diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 6b08cc8c589..c7deff9dc43 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -47,17 +47,6 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 -struct upload_queue { - struct s3_file *upload_file; - struct multipart_upload *m_upload_file; - char *tag; - int tag_len; - - time_t upload_time; - - struct mk_list _head; -}; - struct multipart_upload { flb_sds_t s3_key; flb_sds_t tag; @@ -141,8 +130,6 @@ struct flb_s3 { struct mk_list uploads; int preserve_data_ordering; - int upload_queue_success; - struct mk_list upload_queue; size_t file_size; size_t upload_chunk_size; From 984fe8ec0d2bafb18ae8bb6dc98cf0b711a3a500 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 24 Feb 2023 13:55:29 -0800 Subject: [PATCH 07/17] out_s3: multipart timeouts are processed after data upload or in the timer callback only Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index b0527597531..92090c14327 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1699,7 +1699,7 @@ static void cb_s3_upload(struct flb_config *config, void *data) if (m_upload->complete_errors > ctx->ins->retry_limit) { flb_plg_error(ctx->ins, "Multipart Upload for %s has failed " - "s3: CompleteMultipartUpload more than configured retry_limit, " + "s3:CompleteMultipartUpload more than configured retry_limit, " "output will give up ", m_upload->s3_key); mk_list_del(&m_upload->_head); multipart_upload_destroy(m_upload); @@ -2049,13 +2049,6 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, m_upload_file = get_upload(ctx, event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (m_upload_file != NULL && time(NULL) > - (m_upload_file->init_time + ctx->upload_timeout)) { - upload_timeout_check = FLB_TRUE; - flb_plg_info(ctx->ins, "upload_timeout reached for %s", event_chunk->tag); - m_upload_file->input_name = out_flush->task->i_ins->name; - } - /* If total_file_size has been reached, upload file */ if ((upload_file && upload_file->size + chunk_size > ctx->upload_chunk_size) || (m_upload_file && m_upload_file->bytes + chunk_size > ctx->file_size)) { From 076c361a673a07751ac40e2db2cf1c7d378ccc7d Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 26 Feb 2023 16:02:46 -0800 Subject: [PATCH 08/17] out_s3: refactor buffer_chunk to set input name Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 16 +++++++++------- plugins/out_s3/s3_store.c | 4 +++- plugins/out_s3/s3_store.h | 3 ++- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 92090c14327..64e30940520 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1628,13 +1628,14 @@ static int send_upload_request(void *out_context, flb_sds_t chunk, static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_t chunk, int chunk_size, const char *tag, int tag_len, - time_t file_first_log_time) + time_t file_first_log_time, + char *input_name) { int ret; struct flb_s3 *ctx = out_context; ret = s3_store_buffer_put(ctx, upload_file, tag, - tag_len, chunk, (size_t) chunk_size, file_first_log_time); + tag_len, chunk, (size_t) chunk_size, file_first_log_time, input_name); flb_sds_destroy(chunk); if (ret < 0) { flb_plg_warn(ctx->ins, "Could not buffer chunk. Data order preservation " @@ -1888,7 +1889,8 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char static void unit_test_flush(void *out_context, struct s3_file *upload_file, const char *tag, int tag_len, flb_sds_t chunk, int chunk_size, struct multipart_upload *m_upload_file, - time_t file_first_log_time) + time_t file_first_log_time, + char *input_name) { int ret; char *buffer; @@ -1896,7 +1898,7 @@ static void unit_test_flush(void *out_context, struct s3_file *upload_file, struct flb_s3 *ctx = out_context; s3_store_buffer_put(ctx, upload_file, tag, tag_len, - chunk, (size_t) chunk_size, file_first_log_time); + chunk, (size_t) chunk_size, file_first_log_time, input_name); ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); if (ret < 0) { flb_plg_error(ctx->ins, "Could not construct request buffer for %s", @@ -2035,7 +2037,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, unit_test_flush(ctx, upload_file, event_chunk->tag, flb_sds_len(event_chunk->tag), chunk, chunk_size, - m_upload_file, file_first_log_time); + m_upload_file, file_first_log_time, out_flush->task->i_ins->name); } /* If upload_timeout has elapsed, upload file */ @@ -2062,7 +2064,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, /* Buffer last chunk in file and lock file to prevent further changes */ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, event_chunk->tag, flb_sds_len(event_chunk->tag), - file_first_log_time); + file_first_log_time, out_flush->task->i_ins->name); if (ret < 0) { FLB_OUTPUT_RETURN(FLB_RETRY); @@ -2085,7 +2087,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, /* Buffer current chunk in filesystem and wait for next chunk from engine */ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, event_chunk->tag, flb_sds_len(event_chunk->tag), - file_first_log_time); + file_first_log_time, out_flush->task->i_ins->name); if (ret < 0) { FLB_OUTPUT_RETURN(FLB_RETRY); diff --git a/plugins/out_s3/s3_store.c b/plugins/out_s3/s3_store.c index 6f7c8717827..25de40c915e 100644 --- a/plugins/out_s3/s3_store.c +++ b/plugins/out_s3/s3_store.c @@ -126,7 +126,8 @@ struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag, int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, const char *tag, int tag_len, char *data, size_t bytes, - time_t file_first_log_time) + time_t file_first_log_time, + char* input_name) { int ret; flb_sds_t name; @@ -178,6 +179,7 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, s3_file->fsf = fsf; s3_file->first_log_time = file_first_log_time; s3_file->create_time = time(NULL); + s3_file->input_name = input_name; /* Use fstore opaque 'data' reference to keep our context */ fsf->data = s3_file; diff --git a/plugins/out_s3/s3_store.h b/plugins/out_s3/s3_store.h index 818d2231591..605ce37cbf5 100644 --- a/plugins/out_s3/s3_store.h +++ b/plugins/out_s3/s3_store.h @@ -37,7 +37,8 @@ struct s3_file { int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, const char *tag, int tag_len, char *data, size_t bytes, - time_t file_first_log_time); + time_t file_first_log_time, + char *input_name); int s3_store_init(struct flb_s3 *ctx); int s3_store_exit(struct flb_s3 *ctx); From 105b4ce2ba41debbb6eebfec7612d98ab04eaf44 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 26 Feb 2023 18:08:05 -0800 Subject: [PATCH 09/17] s3: set input_name on multipart from chunk Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 64e30940520..1546aee1eab 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -57,7 +57,8 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, int tag_len, - time_t file_first_log_time); + time_t file_first_log_time, + char *input_name); static struct flb_aws_header content_encoding_header = { @@ -1020,6 +1021,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, size_t payload_size = 0; size_t preCompress_size = 0; time_t file_first_log_time = time(NULL); + char* input_name = NULL; /* * When chunk does not exist, file_first_log_time will be the current time. @@ -1029,6 +1031,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, */ if (chunk != NULL) { file_first_log_time = chunk->first_log_time; + input_name = chunk->input_name; } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { @@ -1125,7 +1128,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, multipart: if (init_upload == FLB_TRUE) { - m_upload = create_upload(ctx, tag, tag_len, file_first_log_time); + m_upload = create_upload(ctx, tag, tag_len, file_first_log_time, input_name); if (!m_upload) { flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag); if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { @@ -1549,7 +1552,8 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,const char *tag, i } static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, - int tag_len, time_t file_first_log_time) + int tag_len, time_t file_first_log_time, + char *input_name) { int ret; struct multipart_upload *m_upload = NULL; @@ -1580,6 +1584,9 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *ta m_upload->upload_state = MULTIPART_UPLOAD_STATE_NOT_CREATED; m_upload->part_number = 1; m_upload->init_time = time(NULL); + if (input_name != NULL) { + m_upload->input_name = input_name; + } mk_list_add(&m_upload->_head, &ctx->uploads); /* Update file and increment index value right before request */ From a68f4848058bd43cfc9f205541f5c806c7ac6dd3 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 26 Feb 2023 19:28:29 -0800 Subject: [PATCH 10/17] out_s3: now sends only happen from upload timer callback Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 136 ++++++++++++++------------------------ plugins/out_s3/s3.h | 3 + plugins/out_s3/s3_store.h | 1 + 3 files changed, 54 insertions(+), 86 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 1546aee1eab..43de90c0403 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -567,6 +567,8 @@ static int cb_s3_init(struct flb_output_instance *ins, } ctx->ins = ins; mk_list_init(&ctx->uploads); + mk_list_init(&ctx->upload_queue); + /* Export context */ flb_output_set_context(ins, ctx); @@ -1109,7 +1111,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, chunk->failures += 1; if (chunk->failures > ctx->ins->retry_limit){ s3_retry_warn(ctx, tag, chunk->input_name, file_first_log_time, FLB_FALSE); - s3_store_file_delete(ctx, chunk); return -2; } else { @@ -1119,10 +1120,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, } } - /* data was sent successfully- delete the local buffer */ - if (chunk) { - s3_store_file_delete(ctx, chunk); - } return 0; multipart: @@ -1161,7 +1158,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk->failures > ctx->ins->retry_limit) { s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, chunk->create_time, FLB_FALSE); - s3_store_file_delete(ctx, chunk); /* * part_number initializes with 1, if the number still is 1 which means * no data is uploaded and this upload file can be deleted , else set @@ -1188,12 +1184,8 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, } } } + m_upload->part_number += 1; - /* data was sent successfully- delete the local buffer */ - if (chunk) { - s3_store_file_delete(ctx, chunk); - chunk = NULL; - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1293,6 +1285,12 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) if (chunk->failures > ctx->ins->retry_limit){ s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, chunk->create_time, FLB_FALSE); + if (chunk->locked == FLB_TRUE) { + /* remove from upload_queue */ + if (chunk->_head.next != NULL && chunk->_head.prev != NULL) { + mk_list_del(&chunk->_head); + } + } s3_store_file_delete(ctx, chunk); return -1; } @@ -1312,6 +1310,12 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) } /* data was sent successfully- delete the local buffer */ + if (chunk->locked == FLB_TRUE) { + /* remove from upload_queue */ + if (chunk->_head.next != NULL && chunk->_head.prev != NULL) { + mk_list_del(&chunk->_head); + } + } s3_store_file_delete(ctx, chunk); } } @@ -1605,33 +1609,6 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *ta return m_upload; } - -static int send_upload_request(void *out_context, flb_sds_t chunk, - struct s3_file *upload_file, - struct multipart_upload *m_upload_file, - const char *tag, int tag_len) -{ - int ret; - char *buffer; - size_t buffer_size; - struct flb_s3 *ctx = out_context; - - /* Create buffer to upload to S3 */ - ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); - flb_sds_destroy(chunk); - if (ret < 0) { - flb_plg_error(ctx->ins, "Could not construct request buffer for %s", - upload_file->file_path); - return -1; - } - - /* Upload to S3 */ - ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len); - flb_free(buffer); - - return ret; -} - static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_t chunk, int chunk_size, const char *tag, int tag_len, @@ -1665,20 +1642,13 @@ static void cb_s3_upload(struct flb_config *config, void *data) struct mk_list *head; int complete; int ret; - time_t now; flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload).."); - now = time(NULL); - - /* Check all chunks and see if any have timed out */ - mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { - fsf = mk_list_entry(head, struct flb_fstore_file, _head); - chunk = fsf->data; - - if (now < (chunk->create_time + ctx->upload_timeout)) { - continue; /* Only send chunks which have timed out */ - } + /* send any chunks that are ready */ + mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { + chunk = mk_list_entry(head, struct s3_file, _head); + fsf = chunk->fsf; m_upload = get_upload(ctx, (const char *) fsf->meta_buf, fsf->meta_size); @@ -1686,16 +1656,26 @@ static void cb_s3_upload(struct flb_config *config, void *data) if (ret < 0) { flb_plg_error(ctx->ins, "Could not construct request buffer for %s", chunk->file_path); - continue; + if (ctx->preserve_data_ordering == FLB_TRUE) { + break; /* if preserve_data_ordering send in the queue order, do not skip over chunks */ + } else { + continue; + } } - /* FYI: if construct_request_buffer() succeedeed, the s3_file is locked */ ret = upload_data(ctx, chunk, m_upload, buffer, buffer_size, (const char *) fsf->meta_buf, fsf->meta_size); flb_free(buffer); - if (ret != 0) { - flb_plg_error(ctx->ins, "Could not send chunk with tag %s", - (char *) fsf->meta_buf); + if (ret == -2 || ret == 0) { + /* if we succeeded or retries expired, delete chunk file and remove from queue */ + mk_list_del(&chunk->_head); + s3_store_file_delete(ctx, chunk); + } + + if (ret < 0) { + if (ctx->preserve_data_ordering == FLB_TRUE) { + break; /* if preserve_data_ordering send in the queue order, do not skip over chunks */ + } } } @@ -2047,6 +2027,18 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, m_upload_file, file_first_log_time, out_flush->task->i_ins->name); } + /* + * Buffer new data in chunk in filesystem and wait for next data from engine + * If successful, data ordering is preserved to be the same as the engine sent us + */ + ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, + event_chunk->tag, flb_sds_len(event_chunk->tag), + file_first_log_time, out_flush->task->i_ins->name); + + if (ret < 0) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + /* If upload_timeout has elapsed, upload file */ if (upload_file != NULL && time(NULL) > (upload_file->create_time + ctx->upload_timeout)) { @@ -2064,41 +2056,13 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, total_file_size_check = FLB_TRUE; } - /* File is ready for upload, upload_file != NULL prevents from segfaulting. */ + /* lock chunk file so new new appends. Its ready to be sent. */ if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { - upload_file->input_name = out_flush->task->i_ins->name; - if (ctx->preserve_data_ordering == FLB_TRUE) { - /* Buffer last chunk in file and lock file to prevent further changes */ - ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, - event_chunk->tag, flb_sds_len(event_chunk->tag), - file_first_log_time, out_flush->task->i_ins->name); - - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - FLB_OUTPUT_RETURN(FLB_OK); - } - else { - /* Send upload directly without upload queue */ - ret = send_upload_request(ctx, chunk, upload_file, m_upload_file, - event_chunk->tag, - flb_sds_len(event_chunk->tag)); - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - FLB_OUTPUT_RETURN(FLB_OK); - } + s3_store_file_lock(upload_file); + /* sends only happen from upload timer callback which iterates over queue */ + mk_list_add(&upload_file->_head, &ctx->upload_queue); } - /* Buffer current chunk in filesystem and wait for next chunk from engine */ - ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, - event_chunk->tag, flb_sds_len(event_chunk->tag), - file_first_log_time, out_flush->task->i_ins->name); - - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } FLB_OUTPUT_RETURN(FLB_OK); } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index c7deff9dc43..62b8fbd980c 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -129,6 +129,9 @@ struct flb_s3 { struct mk_list uploads; + /* list of locked chunks that are ready to send */ + struct mk_list upload_queue; + int preserve_data_ordering; size_t file_size; diff --git a/plugins/out_s3/s3_store.h b/plugins/out_s3/s3_store.h index 605ce37cbf5..38d85b90f7f 100644 --- a/plugins/out_s3/s3_store.h +++ b/plugins/out_s3/s3_store.h @@ -32,6 +32,7 @@ struct s3_file { time_t first_log_time; /* first log time */ flb_sds_t file_path; /* file path */ struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */ + struct mk_list _head; }; int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, From 78b0e2fe5858b80ef83715b7eb7e04a1c85b7da8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 27 Mar 2023 21:10:43 -0700 Subject: [PATCH 11/17] out_s3: daemon thread with code changes only in S3 plugin Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 145 +++++++++++++++++++++++++++++++++----------- plugins/out_s3/s3.h | 2 + 2 files changed, 111 insertions(+), 36 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 43de90c0403..5f3d1bc9382 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -18,9 +18,12 @@ */ #include +#include +#include #include #include #include +#include #include #include #include @@ -955,14 +958,6 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT; } - /* - * S3 must ALWAYS use sync mode - * In the timer thread we do a mk_list_foreach_safe on the queue of uploads and chunks - * Iterating over those lists is not concurrent safe. If a flush call ran at the same time - * And deleted an item from the list, this could cause a crash/corruption. - */ - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - /* clean up any old buffers found on startup */ if (ctx->has_old_buffers == FLB_TRUE) { flb_plg_info(ctx->ins, @@ -1098,10 +1093,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, put_object: - /* - * remove chunk from buffer list- needed for async http so that the - * same chunk won't be sent more than once - */ ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size); if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); @@ -1643,7 +1634,7 @@ static void cb_s3_upload(struct flb_config *config, void *data) int complete; int ret; - flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload).."); + flb_plg_debug(ctx->ins, "Running upload daemon coro uploader (cb_s3_upload).."); /* send any chunks that are ready */ mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { @@ -1899,11 +1890,9 @@ static void unit_test_flush(void *out_context, struct s3_file *upload_file, FLB_OUTPUT_RETURN(ret); } -static void flush_init(void *out_context) +static void flush_startup_chunks(struct flb_s3 *ctx) { int ret; - struct flb_s3 *ctx = out_context; - struct flb_sched *sched; /* clean up any old buffers found on startup */ if (ctx->has_old_buffers == FLB_TRUE) { @@ -1919,30 +1908,112 @@ static void flush_init(void *out_context) "Failed to send locally buffered data left over " "from previous executions; will retry. Buffer=%s", ctx->fs->root_path); - FLB_OUTPUT_RETURN(FLB_RETRY); } } +} + + +/* + * Same as flb_output_return_do() but no coro prepare destroy + * and no coro yield. + * Using for S3 daemon thread so we can clean up the task + * But keep the coroutine. + * This is the best way to do it because task clean up is + * handled by the control thread in the engine AFAICT + * But writing to the output pipe is done from coro. + */ +static inline void flb_output_return_no_destroy(int ret) +{ + struct flb_coro *coro; + int n; + int pipe_fd; + uint32_t set; + uint64_t val; + struct flb_task *task; + struct flb_output_flush *out_flush; + struct flb_output_instance *o_ins; + struct flb_out_thread_instance *th_ins = NULL; + + coro = flb_coro_get(); + + out_flush = (struct flb_output_flush *) coro->data; + o_ins = out_flush->o_ins; + task = out_flush->task; + /* - * create a timer that will run periodically and check if uploads - * are ready for completion - * this is created once on the first flush + * To compose the signal event the relevant info is: + * + * - Unique Task events id: 2 in this case + * - Return value: FLB_OK (0), FLB_ERROR (1) or FLB_RETRY (2) + * - Task ID + * - Output Instance ID (struct flb_output_instance)->id + * + * We put together the return value with the task_id on the 32 bits at right */ - if (ctx->timer_created == FLB_FALSE) { - flb_plg_debug(ctx->ins, - "Creating upload timer with frequency %ds", - ctx->timer_ms / 1000); + set = FLB_TASK_SET(ret, task->id, o_ins->id); + val = FLB_BITS_U64_SET(2 /* FLB_ENGINE_TASK */, set); - sched = flb_sched_ctx_get(); + /* + * Set the target pipe channel: if this return code is running inside a + * thread pool worker, use the specific worker pipe/event loop to handle + * the return status, otherwise use the channel connected to the parent + * event loop. + */ + if (flb_output_is_threaded(o_ins) == FLB_TRUE) { + /* Retrieve the thread instance and prepare pipe channel */ + th_ins = flb_output_thread_instance_get(); + pipe_fd = th_ins->ch_thread_events[1]; + } + else { + pipe_fd = out_flush->o_ins->ch_events[1]; + } - ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, cb_s3_upload, ctx, NULL); + /* Notify the event loop about our return status */ + n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val)); + if (n == -1) { + flb_errno(); + } +} - if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to create upload timer"); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - ctx->timer_created = FLB_TRUE; +/* + * We need to use async IO for S3 because it is more stable + * However, S3 has unique needs. The chunk list, multipart code, etc + * all is not concurrent safe. + * Additionally, timer callbacks don't run in coroutines and + * can't using async IO. + * Solution: daemon coroutine + * The first coroutine that that flushes to S3 never ends, + * and just uploads and sleeps. + * + * We increment the metrics counters for the chunk originally + * associated with the coroutine and decrement the task users + * in release_chunk_upstream() + */ +static void daemon_coroutine(struct flb_config *config, struct flb_s3 *ctx) +{ + flb_plg_info(ctx->ins, "daemon coroutine starting..."); + + ctx->daemon_coro_started = FLB_TRUE; + + /* tell engine that this task did complete successfully */ + flb_output_return_no_destroy(FLB_OK); + + while (FLB_TRUE) { + /* Cleanup old buffers found on startup */ + flush_startup_chunks(ctx); + + /* upload any ready chunks */ + cb_s3_upload(config, ctx); + + /* + * special coroutine sleep + * Doesn't block any thread + * Puts an event on the event + * loop which will wake this coro back up + */ + flb_time_sleep(ctx->timer_ms); + flb_plg_info(ctx->ins, "daemon coroutine resumed..."); } } @@ -1966,9 +2037,6 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, struct flb_time tms; time_t file_first_log_time = 0; - /* Cleanup old buffers and initialize upload timer */ - flush_init(ctx); - /* Process chunk */ if (ctx->log_key) { chunk = flb_pack_msgpack_extract_log_key(ctx, @@ -2059,10 +2127,15 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, /* lock chunk file so new new appends. Its ready to be sent. */ if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { s3_store_file_lock(upload_file); - /* sends only happen from upload timer callback which iterates over queue */ + /* sends only happen from upload daemon coroutine which iterates over queue */ mk_list_add(&upload_file->_head, &ctx->upload_queue); } + if (ctx->daemon_coro_started == FLB_FALSE) { + daemon_coroutine(config, ctx); + } + + flb_plg_info(ctx->ins, "normal coro flush ending..."); FLB_OUTPUT_RETURN(FLB_OK); } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 62b8fbd980c..30a222c90ff 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -147,6 +147,8 @@ struct flb_s3 { flb_sds_t metadata_dir; flb_sds_t seq_index_file; + int daemon_coro_started; + struct flb_output_instance *ins; }; From e5f7be5a095d5ced4778694f89ffbd9b7c543304 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 27 Mar 2023 21:14:58 -0700 Subject: [PATCH 12/17] out_s3: do not use sync IO ever so do not upload in init callback? Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 63 +++++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 5f3d1bc9382..2c6b3385eeb 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -958,37 +958,38 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT; } - /* clean up any old buffers found on startup */ - if (ctx->has_old_buffers == FLB_TRUE) { - flb_plg_info(ctx->ins, - "Sending locally buffered data from previous " - "executions to S3; buffer=%s", - ctx->fs->root_path); - ctx->has_old_buffers = FLB_FALSE; - ret = put_all_chunks(ctx, FLB_TRUE); - if (ret < 0) { - ctx->has_old_buffers = FLB_TRUE; - flb_plg_error(ctx->ins, - "Failed to send locally buffered data left over " - "from previous executions; will retry. Buffer=%s", - ctx->fs->root_path); - } - } - - /* clean up any old uploads found on start up */ - if (ctx->has_old_uploads == FLB_TRUE) { - flb_plg_info(ctx->ins, - "Completing multipart uploads from previous " - "executions to S3; buffer=%s", - ctx->stream_upload->path); - ctx->has_old_uploads = FLB_FALSE; - - /* - * we don't need to worry if this fails; it will retry each - * time the upload callback is called - */ - cb_s3_upload(config, ctx); - } + /* this would use sync IO which we want to avoid */ + // /* clean up any old buffers found on startup */ + // if (ctx->has_old_buffers == FLB_TRUE) { + // flb_plg_info(ctx->ins, + // "Sending locally buffered data from previous " + // "executions to S3; buffer=%s", + // ctx->fs->root_path); + // ctx->has_old_buffers = FLB_FALSE; + // ret = put_all_chunks(ctx, FLB_TRUE); + // if (ret < 0) { + // ctx->has_old_buffers = FLB_TRUE; + // flb_plg_error(ctx->ins, + // "Failed to send locally buffered data left over " + // "from previous executions; will retry. Buffer=%s", + // ctx->fs->root_path); + // } + // } + + // /* clean up any old uploads found on start up */ + // if (ctx->has_old_uploads == FLB_TRUE) { + // flb_plg_info(ctx->ins, + // "Completing multipart uploads from previous " + // "executions to S3; buffer=%s", + // ctx->stream_upload->path); + // ctx->has_old_uploads = FLB_FALSE; + + // /* + // * we don't need to worry if this fails; it will retry each + // * time the upload callback is called + // */ + // cb_s3_upload(config, ctx); + // } /* this is done last since in the previous block we make calls to AWS */ ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins); From 0f6c2cc52cb9d633e6e7d01f806cc1bf16c14e9a Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 27 Mar 2023 21:28:38 -0700 Subject: [PATCH 13/17] out_s3: remove annoying info message Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 2c6b3385eeb..8f25620f136 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -2135,8 +2135,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, if (ctx->daemon_coro_started == FLB_FALSE) { daemon_coroutine(config, ctx); } - - flb_plg_info(ctx->ins, "normal coro flush ending..."); + FLB_OUTPUT_RETURN(FLB_OK); } From 44b7c09c08124eb629307ca76308acd0a623dd7d Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 28 Mar 2023 16:01:31 -0700 Subject: [PATCH 14/17] out_s3: ignore active stream in put_all_chunks on startup Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 8f25620f136..fc5ba4b202f 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1239,6 +1239,10 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) continue; } + if (is_startup == FLB_TRUE && fs_stream == ctx->stream_active) { + continue; + } + mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); chunk = fsf->data; @@ -2135,7 +2139,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, if (ctx->daemon_coro_started == FLB_FALSE) { daemon_coroutine(config, ctx); } - + FLB_OUTPUT_RETURN(FLB_OK); } From 2498ab748dd58a39063aae1b50f7989bb53844ba Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 10 Apr 2023 22:51:18 -0700 Subject: [PATCH 15/17] out_s3: support no_retries and no_limits for retry limit, exit daemon coro on shutdown, send timed out chunks in daemon coro Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 60 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index fc5ba4b202f..0ab2f7f3ff9 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1101,7 +1101,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ret < 0) { if (chunk) { chunk->failures += 1; - if (chunk->failures > ctx->ins->retry_limit){ + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit){ s3_retry_warn(ctx, tag, chunk->input_name, file_first_log_time, FLB_FALSE); return -2; } @@ -1147,7 +1147,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { chunk->failures += 1; - if (chunk->failures > ctx->ins->retry_limit) { + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit) { s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, chunk->create_time, FLB_FALSE); /* @@ -1234,12 +1234,15 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) if (fs_stream == ctx->stream_upload) { continue; } + /* skip metadata stream */ if (fs_stream == ctx->stream_metadata) { continue; } + /* on startup, we only send old chunks in this routine */ if (is_startup == FLB_TRUE && fs_stream == ctx->stream_active) { + flb_info("put_all_chunks: stream_active has %d chunks", mk_list_size(&fs_stream->files)); continue; } @@ -1278,7 +1281,7 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) if (ret < 0) { chunk->failures += 1; if (is_startup == FLB_TRUE) { - if (chunk->failures > ctx->ins->retry_limit){ + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit){ s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, chunk->create_time, FLB_FALSE); if (chunk->locked == FLB_TRUE) { @@ -1638,9 +1641,33 @@ static void cb_s3_upload(struct flb_config *config, void *data) struct mk_list *head; int complete; int ret; + time_t now; + now = time(NULL); + flb_plg_debug(ctx->ins, "Running upload daemon coro uploader (cb_s3_upload).."); + /* check chunks in active stream not marked as ready to be sent and see if any are timed out */ + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { + fsf = mk_list_entry(head, struct flb_fstore_file, _head); + chunk = fsf->data; + + /* Locked chunks are already in the queue, skip */ + if (chunk->locked == FLB_TRUE) { + continue; + } + + if (now > (chunk->create_time + ctx->upload_timeout)) { + /* add to upload queue */ + if (chunk->input_name) { + flb_plg_info(ctx->ins, "upload_timeout reached for chunk from %s", + chunk->input_name); + } + s3_store_file_lock(chunk); + mk_list_add(&chunk->_head, &ctx->upload_queue); + } + } + /* send any chunks that are ready */ mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { chunk = mk_list_entry(head, struct s3_file, _head); @@ -1680,7 +1707,7 @@ static void cb_s3_upload(struct flb_config *config, void *data) m_upload = mk_list_entry(head, struct multipart_upload, _head); complete = FLB_FALSE; - if (m_upload->complete_errors > ctx->ins->retry_limit) { + if (ctx->ins->retry_limit >= 0 && m_upload->complete_errors > ctx->ins->retry_limit) { flb_plg_error(ctx->ins, "Multipart Upload for %s has failed " "s3:CompleteMultipartUpload more than configured retry_limit, " @@ -1913,6 +1940,11 @@ static void flush_startup_chunks(struct flb_s3 *ctx) "Failed to send locally buffered data left over " "from previous executions; will retry. Buffer=%s", ctx->fs->root_path); + } else { + flb_plg_info(ctx->ins, + "Successfully sent all locally buffered data left over " + "from previous executions. Buffer=%s", + ctx->fs->root_path); } } @@ -2004,13 +2036,21 @@ static void daemon_coroutine(struct flb_config *config, struct flb_s3 *ctx) /* tell engine that this task did complete successfully */ flb_output_return_no_destroy(FLB_OK); - while (FLB_TRUE) { + /* + * FLB engine uses a graceful cooperative shutdown model. + * If coroutines never end, the system won't stop. + * So the daemon coroutine must exit itself when the engine is in shutdown mode. + */ + while (config->is_running == FLB_TRUE) { /* Cleanup old buffers found on startup */ flush_startup_chunks(ctx); /* upload any ready chunks */ cb_s3_upload(config, ctx); + if (config->is_running == FLB_FALSE) { + break; + } /* * special coroutine sleep * Doesn't block any thread @@ -2097,7 +2137,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, unit_test_flush(ctx, upload_file, event_chunk->tag, flb_sds_len(event_chunk->tag), chunk, chunk_size, - m_upload_file, file_first_log_time, out_flush->task->i_ins->name); + m_upload_file, file_first_log_time, i_ins->name); } /* @@ -2106,7 +2146,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, */ ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, event_chunk->tag, flb_sds_len(event_chunk->tag), - file_first_log_time, out_flush->task->i_ins->name); + file_first_log_time, i_ins->name); if (ret < 0) { FLB_OUTPUT_RETURN(FLB_RETRY); @@ -2116,8 +2156,8 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, if (upload_file != NULL && time(NULL) > (upload_file->create_time + ctx->upload_timeout)) { upload_timeout_check = FLB_TRUE; - flb_plg_info(ctx->ins, "upload_timeout reached for %s", - event_chunk->tag); + flb_plg_info(ctx->ins, "upload_timeout reached for chunk from %s, tag=%s", + i_ins->name, event_chunk->tag); } m_upload_file = get_upload(ctx, @@ -2151,6 +2191,8 @@ static int cb_s3_exit(void *data, struct flb_config *config) struct mk_list *tmp; struct mk_list *head; + flb_plg_info(ctx->ins, "cb_exit"); + if (!ctx) { return 0; } From b035d6f717b744cc8414b17147ae457030396ca8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 2 May 2023 13:26:24 -0700 Subject: [PATCH 16/17] out_s3: in upload_data, use input_name variable instead of from chunk Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 0ab2f7f3ff9..da07b7f014b 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1102,11 +1102,11 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { chunk->failures += 1; if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit){ - s3_retry_warn(ctx, tag, chunk->input_name, file_first_log_time, FLB_FALSE); + s3_retry_warn(ctx, tag, input_name, file_first_log_time, FLB_FALSE); return -2; } else { - s3_retry_warn(ctx, tag, chunk->input_name, file_first_log_time, FLB_TRUE); + s3_retry_warn(ctx, tag, input_name, file_first_log_time, FLB_TRUE); return -1; } } From 9efcca9003bd44288ee6ab0b61fa1c4d8b9ff721 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 2 May 2023 14:04:23 -0700 Subject: [PATCH 17/17] out_s3: re-enable send old chunks on startup Signed-off-by: Wesley Pettit --- plugins/out_s3/s3.c | 63 ++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index da07b7f014b..3c32f4010ac 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -958,38 +958,37 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT; } - /* this would use sync IO which we want to avoid */ - // /* clean up any old buffers found on startup */ - // if (ctx->has_old_buffers == FLB_TRUE) { - // flb_plg_info(ctx->ins, - // "Sending locally buffered data from previous " - // "executions to S3; buffer=%s", - // ctx->fs->root_path); - // ctx->has_old_buffers = FLB_FALSE; - // ret = put_all_chunks(ctx, FLB_TRUE); - // if (ret < 0) { - // ctx->has_old_buffers = FLB_TRUE; - // flb_plg_error(ctx->ins, - // "Failed to send locally buffered data left over " - // "from previous executions; will retry. Buffer=%s", - // ctx->fs->root_path); - // } - // } - - // /* clean up any old uploads found on start up */ - // if (ctx->has_old_uploads == FLB_TRUE) { - // flb_plg_info(ctx->ins, - // "Completing multipart uploads from previous " - // "executions to S3; buffer=%s", - // ctx->stream_upload->path); - // ctx->has_old_uploads = FLB_FALSE; - - // /* - // * we don't need to worry if this fails; it will retry each - // * time the upload callback is called - // */ - // cb_s3_upload(config, ctx); - // } + /* clean up any old buffers found on startup */ + if (ctx->has_old_buffers == FLB_TRUE) { + flb_plg_info(ctx->ins, + "Sending locally buffered data from previous " + "executions to S3; buffer=%s", + ctx->fs->root_path); + ctx->has_old_buffers = FLB_FALSE; + ret = put_all_chunks(ctx, FLB_TRUE); + if (ret < 0) { + ctx->has_old_buffers = FLB_TRUE; + flb_plg_error(ctx->ins, + "Failed to send locally buffered data left over " + "from previous executions; will retry. Buffer=%s", + ctx->fs->root_path); + } + } + + /* clean up any old uploads found on start up */ + if (ctx->has_old_uploads == FLB_TRUE) { + flb_plg_info(ctx->ins, + "Completing multipart uploads from previous " + "executions to S3; buffer=%s", + ctx->stream_upload->path); + ctx->has_old_uploads = FLB_FALSE; + + /* + * we don't need to worry if this fails; it will retry each + * time the upload callback is called + */ + cb_s3_upload(config, ctx); + } /* this is done last since in the previous block we make calls to AWS */ ctx->provider->provider_vtable->upstream_set(ctx->provider, ctx->ins);