Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 133 additions & 50 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,17 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
ssize_t offset,
struct flb_tail_config *ctx)
{
int fd;
/*
* Cleanup uses staged-goto labels in reverse acquisition order;
* jumping to err_X cleans up everything acquired up to and including X
* and falls through to subsequent labels. Each label undoes exactly one
* acquisition step. Construction-failure paths must NOT route through
* flb_tail_file_remove() because that helper increments the
* cmt_files_closed counter unconditionally and would unbalance the
* cmt_files_opened/closed pair (the matching opened increment lives at
* the very end of this function, immediately before return 0).
*/
int fd = -1;
int ret;
uint64_t stream_id;
uint64_t ts;
Expand Down Expand Up @@ -1305,12 +1315,14 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
file = flb_calloc(1, sizeof(struct flb_tail_file));
if (!file) {
flb_errno();
goto error;
goto err_close_fd;
}

/* Initialize */
file->watch_fd = -1;
file->config = ctx;
file->fd = fd;
file->watch_fd = -1;
file->tail_mode = mode;

/* On non-windows environments check if the original path is a link */
ret = lstat(path, &lst);
Expand All @@ -1325,15 +1337,15 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
ret = stat_to_hash_bits(ctx, st, &hash_bits);
if (ret != 0) {
flb_plg_error(ctx->ins, "error procesisng hash bits for file %s", path);
goto error;
goto err_free_file;
}
file->hash_bits = hash_bits;

/* store the hash key used for hash_bits */
ret = stat_to_hash_key(ctx, st, &hash_key);
if (ret != 0) {
flb_plg_error(ctx->ins, "error procesisng hash key for file %s", path);
goto error;
goto err_free_file;
}
file->hash_key = hash_key;

Expand All @@ -1342,8 +1354,6 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
file->size = st->st_size;
file->buf_len = 0;
file->parsed = 0;
file->config = ctx;
file->tail_mode = mode;
file->tag_len = 0;
file->tag_buf = NULL;
file->rotated = 0;
Expand All @@ -1365,7 +1375,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
ctx->buf_max_size);

if (file->decompression_context == NULL) {
goto error;
goto err_free_hash_key;
}
}

Expand All @@ -1383,17 +1393,15 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
ret = flb_tail_file_name_dup(path, file);
if (!file->name) {
flb_errno();
goto error;
goto err_free_decomp;
}

/* We keep a copy of the initial filename in orig_name. This is required
* for path_key to continue working after rotation. */
file->orig_name = flb_strdup(file->name);
if (!file->orig_name) {
flb_errno();
flb_free(file->name);
file->name = NULL;
goto error;
goto err_free_name;
}
file->orig_name_len = file->name_len;

Expand All @@ -1407,7 +1415,15 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
file->dmode_flush_timeout = 0;
file->dmode_complete = true;
file->dmode_buf = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 65536 : 0);
if (!file->dmode_buf) {
flb_errno();
goto err_destroy_sbuf;
}
file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0);
if (!file->dmode_lastline) {
flb_errno();
goto err_free_dmode_buf;
}
file->dmode_firstline = false;
#ifdef FLB_HAVE_SQLDB
file->db_id = 0;
Expand Down Expand Up @@ -1440,7 +1456,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
"could not create multiline stream for file: %s",
inode_str);
flb_sds_destroy(inode_str);
goto error;
goto err_free_dmode_lastline;
}
file->ml_stream_id = stream_id;
flb_sds_destroy(inode_str);
Expand All @@ -1461,7 +1477,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
file->buf_data = flb_malloc(file->buf_size);
if (!file->buf_data) {
flb_errno();
goto error;
goto err_destroy_ml_stream;
}

/* Initialize (optional) dynamic tag */
Expand All @@ -1471,7 +1487,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
if (!tag) {
flb_errno();
flb_plg_error(ctx->ins, "failed to allocate tag buffer");
goto error;
goto err_free_buf_data;
}
#ifdef FLB_HAVE_REGEX
ret = tag_compose(ctx->ins->tag, ctx->tag_regex, path, tag, &tag_len, ctx);
Expand All @@ -1485,7 +1501,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
flb_free(tag);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to compose tag for file: %s", path);
goto error;
goto err_free_buf_data;
}
}
else {
Expand All @@ -1495,83 +1511,138 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
if (!file->tag_buf) {
flb_plg_error(ctx->ins, "failed to set tag for file: %s", path);
flb_errno();
goto error;
goto err_free_buf_data;
}

if (mode == FLB_TAIL_STATIC) {
mk_list_add(&file->_head, &ctx->files_static);
ctx->files_static_count++;
flb_hash_table_add(ctx->static_hash, file->hash_key, flb_sds_len(file->hash_key),
file, sizeof(file));

ret = flb_hash_table_add(ctx->static_hash,
file->hash_key, flb_sds_len(file->hash_key),
file, sizeof(file));
if (ret < 0) {
flb_plg_error(ctx->ins, "could not register file in static hash");
goto err_unlist;
}

tail_signal_manager(file->config);
}
else if (mode == FLB_TAIL_EVENT) {
mk_list_add(&file->_head, &ctx->files_event);
flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key),
file, sizeof(file));

ret = flb_hash_table_add(ctx->event_hash,
file->hash_key, flb_sds_len(file->hash_key),
file, sizeof(file));
if (ret < 0) {
flb_plg_error(ctx->ins, "could not register file in event hash");
goto err_unlist;
}

/* Register this file into the fs_event monitoring */
ret = flb_tail_fs_add(ctx, file);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not register file into fs_events");
goto error;
goto err_unlist;
}
}

