From eb84b317dad4ccde847433029970c8358ca651b8 Mon Sep 17 00:00:00 2001 From: Guillermo Facundo Colunga Date: Fri, 13 Mar 2026 17:55:55 +0100 Subject: [PATCH] input_metric: split large metric contexts into chunk-sized batches Addresses https://github.com/fluent/fluent-bit/issues/9653 Large metric scrapes (e.g., 200K+ metrics from prometheus_scrape) were encoded as a single chunk exceeding the 2MB limit, breaking downstream plugins like out_opentelemetry. Add a size-aware split path to flb_input_metrics_append() that batches metric families into chunk-sized cmt contexts using cmt_cat_*() before encoding. Small payloads are unaffected (fast path preserved). Signed-off-by: Guillermo Facundo Colunga Co-Authored-By: Claude Opus 4.6 (1M context) --- src/flb_input_metric.c | 354 +++++++++++++++++++++++++++- tests/internal/CMakeLists.txt | 1 + tests/internal/input_metric.c | 429 ++++++++++++++++++++++++++++++++++ 3 files changed, 776 insertions(+), 8 deletions(-) create mode 100644 tests/internal/input_metric.c diff --git a/src/flb_input_metric.c b/src/flb_input_metric.c index 1f55b848905..e48a88d632a 100644 --- a/src/flb_input_metric.c +++ b/src/flb_input_metric.c @@ -17,12 +17,328 @@ * limitations under the License. */ +#include + #include #include #include #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * Copy static labels from a source cmt context to a destination cmt context. + * This ensures each batch retains the same static labels as the original. + */ +static int copy_static_labels(struct cmt *dst, struct cmt *src) +{ + int ret; + struct cfl_list *head; + struct cmt_label *label; + + if (src->static_labels == NULL) { + return 0; + } + + cfl_list_foreach(head, &src->static_labels->list) { + label = cfl_list_entry(head, struct cmt_label, _head); + ret = cmt_label_add(dst, label->key, label->val); + if (ret != 0) { + return -1; + } + } + + return 0; +} + +/* + * Deep-copy a cfl_kvlist by iterating its entries and inserting copies + * into the destination. Handles string, int64, uint64, double, and bool + * variant types. Nested kvlists and arrays are skipped (not typically + * used in cmetrics metadata). + */ +static int copy_kvlist(struct cfl_kvlist *dst, struct cfl_kvlist *src) +{ + int ret; + struct cfl_list *head; + struct cfl_kvpair *pair; + + if (src == NULL || dst == NULL) { + return 0; + } + + cfl_list_foreach(head, &src->list) { + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + + switch (pair->val->type) { + case CFL_VARIANT_STRING: + ret = cfl_kvlist_insert_string(dst, pair->key, + pair->val->data.as_string); + break; + case CFL_VARIANT_INT: + ret = cfl_kvlist_insert_int64(dst, pair->key, + pair->val->data.as_int64); + break; + case CFL_VARIANT_UINT: + ret = cfl_kvlist_insert_uint64(dst, pair->key, + pair->val->data.as_uint64); + break; + case CFL_VARIANT_DOUBLE: + ret = cfl_kvlist_insert_double(dst, pair->key, + pair->val->data.as_double); + break; + case CFL_VARIANT_BOOL: + ret = cfl_kvlist_insert_bool(dst, pair->key, + pair->val->data.as_bool); + break; + case CFL_VARIANT_BYTES: + ret = cfl_kvlist_insert_bytes(dst, pair->key, + pair->val->data.as_bytes, + pair->val->size, CFL_FALSE); + break; + default: + /* Skip unsupported types (arrays, nested kvlists, references) */ + ret = 0; + break; + } + + if (ret != 0) { + return -1; + } + } + + return 0; +} + +/* + * Copy internal and external metadata from a source cmt context to a + * destination cmt context. This preserves OTLP resource/scope metadata + * across batch splits. + */ +static int copy_metadata(struct cmt *dst, struct cmt *src) +{ + int ret; + + ret = copy_kvlist(dst->internal_metadata, src->internal_metadata); + if (ret != 0) { + return -1; + } + + ret = copy_kvlist(dst->external_metadata, src->external_metadata); + if (ret != 0) { + return -1; + } + + return 0; +} + +/* + * Split a large cmt context into multiple smaller batches and append each + * one individually. This is used when the encoded metrics exceed the chunk + * size limit (FLB_INPUT_CHUNK_FS_MAX_SIZE = 2MB). + * + * The approach: count total metric families across all 6 metric type lists, + * estimate how many families fit per batch, then iterate through families + * in order, building up temporary cmt contexts and flushing them. + * + * Note: if a mid-batch error occurs, previously appended batches are NOT + * rolled back. This is acceptable because metrics consumers (e.g., + * Prometheus, OTLP) handle partial scrapes gracefully. + * + * Note: internal/external metadata (kvlists) are deep-copied into each + * batch via copy_metadata() to preserve OTLP resource/scope attributes. + */ +static int input_metrics_split_and_append(struct flb_input_instance *ins, + const char *tag, size_t tag_len, + struct cmt *src, + size_t total_encoded_size) +{ + int ret; + int total_families; + int families_per_batch; + int batch_count = 0; + int batches_sent = 0; + uint64_t numerator; + char *mt_buf; + size_t mt_size; + struct cmt *batch = NULL; + struct cfl_list *head; + struct cfl_list *tmp_head; + + /* Iteration variables used by the PROCESS_METRIC_LIST macro */ + struct cmt_counter *counter; + struct cmt_gauge *gauge; + struct cmt_untyped *untyped; + struct cmt_histogram *histogram; + struct cmt_exp_histogram *exp_histogram; + struct cmt_summary *summary; + + /* Count total metric families across all type lists */ + total_families = cfl_list_size(&src->counters) + + cfl_list_size(&src->gauges) + + cfl_list_size(&src->untypeds) + + cfl_list_size(&src->histograms) + + cfl_list_size(&src->exp_histograms) + + cfl_list_size(&src->summaries); + + if (total_families == 0) { + return 0; + } + + /* + * Estimate how many families fit within the chunk size limit. + * Use 64-bit arithmetic to avoid overflow on 32-bit platforms. + * Ensure at least 1 family per batch to guarantee forward progress. + */ + numerator = (uint64_t) total_families * FLB_INPUT_CHUNK_FS_MAX_SIZE; + families_per_batch = (int) (numerator / total_encoded_size); + if (families_per_batch < 1) { + families_per_batch = 1; + } + + flb_plg_debug(ins, + "metric batch split: total_families=%d " + "families_per_batch=%d encoded_size=%zu limit=%zu", + total_families, families_per_batch, + total_encoded_size, (size_t) FLB_INPUT_CHUNK_FS_MAX_SIZE); + +/* + * Macro to iterate one metric type list, adding families to the current + * batch. When the batch reaches families_per_batch, it is encoded, + * appended, and destroyed. A new batch is created for subsequent families. + * + * Parameters: + * list - the cfl_list head in the source cmt (e.g., src->counters) + * type - the C struct type (e.g., cmt_counter) + * cat_func - the cmt_cat function (e.g., cmt_cat_counter) + * var - a local variable of the correct pointer type + */ +#define PROCESS_METRIC_LIST(list, type, cat_func, var) \ + cfl_list_foreach_safe(head, tmp_head, &(list)) { \ + var = cfl_list_entry(head, struct type, _head); \ + \ + /* Create a new batch context if needed */ \ + if (batch == NULL) { \ + batch = cmt_create(); \ + if (batch == NULL) { \ + flb_plg_error(ins, \ + "could not create batch cmt context"); \ + goto error; \ + } \ + ret = copy_static_labels(batch, src); \ + if (ret != 0) { \ + flb_plg_error(ins, \ + "could not copy static labels to batch"); \ + goto error; \ + } \ + ret = copy_metadata(batch, src); \ + if (ret != 0) { \ + flb_plg_error(ins, \ + "could not copy metadata to batch"); \ + goto error; \ + } \ + batch_count = 0; \ + } \ + \ + ret = cat_func(batch, var, NULL); \ + if (ret != 0) { \ + flb_plg_error(ins, \ + "could not concatenate metric family into batch"); \ + goto error; \ + } \ + batch_count++; \ + \ + /* Flush the batch if it has reached the target size */ \ + if (batch_count >= families_per_batch) { \ + ret = cmt_encode_msgpack_create(batch, &mt_buf, &mt_size); \ + cmt_destroy(batch); \ + batch = NULL; \ + if (ret != 0) { \ + flb_plg_error(ins, "could not encode metric batch"); \ + goto error; \ + } \ + if (mt_size > FLB_INPUT_CHUNK_FS_MAX_SIZE && \ + families_per_batch > 1) { \ + families_per_batch = (families_per_batch + 1) / 2; \ + flb_plg_debug(ins, \ + "batch %zu bytes exceeds limit, reducing " \ + "families_per_batch to %d", \ + mt_size, families_per_batch); \ + } \ + else if (mt_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { \ + flb_plg_warn(ins, \ + "metric batch (%zu bytes) still exceeds " \ + "chunk limit; cannot split further", mt_size); \ + } \ + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_METRICS, 0, \ + tag, tag_len, \ + mt_buf, mt_size); \ + cmt_encode_msgpack_destroy(mt_buf); \ + if (ret != 0) { \ + flb_plg_error(ins, "could not append metric batch"); \ + goto error; \ + } \ + batches_sent++; \ + } \ + } + + /* Process all 6 metric type lists in order */ + PROCESS_METRIC_LIST(src->counters, cmt_counter, cmt_cat_counter, counter); + PROCESS_METRIC_LIST(src->gauges, cmt_gauge, cmt_cat_gauge, gauge); + PROCESS_METRIC_LIST(src->untypeds, cmt_untyped, cmt_cat_untyped, untyped); + PROCESS_METRIC_LIST(src->histograms, cmt_histogram, + cmt_cat_histogram, histogram); + PROCESS_METRIC_LIST(src->exp_histograms, cmt_exp_histogram, + cmt_cat_exp_histogram, exp_histogram); + PROCESS_METRIC_LIST(src->summaries, cmt_summary, + cmt_cat_summary, summary); + +#undef PROCESS_METRIC_LIST + + /* Flush any remaining families in the last partial batch */ + if (batch != NULL) { + ret = cmt_encode_msgpack_create(batch, &mt_buf, &mt_size); + cmt_destroy(batch); + batch = NULL; + if (ret != 0) { + flb_plg_error(ins, "could not encode final metric batch"); + return -1; + } + if (mt_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { + flb_plg_warn(ins, + "metric batch (%zu bytes) still exceeds " + "chunk limit; cannot split further", mt_size); + } + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_METRICS, 0, + tag, tag_len, mt_buf, mt_size); + cmt_encode_msgpack_destroy(mt_buf); + if (ret != 0) { + flb_plg_error(ins, "could not append final metric batch"); + return -1; + } + batches_sent++; + } + + flb_plg_debug(ins, "metric batch split complete: %d batches sent", + batches_sent); + + return 0; + +error: + if (batch != NULL) { + cmt_destroy(batch); + } + return -1; +} static int input_metrics_append(struct flb_input_instance *ins, size_t processor_starting_stage, @@ -79,22 +395,44 @@ static int input_metrics_append(struct flb_input_instance *ins, /* Convert metrics to msgpack */ ret = cmt_encode_msgpack_create(encode_context, &mt_buf, &mt_size); - - if (out_context && out_context != cmt) { - cmt_destroy(out_context); - } - if (ret != 0) { + if (out_context && out_context != cmt) { + cmt_destroy(out_context); + } flb_plg_error(ins, "could not encode metrics"); return -1; } - /* Append packed metrics */ - ret = flb_input_chunk_append_raw(ins, FLB_INPUT_METRICS, 0, - tag, tag_len, mt_buf, mt_size); + /* Fast path: encoded metrics fit within the chunk size limit */ + if (mt_size <= FLB_INPUT_CHUNK_FS_MAX_SIZE) { + if (out_context && out_context != cmt) { + cmt_destroy(out_context); + } + + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_METRICS, 0, + tag, tag_len, mt_buf, mt_size); + cmt_encode_msgpack_destroy(mt_buf); + return ret; + } + + /* + * Slow path: encoded metrics exceed the chunk size limit. + * Free the oversized buffer and split into smaller batches. + * We need encode_context alive for iteration, so defer its cleanup. + */ + flb_plg_debug(ins, + "encoded metrics size %zu exceeds chunk limit %zu, splitting", + mt_size, (size_t) FLB_INPUT_CHUNK_FS_MAX_SIZE); cmt_encode_msgpack_destroy(mt_buf); + ret = input_metrics_split_and_append(ins, tag, tag_len, + encode_context, mt_size); + + if (out_context && out_context != cmt) { + cmt_destroy(out_context); + } + return ret; } diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index ec84a3bcdf0..94009227e82 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -27,6 +27,7 @@ set(UNIT_TESTS_FILES mp_chunk_cobj.c input_chunk.c input_chunk_routes.c + input_metric.c flb_time.c file.c csv.c diff --git a/tests/internal/input_metric.c b/tests/internal/input_metric.c new file mode 100644 index 00000000000..3b1a441c421 --- /dev/null +++ b/tests/internal/input_metric.c @@ -0,0 +1,429 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "flb_tests_internal.h" + +/* + * Test: copy_static_labels round-trip + * + * Create a cmt with static labels, copy them to a new cmt using + * cmt_label_add, and verify they match. + */ +static void test_static_labels_copy() +{ + int ret; + struct cmt *src; + struct cmt *dst; + struct cfl_list *head; + struct cmt_label *label; + int count; + + src = cmt_create(); + TEST_CHECK(src != NULL); + + /* Add static labels to source */ + ret = cmt_label_add(src, "env", "production"); + TEST_CHECK(ret == 0); + ret = cmt_label_add(src, "region", "eu-west-1"); + TEST_CHECK(ret == 0); + + /* Create destination and copy labels */ + dst = cmt_create(); + TEST_CHECK(dst != NULL); + + cfl_list_foreach(head, &src->static_labels->list) { + label = cfl_list_entry(head, struct cmt_label, _head); + ret = cmt_label_add(dst, label->key, label->val); + TEST_CHECK(ret == 0); + } + + /* Verify destination has the same labels */ + count = 0; + cfl_list_foreach(head, &dst->static_labels->list) { + count++; + } + TEST_CHECK(count == 2); + + cmt_destroy(src); + cmt_destroy(dst); +} + +/* + * Test: cmt_cat_counter copies a single counter family correctly + * + * Create a counter in one cmt, copy it to another via cmt_cat_counter, + * encode both, and verify the copy has the same data. + */ +static void test_cat_single_counter() +{ + int ret; + struct cmt *src; + struct cmt *dst; + struct cmt_counter *c; + struct cmt *decoded; + size_t off; + char *buf_src = NULL; + char *buf_dst = NULL; + size_t size_src; + size_t size_dst; + + src = cmt_create(); + TEST_CHECK(src != NULL); + + /* Create a counter with 2 label keys */ + c = cmt_counter_create(src, "test", "ns", "requests_total", + "Total requests", 2, + (char *[]){"method", "status"}); + TEST_CHECK(c != NULL); + + ret = cmt_counter_set(c, 1000000000, 42.0, 2, + (char *[]){"GET", "200"}); + TEST_CHECK(ret == 0); + + ret = cmt_counter_set(c, 1000000000, 7.0, 2, + (char *[]){"POST", "500"}); + TEST_CHECK(ret == 0); + + /* Copy to destination */ + dst = cmt_create(); + TEST_CHECK(dst != NULL); + + ret = cmt_cat_counter(dst, c, NULL); + TEST_CHECK(ret == 0); + + /* Encode the destination and decode it back */ + ret = cmt_encode_msgpack_create(dst, &buf_dst, &size_dst); + TEST_CHECK(ret == 0); + + off = 0; + ret = cmt_decode_msgpack_create(&decoded, buf_dst, size_dst, &off); + TEST_CHECK(ret == 0); + + /* Verify the decoded context has exactly 1 counter family */ + TEST_CHECK(cfl_list_size(&decoded->counters) == 1); + + /* Verify the counter values by encoding source and comparing payloads */ + ret = cmt_encode_msgpack_create(src, &buf_src, &size_src); + TEST_CHECK(ret == 0); + TEST_CHECK(size_src == size_dst); + TEST_CHECK(memcmp(buf_src, buf_dst, size_src) == 0); + + cmt_decode_msgpack_destroy(decoded); + cmt_encode_msgpack_destroy(buf_src); + cmt_encode_msgpack_destroy(buf_dst); + cmt_destroy(src); + cmt_destroy(dst); +} + +/* + * Test: batch splitting preserves all metric families + * + * Create a cmt with many counter families, simulate the splitting logic + * (same as input_metrics_split_and_append), and verify that re-assembling + * all batches yields the same total number of families. + */ +static void test_batch_split_preserves_families() +{ + int i; + int ret; + int total_families = 20; + int families_per_batch = 5; + int batch_count = 0; + int total_batches = 0; + int total_recovered = 0; + char name[64]; + struct cmt *src; + struct cmt *batch = NULL; + struct cmt_counter *c; + struct cfl_list *head; + struct cfl_list *tmp; + struct cmt_counter *counter; + char *mt_buf; + size_t mt_size; + struct cmt *decoded; + size_t off; + + src = cmt_create(); + TEST_CHECK(src != NULL); + + /* Create many counter families */ + for (i = 0; i < total_families; i++) { + snprintf(name, sizeof(name), "metric_%d", i); + c = cmt_counter_create(src, "test", "ns", name, + "A test counter", 0, NULL); + TEST_CHECK(c != NULL); + ret = cmt_counter_set(c, 1000000000, (double) i, 0, NULL); + TEST_CHECK(ret == 0); + } + + /* Verify source has the expected number of counter families */ + TEST_CHECK(cfl_list_size(&src->counters) == total_families); + + /* Simulate the splitting logic from input_metrics_split_and_append */ + cfl_list_foreach_safe(head, tmp, &src->counters) { + counter = cfl_list_entry(head, struct cmt_counter, _head); + + if (batch == NULL) { + batch = cmt_create(); + TEST_CHECK(batch != NULL); + batch_count = 0; + } + + ret = cmt_cat_counter(batch, counter, NULL); + TEST_CHECK(ret == 0); + batch_count++; + + if (batch_count >= families_per_batch) { + /* Encode the batch */ + ret = cmt_encode_msgpack_create(batch, &mt_buf, &mt_size); + TEST_CHECK(ret == 0); + + /* Decode it back and count families */ + off = 0; + ret = cmt_decode_msgpack_create(&decoded, mt_buf, mt_size, &off); + TEST_CHECK(ret == 0); + + total_recovered += cfl_list_size(&decoded->counters); + + cmt_decode_msgpack_destroy(decoded); + cmt_encode_msgpack_destroy(mt_buf); + cmt_destroy(batch); + batch = NULL; + total_batches++; + } + } + + /* Flush remaining */ + if (batch != NULL) { + ret = cmt_encode_msgpack_create(batch, &mt_buf, &mt_size); + TEST_CHECK(ret == 0); + + off = 0; + ret = cmt_decode_msgpack_create(&decoded, mt_buf, mt_size, &off); + TEST_CHECK(ret == 0); + + total_recovered += cfl_list_size(&decoded->counters); + + cmt_decode_msgpack_destroy(decoded); + cmt_encode_msgpack_destroy(mt_buf); + cmt_destroy(batch); + batch = NULL; + total_batches++; + } + + /* Verify all families were recovered across all batches */ + TEST_CHECK(total_recovered == total_families); + + /* Verify expected number of batches (20 families / 5 per batch = 4) */ + TEST_CHECK(total_batches == 4); + + cmt_destroy(src); +} + +/* + * Test: batch splitting with mixed metric types + * + * Create a cmt with counters and gauges, split into batches, and verify + * the total family count across all batches matches the original. + */ +static void test_batch_split_mixed_types() +{ + int i; + int ret; + int total_families; + int families_per_batch = 3; + int batch_count = 0; + int total_recovered = 0; + char name[64]; + struct cmt *src; + struct cmt *batch = NULL; + struct cmt_counter *c; + struct cmt_gauge *g; + struct cfl_list *head; + struct cfl_list *tmp; + struct cmt_counter *counter; + struct cmt_gauge *gauge; + char *mt_buf; + size_t mt_size; + struct cmt *decoded; + size_t off; + + src = cmt_create(); + TEST_CHECK(src != NULL); + + /* Create 5 counters and 5 gauges = 10 total families */ + for (i = 0; i < 5; i++) { + snprintf(name, sizeof(name), "counter_%d", i); + c = cmt_counter_create(src, "test", "ns", name, + "A counter", 0, NULL); + TEST_CHECK(c != NULL); + cmt_counter_set(c, 1000000000, (double) i, 0, NULL); + } + for (i = 0; i < 5; i++) { + snprintf(name, sizeof(name), "gauge_%d", i); + g = cmt_gauge_create(src, "test", "ns", name, + "A gauge", 0, NULL); + TEST_CHECK(g != NULL); + cmt_gauge_set(g, 1000000000, (double) (i * 10), 0, NULL); + } + + total_families = cfl_list_size(&src->counters) + + cfl_list_size(&src->gauges); + TEST_CHECK(total_families == 10); + + /* + * Simulate split: process counters first, then gauges. + * Same order as input_metrics_split_and_append. + */ + cfl_list_foreach_safe(head, tmp, &src->counters) { + counter = cfl_list_entry(head, struct cmt_counter, _head); + + if (batch == NULL) { + batch = cmt_create(); + TEST_CHECK(batch != NULL); + batch_count = 0; + } + + ret = cmt_cat_counter(batch, counter, NULL); + TEST_CHECK(ret == 0); + batch_count++; + + if (batch_count >= families_per_batch) { + ret = cmt_encode_msgpack_create(batch, &mt_buf, &mt_size); + TEST_CHECK(ret == 0); + off = 0; + ret = cmt_decode_msgpack_create(&decoded, mt_buf, mt_size, &off); + TEST_CHECK(ret == 0); + total_recovered += cfl_list_size(&decoded->counters) + + cfl_list_size(&decoded->gauges); + cmt_decode_msgpack_destroy(decoded); + cmt_encode_msgpack_destroy(mt_buf); + cmt_destroy(batch); + batch = NULL; + } + } + + cfl_list_foreach_safe(head, tmp, &src->gauges) { + gauge = cfl_list_entry(head, struct cmt_gauge, _head); + + if (batch == NULL) { + batch = cmt_create(); + TEST_CHECK(batch != NULL); + batch_count = 0; + } + + ret = cmt_cat_gauge(batch, gauge, NULL); + TEST_CHECK(ret == 0); + batch_count++; + + if (batch_count >= families_per_batch) { + ret = cmt_encode_msgpack_create(batch, &mt_buf, &mt_size); + TEST_CHECK(ret == 0); + off = 0; + ret = cmt_decode_msgpack_create(&decoded, mt_buf, mt_size, &off); + TEST_CHECK(ret == 0); + total_recovered += cfl_list_size(&decoded->counters) + + cfl_list_size(&decoded->gauges); + cmt_decode_msgpack_destroy(decoded); + cmt_encode_msgpack_destroy(mt_buf); + cmt_destroy(batch); + batch = NULL; + } + } + + /* Flush remaining */ + if (batch != NULL) { + ret = cmt_encode_msgpack_create(batch, &mt_buf, &mt_size); + TEST_CHECK(ret == 0); + off = 0; + ret = cmt_decode_msgpack_create(&decoded, mt_buf, mt_size, &off); + TEST_CHECK(ret == 0); + total_recovered += cfl_list_size(&decoded->counters) + + cfl_list_size(&decoded->gauges); + cmt_decode_msgpack_destroy(decoded); + cmt_encode_msgpack_destroy(mt_buf); + cmt_destroy(batch); + batch = NULL; + } + + /* All 10 families must be recovered */ + TEST_CHECK(total_recovered == 10); + + cmt_destroy(src); +} + +/* + * Test: empty cmt context produces zero families + */ +static void test_empty_context() +{ + struct cmt *src; + + src = cmt_create(); + TEST_CHECK(src != NULL); + + TEST_CHECK(cfl_list_size(&src->counters) == 0); + TEST_CHECK(cfl_list_size(&src->gauges) == 0); + TEST_CHECK(cfl_list_size(&src->histograms) == 0); + TEST_CHECK(cfl_list_size(&src->summaries) == 0); + TEST_CHECK(cfl_list_size(&src->untypeds) == 0); + TEST_CHECK(cfl_list_size(&src->exp_histograms) == 0); + + cmt_destroy(src); +} + +/* + * Test: families_per_batch calculation + * + * Verify the batch size estimation: given total_families, total_encoded_size, + * and FLB_INPUT_CHUNK_FS_MAX_SIZE (2MB), the formula should produce sensible + * batch sizes. + */ +static void test_families_per_batch_calculation() +{ + int families_per_batch; + uint64_t numerator; + + /* Case 1: 100 families, 10MB total -> ~20 families per 2MB batch */ + numerator = (uint64_t) 100 * FLB_INPUT_CHUNK_FS_MAX_SIZE; + families_per_batch = (int) (numerator / (10 * 1024 * 1024)); + TEST_CHECK(families_per_batch == 19); /* 100 * 2048000 / 10485760 = 19 */ + + /* Case 2: 1 family, 5MB total -> clamp to 1 */ + numerator = (uint64_t) 1 * FLB_INPUT_CHUNK_FS_MAX_SIZE; + families_per_batch = (int) (numerator / (5 * 1024 * 1024)); + if (families_per_batch < 1) { + families_per_batch = 1; + } + TEST_CHECK(families_per_batch == 1); + + /* Case 3: 1000 families, 4MB total -> ~500 families per 2MB batch */ + numerator = (uint64_t) 1000 * FLB_INPUT_CHUNK_FS_MAX_SIZE; + families_per_batch = (int) (numerator / (4 * 1024 * 1024)); + TEST_CHECK(families_per_batch == 488); /* 1000 * 2048000 / 4194304 = 488 */ +} + +TEST_LIST = { + { "static_labels_copy", test_static_labels_copy}, + { "cat_single_counter", test_cat_single_counter}, + { "batch_split_preserves_families", test_batch_split_preserves_families}, + { "batch_split_mixed_types", test_batch_split_mixed_types}, + { "empty_context", test_empty_context}, + { "families_per_batch_calculation", test_families_per_batch_calculation}, + { 0 } +};