Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ if(FLB_AVRO_ENCODER)
# jansson
option(JANSSON_BUILD_DOCS OFF)
option(JANSSON_EXAMPLES OFF)
option(JANSSON_WITHOUT_TESTS ON)
option(JANSSON_BUILD_SHARED_LIBS OFF)
set(JANSSON_WITHOUT_TESTS ON CACHE BOOL "" FORCE)
add_subdirectory(${FLB_PATH_LIB_JANSSON})

#avro
Expand Down
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_avro.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#define MEMORY_POOL_MINIMUM_SIZE sizeof(void *)

struct flb_avro_fields {
flb_sds_t schema_id;
int32_t schema_id;
flb_sds_t schema_str;
};

Expand Down
20 changes: 10 additions & 10 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
// an embedded schemaid which is used
// the embedding is a null byte
// followed by a 16 byte schemaid
#define AVRO_SCHEMA_OVERHEAD 16 + 1
#define AVRO_SCHEMA_OVERHEAD 4 + 1
#endif

flb_debug("in produce_message\n");
Expand Down Expand Up @@ -312,29 +312,29 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
#ifdef FLB_HAVE_AVRO_ENCODER
else if (ctx->format == FLB_KAFKA_FMT_AVRO) {

flb_plg_debug(ctx->ins, "avro schema ID:%s:\n", ctx->avro_fields.schema_id);
flb_plg_debug(ctx->ins, "avro schema ID:%d:\n", ctx->avro_fields.schema_id);
flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str);

// if there's no data then log it and return
if (mp_sbuf.size == 0) {
flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%s:\n", ctx->avro_fields.schema_id);
flb_plg_error(ctx->ins, "got zero bytes decoding to avro AVRO:schemaID:%d:\n", ctx->avro_fields.schema_id);
msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_OK;
}

// is the line is too long log it and return
if (mp_sbuf.size > AVRO_LINE_MAX_LEN) {
flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id);
flb_plg_warn(ctx->ins, "skipping long line AVRO:len:%zu:limit:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, (size_t)AVRO_LINE_MAX_LEN, ctx->avro_fields.schema_id);
msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_OK;
}

flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id);
flb_plg_debug(ctx->ins, "using default buffer AVRO:len:%zu:limit:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, (size_t)AVRO_DEFAULT_BUFFER_SIZE, ctx->avro_fields.schema_id);
out_buf = avro_buff;
out_size = AVRO_DEFAULT_BUFFER_SIZE;

if (mp_sbuf.size + AVRO_SCHEMA_OVERHEAD >= AVRO_DEFAULT_BUFFER_SIZE) {
flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%s:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id);
flb_plg_info(ctx->ins, "upsizing to dynamic buffer AVRO:len:%zu:schemaID:%d:\n", (size_t)mp_sbuf.size, ctx->avro_fields.schema_id);
avro_fast_buffer = false;
// avro will always be smaller than msgpack
// it contains no meta-info aside from the schemaid
Expand All @@ -344,14 +344,14 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
out_size = mp_sbuf.size + AVRO_SCHEMA_OVERHEAD;
out_buf = flb_malloc(out_size);
if (!out_buf) {
flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
flb_plg_error(ctx->ins, "error allocating memory for decoding to AVRO:schema:%s:schemaID:%d:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_ERROR;
}
}

if(!flb_msgpack_raw_to_avro_sds(mp_sbuf.data, mp_sbuf.size, &ctx->avro_fields, out_buf, &out_size)) {
flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%s:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
flb_plg_error(ctx->ins, "error encoding to AVRO:schema:%s:schemaID:%d:\n", ctx->avro_fields.schema_str, ctx->avro_fields.schema_id);
msgpack_sbuffer_destroy(&mp_sbuf);
if (!avro_fast_buffer) {
flb_free(out_buf);
Expand Down Expand Up @@ -639,8 +639,8 @@ static struct flb_config_map config_map[] = {
"Set AVRO schema."
},
{
FLB_CONFIG_MAP_STR, "schema_id", (char *)NULL,
0, FLB_FALSE, 0,
FLB_CONFIG_MAP_INT, "schema_id", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, avro_fields) + offsetof(struct flb_avro_fields, schema_id),
"Set AVRO schema ID."
},
#endif
Expand Down
7 changes: 1 addition & 6 deletions plugins/out_kafka/kafka_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,6 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
if (tmp) {
ctx->avro_fields.schema_str = flb_sds_create(tmp);
}
tmp = flb_output_get_property("schema_id", ins);
if (tmp) {
ctx->avro_fields.schema_id = flb_sds_create(tmp);
}
#endif

/* Config: Topic */
Expand Down Expand Up @@ -282,7 +278,7 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,

flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->kafka.brokers, tmp);
#ifdef FLB_HAVE_AVRO_ENCODER
flb_plg_info(ctx->ins, "schemaID='%s' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str);
flb_plg_info(ctx->ins, "schemaID='%d' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str);
#endif

return ctx;
Expand Down Expand Up @@ -324,7 +320,6 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)

#ifdef FLB_HAVE_AVRO_ENCODER
// avro
flb_sds_destroy(ctx->avro_fields.schema_id);
flb_sds_destroy(ctx->avro_fields.schema_str);
#endif

Expand Down
34 changes: 19 additions & 15 deletions src/flb_avro.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ bool flb_msgpack_raw_to_avro_sds(const void *in_buf, size_t in_size, struct flb_

avro_writer_t awriter;
flb_debug("in flb_msgpack_raw_to_avro_sds\n");
flb_debug("schemaID:%s:\n", ctx->schema_id);
flb_debug("schemaID:%d:\n", ctx->schema_id);
flb_debug("schema string:%s:\n", ctx->schema_str);

size_t schema_json_len = flb_sds_len(ctx->schema_str);
Expand Down Expand Up @@ -340,19 +340,26 @@ bool flb_msgpack_raw_to_avro_sds(const void *in_buf, size_t in_size, struct flb_
return false;
}

// write the schemaid
// its md5hash of the avro schema
// it looks like this c4b52aaf22429c7f9eb8c30270bc1795
const char *pos = ctx->schema_id;
unsigned char val[16];
size_t count;
for (count = 0; count < sizeof val/sizeof *val; count++) {
sscanf(pos, "%2hhx", &val[count]);
pos += 2;
// write the schemaID
if (ctx->schema_id <= 0) {
flb_error("Invalid schema_id=%d (must be > 0)\n", ctx->schema_id);
avro_writer_free(awriter);
avro_value_decref(&aobject);
avro_value_iface_decref(aclass);
avro_schema_decref(aschema);
msgpack_unpacked_destroy(&result);
return false;
}


int32_t id = ctx->schema_id;
unsigned char val[4];
val[0] = (id >> 24) & 0xFF;
val[1] = (id >> 16) & 0xFF;
val[2] = (id >> 8) & 0xFF;
val[3] = id & 0xFF;

// write it into a buffer which can be passed to librdkafka
rval = avro_write(awriter, val, 16);
rval = avro_write(awriter, val, 4);
if (rval != 0) {
flb_error("Unable to write schemaid\n");
avro_writer_free(awriter);
Expand All @@ -373,9 +380,6 @@ bool flb_msgpack_raw_to_avro_sds(const void *in_buf, size_t in_size, struct flb_
return false;
}

// null terminate it
avro_write(awriter, "\0", 1);

flb_debug("before avro_writer_flush\n");

avro_writer_flush(awriter);
Expand Down
Loading