diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index b7475cbc8c6..3c32f4010ac 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -18,9 +18,12 @@ */ #include +#include +#include #include #include #include +#include #include #include #include @@ -57,9 +60,9 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx, static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, int tag_len, - time_t file_first_log_time); + time_t file_first_log_time, + char *input_name); -static void remove_from_queue(struct upload_queue *entry); static struct flb_aws_header content_encoding_header = { .key = "Content-Encoding", @@ -140,7 +143,7 @@ static void s3_retry_warn(struct flb_s3 *ctx, const char *tag, flb_plg_warn(ctx->ins, "chunk tag=%s, create_time=%s cannot be retried", tag, create_time_str); - } + } } else { if (less_than_limit == FLB_TRUE) { @@ -489,7 +492,6 @@ static void s3_context_destroy(struct flb_s3 *ctx) struct mk_list *head; struct mk_list *tmp; struct multipart_upload *m_upload; - struct upload_queue *upload_contents; if (!ctx) { return; } @@ -541,13 +543,6 @@ static void s3_context_destroy(struct flb_s3 *ctx) multipart_upload_destroy(m_upload); } - mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { - upload_contents = mk_list_entry(head, struct upload_queue, _head); - s3_store_file_delete(ctx, upload_contents->upload_file); - multipart_upload_destroy(upload_contents->m_upload_file); - remove_from_queue(upload_contents); - } - flb_free(ctx); } @@ -577,7 +572,6 @@ static int cb_s3_init(struct flb_output_instance *ins, mk_list_init(&ctx->uploads); mk_list_init(&ctx->upload_queue); - ctx->upload_queue_success = FLB_FALSE; /* Export context */ flb_output_set_context(ins, ctx); @@ -964,14 +958,6 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT; } - /* - * S3 must ALWAYS use sync mode - * In the timer thread we do a mk_list_foreach_safe on the queue of uplaods and chunks - * Iterating over those lists is not concurrent safe. If a flush call ran at the same time - * And deleted an item from the list, this could cause a crash/corruption. - */ - ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC); - /* clean up any old buffers found on startup */ if (ctx->has_old_buffers == FLB_TRUE) { flb_plg_info(ctx->ins, @@ -1032,6 +1018,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, size_t payload_size = 0; size_t preCompress_size = 0; time_t file_first_log_time = time(NULL); + char* input_name = NULL; /* * When chunk does not exist, file_first_log_time will be the current time. @@ -1041,6 +1028,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, */ if (chunk != NULL) { file_first_log_time = chunk->first_log_time; + input_name = chunk->input_name; } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { @@ -1105,46 +1093,32 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, put_object: - /* - * remove chunk from buffer list- needed for async http so that the - * same chunk won't be sent more than once - */ ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size); if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } if (ret < 0) { - /* re-add chunk to list */ if (chunk) { chunk->failures += 1; - if (chunk->failures > ctx->ins->retry_limit){ - s3_retry_warn(ctx, tag, chunk->input_name, create_time, FLB_FALSE); - s3_store_file_delete(ctx, chunk); + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit){ + s3_retry_warn(ctx, tag, input_name, file_first_log_time, FLB_FALSE); return -2; } else { - s3_retry_warn(ctx, tag, chunk->input_name, create_time, FLB_TRUE); - s3_store_file_unlock(chunk); + s3_retry_warn(ctx, tag, input_name, file_first_log_time, FLB_TRUE); return -1; } } } - /* data was sent successfully- delete the local buffer */ - if (chunk) { - s3_store_file_delete(ctx, chunk); - } return 0; multipart: if (init_upload == FLB_TRUE) { - m_upload = create_upload(ctx, tag, tag_len, file_first_log_time); + m_upload = create_upload(ctx, tag, tag_len, file_first_log_time, input_name); if (!m_upload) { flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag); - if (chunk) { - s3_store_file_unlock(chunk); - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1156,9 +1130,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = create_multipart_upload(ctx, m_upload); if (ret < 0) { flb_plg_error(ctx->ins, "Could not initiate multipart upload"); - if (chunk) { - s3_store_file_unlock(chunk); - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1173,13 +1144,11 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, flb_free(payload_buf); } - /* re-add chunk to list */ if (chunk) { chunk->failures += 1; - if (chunk->failures > ctx->ins->retry_limit) { + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit) { s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, chunk->create_time, FLB_FALSE); - s3_store_file_delete(ctx, chunk); /* * part_number initializes with 1, if the number still is 1 which means * no data is uploaded and this upload file can be deleted , else set @@ -1202,17 +1171,12 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, else { s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name, chunk->create_time, FLB_TRUE); - s3_store_file_unlock(chunk); return -1; } } } + m_upload->part_number += 1; - /* data was sent successfully- delete the local buffer */ - if (chunk) { - s3_store_file_delete(ctx, chunk); - chunk = NULL; - } if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } @@ -1269,27 +1233,22 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) if (fs_stream == ctx->stream_upload) { continue; } + /* skip metadata stream */ if (fs_stream == ctx->stream_metadata) { continue; } + /* on startup, we only send old chunks in this routine */ + if (is_startup == FLB_TRUE && fs_stream == ctx->stream_active) { + flb_info("put_all_chunks: stream_active has %d chunks", mk_list_size(&fs_stream->files)); + continue; + } + mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); chunk = fsf->data; - /* Locked chunks are being processed, skip */ - if (chunk->locked == FLB_TRUE) { - continue; - } - - if (chunk->failures > ctx->ins->retry_limit) { - s3_retry_warn(ctx, (char *) fsf->meta_buf, - NULL, chunk->create_time, FLB_FALSE); - flb_fstore_file_delete(ctx->fs, fsf); - continue; - } - ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); if (ret < 0) { @@ -1321,9 +1280,23 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) if (ret < 0) { chunk->failures += 1; if (is_startup == FLB_TRUE) { - s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, - chunk->create_time, FLB_TRUE); - s3_store_file_unlock(chunk); + if (ctx->ins->retry_limit >= 0 && chunk->failures > ctx->ins->retry_limit){ + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, FLB_FALSE); + if (chunk->locked == FLB_TRUE) { + /* remove from upload_queue */ + if (chunk->_head.next != NULL && chunk->_head.prev != NULL) { + mk_list_del(&chunk->_head); + } + } + s3_store_file_delete(ctx, chunk); + return -1; + } + else { + s3_retry_warn(ctx, (char *) fsf->meta_buf, NULL, + chunk->create_time, FLB_TRUE); + return -1; + } } else { flb_plg_error(ctx->ins, "Failed to flush chunk tag=%s, " @@ -1335,6 +1308,12 @@ static int put_all_chunks(struct flb_s3 *ctx, int is_startup) } /* data was sent successfully- delete the local buffer */ + if (chunk->locked == FLB_TRUE) { + /* remove from upload_queue */ + if (chunk->_head.next != NULL && chunk->_head.prev != NULL) { + mk_list_del(&chunk->_head); + } + } s3_store_file_delete(ctx, chunk); } } @@ -1370,11 +1349,6 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, return -1; } - /* - * lock the chunk from buffer list- needed for async http so that the - * same chunk won't be sent more than once. - */ - s3_store_file_lock(chunk); body = buffered_data; body_size = buffer_size; } @@ -1390,9 +1364,6 @@ static int construct_request_buffer(struct flb_s3 *ctx, flb_sds_t new_data, if (!tmp) { flb_errno(); flb_free(buffered_data); - if (chunk) { - s3_store_file_unlock(chunk); - } return -1; } body = buffered_data = tmp; @@ -1583,7 +1554,8 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,const char *tag, i } static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag, - int tag_len, time_t file_first_log_time) + int tag_len, time_t file_first_log_time, + char *input_name) { int ret; struct multipart_upload *m_upload = NULL; @@ -1614,6 +1586,9 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *ta m_upload->upload_state = MULTIPART_UPLOAD_STATE_NOT_CREATED; m_upload->part_number = 1; m_upload->init_time = time(NULL); + if (input_name != NULL) { + m_upload->input_name = input_name; + } mk_list_add(&m_upload->_head, &ctx->uploads); /* Update file and increment index value right before request */ @@ -1632,120 +1607,17 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *ta return m_upload; } -/* Adds an entry to upload queue */ -static int add_to_queue(struct flb_s3 *ctx, struct s3_file *upload_file, - struct multipart_upload *m_upload_file, const char *tag, int tag_len) -{ - struct upload_queue *upload_contents; - char *tag_cpy; - - /* Create upload contents object and add to upload queue */ - upload_contents = flb_malloc(sizeof(struct upload_queue)); - if (upload_contents == NULL) { - flb_plg_error(ctx->ins, "Error allocating memory for upload_queue entry"); - flb_errno(); - return -1; - } - upload_contents->upload_file = upload_file; - upload_contents->m_upload_file = m_upload_file; - upload_contents->tag_len = tag_len; - upload_contents->upload_time = -1; - - /* Necessary to create separate string for tag to prevent corruption */ - tag_cpy = flb_malloc(tag_len); - if (tag_cpy == NULL) { - flb_free(upload_contents); - flb_plg_error(ctx->ins, "Error allocating memory for tag in add_to_queue"); - flb_errno(); - return -1; - } - strncpy(tag_cpy, tag, tag_len); - upload_contents->tag = tag_cpy; - - /* Add entry to upload queue */ - mk_list_add(&upload_contents->_head, &ctx->upload_queue); - return 0; -} - -/* Removes an entry from upload_queue */ -void remove_from_queue(struct upload_queue *entry) -{ - mk_list_del(&entry->_head); - flb_free(entry->tag); - flb_free(entry); - return; -} - -/* Validity check for upload queue object */ -static int upload_queue_valid(struct upload_queue *upload_contents, time_t now, - void *out_context) -{ - struct flb_s3 *ctx = out_context; - - if (upload_contents == NULL) { - flb_plg_error(ctx->ins, "Error getting entry from upload_queue"); - return -1; - } - if (upload_contents->_head.next == NULL || upload_contents->_head.prev == NULL) { - flb_plg_debug(ctx->ins, "Encountered previously deleted entry in " - "upload_queue. Deleting invalid entry"); - mk_list_del(&upload_contents->_head); - return -1; - } - if (upload_contents->upload_file->locked == FLB_FALSE) { - flb_plg_debug(ctx->ins, "Encountered unlocked file in upload_queue. " - "Exiting"); - return -1; - } - if (upload_contents->upload_file->size <= 0) { - flb_plg_debug(ctx->ins, "Encountered empty chunk file in upload_queue. " - "Deleting empty chunk file"); - remove_from_queue(upload_contents); - return -1; - } - if (now < upload_contents->upload_time) { - flb_plg_debug(ctx->ins, "Found valid chunk file but not ready to upload"); - return -1; - } - return 0; -} - -static int send_upload_request(void *out_context, flb_sds_t chunk, - struct s3_file *upload_file, - struct multipart_upload *m_upload_file, - const char *tag, int tag_len) -{ - int ret; - char *buffer; - size_t buffer_size; - struct flb_s3 *ctx = out_context; - - /* Create buffer to upload to S3 */ - ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); - flb_sds_destroy(chunk); - if (ret < 0) { - flb_plg_error(ctx->ins, "Could not construct request buffer for %s", - upload_file->file_path); - return -1; - } - - /* Upload to S3 */ - ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len); - flb_free(buffer); - - return ret; -} - static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_t chunk, int chunk_size, const char *tag, int tag_len, - time_t file_first_log_time) + time_t file_first_log_time, + char *input_name) { int ret; struct flb_s3 *ctx = out_context; ret = s3_store_buffer_put(ctx, upload_file, tag, - tag_len, chunk, (size_t) chunk_size, file_first_log_time); + tag_len, chunk, (size_t) chunk_size, file_first_log_time, input_name); flb_sds_destroy(chunk); if (ret < 0) { flb_plg_warn(ctx->ins, "Could not buffer chunk. Data order preservation " @@ -1755,62 +1627,6 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file, return 0; } -/* Uploads all chunk files in queue synchronously */ -static void s3_upload_queue(struct flb_config *config, void *out_context) -{ - int ret; - time_t now; - struct upload_queue *upload_contents; - struct flb_s3 *ctx = out_context; - struct mk_list *tmp; - struct mk_list *head; - - flb_plg_debug(ctx->ins, "Running upload timer callback (upload_queue).."); - - /* No chunks in upload queue. Scan for timed out chunks. */ - if (mk_list_size(&ctx->upload_queue) == 0) { - flb_plg_debug(ctx->ins, "No files found in upload_queue. Scanning for timed " - "out chunks"); - cb_s3_upload(config, out_context); - } - - /* Iterate through each file in upload queue */ - mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { - upload_contents = mk_list_entry(head, struct upload_queue, _head); - - now = time(NULL); - - /* Checks if upload_contents is valid */ - ret = upload_queue_valid(upload_contents, now, ctx); - if (ret < 0) { - goto exit; - } - - /* Try to upload file. Return value can be 0, -1, -2. */ - ret = send_upload_request(ctx, NULL, upload_contents->upload_file, - upload_contents->m_upload_file, - upload_contents->tag, upload_contents->tag_len); - - if (ret == 0) { - remove_from_queue(upload_contents); - ctx->upload_queue_success = FLB_TRUE; - } - else { - ctx->upload_queue_success = FLB_FALSE; - - /* If retry limit was reached, remove file from queue */ - if (ret == -2) { - remove_from_queue(upload_contents); - continue; - } - - break; - } - } - -exit: - return; -} static void cb_s3_upload(struct flb_config *config, void *data) { @@ -1826,40 +1642,62 @@ static void cb_s3_upload(struct flb_config *config, void *data) int ret; time_t now; - flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload).."); - now = time(NULL); + + flb_plg_debug(ctx->ins, "Running upload daemon coro uploader (cb_s3_upload).."); - /* Check all chunks and see if any have timed out */ + /* check chunks in active stream not marked as ready to be sent and see if any are timed out */ mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { fsf = mk_list_entry(head, struct flb_fstore_file, _head); chunk = fsf->data; - if (now < (chunk->create_time + ctx->upload_timeout)) { - continue; /* Only send chunks which have timed out */ - } - - /* Locked chunks are being processed, skip */ + /* Locked chunks are already in the queue, skip */ if (chunk->locked == FLB_TRUE) { continue; } + if (now > (chunk->create_time + ctx->upload_timeout)) { + /* add to upload queue */ + if (chunk->input_name) { + flb_plg_info(ctx->ins, "upload_timeout reached for chunk from %s", + chunk->input_name); + } + s3_store_file_lock(chunk); + mk_list_add(&chunk->_head, &ctx->upload_queue); + } + } + + /* send any chunks that are ready */ + mk_list_foreach_safe(head, tmp, &ctx->upload_queue) { + chunk = mk_list_entry(head, struct s3_file, _head); + fsf = chunk->fsf; + m_upload = get_upload(ctx, (const char *) fsf->meta_buf, fsf->meta_size); ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); if (ret < 0) { flb_plg_error(ctx->ins, "Could not construct request buffer for %s", chunk->file_path); - continue; + if (ctx->preserve_data_ordering == FLB_TRUE) { + break; /* if preserve_data_ordering send in the queue order, do not skip over chunks */ + } else { + continue; + } } - /* FYI: if construct_request_buffer() succeedeed, the s3_file is locked */ ret = upload_data(ctx, chunk, m_upload, buffer, buffer_size, (const char *) fsf->meta_buf, fsf->meta_size); flb_free(buffer); - if (ret != 0) { - flb_plg_error(ctx->ins, "Could not send chunk with tag %s", - (char *) fsf->meta_buf); + if (ret == -2 || ret == 0) { + /* if we succeeded or retries expired, delete chunk file and remove from queue */ + mk_list_del(&chunk->_head); + s3_store_file_delete(ctx, chunk); + } + + if (ret < 0) { + if (ctx->preserve_data_ordering == FLB_TRUE) { + break; /* if preserve_data_ordering send in the queue order, do not skip over chunks */ + } } } @@ -1868,10 +1706,10 @@ static void cb_s3_upload(struct flb_config *config, void *data) m_upload = mk_list_entry(head, struct multipart_upload, _head); complete = FLB_FALSE; - if (m_upload->complete_errors > ctx->ins->retry_limit) { + if (ctx->ins->retry_limit >= 0 && m_upload->complete_errors > ctx->ins->retry_limit) { flb_plg_error(ctx->ins, "Multipart Upload for %s has failed " - "s3: CompleteMultipartUpload more than configured retry_limit, " + "s3:CompleteMultipartUpload more than configured retry_limit, " "output will give up ", m_upload->s3_key); mk_list_del(&m_upload->_head); multipart_upload_destroy(m_upload); @@ -2060,7 +1898,8 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char static void unit_test_flush(void *out_context, struct s3_file *upload_file, const char *tag, int tag_len, flb_sds_t chunk, int chunk_size, struct multipart_upload *m_upload_file, - time_t file_first_log_time) + time_t file_first_log_time, + char *input_name) { int ret; char *buffer; @@ -2068,7 +1907,7 @@ static void unit_test_flush(void *out_context, struct s3_file *upload_file, struct flb_s3 *ctx = out_context; s3_store_buffer_put(ctx, upload_file, tag, tag_len, - chunk, (size_t) chunk_size, file_first_log_time); + chunk, (size_t) chunk_size, file_first_log_time, input_name); ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size); if (ret < 0) { flb_plg_error(ctx->ins, "Could not construct request buffer for %s", @@ -2082,11 +1921,9 @@ static void unit_test_flush(void *out_context, struct s3_file *upload_file, FLB_OUTPUT_RETURN(ret); } -static void flush_init(void *out_context) +static void flush_startup_chunks(struct flb_s3 *ctx) { int ret; - struct flb_s3 *ctx = out_context; - struct flb_sched *sched; /* clean up any old buffers found on startup */ if (ctx->has_old_buffers == FLB_TRUE) { @@ -2102,35 +1939,125 @@ static void flush_init(void *out_context) "Failed to send locally buffered data left over " "from previous executions; will retry. Buffer=%s", ctx->fs->root_path); - FLB_OUTPUT_RETURN(FLB_RETRY); + } else { + flb_plg_info(ctx->ins, + "Successfully sent all locally buffered data left over " + "from previous executions. Buffer=%s", + ctx->fs->root_path); } } +} + + +/* + * Same as flb_output_return_do() but no coro prepare destroy + * and no coro yield. + * Using for S3 daemon thread so we can clean up the task + * But keep the coroutine. + * This is the best way to do it because task clean up is + * handled by the control thread in the engine AFAICT + * But writing to the output pipe is done from coro. + */ +static inline void flb_output_return_no_destroy(int ret) +{ + struct flb_coro *coro; + int n; + int pipe_fd; + uint32_t set; + uint64_t val; + struct flb_task *task; + struct flb_output_flush *out_flush; + struct flb_output_instance *o_ins; + struct flb_out_thread_instance *th_ins = NULL; + + coro = flb_coro_get(); + + out_flush = (struct flb_output_flush *) coro->data; + o_ins = out_flush->o_ins; + task = out_flush->task; + /* - * create a timer that will run periodically and check if uploads - * are ready for completion - * this is created once on the first flush + * To compose the signal event the relevant info is: + * + * - Unique Task events id: 2 in this case + * - Return value: FLB_OK (0), FLB_ERROR (1) or FLB_RETRY (2) + * - Task ID + * - Output Instance ID (struct flb_output_instance)->id + * + * We put together the return value with the task_id on the 32 bits at right */ - if (ctx->timer_created == FLB_FALSE) { - flb_plg_debug(ctx->ins, - "Creating upload timer with frequency %ds", - ctx->timer_ms / 1000); + set = FLB_TASK_SET(ret, task->id, o_ins->id); + val = FLB_BITS_U64_SET(2 /* FLB_ENGINE_TASK */, set); - sched = flb_sched_ctx_get(); + /* + * Set the target pipe channel: if this return code is running inside a + * thread pool worker, use the specific worker pipe/event loop to handle + * the return status, otherwise use the channel connected to the parent + * event loop. + */ + if (flb_output_is_threaded(o_ins) == FLB_TRUE) { + /* Retrieve the thread instance and prepare pipe channel */ + th_ins = flb_output_thread_instance_get(); + pipe_fd = th_ins->ch_thread_events[1]; + } + else { + pipe_fd = out_flush->o_ins->ch_events[1]; + } - if (ctx->preserve_data_ordering) { - ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, s3_upload_queue, ctx, NULL); - } - else { - ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, cb_s3_upload, ctx, NULL); - } - if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to create upload timer"); - FLB_OUTPUT_RETURN(FLB_RETRY); + /* Notify the event loop about our return status */ + n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val)); + if (n == -1) { + flb_errno(); + } +} + +/* + * We need to use async IO for S3 because it is more stable + * However, S3 has unique needs. The chunk list, multipart code, etc + * all is not concurrent safe. + * Additionally, timer callbacks don't run in coroutines and + * can't using async IO. + * Solution: daemon coroutine + * The first coroutine that that flushes to S3 never ends, + * and just uploads and sleeps. + * + * We increment the metrics counters for the chunk originally + * associated with the coroutine and decrement the task users + * in release_chunk_upstream() + */ +static void daemon_coroutine(struct flb_config *config, struct flb_s3 *ctx) +{ + flb_plg_info(ctx->ins, "daemon coroutine starting..."); + + ctx->daemon_coro_started = FLB_TRUE; + + /* tell engine that this task did complete successfully */ + flb_output_return_no_destroy(FLB_OK); + + /* + * FLB engine uses a graceful cooperative shutdown model. + * If coroutines never end, the system won't stop. + * So the daemon coroutine must exit itself when the engine is in shutdown mode. + */ + while (config->is_running == FLB_TRUE) { + /* Cleanup old buffers found on startup */ + flush_startup_chunks(ctx); + + /* upload any ready chunks */ + cb_s3_upload(config, ctx); + + if (config->is_running == FLB_FALSE) { + break; } - ctx->timer_created = FLB_TRUE; + /* + * special coroutine sleep + * Doesn't block any thread + * Puts an event on the event + * loop which will wake this coro back up + */ + flb_time_sleep(ctx->timer_ms); + flb_plg_info(ctx->ins, "daemon coroutine resumed..."); } } @@ -2154,9 +2081,6 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, struct flb_time tms; time_t file_first_log_time = 0; - /* Cleanup old buffers and initialize upload timer */ - flush_init(ctx); - /* Process chunk */ if (ctx->log_key) { chunk = flb_pack_msgpack_extract_log_key(ctx, @@ -2212,90 +2136,49 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, unit_test_flush(ctx, upload_file, event_chunk->tag, flb_sds_len(event_chunk->tag), chunk, chunk_size, - m_upload_file, file_first_log_time); + m_upload_file, file_first_log_time, i_ins->name); } - /* Discard upload_file if it has failed to upload ctx->ins->retry_limit times */ - if (upload_file != NULL && upload_file->failures > ctx->ins->retry_limit) { - s3_retry_warn(ctx, event_chunk->tag, out_flush->task->i_ins->name, - upload_file->create_time, FLB_FALSE); - s3_store_file_delete(ctx, upload_file); - upload_file = NULL; + /* + * Buffer new data in chunk in filesystem and wait for next data from engine + * If successful, data ordering is preserved to be the same as the engine sent us + */ + ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, + event_chunk->tag, flb_sds_len(event_chunk->tag), + file_first_log_time, i_ins->name); + + if (ret < 0) { + FLB_OUTPUT_RETURN(FLB_RETRY); } /* If upload_timeout has elapsed, upload file */ if (upload_file != NULL && time(NULL) > (upload_file->create_time + ctx->upload_timeout)) { upload_timeout_check = FLB_TRUE; - flb_plg_info(ctx->ins, "upload_timeout reached for %s", - event_chunk->tag); + flb_plg_info(ctx->ins, "upload_timeout reached for chunk from %s, tag=%s", + i_ins->name, event_chunk->tag); } m_upload_file = get_upload(ctx, event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (m_upload_file != NULL && time(NULL) > - (m_upload_file->init_time + ctx->upload_timeout)) { - upload_timeout_check = FLB_TRUE; - flb_plg_info(ctx->ins, "upload_timeout reached for %s", event_chunk->tag); - m_upload_file->input_name = out_flush->task->i_ins->name; - } - /* If total_file_size has been reached, upload file */ if ((upload_file && upload_file->size + chunk_size > ctx->upload_chunk_size) || (m_upload_file && m_upload_file->bytes + chunk_size > ctx->file_size)) { total_file_size_check = FLB_TRUE; } - /* File is ready for upload, upload_file != NULL prevents from segfaulting. */ + /* lock chunk file so new new appends. Its ready to be sent. */ if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) { - upload_file->input_name = out_flush->task->i_ins->name; - if (ctx->preserve_data_ordering == FLB_TRUE) { - /* Buffer last chunk in file and lock file to prevent further changes */ - ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, - event_chunk->tag, flb_sds_len(event_chunk->tag), - file_first_log_time); - - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - s3_store_file_lock(upload_file); - - /* Add chunk file to upload queue */ - ret = add_to_queue(ctx, upload_file, m_upload_file, - event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_ERROR); - } - - /* Go through upload queue and return error if something went wrong */ - s3_upload_queue(config, ctx); - if (ctx->upload_queue_success == FLB_FALSE) { - ctx->upload_queue_success = FLB_TRUE; - FLB_OUTPUT_RETURN(FLB_ERROR); - } - FLB_OUTPUT_RETURN(FLB_OK); - } - else { - /* Send upload directly without upload queue */ - ret = send_upload_request(ctx, chunk, upload_file, m_upload_file, - event_chunk->tag, - flb_sds_len(event_chunk->tag)); - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - FLB_OUTPUT_RETURN(FLB_OK); - } + s3_store_file_lock(upload_file); + /* sends only happen from upload daemon coroutine which iterates over queue */ + mk_list_add(&upload_file->_head, &ctx->upload_queue); } - /* Buffer current chunk in filesystem and wait for next chunk from engine */ - ret = buffer_chunk(ctx, upload_file, chunk, chunk_size, - event_chunk->tag, flb_sds_len(event_chunk->tag), - file_first_log_time); - - if (ret < 0) { - FLB_OUTPUT_RETURN(FLB_RETRY); + if (ctx->daemon_coro_started == FLB_FALSE) { + daemon_coroutine(config, ctx); } + FLB_OUTPUT_RETURN(FLB_OK); } @@ -2307,6 +2190,8 @@ static int cb_s3_exit(void *data, struct flb_config *config) struct mk_list *tmp; struct mk_list *head; + flb_plg_info(ctx->ins, "cb_exit"); + if (!ctx) { return 0; } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 6b08cc8c589..30a222c90ff 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -47,17 +47,6 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 -struct upload_queue { - struct s3_file *upload_file; - struct multipart_upload *m_upload_file; - char *tag; - int tag_len; - - time_t upload_time; - - struct mk_list _head; -}; - struct multipart_upload { flb_sds_t s3_key; flb_sds_t tag; @@ -140,10 +129,11 @@ struct flb_s3 { struct mk_list uploads; - int preserve_data_ordering; - int upload_queue_success; + /* list of locked chunks that are ready to send */ struct mk_list upload_queue; + int preserve_data_ordering; + size_t file_size; size_t upload_chunk_size; time_t upload_timeout; @@ -157,6 +147,8 @@ struct flb_s3 { flb_sds_t metadata_dir; flb_sds_t seq_index_file; + int daemon_coro_started; + struct flb_output_instance *ins; }; diff --git a/plugins/out_s3/s3_store.c b/plugins/out_s3/s3_store.c index 6f7c8717827..25de40c915e 100644 --- a/plugins/out_s3/s3_store.c +++ b/plugins/out_s3/s3_store.c @@ -126,7 +126,8 @@ struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag, int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, const char *tag, int tag_len, char *data, size_t bytes, - time_t file_first_log_time) + time_t file_first_log_time, + char* input_name) { int ret; flb_sds_t name; @@ -178,6 +179,7 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, s3_file->fsf = fsf; s3_file->first_log_time = file_first_log_time; s3_file->create_time = time(NULL); + s3_file->input_name = input_name; /* Use fstore opaque 'data' reference to keep our context */ fsf->data = s3_file; diff --git a/plugins/out_s3/s3_store.h b/plugins/out_s3/s3_store.h index ff1be7bb78c..38d85b90f7f 100644 --- a/plugins/out_s3/s3_store.h +++ b/plugins/out_s3/s3_store.h @@ -24,7 +24,7 @@ #include struct s3_file { - int locked; /* locked chunk is busy, cannot write to it */ + int locked; /* locked = no appends to this chunk */ int failures; /* delivery failures */ char *input_name; /* for s3_retry_warn output message gets input name */ size_t size; /* file size */ @@ -32,12 +32,14 @@ struct s3_file { time_t first_log_time; /* first log time */ flb_sds_t file_path; /* file path */ struct flb_fstore_file *fsf; /* reference to parent flb_fstore_file */ + struct mk_list _head; }; int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file, const char *tag, int tag_len, char *data, size_t bytes, - time_t file_first_log_time); + time_t file_first_log_time, + char *input_name); int s3_store_init(struct flb_s3 *ctx); int s3_store_exit(struct flb_s3 *ctx);