/* Set the file position (database offset, head or tail) */
ret = set_file_position(ctx, file);
if (ret == -1) {
flb_tail_file_remove(file);
goto error;
goto err_fs_remove;
}

/* Remaining bytes to read */
file->pending_bytes = file->size - file->offset;

#ifdef FLB_HAVE_METRICS
name = (char *) flb_input_name(ctx->ins);
ts = cfl_time_now();
cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name});

/* Old api */
flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics);
#endif

file->sl_log_event_encoder = flb_log_event_encoder_create(
FLB_LOG_EVENT_FORMAT_DEFAULT);

if (file->sl_log_event_encoder == NULL) {
flb_tail_file_remove(file);

goto error;
goto err_set_pos;
}

file->ml_log_event_encoder = flb_log_event_encoder_create(
FLB_LOG_EVENT_FORMAT_DEFAULT);

if (file->ml_log_event_encoder == NULL) {
flb_tail_file_remove(file);

goto error;
goto err_destroy_sl;
}

flb_plg_debug(ctx->ins,
"inode=%"PRIu64" with offset=%"PRId64" appended as %s",
file->inode, file->offset, path);

#ifdef FLB_HAVE_METRICS
name = (char *) flb_input_name(ctx->ins);
ts = cfl_time_now();
cmt_counter_inc(ctx->cmt_files_opened, ts, 1, (char *[]) {name});

/* Old api */
flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics);
#endif

return 0;

error:
if (file) {
if (file->buf_data) {
flb_free(file->buf_data);
}
if (file->name) {
flb_free(file->name);
/*
* Cleanup ladder: each label undoes exactly one acquisition and falls
* through to the next. Failed-acquisition gotos jump to the label that
* cleans up the LAST resource that was successfully acquired, skipping
* the cleanup of the resource that was never created.
*/
err_destroy_sl:
flb_log_event_encoder_destroy(file->sl_log_event_encoder);
err_set_pos:
/* set_file_position has no resource of its own; fall through */
err_fs_remove:
if (mode == FLB_TAIL_EVENT) {
flb_tail_fs_remove(ctx, file);
}
err_unlist:
if (mode == FLB_TAIL_STATIC) {
mk_list_del(&file->_head);
if (ctx->files_static_count > 0) {
ctx->files_static_count--;
}
flb_free(file);
flb_hash_table_del(ctx->static_hash, file->hash_key);
}
else if (mode == FLB_TAIL_EVENT) {
mk_list_del(&file->_head);
flb_hash_table_del(ctx->event_hash, file->hash_key);
}
flb_free(file->tag_buf);
err_free_buf_data:
flb_free(file->buf_data);
err_destroy_ml_stream:
if (ctx->ml_ctx && file->ml_stream_id > 0) {
flb_ml_stream_id_destroy_all(ctx->ml_ctx, file->ml_stream_id);
}
err_free_dmode_lastline:
flb_sds_destroy(file->dmode_lastline);
err_free_dmode_buf:
flb_sds_destroy(file->dmode_buf);
err_destroy_sbuf:
msgpack_sbuffer_destroy(&file->mult_sbuf);
/* orig_name was acquired immediately after name with no fallible step
* between, so its cleanup is folded into err_destroy_sbuf rather than
* carrying a label that no goto can reach */
flb_free(file->orig_name);
err_free_name:
flb_free(file->name);
if (file->real_name) {
flb_free(file->real_name);
}
err_free_decomp:
if (file->decompression_context) {
flb_decompression_context_destroy(file->decompression_context);
}
err_free_hash_key:
flb_sds_destroy(file->hash_key);
err_free_file:
flb_free(file);
err_close_fd:
close(fd);

return -1;
}

Expand Down Expand Up @@ -1620,6 +1691,11 @@ void flb_tail_file_remove(struct flb_tail_file *file)

flb_sds_destroy(file->dmode_buf);
flb_sds_destroy(file->dmode_lastline);

if (file->tail_mode == FLB_TAIL_STATIC && ctx->files_static_count > 0) {
ctx->files_static_count--;
}

mk_list_del(&file->_head);
flb_tail_fs_remove(ctx, file);

Expand Down Expand Up @@ -2094,14 +2170,21 @@ int flb_tail_file_to_event(struct flb_tail_file *file)
return -1;
}

ret = flb_hash_table_add(ctx->event_hash,
file->hash_key, flb_sds_len(file->hash_key),
file, sizeof(file));
if (ret < 0) {
flb_plg_error(ctx->ins, "could not register file in event hash");
flb_tail_fs_remove(ctx, file);
Comment thread
sandy2008 marked this conversation as resolved.
return -1;
}

Comment thread
sandy2008 marked this conversation as resolved.
/* List swap: change from 'static' to 'event' list */
mk_list_del(&file->_head);
ctx->files_static_count--;
flb_hash_table_del(ctx->static_hash, file->hash_key);

mk_list_add(&file->_head, &file->config->files_event);
flb_hash_table_add(ctx->event_hash, file->hash_key, flb_sds_len(file->hash_key),
file, sizeof(file));

file->tail_mode = FLB_TAIL_EVENT;

Expand Down
3 changes: 2 additions & 1 deletion plugins/in_tail/tail_fs_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ int flb_tail_fs_stat_add(struct flb_tail_file *file)

int flb_tail_fs_stat_remove(struct flb_tail_file *file)
{
if (file->tail_mode == FLB_TAIL_EVENT) {
if (file->fs_backend != NULL) {
flb_free(file->fs_backend);
file->fs_backend = NULL;
}
return 0;
}
Expand Down