diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 25244c1df04..38b82c08d88 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -1267,7 +1267,17 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, ssize_t offset, struct flb_tail_config *ctx) { - int fd; + /* + * Cleanup uses staged-goto labels in reverse acquisition order; + * jumping to err_X cleans up everything acquired up to and including X + * and falls through to subsequent labels. Each label undoes exactly one + * acquisition step. Construction-failure paths must NOT route through + * flb_tail_file_remove() because that helper increments the + * cmt_files_closed counter unconditionally and would unbalance the + * cmt_files_opened/closed pair (the matching opened increment lives at + * the very end of this function, immediately before return 0). + */ + int fd = -1; int ret; uint64_t stream_id; uint64_t ts; @@ -1305,12 +1315,14 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file = flb_calloc(1, sizeof(struct flb_tail_file)); if (!file) { flb_errno(); - goto error; + goto err_close_fd; } /* Initialize */ - file->watch_fd = -1; + file->config = ctx; file->fd = fd; + file->watch_fd = -1; + file->tail_mode = mode; /* On non-windows environments check if the original path is a link */ ret = lstat(path, &lst); @@ -1325,7 +1337,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, ret = stat_to_hash_bits(ctx, st, &hash_bits); if (ret != 0) { flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path); - goto error; + goto err_free_file; } file->hash_bits = hash_bits; @@ -1333,7 +1345,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, ret = stat_to_hash_key(ctx, st, &hash_key); if (ret != 0) { flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path); - goto error; + goto err_free_file; } file->hash_key = hash_key; @@ -1342,8 +1354,6 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file->size = st->st_size; file->buf_len = 0; file->parsed = 0; - file->config = ctx; - file->tail_mode = mode; file->tag_len = 0; file->tag_buf = NULL; file->rotated = 0; @@ -1365,7 +1375,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, ctx->buf_max_size); if (file->decompression_context == NULL) { - goto error; + goto err_free_hash_key; } } @@ -1383,7 +1393,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, ret = flb_tail_file_name_dup(path, file); if (!file->name) { flb_errno(); - goto error; + goto err_free_decomp; } /* We keep a copy of the initial filename in orig_name. This is required @@ -1391,9 +1401,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file->orig_name = flb_strdup(file->name); if (!file->orig_name) { flb_errno(); - flb_free(file->name); - file->name = NULL; - goto error; + goto err_free_name; } file->orig_name_len = file->name_len; @@ -1407,7 +1415,15 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file->dmode_flush_timeout = 0; file->dmode_complete = true; file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0); + if (!file->dmode_buf) { + flb_errno(); + goto err_destroy_sbuf; + } file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0); + if (!file->dmode_lastline) { + flb_errno(); + goto err_free_dmode_buf; + } file->dmode_firstline = false; #ifdef FLB_HAVE_SQLDB file->db_id = 0; @@ -1440,7 +1456,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, "could not create multiline stream for file: %s", inode_str); flb_sds_destroy(inode_str); - goto error; + goto err_free_dmode_lastline; } file->ml_stream_id = stream_id; flb_sds_destroy(inode_str); @@ -1461,7 +1477,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file->buf_data = flb_malloc(file->buf_size); if (!file->buf_data) { flb_errno(); - goto error; + goto err_destroy_ml_stream; } /* Initialize (optional) dynamic tag */ @@ -1471,7 +1487,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, if (!tag) { flb_errno(); flb_plg_error(ctx->ins, "failed to allocate tag buffer"); - goto error; + goto err_free_buf_data; } #ifdef FLB_HAVE_REGEX ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx); @@ -1485,7 +1501,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, flb_free(tag); if (ret != 0) { flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path); - goto error; + goto err_free_buf_data; } } else { @@ -1495,83 +1511,138 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, if (!file->tag_buf) { flb_plg_error(ctx->ins, "failed to set tag for file: %s", path); flb_errno(); - goto error; + goto err_free_buf_data; } if (mode == FLB_TAIL_STATIC) { mk_list_add(&file->_head, &ctx->files_static); ctx->files_static_count++; - flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); + + ret = flb_hash_table_add(ctx->static_hash, + file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + if (ret < 0) { + flb_plg_error(ctx->ins, "could not register file in static hash"); + goto err_unlist; + } + tail_signal_manager(file->config); } else if (mode == FLB_TAIL_EVENT) { mk_list_add(&file->_head, &ctx->files_event); - flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); + + ret = flb_hash_table_add(ctx->event_hash, + file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + if (ret < 0) { + flb_plg_error(ctx->ins, "could not register file in event hash"); + goto err_unlist; + } /* Register this file into the fs_event monitoring */ ret = flb_tail_fs_add(ctx, file); if (ret == -1) { flb_plg_error(ctx->ins, "could not register file into fs_events"); - goto error; + goto err_unlist; } } /* Set the file position (database offset, head or tail) */ ret = set_file_position(ctx, file); if (ret == -1) { - flb_tail_file_remove(file); - goto error; + goto err_fs_remove; } /* Remaining bytes to read */ file->pending_bytes = file->size - file->offset; -#ifdef FLB_HAVE_METRICS - name = (char *) flb_input_name(ctx->ins); - ts = cfl_time_now(); - cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name}); - - /* Old api */ - flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics); -#endif - file->sl_log_event_encoder = flb_log_event_encoder_create( FLB_LOG_EVENT_FORMAT_DEFAULT); if (file->sl_log_event_encoder == NULL) { - flb_tail_file_remove(file); - - goto error; + goto err_set_pos; } file->ml_log_event_encoder = flb_log_event_encoder_create( FLB_LOG_EVENT_FORMAT_DEFAULT); if (file->ml_log_event_encoder == NULL) { - flb_tail_file_remove(file); - - goto error; + goto err_destroy_sl; } flb_plg_debug(ctx->ins, "inode=%"PRIu64" with offset=%"PRId64" appended as %s", file->inode, file->offset, path); + +#ifdef FLB_HAVE_METRICS + name = (char *) flb_input_name(ctx->ins); + ts = cfl_time_now(); + cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name}); + + /* Old api */ + flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics); +#endif + return 0; -error: - if (file) { - if (file->buf_data) { - flb_free(file->buf_data); - } - if (file->name) { - flb_free(file->name); +/* + * Cleanup ladder: each label undoes exactly one acquisition and falls + * through to the next. Failed-acquisition gotos jump to the label that + * cleans up the LAST resource that was successfully acquired, skipping + * the cleanup of the resource that was never created. + */ +err_destroy_sl: + flb_log_event_encoder_destroy(file->sl_log_event_encoder); +err_set_pos: + /* set_file_position has no resource of its own; fall through */ +err_fs_remove: + if (mode == FLB_TAIL_EVENT) { + flb_tail_fs_remove(ctx, file); + } +err_unlist: + if (mode == FLB_TAIL_STATIC) { + mk_list_del(&file->_head); + if (ctx->files_static_count > 0) { + ctx->files_static_count--; } - flb_free(file); + flb_hash_table_del(ctx->static_hash, file->hash_key); } + else if (mode == FLB_TAIL_EVENT) { + mk_list_del(&file->_head); + flb_hash_table_del(ctx->event_hash, file->hash_key); + } + flb_free(file->tag_buf); +err_free_buf_data: + flb_free(file->buf_data); +err_destroy_ml_stream: + if (ctx->ml_ctx && file->ml_stream_id > 0) { + flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id); + } +err_free_dmode_lastline: + flb_sds_destroy(file->dmode_lastline); +err_free_dmode_buf: + flb_sds_destroy(file->dmode_buf); +err_destroy_sbuf: + msgpack_sbuffer_destroy(&file->mult_sbuf); + /* orig_name was acquired immediately after name with no fallible step + * between, so its cleanup is folded into err_destroy_sbuf rather than + * carrying a label that no goto can reach */ + flb_free(file->orig_name); +err_free_name: + flb_free(file->name); + if (file->real_name) { + flb_free(file->real_name); + } +err_free_decomp: + if (file->decompression_context) { + flb_decompression_context_destroy(file->decompression_context); + } +err_free_hash_key: + flb_sds_destroy(file->hash_key); +err_free_file: + flb_free(file); +err_close_fd: close(fd); - return -1; } @@ -1620,6 +1691,11 @@ void flb_tail_file_remove(struct flb_tail_file *file) flb_sds_destroy(file->dmode_buf); flb_sds_destroy(file->dmode_lastline); + + if (file->tail_mode == FLB_TAIL_STATIC && ctx->files_static_count > 0) { + ctx->files_static_count--; + } + mk_list_del(&file->_head); flb_tail_fs_remove(ctx, file); @@ -2094,14 +2170,21 @@ int flb_tail_file_to_event(struct flb_tail_file *file) return -1; } + ret = flb_hash_table_add(ctx->event_hash, + file->hash_key, flb_sds_len(file->hash_key), + file, sizeof(file)); + if (ret < 0) { + flb_plg_error(ctx->ins, "could not register file in event hash"); + flb_tail_fs_remove(ctx, file); + return -1; + } + /* List swap: change from 'static' to 'event' list */ mk_list_del(&file->_head); ctx->files_static_count--; flb_hash_table_del(ctx->static_hash, file->hash_key); mk_list_add(&file->_head, &file->config->files_event); - flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key), - file, sizeof(file)); file->tail_mode = FLB_TAIL_EVENT; diff --git a/plugins/in_tail/tail_fs_stat.c b/plugins/in_tail/tail_fs_stat.c index 74b6b95dc77..18f3fb13695 100644 --- a/plugins/in_tail/tail_fs_stat.c +++ b/plugins/in_tail/tail_fs_stat.c @@ -246,8 +246,9 @@ int flb_tail_fs_stat_add(struct flb_tail_file *file) int flb_tail_fs_stat_remove(struct flb_tail_file *file) { - if (file->tail_mode == FLB_TAIL_EVENT) { + if (file->fs_backend != NULL) { flb_free(file->fs_backend); + file->fs_backend = NULL; } return 0; }