diff --git a/CMakeLists.txt b/CMakeLists.txt index b790cf42a69..36771b12841 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -658,8 +658,8 @@ if(FLB_AVRO_ENCODER) # jansson option(JANSSON_BUILD_DOCS OFF) option(JANSSON_EXAMPLES OFF) - option(JANSSON_WITHOUT_TESTS ON) option(JANSSON_BUILD_SHARED_LIBS OFF) + set(JANSSON_WITHOUT_TESTS ON CACHE BOOL "" FORCE) add_subdirectory(${FLB_PATH_LIB_JANSSON}) #avro diff --git a/include/fluent-bit/flb_avro.h b/include/fluent-bit/flb_avro.h index 581baf622d5..abb10a35b99 100644 --- a/include/fluent-bit/flb_avro.h +++ b/include/fluent-bit/flb_avro.h @@ -31,7 +31,7 @@ #define MEMORY_POOL_MINIMUM_SIZE sizeof(void *) struct flb_avro_fields { - flb_sds_t schema_id; + int32_t schema_id; flb_sds_t schema_str; }; diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 1be9cd4ed6b..f4e8965bc86 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -141,7 +141,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, // an embedded schemaid which is used // the embedding is a null byte // followed by a 16 byte schemaid -#define AVRO_SCHEMA_OVERHEAD 16 + 1 +#define AVRO_SCHEMA_OVERHEAD 4 + 1 #endif flb_debug("in produce_message\n"); @@ -312,29 +312,29 @@ int produce_message(struct flb_time *tm, msgpack_object *map, #ifdef FLB_HAVE_AVRO_ENCODER else if (ctx->format == FLB_KAFKA_FMT_AVRO) { - flb_plg_debug(ctx->ins, "avro schema ID:%s:\n", ctx->avro_fields.schema_id); + flb_plg_debug(ctx->ins, "avro schema ID:%d:\n", ctx->avro_fields.schema_id); flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str); // if there's no data then log it and return if (mp_sbuf.size == 0) { - flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%s:\n", ctx->avro_fields.schema_id); + flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%d:\n", ctx->avro_fields.schema_id); msgpack_sbuffer_destroy(&mp_sbuf); return FLB_OK; } // is the line is too long log it and return if (mp_sbuf.size > AVRO_LINE_MAX_LEN) { - flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id); + flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id); msgpack_sbuffer_destroy(&mp_sbuf); return FLB_OK; } - flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id); + flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id); out_buf = avro_buff; out_size = AVRO_DEFAULT_BUFFER_SIZE; if (mp_sbuf.size + AVRO_SCHEMA_OVERHEAD >= AVRO_DEFAULT_BUFFER_SIZE) { - flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id); + flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id); avro_fast_buffer = false; // avro will always be smaller than msgpack // it contains no meta-info aside from the schemaid @@ -344,14 +344,14 @@ int produce_message(struct flb_time *tm, msgpack_object *map, out_size = mp_sbuf.size + AVRO_SCHEMA_OVERHEAD; out_buf = flb_malloc(out_size); if (!out_buf) { - flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id); + flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%d:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id); msgpack_sbuffer_destroy(&mp_sbuf); return FLB_ERROR; } } if(!flb_msgpack_raw_to_avro_sds(mp_sbuf.data, mp_sbuf.size, &ctx->avro_fields, out_buf, &out_size)) { - flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id); + flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%d:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id); msgpack_sbuffer_destroy(&mp_sbuf); if (!avro_fast_buffer) { flb_free(out_buf); @@ -639,8 +639,8 @@ static struct flb_config_map config_map[] = { "Set AVRO schema." }, { - FLB_CONFIG_MAP_STR, "schema_id", (char *)NULL, - 0, FLB_FALSE, 0, + FLB_CONFIG_MAP_INT, "schema_id", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, avro_fields) + offsetof(struct flb_avro_fields, schema_id), "Set AVRO schema ID." }, #endif diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index b1f07458884..91e52e4323c 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -249,10 +249,6 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, if (tmp) { ctx->avro_fields.schema_str = flb_sds_create(tmp); } - tmp = flb_output_get_property("schema_id", ins); - if (tmp) { - ctx->avro_fields.schema_id = flb_sds_create(tmp); - } #endif /* Config: Topic */ @@ -282,7 +278,7 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->kafka.brokers, tmp); #ifdef FLB_HAVE_AVRO_ENCODER - flb_plg_info(ctx->ins, "schemaID='%s' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str); + flb_plg_info(ctx->ins, "schemaID='%d' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str); #endif return ctx; @@ -324,7 +320,6 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx) #ifdef FLB_HAVE_AVRO_ENCODER // avro - flb_sds_destroy(ctx->avro_fields.schema_id); flb_sds_destroy(ctx->avro_fields.schema_str); #endif diff --git a/src/flb_avro.c b/src/flb_avro.c index 57e35926c2a..6e978edf7d9 100644 --- a/src/flb_avro.c +++ b/src/flb_avro.c @@ -268,7 +268,7 @@ bool flb_msgpack_raw_to_avro_sds(const void *in_buf, size_t in_size, struct flb_ avro_writer_t awriter; flb_debug("in flb_msgpack_raw_to_avro_sds\n"); - flb_debug("schemaID:%s:\n", ctx->schema_id); + flb_debug("schemaID:%d:\n", ctx->schema_id); flb_debug("schema string:%s:\n", ctx->schema_str); size_t schema_json_len = flb_sds_len(ctx->schema_str); @@ -340,19 +340,26 @@ bool flb_msgpack_raw_to_avro_sds(const void *in_buf, size_t in_size, struct flb_ return false; } - // write the schemaid - // its md5hash of the avro schema - // it looks like this c4b52aaf22429c7f9eb8c30270bc1795 - const char *pos = ctx->schema_id; - unsigned char val[16]; - size_t count; - for (count = 0; count < sizeof val/sizeof *val; count++) { - sscanf(pos, "%2hhx", &val[count]); - pos += 2; + // write the schemaID + if (ctx->schema_id <= 0) { + flb_error("Invalid schema_id=%d (must be > 0)\n", ctx->schema_id); + avro_writer_free(awriter); + avro_value_decref(&aobject); + avro_value_iface_decref(aclass); + avro_schema_decref(aschema); + msgpack_unpacked_destroy(&result); + return false; } - + + int32_t id = ctx->schema_id; + unsigned char val[4]; + val[0] = (id >> 24) & 0xFF; + val[1] = (id >> 16) & 0xFF; + val[2] = (id >> 8) & 0xFF; + val[3] = id & 0xFF; + // write it into a buffer which can be passed to librdkafka - rval = avro_write(awriter, val, 16); + rval = avro_write(awriter, val, 4); if (rval != 0) { flb_error("Unable to write schemaid\n"); avro_writer_free(awriter); @@ -373,9 +380,6 @@ bool flb_msgpack_raw_to_avro_sds(const void *in_buf, size_t in_size, struct flb_ return false; } - // null terminate it - avro_write(awriter, "\0", 1); - flb_debug("before avro_writer_flush\n"); avro_writer_flush(awriter);