diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 88ff8cb0af3..b2409c489e0 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -52,9 +52,17 @@ FLB_EXPORT void flb_init_env(); FLB_EXPORT flb_ctx_t *flb_create(); FLB_EXPORT void flb_destroy(flb_ctx_t *ctx); FLB_EXPORT int flb_input(flb_ctx_t *ctx, const char *input, void *data); +FLB_EXPORT int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, + ...); FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); +FLB_EXPORT int flb_input_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc); FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb); +FLB_EXPORT int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, + ...); FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); +FLB_EXPORT int flb_output_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc); FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data); FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...); FLB_EXPORT int flb_input_set_test(flb_ctx_t *ctx, int ffd, char *test_name, diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index c6828f78bbd..c9d80974cec 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -229,7 +229,7 @@ int flb_processor_run(struct flb_processor *proc, struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, int event_type, - char *unit_name); + const char *unit_name); void flb_processor_unit_destroy(struct flb_processor_unit *pu); int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, struct cfl_variant *v); int flb_processor_unit_set_property_str(struct flb_processor_unit *pu, const char *k, const char *v); diff --git a/src/flb_lib.c b/src/flb_lib.c index 30d4d99dd40..6e8a1b20f51 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -329,6 +330,72 @@ int flb_input_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +static int flb_processor_event_type_get(const char *event_type) +{ + if (strcasecmp(event_type, "logs") == 0) { + return FLB_PROCESSOR_LOGS; + } + else if (strcasecmp(event_type, "metrics") == 0) { + return FLB_PROCESSOR_METRICS; + } + else if (strcasecmp(event_type, "traces") == 0) { + return FLB_PROCESSOR_TRACES; + } + else if (strcasecmp(event_type, "profiles") == 0) { + return FLB_PROCESSOR_PROFILES; + } + else { + return -1; + } +} + +/* Create a single processor unit for the input processor */ +int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, ...) +{ + int ret; + int type; + struct flb_input_instance *i_ins; + struct flb_processor *proc; + struct flb_processor_unit *pu; + char *key; + char *value; + va_list va; + + i_ins = in_instance_get(ctx, ffd); + if (!i_ins) { + return -1; + } + proc = i_ins->processor; + if (!proc) { + return -1; + } + type = flb_processor_event_type_get(event_type); + if (type == -1) { + return -1; + } + pu = flb_processor_unit_create(proc, type, processor_unit_name); + va_start(va, ffd); + while ((key = va_arg(va, char *))) { + value = va_arg(va, char *); + if (!value) { + /* Wrong parameter */ + va_end(va); + return -1; + } + struct cfl_variant cfl_value = { + .type = CFL_VARIANT_STRING, + .data.as_string = value, + }; + ret = flb_processor_unit_set_property(pu, key, &cfl_value); + if (ret != 0) { + va_end(va); + return -1; + } + } + return 0; +} + int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) { struct flb_input_instance *i_ins; @@ -347,6 +414,23 @@ int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) return 0; } +/* Retrieve the processor associated with a given input plugin. */ +int flb_input_get_processor(flb_ctx_t *ctx, int ffd, + struct flb_processor **proc) +{ + struct flb_input_instance *i_ins; + + i_ins = in_instance_get(ctx, ffd); + if (!i_ins) { + return -1; + } + if (!i_ins->processor) { + return -1; + } + *proc = i_ins->processor; + return 0; +} + int flb_input_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void (*in_callback) (void *, int, int, void *, size_t, void *), void *in_callback_data) @@ -545,6 +629,55 @@ int flb_output_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +/* Create a single processor unit for the input processor */ +int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, ...) +{ + int ret; + int type; + struct flb_output_instance *o_ins; + struct flb_processor *proc; + struct flb_processor_unit *pu; + char *key; + char *value; + va_list va; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + proc = o_ins->processor; + if (!proc) { + return -1; + } + type = flb_processor_event_type_get(event_type); + if (type == -1) { + return -1; + } + pu = flb_processor_unit_create(proc, type, processor_unit_name); + va_start(va, ffd); + while ((key = va_arg(va, char *))) { + value = va_arg(va, char *); + if (!value) { + /* Wrong parameter */ + va_end(va); + return -1; + } + + struct cfl_variant cfl_value = { + .type = CFL_VARIANT_STRING, + .data.as_string = value, + }; + + ret = flb_processor_unit_set_property(pu, key, &cfl_value); + if (ret != 0) { + va_end(va); + return -1; + } + } + return 0; +} + int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) { struct flb_output_instance *o_ins; @@ -563,6 +696,23 @@ int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc return 0; } +/* Retrieve the processor associated with a given output plugin. */ +int flb_output_get_processor(flb_ctx_t *ctx, int ffd, + struct flb_processor **proc) +{ + struct flb_output_instance *o_ins; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + if (!o_ins->processor) { + return -1; + } + *proc = o_ins->processor; + return 0; +} + int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)) { diff --git a/src/flb_processor.c b/src/flb_processor.c index 91222dfdb4d..b6c4dc82249 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -161,7 +161,7 @@ struct flb_processor *flb_processor_create(struct flb_config *config, struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, int event_type, - char *unit_name) + const char *unit_name) { int result; struct mk_list *head; diff --git a/tests/internal/processor.c b/tests/internal/processor.c index 679fd131b8d..f63d2a988d8 100644 --- a/tests/internal/processor.c +++ b/tests/internal/processor.c @@ -119,7 +119,99 @@ static void processor() flb_sds_destroy(hostname_prop_key); } +static void input_processor() +{ + flb_ctx_t *ctx; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct mk_list *head; + int ret; + int in_ffd; + int found = 0; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + in_ffd = flb_input(ctx, "dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + ret = flb_input_processor_unit(ctx, "logs", "opentelemetry_envelope", + in_ffd, NULL); + TEST_CHECK(ret == 0); + + ret = flb_input_get_processor(ctx, in_ffd, &proc); + TEST_CHECK(ret == 0); + TEST_CHECK(proc != NULL); + + /* Walk through logs processor units and verify one was added */ + mk_list_foreach(head, &proc->logs) { + pu = mk_list_entry(head, struct flb_processor_unit, _head); + if (strcmp(pu->name, "opentelemetry_envelope") == 0) { + found++; + break; + } + } + TEST_CHECK(found == 1); + + flb_destroy(ctx); +} + +static void output_processor() +{ + flb_ctx_t *ctx; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct flb_processor_instance *pi; + struct mk_list *head; + const char *val; + int ret; + int out_ffd; + int found = 0; + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + out_ffd = flb_output(ctx, "stdout", NULL); + TEST_CHECK(out_ffd >= 0); + + ret = flb_output_processor_unit(ctx, "metrics", + "metrics_selector", out_ffd, + "metric_name", "/storage/", + "action", "includeNULL", NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_get_processor(ctx, out_ffd, &proc); + TEST_CHECK(ret == 0); + TEST_CHECK(proc != NULL); + + /* Walk through logs processor units and verify one was added */ + mk_list_foreach(head, &proc->metrics) { + pu = mk_list_entry(head, struct flb_processor_unit, _head); + if (strcmp(pu->name, "metrics_selector") == 0) { + found++; + + pi = (struct flb_processor_instance *) pu->ctx; + TEST_CHECK(pi != NULL); + + val = flb_processor_instance_get_property("metric_name", pi); + TEST_CHECK(val != NULL); + TEST_CHECK(strcmp(val, "/storage/") == 0); + + val = flb_processor_instance_get_property("action", pi); + TEST_CHECK(val != NULL); + TEST_CHECK(strcmp(val, "includeNULL") == 0); + + break; + } + } + TEST_CHECK(found == 1); + + flb_destroy(ctx); +} + TEST_LIST = { { "processor", processor }, + {"input_processor", input_processor}, + {"output_processor", output_processor}, { 0 } };