diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index f4e8965bc86..224a4735c96 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -153,47 +153,58 @@ int produce_message(struct flb_time *tm, msgpack_object *map, msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) { - /* Make room for the timestamp */ - size = map->via.map.size + 1; + if (ctx->add_timestamp) { + /* Make room for the timestamp */ + size = map->via.map.size + 1; + } + else { + size = map->via.map.size; + } msgpack_pack_map(&mp_pck, size); - /* Pack timestamp */ - msgpack_pack_str(&mp_pck, ctx->timestamp_key_len); - msgpack_pack_str_body(&mp_pck, - ctx->timestamp_key, ctx->timestamp_key_len); - switch (ctx->timestamp_format) { - case FLB_JSON_DATE_DOUBLE: - msgpack_pack_double(&mp_pck, flb_time_to_double(tm)); - break; - - case FLB_JSON_DATE_ISO8601: - case FLB_JSON_DATE_ISO8601_NS: - { - size_t date_len; - int len; - struct tm _tm; - char time_formatted[36]; - - /* Format the time; use microsecond precision (not nanoseconds). */ - gmtime_r(&tm->tm.tv_sec, &_tm); - date_len = strftime(time_formatted, sizeof(time_formatted) - 1, - FLB_JSON_DATE_ISO8601_FMT, &_tm); - - if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) { - len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len, - ".%06" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec / 1000); - } - else { - /* FLB_JSON_DATE_ISO8601_NS */ - len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len, - ".%09" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec); - } - date_len += len; + if (ctx->add_timestamp) { + /* Pack timestamp */ + msgpack_pack_str(&mp_pck, ctx->timestamp_key_len); + msgpack_pack_str_body(&mp_pck, + ctx->timestamp_key, ctx->timestamp_key_len); + switch (ctx->timestamp_format) { + case FLB_JSON_DATE_DOUBLE: + msgpack_pack_double(&mp_pck, flb_time_to_double(tm)); + break; + + case FLB_JSON_DATE_ISO8601: + case FLB_JSON_DATE_ISO8601_NS: + { + size_t date_len; + int len; + struct tm _tm; + char time_formatted[36]; + + /* Format the time; use microsecond precision (not nanoseconds). */ + gmtime_r(&tm->tm.tv_sec, &_tm); + date_len = strftime(time_formatted, sizeof(time_formatted) - 1, + FLB_JSON_DATE_ISO8601_FMT, &_tm); + + if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) { + len = snprintf(time_formatted + date_len, + sizeof(time_formatted) - 1 - date_len, + ".%06" PRIu64 "Z", + (uint64_t) tm->tm.tv_nsec / 1000); + } + else { + /* FLB_JSON_DATE_ISO8601_NS */ + len = snprintf(time_formatted + date_len, + sizeof(time_formatted) - 1 - date_len, + ".%09" PRIu64 "Z", + (uint64_t) tm->tm.tv_nsec); + } + date_len += len; - msgpack_pack_str(&mp_pck, date_len); - msgpack_pack_str_body(&mp_pck, time_formatted, date_len); - } - break; + msgpack_pack_str(&mp_pck, date_len); + msgpack_pack_str_body(&mp_pck, time_formatted, date_len); + } + break; + } } } else { @@ -602,6 +613,12 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_format_str), "Set the format the timestamp is in." }, + { + FLB_CONFIG_MAP_BOOL, "add_timestamp", "true", + 0, FLB_TRUE, offsetof(struct flb_out_kafka, add_timestamp), + "When enabled, a timestamp field (timestamp_key) is appended to the record. " + "Set to false to suppress the timestamp field entirely." + }, { FLB_CONFIG_MAP_INT, "queue_full_retries", FLB_KAFKA_QUEUE_FULL_RETRIES, 0, FLB_TRUE, offsetof(struct flb_out_kafka, queue_full_retries), diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index 71bfad88966..43813f300ae 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -75,6 +75,7 @@ struct flb_out_kafka { char *timestamp_key; int timestamp_format; flb_sds_t timestamp_format_str; + int add_timestamp; int message_key_len; char *message_key;