From 2fe5de42fa3fb45631e18680963fc01eb1737bd1 Mon Sep 17 00:00:00 2001 From: radudiaconu Date: Mon, 15 Jun 2026 21:50:26 +0300 Subject: [PATCH 1/7] feat(connectors): add Meilisearch sink connector --- Cargo.lock | 92 ++ Cargo.toml | 6 + core/connectors/README.md | 1 + core/connectors/sinks/README.md | 1 + .../sinks/meilisearch_sink/Cargo.toml | 52 + .../sinks/meilisearch_sink/README.md | 45 + .../sinks/meilisearch_sink/config.toml | 43 + .../sinks/meilisearch_sink/src/lib.rs | 1074 +++++++++++++++++ .../fixtures/meilisearch/container.rs | 179 +++ .../connectors/fixtures/meilisearch/mod.rs | 22 + .../connectors/fixtures/meilisearch/sink.rs | 90 ++ .../tests/connectors/fixtures/mod.rs | 2 + .../meilisearch/meilisearch_sink.rs | 81 ++ .../tests/connectors/meilisearch/mod.rs | 18 + .../tests/connectors/meilisearch/sink.toml | 20 + core/integration/tests/connectors/mod.rs | 1 + 16 files changed, 1727 insertions(+) create mode 100644 core/connectors/sinks/meilisearch_sink/Cargo.toml create mode 100644 core/connectors/sinks/meilisearch_sink/README.md create mode 100644 core/connectors/sinks/meilisearch_sink/config.toml create mode 100644 core/connectors/sinks/meilisearch_sink/src/lib.rs create mode 100644 core/integration/tests/connectors/fixtures/meilisearch/container.rs create mode 100644 core/integration/tests/connectors/fixtures/meilisearch/mod.rs create mode 100644 core/integration/tests/connectors/fixtures/meilisearch/sink.rs create mode 100644 core/integration/tests/connectors/meilisearch/meilisearch_sink.rs create mode 100644 core/integration/tests/connectors/meilisearch/mod.rs create mode 100644 core/integration/tests/connectors/meilisearch/sink.toml diff --git a/Cargo.lock b/Cargo.lock index 4988f664fd..be23d9a18c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3196,6 +3196,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "convert_case" version = "0.9.0" @@ -6916,6 +6925,26 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_meilisearch_sink" +version = "0.4.0" +dependencies = [ + "async-trait", + "base64", + "dashmap", + "iggy_common", + "iggy_connector_sdk", + "meilisearch-sdk", + "once_cell", + "reqwest 0.13.4", + "secrecy", + "serde", + "serde_json", + "simd-json", + "tokio", + "tracing", +] + [[package]] name = "iggy_connector_mongodb_sink" version = "0.4.1-edge.1" @@ -7360,6 +7389,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "iso8601" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46" +dependencies = [ + "nom 8.0.0", +] + [[package]] name = "itertools" version = "0.13.0" @@ -8150,6 +8188,49 @@ dependencies = [ "digest 0.11.3", ] +[[package]] +name = "meilisearch-index-setting-macro" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93b5b21df781c820a9cc387b808d4128cbc164dd28d67ac6ed666a00996f8f15" +dependencies = [ + "convert_case 0.8.0", + "proc-macro2", + "quote", + "structmeta", + "syn 2.0.117", +] + +[[package]] +name = "meilisearch-sdk" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e6e3646ba2a9a306296c1edf4a050508a408c1b59ca456d9ad4965ec6e91e9" +dependencies = [ + "async-trait", + "bytes", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "iso8601", + "jsonwebtoken", + "log", + "meilisearch-index-setting-macro", + "pin-project-lite", + "reqwest 0.12.28", + "serde", + "serde_json", + "thiserror 2.0.18", + "time", + "tokio", + "uuid", + "wasm-bindgen-futures", + "web-sys", + "yaup", +] + [[package]] name = "memchr" version = "2.8.1" @@ -14823,6 +14904,17 @@ dependencies = [ "time", ] +[[package]] +name = "yaup" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0144f1a16a199846cb21024da74edd930b43443463292f536b7110b4855b5c6" +dependencies = [ + "form_urlencoded", + "serde", + "thiserror 1.0.69", +] + [[package]] name = "yew" version = "0.23.0" diff --git a/Cargo.toml b/Cargo.toml index 888d97014a..28d29910b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ members = [ "core/connectors/sinks/http_sink", "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/influxdb_sink", + "core/connectors/sinks/meilisearch_sink", "core/connectors/sinks/mongodb_sink", "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", @@ -200,6 +201,11 @@ lending-iterator = "0.1.7" libc = "0.2.186" log = "0.4.30" lz4_flex = "0.13.1" +meilisearch-sdk = { version = "0.33.0", default-features = false, features = [ + "reqwest", + "tls", + "jwt_rust_crypto", +] } message_bus = { path = "core/message_bus" } metadata = { path = "core/metadata" } mimalloc = "0.1" diff --git a/core/connectors/README.md b/core/connectors/README.md index b7d24cfbab..42d89ccd6e 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -83,6 +83,7 @@ Each sink should have its own, custom configuration, which is passed along with - **Doris Sink** - loads JSON messages into Apache Doris tables via the Stream Load HTTP API - **Elasticsearch Sink** - sends messages to Elasticsearch indices - **Iceberg Sink** - writes data to Apache Iceberg tables via REST catalog +- **Meilisearch Sink** - indexes messages in Meilisearch - **PostgreSQL Sink** - stores messages in PostgreSQL database tables - **Quickwit Sink** - indexes messages in Quickwit search engine - **Stdout Sink** - prints messages to standard output (useful for debugging/development) diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md index 9aaaa2ffaa..dcd6d0e554 100644 --- a/core/connectors/sinks/README.md +++ b/core/connectors/sinks/README.md @@ -12,6 +12,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s | **elasticsearch_sink** | Sends messages to Elasticsearch indices for full-text search and analytics | | **iceberg_sink** | Writes data to Apache Iceberg tables via REST catalog with S3/GCS/Azure storage | | **influxdb_sink** | Writes messages to InfluxDB as line-protocol points; supports both V2 (org/bucket, Flux) and V3 (db, SQL) | +| **meilisearch_sink** | Indexes messages in Meilisearch for full-text search | | **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas | | **quickwit_sink** | Indexes messages in Quickwit search engine for log analytics | | **stdout_sink** | Prints messages to standard output (useful for debugging and development) | diff --git a/core/connectors/sinks/meilisearch_sink/Cargo.toml b/core/connectors/sinks/meilisearch_sink/Cargo.toml new file mode 100644 index 0000000000..8bda76a40d --- /dev/null +++ b/core/connectors/sinks/meilisearch_sink/Cargo.toml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_meilisearch_sink" +version = "0.4.0" +description = "Iggy Meilisearch sink connector" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "search", "meilisearch"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +dashmap = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +meilisearch-sdk = { workspace = true } +once_cell = { workspace = true } +reqwest = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/core/connectors/sinks/meilisearch_sink/README.md b/core/connectors/sinks/meilisearch_sink/README.md new file mode 100644 index 0000000000..7a17d2a41f --- /dev/null +++ b/core/connectors/sinks/meilisearch_sink/README.md @@ -0,0 +1,45 @@ +# Meilisearch Sink Connector + +A sink connector that consumes messages from Iggy streams and writes them to a +Meilisearch index through the official Rust SDK. + +## Configuration + +- `url`: Meilisearch base URL. +- `index`: Target index UID. +- `api_key`: Optional Meilisearch API key sent as `Authorization: Bearer`. +- `primary_key`: Index primary key field. Defaults to `iggy_id`. +- `document_action`: `replace` uses SDK add-or-replace semantics; `update` uses SDK add-or-update semantics. Defaults to `replace`. +- `create_index_if_not_exists`: Create the index during `open()` when missing. Defaults to `true`. +- `include_metadata`: Add Iggy metadata fields to each document. Defaults to `true`. +- `batch_size`: Maximum documents per Meilisearch document request. Defaults to `1000`. +- `timeout`: Request timeout as a humantime string, for example `30s`. Defaults to `30s`. +- `wait_for_tasks`: Poll Meilisearch tasks until terminal state before returning from `consume()`. Defaults to `true`. +- `task_timeout`: Maximum time to wait for each Meilisearch task. Defaults to `30s`. +- `task_poll_interval`: Delay between task polls. Defaults to `100ms`. +- `max_retries`: Maximum transient retry attempts. Defaults to `3`. +- `retry_delay`: Initial transient retry delay. Defaults to `500ms`. +- `max_retry_delay`: Maximum transient retry delay. Defaults to `5s`. +- `max_open_retries`: Maximum transient retry attempts while opening the index. Defaults to `5`. + +## Behavior + +JSON object payloads are indexed as documents. JSON arrays or scalar values are +wrapped in a `value` field because Meilisearch documents must be objects. Raw +payloads are parsed as JSON when possible; otherwise, they are indexed as base64 +data. Text payloads are indexed in a `text` field. Unsupported payload schemas +fail the batch instead of being silently dropped. + +When the configured primary key is absent, the connector injects a stable value +derived from the exact Iggy stream, topic, partition, offset, and message ID. +This avoids Meilisearch primary-key inference failures and keeps repeated +delivery idempotent for the same message. + +When `include_metadata` is enabled, metadata fields are only inserted when the +document does not already contain those names. Existing user fields are +preserved. + +`wait_for_tasks=false` only skips waiting for document indexing tasks during +`consume()`. If `create_index_if_not_exists=true` and the connector creates the +index during `open()`, it still waits for that index-creation task so the first +batch cannot race the index creation. diff --git a/core/connectors/sinks/meilisearch_sink/config.toml b/core/connectors/sinks/meilisearch_sink/config.toml new file mode 100644 index 0000000000..6f29014616 --- /dev/null +++ b/core/connectors/sinks/meilisearch_sink/config.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "meilisearch" +enabled = true +version = 0 +name = "Meilisearch sink" +path = "../../target/release/libiggy_connector_meilisearch_sink" +verbose = false + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "meilisearch_sink_cg" + +[plugin_config] +url = "http://localhost:7700" +index = "iggy_messages" +primary_key = "iggy_id" +document_action = "replace" +create_index_if_not_exists = true +include_metadata = true +batch_size = 100 +timeout = "30s" +wait_for_tasks = true diff --git a/core/connectors/sinks/meilisearch_sink/src/lib.rs b/core/connectors/sinks/meilisearch_sink/src/lib.rs new file mode 100644 index 0000000000..897e873e42 --- /dev/null +++ b/core/connectors/sinks/meilisearch_sink/src/lib.rs @@ -0,0 +1,1074 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use base64::{Engine as _, engine::general_purpose}; +use iggy_common::IggyTimestamp; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, + convert::owned_value_to_serde_json, + retry::{exponential_backoff, jitter, parse_duration}, + sink_connector, +}; +use meilisearch_sdk::{ + client::Client, + errors::{ + Error as MeilisearchSdkError, ErrorCode as MeilisearchErrorCode, + ErrorType as MeilisearchErrorType, + }, + indexes::Index, + task_info::TaskInfo, + tasks::Task, +}; +use reqwest::Url; +use secrecy::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value, json}; +use std::{future::Future, time::Duration}; +use tokio::sync::Mutex; +use tracing::{info, warn}; + +sink_connector!(MeilisearchSink); + +const DEFAULT_PRIMARY_KEY: &str = "iggy_id"; +const DEFAULT_CREATE_INDEX_IF_NOT_EXISTS: bool = true; +const DEFAULT_INCLUDE_METADATA: bool = true; +const DEFAULT_BATCH_SIZE: usize = 1000; +const DEFAULT_TIMEOUT: &str = "30s"; +const DEFAULT_WAIT_FOR_TASKS: bool = true; +const DEFAULT_TASK_TIMEOUT: &str = "30s"; +const DEFAULT_TASK_POLL_INTERVAL: &str = "100ms"; +const DEFAULT_RETRY_DELAY: &str = "500ms"; +const DEFAULT_MAX_RETRY_DELAY: &str = "5s"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_MAX_OPEN_RETRIES: u32 = 5; +const ENCODING_BASE64: &str = "base64"; + +#[derive(Debug)] +struct State { + invocations_count: usize, + documents_enqueued: usize, + documents_indexed: usize, + errors_count: usize, +} + +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MeilisearchDocumentAction { + #[default] + Replace, + Update, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MeilisearchSinkConfig { + pub url: String, + pub index: String, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")] + pub api_key: Option, + pub primary_key: Option, + pub document_action: Option, + pub create_index_if_not_exists: Option, + pub include_metadata: Option, + pub batch_size: Option, + pub timeout: Option, + pub wait_for_tasks: Option, + pub task_timeout: Option, + pub task_poll_interval: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub max_retry_delay: Option, + pub max_open_retries: Option, +} + +#[derive(Debug)] +pub struct MeilisearchSink { + id: u32, + config: ResolvedMeilisearchSinkConfig, + client: Option, + state: Mutex, +} + +#[derive(Debug)] +struct ResolvedMeilisearchSinkConfig { + url: String, + index: String, + api_key: Option, + primary_key: String, + document_action: MeilisearchDocumentAction, + create_index_if_not_exists: bool, + include_metadata: bool, + batch_size: usize, + timeout: Duration, + wait_for_tasks: bool, + task_timeout: Duration, + task_poll_interval: Duration, + max_retries: u32, + retry_delay: Duration, + max_retry_delay: Duration, + max_open_retries: u32, +} + +impl From for ResolvedMeilisearchSinkConfig { + fn from(config: MeilisearchSinkConfig) -> Self { + let primary_key = config + .primary_key + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_PRIMARY_KEY.to_string()); + let document_action = config.document_action.unwrap_or_default(); + let create_index_if_not_exists = config + .create_index_if_not_exists + .unwrap_or(DEFAULT_CREATE_INDEX_IF_NOT_EXISTS); + let include_metadata = config.include_metadata.unwrap_or(DEFAULT_INCLUDE_METADATA); + let batch_size = config.batch_size.unwrap_or(DEFAULT_BATCH_SIZE).max(1); + let timeout = parse_duration(config.timeout.as_deref(), DEFAULT_TIMEOUT); + let wait_for_tasks = config.wait_for_tasks.unwrap_or(DEFAULT_WAIT_FOR_TASKS); + let task_timeout = parse_duration(config.task_timeout.as_deref(), DEFAULT_TASK_TIMEOUT); + let task_poll_interval = parse_duration( + config.task_poll_interval.as_deref(), + DEFAULT_TASK_POLL_INTERVAL, + ); + let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES).max(1); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let max_retry_delay = + parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); + let max_open_retries = config + .max_open_retries + .unwrap_or(DEFAULT_MAX_OPEN_RETRIES) + .max(1); + + Self { + url: config.url, + index: config.index, + api_key: config.api_key, + primary_key, + document_action, + create_index_if_not_exists, + include_metadata, + batch_size, + timeout, + wait_for_tasks, + task_timeout, + task_poll_interval, + max_retries, + retry_delay, + max_retry_delay, + max_open_retries, + } + } +} + +impl MeilisearchSink { + pub fn new(id: u32, config: MeilisearchSinkConfig) -> Self { + Self { + id, + config: config.into(), + client: None, + state: Mutex::new(State { + invocations_count: 0, + documents_enqueued: 0, + documents_indexed: 0, + errors_count: 0, + }), + } + } + + fn create_client(&self) -> Result { + let url = normalize_host(&self.config.url)?; + let api_key = self.config.api_key.as_ref().map(|key| key.expose_secret()); + Client::new(url, api_key).map_err(|error| { + Error::Connection(format!("Failed to create Meilisearch client: {error}")) + }) + } + + async fn check_connectivity(&self, client: &Client) -> Result<(), Error> { + let health = self + .retry_sdk_open_operation("health check", || client.health()) + .await?; + if health.status == "available" { + return Ok(()); + } + + Err(Error::Connection(format!( + "Meilisearch health check returned status '{}'", + health.status + ))) + } + + async fn ensure_index_exists(&self, client: &Client) -> Result<(), Error> { + match self.get_index_if_exists(client).await? { + Some(_) => { + info!("Meilisearch index '{}' already exists", self.config.index); + Ok(()) + } + None if self.config.create_index_if_not_exists => self.create_index(client).await, + None => Err(Error::InitError(format!( + "Meilisearch index '{}' does not exist and create_index_if_not_exists=false", + self.config.index + ))), + } + } + + async fn get_index_if_exists(&self, client: &Client) -> Result, Error> { + let max_attempts = self.config.max_open_retries.max(1); + let mut attempt = 0u32; + + loop { + match client.get_index(&self.config.index).await { + Ok(index) => return Ok(Some(index)), + Err(error) if is_index_not_found(&error) => return Ok(None), + Err(error) => { + attempt += 1; + let should_retry = attempt < max_attempts && is_transient_sdk_error(&error); + if !should_retry { + return Err(map_sdk_error(error)); + } + let delay = jitter(exponential_backoff( + self.config.retry_delay, + attempt, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch get index '{}' failed (attempt {attempt}/{max_attempts}): {error}. Retrying in {delay:?}...", + self.config.index + ); + tokio::time::sleep(delay).await; + } + } + } + } + + async fn create_index(&self, client: &Client) -> Result<(), Error> { + info!( + "Creating Meilisearch index '{}' with primary key '{}'", + self.config.index, self.config.primary_key + ); + + let task = self + .retry_sdk_open_operation("create index", || { + client.create_index(&self.config.index, Some(&self.config.primary_key)) + }) + .await?; + self.wait_for_index_creation_task(client, task).await?; + + info!("Created Meilisearch index '{}'", self.config.index); + Ok(()) + } + + fn prepare_document( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + message: ConsumedMessage, + ) -> Result, Error> { + let generated_id = generated_document_id(topic_metadata, messages_metadata, &message); + let ConsumedMessage { + id, + offset, + checksum, + timestamp, + origin_timestamp, + headers, + payload, + } = message; + + let mut document = match payload { + Payload::Json(value) => { + Self::document_from_json_value(owned_value_to_serde_json(&value)) + } + Payload::Raw(bytes) => { + let mut bytes_copy = bytes.clone(); + match simd_json::from_slice::(&mut bytes_copy) { + Ok(value) => Self::document_from_json_value(owned_value_to_serde_json(&value)), + Err(_) => json!({ + "data": general_purpose::STANDARD.encode(&bytes), + "data_type": "raw", + "data_encoding": ENCODING_BASE64, + }), + } + } + Payload::Text(text) => json!({ + "text": text, + "data_type": "text", + }), + _ => { + return Err(Error::InvalidRecordValue(format!( + "Unsupported payload format for Meilisearch sink: {}", + messages_metadata.schema + ))); + } + }; + + let object = document + .as_object_mut() + .expect("document_from_json_value always returns an object"); + + object + .entry(self.config.primary_key.clone()) + .or_insert_with(|| Value::String(generated_id.clone())); + + if self.config.include_metadata { + object + .entry(DEFAULT_PRIMARY_KEY.to_string()) + .or_insert_with(|| Value::String(generated_id)); + insert_metadata_field(object, "iggy_message_id", Value::String(id.to_string())); + insert_metadata_field(object, "iggy_offset", Value::from(offset)); + insert_metadata_field( + object, + "iggy_stream", + Value::from(topic_metadata.stream.as_str()), + ); + insert_metadata_field( + object, + "iggy_topic", + Value::from(topic_metadata.topic.as_str()), + ); + insert_metadata_field( + object, + "iggy_partition", + Value::from(messages_metadata.partition_id), + ); + insert_metadata_field(object, "iggy_checksum", Value::from(checksum)); + insert_metadata_field(object, "iggy_timestamp", Value::from(timestamp)); + insert_metadata_field( + object, + "iggy_origin_timestamp", + Value::from(origin_timestamp), + ); + insert_metadata_field( + object, + "iggy_ingested_at", + Value::from(IggyTimestamp::now().as_millis() as i64), + ); + if let Some(headers) = &headers + && let Ok(headers_value) = serde_json::to_value(headers) + { + insert_metadata_field(object, "iggy_headers", headers_value); + } + } + + Ok(Some(document)) + } + + fn document_from_json_value(value: Value) -> Value { + match value { + Value::Object(_) => value, + other => { + let mut object = Map::new(); + object.insert("value".to_string(), other); + Value::Object(object) + } + } + } + + async fn index_documents( + &self, + client: &Client, + documents: Vec, + ) -> Result { + let mut accepted = 0usize; + for chunk in documents.chunks(self.config.batch_size) { + match self.index_document_chunk(client, chunk).await { + Ok(indexed) => accepted += indexed, + Err(partial_error) => { + return Err(PartialIndexError { + accepted: accepted + partial_error.accepted, + error: partial_error.error, + }); + } + } + } + Ok(accepted) + } + + async fn index_document_chunk( + &self, + client: &Client, + documents: &[Value], + ) -> Result { + if documents.is_empty() { + return Ok(0); + } + + let index = client.index(&self.config.index); + let task = match self.config.document_action { + MeilisearchDocumentAction::Replace => { + self.retry_sdk_operation("add or replace documents", || { + index.add_or_replace(documents, Some(&self.config.primary_key)) + }) + .await + } + MeilisearchDocumentAction::Update => { + self.retry_sdk_operation("add or update documents", || { + index.add_or_update(documents, Some(&self.config.primary_key)) + }) + .await + } + } + .map_err(|error| PartialIndexError { accepted: 0, error })?; + self.wait_for_task(client, task) + .await + .map_err(|error| PartialIndexError { + accepted: documents.len(), + error, + })?; + Ok(documents.len()) + } + + async fn wait_for_task(&self, client: &Client, task: TaskInfo) -> Result<(), Error> { + if !self.config.wait_for_tasks { + return Ok(()); + } + + self.wait_for_task_completion(client, task).await + } + + async fn wait_for_index_creation_task( + &self, + client: &Client, + task: TaskInfo, + ) -> Result<(), Error> { + let task = self + .wait_for_task_status(client, task, self.config.max_open_retries) + .await?; + + if task.is_success() { + return Ok(()); + } + + if task.is_failure() { + let failure = task.unwrap_failure(); + if failure.error_code == MeilisearchErrorCode::IndexAlreadyExists { + return Ok(()); + } + return Err(Error::PermanentHttpError(format!( + "Meilisearch task failed: {}", + failure + ))); + } + + Err(Error::HttpRequestFailed( + "Meilisearch task did not reach a terminal state".to_string(), + )) + } + + async fn wait_for_task_completion(&self, client: &Client, task: TaskInfo) -> Result<(), Error> { + let task = self + .wait_for_task_status(client, task, self.config.max_retries) + .await?; + + if task.is_success() { + return Ok(()); + } + + if task.is_failure() { + let failure = task.unwrap_failure(); + return Err(Error::PermanentHttpError(format!( + "Meilisearch task failed: {}", + failure + ))); + } + + Err(Error::HttpRequestFailed( + "Meilisearch task did not reach a terminal state".to_string(), + )) + } + + async fn wait_for_task_status( + &self, + client: &Client, + task: TaskInfo, + max_attempts: u32, + ) -> Result { + let task_uid = task.get_task_uid(); + let mut elapsed_time = Duration::ZERO; + + while self.config.task_timeout > elapsed_time { + let status = self + .retry_sdk_operation_with_attempts("get task status", max_attempts, || { + client.get_task(task.clone()) + }) + .await?; + + if status.is_success() || status.is_failure() { + return Ok(status); + } + + tokio::time::sleep(self.config.task_poll_interval).await; + elapsed_time += self.config.task_poll_interval; + } + + Err(Error::HttpRequestFailed(format!( + "Meilisearch task {task_uid} timed out after {:?}", + self.config.task_timeout + ))) + } + + async fn retry_sdk_operation( + &self, + operation: &str, + operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + self.retry_sdk_operation_with_attempts(operation, self.config.max_retries, operation_fn) + .await + } + + async fn retry_sdk_open_operation( + &self, + operation: &str, + operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + self.retry_sdk_operation_with_attempts( + operation, + self.config.max_open_retries, + operation_fn, + ) + .await + } + + async fn retry_sdk_operation_with_attempts( + &self, + operation: &str, + max_attempts: u32, + mut operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + let max_attempts = max_attempts.max(1); + let mut attempt = 0u32; + + loop { + let result = tokio::time::timeout(self.config.timeout, operation_fn()).await; + match result { + Ok(Ok(value)) => return Ok(value), + Ok(Err(error)) => { + attempt += 1; + let should_retry = attempt < max_attempts && is_transient_sdk_error(&error); + if !should_retry { + return Err(map_sdk_error(error)); + } + let delay = jitter(exponential_backoff( + self.config.retry_delay, + attempt, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch {operation} failed (attempt {attempt}/{max_attempts}): {error}. Retrying in {delay:?}..." + ); + tokio::time::sleep(delay).await; + } + Err(_) => { + attempt += 1; + if attempt >= max_attempts { + return Err(Error::HttpRequestFailed(format!( + "Meilisearch {operation} timed out after {:?}", + self.config.timeout + ))); + } + let delay = jitter(exponential_backoff( + self.config.retry_delay, + attempt, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch {operation} timed out after {:?} (attempt {attempt}/{max_attempts}). Retrying in {delay:?}...", + self.config.timeout + ); + tokio::time::sleep(delay).await; + } + } + } + } + + async fn record_errors(&self, errors: usize) { + let mut state = self.state.lock().await; + state.errors_count += errors; + } +} + +#[async_trait] +impl Sink for MeilisearchSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Meilisearch sink connector with ID: {} for URL: {}, index: {}", + self.id, + sanitize_url_for_log(&self.config.url), + self.config.index + ); + + let client = self.create_client()?; + self.check_connectivity(&client).await?; + self.ensure_index_exists(&client).await?; + + self.client = Some(client); + info!( + "Successfully opened Meilisearch sink connector with ID: {}", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let mut state = self.state.lock().await; + state.invocations_count += 1; + let invocation = state.invocations_count; + drop(state); + + info!( + "Meilisearch sink with ID: {} received: {} messages, schema: {}, stream: {}, topic: {}, partition: {}, offset: {}, invocation: {}", + self.id, + messages.len(), + messages_metadata.schema, + topic_metadata.stream, + topic_metadata.topic, + messages_metadata.partition_id, + messages_metadata.current_offset, + invocation + ); + + let client = self + .client + .as_ref() + .ok_or_else(|| Error::Connection("Meilisearch client not initialized".to_string()))?; + + let messages_count = messages.len(); + let mut documents = Vec::with_capacity(messages.len()); + let mut invalid_records = 0usize; + for message in messages { + match self.prepare_document(topic_metadata, &messages_metadata, message) { + Ok(Some(document)) => documents.push(document), + Ok(None) => {} + Err(Error::InvalidRecordValue(reason)) => { + invalid_records += 1; + warn!( + "Dropping invalid Meilisearch sink record for connector ID: {}, reason: {}", + self.id, reason + ); + } + Err(error) => return Err(error), + } + } + if invalid_records > 0 { + self.record_errors(invalid_records).await; + } + + if documents.is_empty() { + return Ok(()); + } + + match self.index_documents(client, documents).await { + Ok(accepted) => { + let mut state = self.state.lock().await; + state.documents_enqueued += accepted; + if self.config.wait_for_tasks { + state.documents_indexed += accepted; + } + info!( + "Accepted {} of {} messages into Meilisearch index '{}'", + accepted, messages_count, self.config.index + ); + Ok(()) + } + Err(partial_error) => { + let mut state = self.state.lock().await; + state.documents_enqueued += partial_error.accepted; + if self.config.wait_for_tasks { + state.documents_indexed += partial_error.accepted; + } + state.errors_count += 1; + drop(state); + Err(partial_error.error) + } + } + } + + async fn close(&mut self) -> Result<(), Error> { + let state = self.state.lock().await; + info!( + "Meilisearch sink connector with ID: {} is closing. Stats: {} invocations, {} documents enqueued, {} documents indexed, {} errors", + self.id, + state.invocations_count, + state.documents_enqueued, + state.documents_indexed, + state.errors_count + ); + drop(state); + + self.client = None; + info!("Meilisearch sink connector with ID: {} is closed.", self.id); + Ok(()) + } +} + +fn generated_document_id( + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + message: &ConsumedMessage, +) -> String { + let components = json!([ + topic_metadata.stream.as_str(), + topic_metadata.topic.as_str(), + messages_metadata.partition_id, + message.offset, + message.id + ]); + let encoded = serde_json::to_vec(&components) + .map(|bytes| general_purpose::URL_SAFE_NO_PAD.encode(bytes)) + .expect("generated ID components are always JSON-serializable"); + format!("iggy_{encoded}") +} + +fn insert_metadata_field(object: &mut Map, field: &str, value: Value) { + if object.contains_key(field) { + warn!( + "Document already contains Meilisearch metadata field '{field}', preserving original value" + ); + } else { + object.insert(field.to_string(), value); + } +} + +fn sanitize_url_for_log(raw: &str) -> String { + let normalized = normalize_host(raw).unwrap_or_else(|_| raw.trim().to_string()); + let Ok(mut url) = Url::parse(&normalized) else { + return "".to_string(); + }; + + if !url.username().is_empty() { + let _ = url.set_username(""); + } + if url.password().is_some() { + let _ = url.set_password(None); + } + url.to_string() +} + +fn normalize_host(raw: &str) -> Result { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Err(Error::Connection( + "Invalid Meilisearch URL: host cannot be empty".to_string(), + )); + } + + let with_scheme = if trimmed.starts_with("http://") || trimmed.starts_with("https://") { + trimmed.to_string() + } else { + format!("http://{trimmed}") + }; + let url = Url::parse(&with_scheme) + .map_err(|error| Error::Connection(format!("Invalid Meilisearch URL: {error}")))?; + let mut host = url.to_string(); + while host.ends_with('/') { + host.pop(); + } + Ok(host) +} + +#[derive(Debug)] +struct PartialIndexError { + accepted: usize, + error: Error, +} + +fn is_index_not_found(error: &MeilisearchSdkError) -> bool { + matches!( + error, + MeilisearchSdkError::Meilisearch(meilisearch_error) + if meilisearch_error.error_code == MeilisearchErrorCode::IndexNotFound + ) +} + +fn is_transient_sdk_error(error: &MeilisearchSdkError) -> bool { + match error { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + meilisearch_error.error_type == MeilisearchErrorType::Internal + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + communication_error.status_code == 429 || communication_error.status_code >= 500 + } + MeilisearchSdkError::HttpError(_) | MeilisearchSdkError::Timeout => true, + _ => false, + } +} + +struct SinkSdkError(MeilisearchSdkError); + +impl From for SinkSdkError { + fn from(error: MeilisearchSdkError) -> Self { + Self(error) + } +} + +impl From for Error { + fn from(error: SinkSdkError) -> Self { + match error.0 { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + if meilisearch_error.error_type == MeilisearchErrorType::Internal { + Error::HttpRequestFailed(meilisearch_error.to_string()) + } else { + Error::PermanentHttpError(meilisearch_error.to_string()) + } + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + if communication_error.status_code == 429 || communication_error.status_code >= 500 + { + Error::HttpRequestFailed(communication_error.to_string()) + } else { + Error::PermanentHttpError(communication_error.to_string()) + } + } + MeilisearchSdkError::ParseError(error) => { + Error::Serialization(format!("Invalid Meilisearch response: {error}")) + } + MeilisearchSdkError::Timeout => { + Error::HttpRequestFailed("Meilisearch task timed out".to_string()) + } + MeilisearchSdkError::HttpError(error) => Error::HttpRequestFailed(error.to_string()), + other => Error::HttpRequestFailed(other.to_string()), + } + } +} + +fn map_sdk_error(error: MeilisearchSdkError) -> Error { + Error::from(SinkSdkError::from(error)) +} + +#[cfg(test)] +mod tests { + use super::*; + use iggy_connector_sdk::Schema; + + fn topic_metadata() -> TopicMetadata { + TopicMetadata { + stream: "orders.stream".to_string(), + topic: "created/topic".to_string(), + } + } + + fn messages_metadata() -> MessagesMetadata { + MessagesMetadata { + partition_id: 7, + current_offset: 10, + schema: Schema::Json, + } + } + + fn message(payload: Payload) -> ConsumedMessage { + ConsumedMessage { + id: 42, + offset: 11, + checksum: 12, + timestamp: 13, + origin_timestamp: 14, + headers: None, + payload, + } + } + + fn sink_with_config(config: MeilisearchSinkConfig) -> MeilisearchSink { + MeilisearchSink::new(1, config) + } + + fn base_config() -> MeilisearchSinkConfig { + MeilisearchSinkConfig { + url: "http://localhost:7700".to_string(), + index: "messages".to_string(), + api_key: None, + primary_key: None, + document_action: None, + create_index_if_not_exists: None, + include_metadata: None, + batch_size: None, + timeout: None, + wait_for_tasks: None, + task_timeout: None, + task_poll_interval: None, + max_retries: None, + retry_delay: None, + max_retry_delay: None, + max_open_retries: None, + } + } + + #[test] + fn generated_ids_use_meilisearch_safe_characters() { + let id = generated_document_id( + &topic_metadata(), + &messages_metadata(), + &message(Payload::Text("x".to_string())), + ); + + assert!(id.starts_with("iggy_")); + assert!( + id.chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') + ); + } + + #[test] + fn generated_ids_do_not_collapse_sanitized_names() { + let first_topic = TopicMetadata { + stream: "orders.stream".to_string(), + topic: "created/topic".to_string(), + }; + let second_topic = TopicMetadata { + stream: "orders/stream".to_string(), + topic: "created.topic".to_string(), + }; + + let first = generated_document_id( + &first_topic, + &messages_metadata(), + &message(Payload::Text("x".to_string())), + ); + let second = generated_document_id( + &second_topic, + &messages_metadata(), + &message(Payload::Text("x".to_string())), + ); + + assert_ne!(first, second); + } + + #[test] + fn injects_default_primary_key_and_metadata() { + let sink = sink_with_config(base_config()); + let payload = Payload::Json(simd_json::json!({ + "name": "Alice" + })); + let message = message(payload); + let expected_id = generated_document_id(&topic_metadata(), &messages_metadata(), &message); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["name"], "Alice"); + assert_eq!(document["iggy_id"], expected_id); + assert_eq!(document["iggy_offset"], 11); + assert_eq!(document["iggy_stream"], "orders.stream"); + assert_eq!(document["iggy_topic"], "created/topic"); + } + + #[test] + fn preserves_existing_configured_primary_key() { + let mut config = base_config(); + config.primary_key = Some("id".to_string()); + let sink = sink_with_config(config); + let payload = Payload::Json(simd_json::json!({ + "id": "existing", + "name": "Alice" + })); + let message = message(payload); + let expected_id = generated_document_id(&topic_metadata(), &messages_metadata(), &message); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["id"], "existing"); + assert_eq!(document["iggy_id"], expected_id); + } + + #[test] + fn preserves_existing_metadata_fields() { + let sink = sink_with_config(base_config()); + let payload = Payload::Json(simd_json::json!({ + "name": "Alice", + "iggy_offset": 999, + "iggy_stream": "user-stream" + })); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message(payload)) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["iggy_offset"], 999); + assert_eq!(document["iggy_stream"], "user-stream"); + } + + #[test] + fn wraps_non_object_json_payloads() { + let sink = sink_with_config(base_config()); + let message = message(Payload::Json(simd_json::json!(["a", "b"]))); + let expected_id = generated_document_id(&topic_metadata(), &messages_metadata(), &message); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["value"], json!(["a", "b"])); + assert_eq!(document["iggy_id"], expected_id); + } + + #[test] + fn raw_payloads_are_base64_encoded_when_not_json() { + let sink = sink_with_config(base_config()); + + let document = sink + .prepare_document( + &topic_metadata(), + &messages_metadata(), + message(Payload::Raw(vec![0, 1, 2, 3])), + ) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["data"], "AAECAw=="); + assert_eq!(document["data_encoding"], ENCODING_BASE64); + } + + #[test] + fn sanitize_url_should_redact_credentials_without_scheme() { + let url = sanitize_url_for_log("user:pass@localhost:7700/indexes"); + assert_eq!(url, "http://localhost:7700/indexes"); + } + + #[test] + fn unsupported_payloads_return_error() { + let sink = sink_with_config(base_config()); + let error = sink + .prepare_document( + &topic_metadata(), + &messages_metadata(), + message(Payload::Avro(vec![1, 2, 3])), + ) + .expect_err("unsupported payload should fail"); + + assert!(matches!(error, Error::InvalidRecordValue(_))); + } +} diff --git a/core/integration/tests/connectors/fixtures/meilisearch/container.rs b/core/integration/tests/connectors/fixtures/meilisearch/container.rs new file mode 100644 index 0000000000..ab2d34f9aa --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/container.rs @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures; +use integration::harness::TestBinaryError; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use reqwest_retry::RetryTransientMiddleware; +use reqwest_retry::policies::ExponentialBackoff; +use serde::Deserialize; +use std::time::Duration; +use testcontainers_modules::testcontainers::core::wait::HttpWaitStrategy; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tokio::time::sleep; +use tracing::info; +use uuid::Uuid; + +const MEILISEARCH_IMAGE: &str = "getmeili/meilisearch"; +const MEILISEARCH_TAG: &str = "v1.13"; +const MEILISEARCH_PORT: u16 = 7700; +const MEILISEARCH_HEALTH_ENDPOINT: &str = "/health"; +pub const TEST_INDEX: &str = "iggy_messages"; +const POLL_ATTEMPTS: usize = 100; +const POLL_INTERVAL_MS: u64 = 50; + +#[derive(Debug, Deserialize)] +pub struct MeilisearchDocumentsResponse { + pub results: Vec, +} + +pub struct MeilisearchContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub base_url: String, +} + +impl MeilisearchContainer { + pub async fn start() -> Result { + let unique_network = format!("iggy-meilisearch-{}", Uuid::new_v4()); + + let container = GenericImage::new(MEILISEARCH_IMAGE, MEILISEARCH_TAG) + .with_exposed_port(MEILISEARCH_PORT.tcp()) + .with_wait_for(WaitFor::http( + HttpWaitStrategy::new(MEILISEARCH_HEALTH_ENDPOINT) + .with_port(MEILISEARCH_PORT.tcp()) + .with_expected_status_code(200u16), + )) + .with_network(unique_network) + .with_container_name(fixtures::unique_container_name("meilisearch")) + .with_env_var("MEILI_ENV", "development") + .with_mapped_port(0, MEILISEARCH_PORT.tcp()) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + info!("Started Meilisearch container"); + + let mapped_port = container + .ports() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: format!("Failed to get ports: {e}"), + })? + .map_to_host_port_ipv4(MEILISEARCH_PORT) + .ok_or_else(|| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: "No mapping for Meilisearch port".to_string(), + })?; + + let base_url = format!("http://localhost:{mapped_port}"); + info!("Meilisearch container available at {base_url}"); + + Ok(Self { + container, + base_url, + }) + } +} + +pub fn create_http_client() -> HttpClient { + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to build HTTP client"); + reqwest_middleware::ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build() +} + +pub trait MeilisearchOps: Sync { + fn container(&self) -> &MeilisearchContainer; + fn http_client(&self) -> &HttpClient; + + fn list_documents( + &self, + index_name: &str, + ) -> impl std::future::Future, TestBinaryError>> + Send + { + async move { + let url = format!( + "{}/indexes/{}/documents", + self.container().base_url, + index_name + ); + let response = self + .http_client() + .get(&url) + .query(&[("limit", "100")]) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to list Meilisearch documents: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!( + "Failed to list Meilisearch documents: status={status}, body={body}" + ), + }); + } + + response + .json::() + .await + .map(|documents| documents.results) + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse Meilisearch documents response: {e}"), + }) + } + } + + fn wait_for_documents( + &self, + expected_count: usize, + ) -> impl std::future::Future, TestBinaryError>> + Send + { + async move { + let mut last_count = 0usize; + for _ in 0..POLL_ATTEMPTS { + if let Ok(documents) = self.list_documents(TEST_INDEX).await { + last_count = documents.len(); + if documents.len() >= expected_count { + return Ok(documents); + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + Err(TestBinaryError::InvalidState { + message: format!( + "Expected {expected_count} Meilisearch documents, found {last_count} after {POLL_ATTEMPTS} attempts" + ), + }) + } + } +} diff --git a/core/integration/tests/connectors/fixtures/meilisearch/mod.rs b/core/integration/tests/connectors/fixtures/meilisearch/mod.rs new file mode 100644 index 0000000000..ba21bf1587 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod container; +mod sink; + +pub use container::MeilisearchOps; +pub use sink::MeilisearchSinkFixture; diff --git a/core/integration/tests/connectors/fixtures/meilisearch/sink.rs b/core/integration/tests/connectors/fixtures/meilisearch/sink.rs new file mode 100644 index 0000000000..01db6da3ba --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/sink.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::container::{MeilisearchContainer, MeilisearchOps, TEST_INDEX, create_http_client}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture, seeds}; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use std::collections::HashMap; + +const ENV_SINK_URL: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_URL"; +const ENV_SINK_INDEX: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_INDEX"; +const ENV_SINK_PRIMARY_KEY: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_PRIMARY_KEY"; +const ENV_SINK_TASK_POLL_INTERVAL: &str = + "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_TASK_POLL_INTERVAL"; +const ENV_SINK_TASK_TIMEOUT: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_TASK_TIMEOUT"; +const ENV_SINK_STREAMS_0_STREAM: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_STREAMS_0_STREAM"; +const ENV_SINK_STREAMS_0_TOPICS: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_STREAMS_0_TOPICS"; +const ENV_SINK_STREAMS_0_SCHEMA: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_STREAMS_0_SCHEMA"; +const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str = + "IGGY_CONNECTORS_SINK_MEILISEARCH_STREAMS_0_CONSUMER_GROUP"; +const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PATH"; + +pub struct MeilisearchSinkFixture { + container: MeilisearchContainer, + http_client: HttpClient, +} + +impl MeilisearchOps for MeilisearchSinkFixture { + fn container(&self) -> &MeilisearchContainer { + &self.container + } + + fn http_client(&self) -> &HttpClient { + &self.http_client + } +} + +#[async_trait] +impl TestFixture for MeilisearchSinkFixture { + async fn setup() -> Result { + let container = MeilisearchContainer::start().await?; + let http_client = create_http_client(); + + Ok(Self { + container, + http_client, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + HashMap::from([ + (ENV_SINK_URL.to_string(), self.container.base_url.clone()), + (ENV_SINK_INDEX.to_string(), TEST_INDEX.to_string()), + (ENV_SINK_PRIMARY_KEY.to_string(), "iggy_id".to_string()), + (ENV_SINK_TASK_TIMEOUT.to_string(), "10s".to_string()), + (ENV_SINK_TASK_POLL_INTERVAL.to_string(), "25ms".to_string()), + ( + ENV_SINK_STREAMS_0_STREAM.to_string(), + seeds::names::STREAM.to_string(), + ), + ( + ENV_SINK_STREAMS_0_TOPICS.to_string(), + format!("[{}]", seeds::names::TOPIC), + ), + (ENV_SINK_STREAMS_0_SCHEMA.to_string(), "json".to_string()), + ( + ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(), + "meilisearch_sink_cg".to_string(), + ), + ( + ENV_SINK_PATH.to_string(), + "../../target/debug/libiggy_connector_meilisearch_sink".to_string(), + ), + ]) + } +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 0b2f264d03..c07641b94c 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -23,6 +23,7 @@ mod elasticsearch; mod http; mod iceberg; mod influxdb; +mod meilisearch; mod mongodb; mod postgres; mod quickwit; @@ -62,6 +63,7 @@ pub use influxdb::{ InfluxDbSinkNoMetadataFixture, InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture, InfluxDbSourceRawFixture, InfluxDbSourceTextFixture, }; +pub use meilisearch::{MeilisearchOps, MeilisearchSinkFixture}; pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture, diff --git a/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs b/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs new file mode 100644 index 0000000000..dba06a3f7c --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures::{MeilisearchOps, MeilisearchSinkFixture}; +use bytes::Bytes; +use iggy::prelude::{IggyMessage, Partitioning}; +use iggy_common::{Identifier, MessageClient}; +use integration::harness::seeds; +use integration::iggy_harness; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/meilisearch/sink.toml")), + seed = seeds::connector_stream +)] +async fn meilisearch_sink_indexes_json_messages( + harness: &TestHarness, + fixture: MeilisearchSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let payloads = [ + serde_json::json!({"name": "first", "category": "alpha"}), + serde_json::json!({"name": "second", "category": "beta"}), + ]; + + let mut messages = payloads + .iter() + .enumerate() + .map(|(i, payload)| { + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(serde_json::to_vec(payload).expect("serialize"))) + .build() + .expect("build message") + }) + .collect::>(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("send messages"); + + let documents = fixture + .wait_for_documents(payloads.len()) + .await + .expect("wait for Meilisearch documents"); + + assert_eq!(documents.len(), payloads.len()); + assert!(documents.iter().any(|document| document["name"] == "first")); + assert!( + documents + .iter() + .any(|document| document["name"] == "second") + ); + assert!( + documents + .iter() + .all(|document| document["iggy_id"].as_str().is_some()) + ); +} diff --git a/core/integration/tests/connectors/meilisearch/mod.rs b/core/integration/tests/connectors/meilisearch/mod.rs new file mode 100644 index 0000000000..fd50dc3671 --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod meilisearch_sink; diff --git a/core/integration/tests/connectors/meilisearch/sink.toml b/core/integration/tests/connectors/meilisearch/sink.toml new file mode 100644 index 0000000000..0a846ac965 --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/sink.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sinks/meilisearch_sink" diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index fc624f897f..b5afedcbf6 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -24,6 +24,7 @@ mod http; mod http_config_provider; mod iceberg; mod influxdb; +mod meilisearch; mod mongodb; mod postgres; mod quickwit; From 7c306ab10e7028793b9c33c13359048cf0b5c156 Mon Sep 17 00:00:00 2001 From: radudiaconu Date: Tue, 16 Jun 2026 11:01:20 +0300 Subject: [PATCH 2/7] fix(connectors): address meilisearch sink review --- Cargo.lock | 2 - .../sinks/meilisearch_sink/Cargo.toml | 5 - .../sinks/meilisearch_sink/README.md | 3 +- .../sinks/meilisearch_sink/src/lib.rs | 201 +++++++++--------- 4 files changed, 103 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be23d9a18c..745c557274 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6931,11 +6931,9 @@ version = "0.4.0" dependencies = [ "async-trait", "base64", - "dashmap", "iggy_common", "iggy_connector_sdk", "meilisearch-sdk", - "once_cell", "reqwest 0.13.4", "secrecy", "serde", diff --git a/core/connectors/sinks/meilisearch_sink/Cargo.toml b/core/connectors/sinks/meilisearch_sink/Cargo.toml index 8bda76a40d..b7e1a66a31 100644 --- a/core/connectors/sinks/meilisearch_sink/Cargo.toml +++ b/core/connectors/sinks/meilisearch_sink/Cargo.toml @@ -29,20 +29,15 @@ repository = "https://github.com/apache/iggy" readme = "../../README.md" publish = false -[package.metadata.cargo-machete] -ignored = ["dashmap", "once_cell"] - [lib] crate-type = ["cdylib", "lib"] [dependencies] async-trait = { workspace = true } base64 = { workspace = true } -dashmap = { workspace = true } iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true } meilisearch-sdk = { workspace = true } -once_cell = { workspace = true } reqwest = { workspace = true } secrecy = { workspace = true } serde = { workspace = true } diff --git a/core/connectors/sinks/meilisearch_sink/README.md b/core/connectors/sinks/meilisearch_sink/README.md index 7a17d2a41f..24982ebd93 100644 --- a/core/connectors/sinks/meilisearch_sink/README.md +++ b/core/connectors/sinks/meilisearch_sink/README.md @@ -28,7 +28,8 @@ JSON object payloads are indexed as documents. JSON arrays or scalar values are wrapped in a `value` field because Meilisearch documents must be objects. Raw payloads are parsed as JSON when possible; otherwise, they are indexed as base64 data. Text payloads are indexed in a `text` field. Unsupported payload schemas -fail the batch instead of being silently dropped. +are skipped with a warning and counted as sink errors, matching the connector +runtime's per-record drop behavior for malformed records. When the configured primary key is absent, the connector injects a stable value derived from the exact Iggy stream, topic, partition, offset, and message ID. diff --git a/core/connectors/sinks/meilisearch_sink/src/lib.rs b/core/connectors/sinks/meilisearch_sink/src/lib.rs index 897e873e42..762f6d72ee 100644 --- a/core/connectors/sinks/meilisearch_sink/src/lib.rs +++ b/core/connectors/sinks/meilisearch_sink/src/lib.rs @@ -38,9 +38,12 @@ use reqwest::Url; use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value, json}; -use std::{future::Future, time::Duration}; -use tokio::sync::Mutex; -use tracing::{info, warn}; +use std::{cmp, future::Future, time::Duration}; +use tokio::{ + sync::Mutex, + time::{Instant, sleep}, +}; +use tracing::{debug, info, warn}; sink_connector!(MeilisearchSink); @@ -224,32 +227,14 @@ impl MeilisearchSink { } async fn get_index_if_exists(&self, client: &Client) -> Result, Error> { - let max_attempts = self.config.max_open_retries.max(1); - let mut attempt = 0u32; - - loop { + self.retry_sdk_open_operation("get index", || async { match client.get_index(&self.config.index).await { - Ok(index) => return Ok(Some(index)), - Err(error) if is_index_not_found(&error) => return Ok(None), - Err(error) => { - attempt += 1; - let should_retry = attempt < max_attempts && is_transient_sdk_error(&error); - if !should_retry { - return Err(map_sdk_error(error)); - } - let delay = jitter(exponential_backoff( - self.config.retry_delay, - attempt, - self.config.max_retry_delay, - )); - warn!( - "Meilisearch get index '{}' failed (attempt {attempt}/{max_attempts}): {error}. Retrying in {delay:?}...", - self.config.index - ); - tokio::time::sleep(delay).await; - } + Ok(index) => Ok(Some(index)), + Err(error) if is_index_not_found(&error) => Ok(None), + Err(error) => Err(error), } - } + }) + .await } async fn create_index(&self, client: &Client) -> Result<(), Error> { @@ -294,17 +279,23 @@ impl MeilisearchSink { let mut bytes_copy = bytes.clone(); match simd_json::from_slice::(&mut bytes_copy) { Ok(value) => Self::document_from_json_value(owned_value_to_serde_json(&value)), - Err(_) => json!({ - "data": general_purpose::STANDARD.encode(&bytes), - "data_type": "raw", - "data_encoding": ENCODING_BASE64, - }), + Err(_) => Map::from_iter([ + ( + "data".to_string(), + Value::String(general_purpose::STANDARD.encode(&bytes)), + ), + ("data_type".to_string(), Value::String("raw".to_string())), + ( + "data_encoding".to_string(), + Value::String(ENCODING_BASE64.to_string()), + ), + ]), } } - Payload::Text(text) => json!({ - "text": text, - "data_type": "text", - }), + Payload::Text(text) => Map::from_iter([ + ("text".to_string(), Value::String(text)), + ("data_type".to_string(), Value::String("text".to_string())), + ]), _ => { return Err(Error::InvalidRecordValue(format!( "Unsupported payload format for Meilisearch sink: {}", @@ -313,64 +304,64 @@ impl MeilisearchSink { } }; - let object = document - .as_object_mut() - .expect("document_from_json_value always returns an object"); - - object + document .entry(self.config.primary_key.clone()) .or_insert_with(|| Value::String(generated_id.clone())); if self.config.include_metadata { - object + document .entry(DEFAULT_PRIMARY_KEY.to_string()) .or_insert_with(|| Value::String(generated_id)); - insert_metadata_field(object, "iggy_message_id", Value::String(id.to_string())); - insert_metadata_field(object, "iggy_offset", Value::from(offset)); insert_metadata_field( - object, + &mut document, + "iggy_message_id", + Value::String(id.to_string()), + ); + insert_metadata_field(&mut document, "iggy_offset", Value::from(offset)); + insert_metadata_field( + &mut document, "iggy_stream", Value::from(topic_metadata.stream.as_str()), ); insert_metadata_field( - object, + &mut document, "iggy_topic", Value::from(topic_metadata.topic.as_str()), ); insert_metadata_field( - object, + &mut document, "iggy_partition", Value::from(messages_metadata.partition_id), ); - insert_metadata_field(object, "iggy_checksum", Value::from(checksum)); - insert_metadata_field(object, "iggy_timestamp", Value::from(timestamp)); + insert_metadata_field(&mut document, "iggy_checksum", Value::from(checksum)); + insert_metadata_field(&mut document, "iggy_timestamp", Value::from(timestamp)); insert_metadata_field( - object, + &mut document, "iggy_origin_timestamp", Value::from(origin_timestamp), ); insert_metadata_field( - object, + &mut document, "iggy_ingested_at", Value::from(IggyTimestamp::now().as_millis() as i64), ); if let Some(headers) = &headers && let Ok(headers_value) = serde_json::to_value(headers) { - insert_metadata_field(object, "iggy_headers", headers_value); + insert_metadata_field(&mut document, "iggy_headers", headers_value); } } - Ok(Some(document)) + Ok(Some(Value::Object(document))) } - fn document_from_json_value(value: Value) -> Value { + fn document_from_json_value(value: Value) -> Map { match value { - Value::Object(_) => value, + Value::Object(object) => object, other => { let mut object = Map::new(); object.insert("value".to_string(), other); - Value::Object(object) + object } } } @@ -495,9 +486,12 @@ impl MeilisearchSink { max_attempts: u32, ) -> Result { let task_uid = task.get_task_uid(); - let mut elapsed_time = Duration::ZERO; + let started = Instant::now(); - while self.config.task_timeout > elapsed_time { + loop { + if started.elapsed() >= self.config.task_timeout { + break; + } let status = self .retry_sdk_operation_with_attempts("get task status", max_attempts, || { client.get_task(task.clone()) @@ -508,8 +502,11 @@ impl MeilisearchSink { return Ok(status); } - tokio::time::sleep(self.config.task_poll_interval).await; - elapsed_time += self.config.task_poll_interval; + let remaining = self.config.task_timeout.saturating_sub(started.elapsed()); + if remaining.is_zero() { + break; + } + sleep(cmp::min(self.config.task_poll_interval, remaining)).await; } Err(Error::HttpRequestFailed(format!( @@ -558,7 +555,6 @@ impl MeilisearchSink { Op: FnMut() -> Fut, Fut: Future>, { - let max_attempts = max_attempts.max(1); let mut attempt = 0u32; loop { @@ -579,7 +575,7 @@ impl MeilisearchSink { warn!( "Meilisearch {operation} failed (attempt {attempt}/{max_attempts}): {error}. Retrying in {delay:?}..." ); - tokio::time::sleep(delay).await; + sleep(delay).await; } Err(_) => { attempt += 1; @@ -598,7 +594,7 @@ impl MeilisearchSink { "Meilisearch {operation} timed out after {:?} (attempt {attempt}/{max_attempts}). Retrying in {delay:?}...", self.config.timeout ); - tokio::time::sleep(delay).await; + sleep(delay).await; } } } @@ -749,7 +745,7 @@ fn generated_document_id( fn insert_metadata_field(object: &mut Map, field: &str, value: Value) { if object.contains_key(field) { - warn!( + debug!( "Document already contains Meilisearch metadata field '{field}', preserving original value" ); } else { @@ -821,48 +817,33 @@ fn is_transient_sdk_error(error: &MeilisearchSdkError) -> bool { } } -struct SinkSdkError(MeilisearchSdkError); - -impl From for SinkSdkError { - fn from(error: MeilisearchSdkError) -> Self { - Self(error) - } -} - -impl From for Error { - fn from(error: SinkSdkError) -> Self { - match error.0 { - MeilisearchSdkError::Meilisearch(meilisearch_error) => { - if meilisearch_error.error_type == MeilisearchErrorType::Internal { - Error::HttpRequestFailed(meilisearch_error.to_string()) - } else { - Error::PermanentHttpError(meilisearch_error.to_string()) - } - } - MeilisearchSdkError::MeilisearchCommunication(communication_error) => { - if communication_error.status_code == 429 || communication_error.status_code >= 500 - { - Error::HttpRequestFailed(communication_error.to_string()) - } else { - Error::PermanentHttpError(communication_error.to_string()) - } - } - MeilisearchSdkError::ParseError(error) => { - Error::Serialization(format!("Invalid Meilisearch response: {error}")) +fn map_sdk_error(error: MeilisearchSdkError) -> Error { + match error { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + if meilisearch_error.error_type == MeilisearchErrorType::Internal { + Error::HttpRequestFailed(meilisearch_error.to_string()) + } else { + Error::PermanentHttpError(meilisearch_error.to_string()) } - MeilisearchSdkError::Timeout => { - Error::HttpRequestFailed("Meilisearch task timed out".to_string()) + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + if communication_error.status_code == 429 || communication_error.status_code >= 500 { + Error::HttpRequestFailed(communication_error.to_string()) + } else { + Error::PermanentHttpError(communication_error.to_string()) } - MeilisearchSdkError::HttpError(error) => Error::HttpRequestFailed(error.to_string()), - other => Error::HttpRequestFailed(other.to_string()), } + MeilisearchSdkError::ParseError(error) => { + Error::Serialization(format!("Invalid Meilisearch response: {error}")) + } + MeilisearchSdkError::Timeout => { + Error::HttpRequestFailed("Meilisearch task timed out".to_string()) + } + MeilisearchSdkError::HttpError(error) => Error::HttpRequestFailed(error.to_string()), + other => Error::HttpRequestFailed(other.to_string()), } } -fn map_sdk_error(error: MeilisearchSdkError) -> Error { - Error::from(SinkSdkError::from(error)) -} - #[cfg(test)] mod tests { use super::*; @@ -1020,6 +1001,26 @@ mod tests { assert_eq!(document["iggy_stream"], "user-stream"); } + #[test] + fn omits_metadata_when_include_metadata_is_false() { + let mut config = base_config(); + config.include_metadata = Some(false); + let sink = sink_with_config(config); + let payload = Payload::Json(simd_json::json!({ + "name": "Alice" + })); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message(payload)) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["name"], "Alice"); + assert!(document["iggy_id"].as_str().is_some()); + assert!(document.get("iggy_offset").is_none()); + assert!(document.get("iggy_stream").is_none()); + } + #[test] fn wraps_non_object_json_payloads() { let sink = sink_with_config(base_config()); From aa8ba69e1bd2f00e9e537fe968adea3c33df0144 Mon Sep 17 00:00:00 2001 From: Diaconu Radu-Mihai <52667211+countradooku@users.noreply.github.com> Date: Wed, 17 Jun 2026 12:12:07 +0300 Subject: [PATCH 3/7] fix(connectors): address meilisearch sink review feedback --- .../sinks/meilisearch_sink/README.md | 7 +- .../sinks/meilisearch_sink/src/lib.rs | 107 +++++++++--------- 2 files changed, 59 insertions(+), 55 deletions(-) diff --git a/core/connectors/sinks/meilisearch_sink/README.md b/core/connectors/sinks/meilisearch_sink/README.md index 24982ebd93..62ad6d8abb 100644 --- a/core/connectors/sinks/meilisearch_sink/README.md +++ b/core/connectors/sinks/meilisearch_sink/README.md @@ -17,10 +17,10 @@ Meilisearch index through the official Rust SDK. - `wait_for_tasks`: Poll Meilisearch tasks until terminal state before returning from `consume()`. Defaults to `true`. - `task_timeout`: Maximum time to wait for each Meilisearch task. Defaults to `30s`. - `task_poll_interval`: Delay between task polls. Defaults to `100ms`. -- `max_retries`: Maximum transient retry attempts. Defaults to `3`. +- `max_retries`: Maximum transient retries after the initial request. Defaults to `3`. - `retry_delay`: Initial transient retry delay. Defaults to `500ms`. - `max_retry_delay`: Maximum transient retry delay. Defaults to `5s`. -- `max_open_retries`: Maximum transient retry attempts while opening the index. Defaults to `5`. +- `max_open_retries`: Maximum transient retries after the initial request while opening the index. Defaults to `5`. ## Behavior @@ -38,7 +38,8 @@ delivery idempotent for the same message. When `include_metadata` is enabled, metadata fields are only inserted when the document does not already contain those names. Existing user fields are -preserved. +preserved. If `primary_key` is set to a field other than `iggy_id`, the +connector also inserts `iggy_id` as stable Iggy metadata when absent. `wait_for_tasks=false` only skips waiting for document indexing tasks during `consume()`. If `create_index_if_not_exists=true` and the connector creates the diff --git a/core/connectors/sinks/meilisearch_sink/src/lib.rs b/core/connectors/sinks/meilisearch_sink/src/lib.rs index 762f6d72ee..3b5b0fbba9 100644 --- a/core/connectors/sinks/meilisearch_sink/src/lib.rs +++ b/core/connectors/sinks/meilisearch_sink/src/lib.rs @@ -145,14 +145,11 @@ impl From for ResolvedMeilisearchSinkConfig { config.task_poll_interval.as_deref(), DEFAULT_TASK_POLL_INTERVAL, ); - let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES).max(1); + let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); let max_retry_delay = parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); - let max_open_retries = config - .max_open_retries - .unwrap_or(DEFAULT_MAX_OPEN_RETRIES) - .max(1); + let max_open_retries = config.max_open_retries.unwrap_or(DEFAULT_MAX_OPEN_RETRIES); Self { url: config.url, @@ -259,7 +256,7 @@ impl MeilisearchSink { topic_metadata: &TopicMetadata, messages_metadata: &MessagesMetadata, message: ConsumedMessage, - ) -> Result, Error> { + ) -> Result { let generated_id = generated_document_id(topic_metadata, messages_metadata, &message); let ConsumedMessage { id, @@ -309,9 +306,11 @@ impl MeilisearchSink { .or_insert_with(|| Value::String(generated_id.clone())); if self.config.include_metadata { - document - .entry(DEFAULT_PRIMARY_KEY.to_string()) - .or_insert_with(|| Value::String(generated_id)); + if self.config.primary_key != DEFAULT_PRIMARY_KEY { + document + .entry(DEFAULT_PRIMARY_KEY.to_string()) + .or_insert_with(|| Value::String(generated_id)); + } insert_metadata_field( &mut document, "iggy_message_id", @@ -352,7 +351,7 @@ impl MeilisearchSink { } } - Ok(Some(Value::Object(document))) + Ok(Value::Object(document)) } fn document_from_json_value(value: Value) -> Map { @@ -483,20 +482,28 @@ impl MeilisearchSink { &self, client: &Client, task: TaskInfo, - max_attempts: u32, + max_retries: u32, ) -> Result { let task_uid = task.get_task_uid(); let started = Instant::now(); loop { - if started.elapsed() >= self.config.task_timeout { + let remaining = self.config.task_timeout.saturating_sub(started.elapsed()); + if remaining.is_zero() { break; } - let status = self - .retry_sdk_operation_with_attempts("get task status", max_attempts, || { + + let status = match tokio::time::timeout( + remaining, + self.retry_sdk_operation_with_retries("get task status", max_retries, || { client.get_task(task.clone()) - }) - .await?; + }), + ) + .await + { + Ok(status) => status?, + Err(_) => break, + }; if status.is_success() || status.is_failure() { return Ok(status); @@ -524,7 +531,7 @@ impl MeilisearchSink { Op: FnMut() -> Fut, Fut: Future>, { - self.retry_sdk_operation_with_attempts(operation, self.config.max_retries, operation_fn) + self.retry_sdk_operation_with_retries(operation, self.config.max_retries, operation_fn) .await } @@ -537,61 +544,57 @@ impl MeilisearchSink { Op: FnMut() -> Fut, Fut: Future>, { - self.retry_sdk_operation_with_attempts( - operation, - self.config.max_open_retries, - operation_fn, - ) - .await + self.retry_sdk_operation_with_retries(operation, self.config.max_open_retries, operation_fn) + .await } - async fn retry_sdk_operation_with_attempts( + async fn retry_sdk_operation_with_retries( &self, operation: &str, - max_attempts: u32, + max_retries: u32, mut operation_fn: Op, ) -> Result where Op: FnMut() -> Fut, Fut: Future>, { - let mut attempt = 0u32; + let mut retries = 0u32; loop { let result = tokio::time::timeout(self.config.timeout, operation_fn()).await; match result { Ok(Ok(value)) => return Ok(value), Ok(Err(error)) => { - attempt += 1; - let should_retry = attempt < max_attempts && is_transient_sdk_error(&error); + let should_retry = retries < max_retries && is_transient_sdk_error(&error); if !should_retry { return Err(map_sdk_error(error)); } + retries += 1; let delay = jitter(exponential_backoff( self.config.retry_delay, - attempt, + retries, self.config.max_retry_delay, )); warn!( - "Meilisearch {operation} failed (attempt {attempt}/{max_attempts}): {error}. Retrying in {delay:?}..." + "Meilisearch {operation} failed (retry {retries}/{max_retries}): {error}. Retrying in {delay:?}..." ); sleep(delay).await; } Err(_) => { - attempt += 1; - if attempt >= max_attempts { + if retries >= max_retries { return Err(Error::HttpRequestFailed(format!( "Meilisearch {operation} timed out after {:?}", self.config.timeout ))); } + retries += 1; let delay = jitter(exponential_backoff( self.config.retry_delay, - attempt, + retries, self.config.max_retry_delay, )); warn!( - "Meilisearch {operation} timed out after {:?} (attempt {attempt}/{max_attempts}). Retrying in {delay:?}...", + "Meilisearch {operation} timed out after {:?} (retry {retries}/{max_retries}). Retrying in {delay:?}...", self.config.timeout ); sleep(delay).await; @@ -661,8 +664,7 @@ impl Sink for MeilisearchSink { let mut invalid_records = 0usize; for message in messages { match self.prepare_document(topic_metadata, &messages_metadata, message) { - Ok(Some(document)) => documents.push(document), - Ok(None) => {} + Ok(document) => documents.push(document), Err(Error::InvalidRecordValue(reason)) => { invalid_records += 1; warn!( @@ -697,9 +699,6 @@ impl Sink for MeilisearchSink { Err(partial_error) => { let mut state = self.state.lock().await; state.documents_enqueued += partial_error.accepted; - if self.config.wait_for_tasks { - state.documents_indexed += partial_error.accepted; - } state.errors_count += 1; drop(state); Err(partial_error.error) @@ -735,7 +734,7 @@ fn generated_document_id( topic_metadata.topic.as_str(), messages_metadata.partition_id, message.offset, - message.id + message.id.to_string() ]); let encoded = serde_json::to_vec(&components) .map(|bytes| general_purpose::URL_SAFE_NO_PAD.encode(bytes)) @@ -941,6 +940,16 @@ mod tests { assert_ne!(first, second); } + #[test] + fn generated_ids_support_u128_message_ids() { + let mut message = message(Payload::Text("x".to_string())); + message.id = u128::MAX; + + let id = generated_document_id(&topic_metadata(), &messages_metadata(), &message); + + assert!(id.starts_with("iggy_")); + } + #[test] fn injects_default_primary_key_and_metadata() { let sink = sink_with_config(base_config()); @@ -952,8 +961,7 @@ mod tests { let document = sink .prepare_document(&topic_metadata(), &messages_metadata(), message) - .expect("prepare document") - .expect("document"); + .expect("prepare document"); assert_eq!(document["name"], "Alice"); assert_eq!(document["iggy_id"], expected_id); @@ -976,8 +984,7 @@ mod tests { let document = sink .prepare_document(&topic_metadata(), &messages_metadata(), message) - .expect("prepare document") - .expect("document"); + .expect("prepare document"); assert_eq!(document["id"], "existing"); assert_eq!(document["iggy_id"], expected_id); @@ -994,8 +1001,7 @@ mod tests { let document = sink .prepare_document(&topic_metadata(), &messages_metadata(), message(payload)) - .expect("prepare document") - .expect("document"); + .expect("prepare document"); assert_eq!(document["iggy_offset"], 999); assert_eq!(document["iggy_stream"], "user-stream"); @@ -1012,8 +1018,7 @@ mod tests { let document = sink .prepare_document(&topic_metadata(), &messages_metadata(), message(payload)) - .expect("prepare document") - .expect("document"); + .expect("prepare document"); assert_eq!(document["name"], "Alice"); assert!(document["iggy_id"].as_str().is_some()); @@ -1029,8 +1034,7 @@ mod tests { let document = sink .prepare_document(&topic_metadata(), &messages_metadata(), message) - .expect("prepare document") - .expect("document"); + .expect("prepare document"); assert_eq!(document["value"], json!(["a", "b"])); assert_eq!(document["iggy_id"], expected_id); @@ -1046,8 +1050,7 @@ mod tests { &messages_metadata(), message(Payload::Raw(vec![0, 1, 2, 3])), ) - .expect("prepare document") - .expect("document"); + .expect("prepare document"); assert_eq!(document["data"], "AAECAw=="); assert_eq!(document["data_encoding"], ENCODING_BASE64); From 4284fe63ebe010b85b73232953da56f78f2d840e Mon Sep 17 00:00:00 2001 From: Diaconu Radu-Mihai <52667211+countradooku@users.noreply.github.com> Date: Fri, 19 Jun 2026 12:06:49 +0300 Subject: [PATCH 4/7] fix(connectors): address meilisearch sink follow-up review --- Cargo.lock | 2 +- .../sinks/meilisearch_sink/Cargo.toml | 2 +- .../sinks/meilisearch_sink/README.md | 15 +- .../sinks/meilisearch_sink/config.toml | 2 +- .../sinks/meilisearch_sink/src/lib.rs | 142 ++++++++++++++---- .../meilisearch/meilisearch_sink.rs | 2 +- 6 files changed, 122 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 745c557274..73bde7e40e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6934,13 +6934,13 @@ dependencies = [ "iggy_common", "iggy_connector_sdk", "meilisearch-sdk", - "reqwest 0.13.4", "secrecy", "serde", "serde_json", "simd-json", "tokio", "tracing", + "url", ] [[package]] diff --git a/core/connectors/sinks/meilisearch_sink/Cargo.toml b/core/connectors/sinks/meilisearch_sink/Cargo.toml index b7e1a66a31..deeb19f454 100644 --- a/core/connectors/sinks/meilisearch_sink/Cargo.toml +++ b/core/connectors/sinks/meilisearch_sink/Cargo.toml @@ -38,10 +38,10 @@ base64 = { workspace = true } iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true } meilisearch-sdk = { workspace = true } -reqwest = { workspace = true } secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } simd-json = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +url = { workspace = true } diff --git a/core/connectors/sinks/meilisearch_sink/README.md b/core/connectors/sinks/meilisearch_sink/README.md index 62ad6d8abb..a3247ea123 100644 --- a/core/connectors/sinks/meilisearch_sink/README.md +++ b/core/connectors/sinks/meilisearch_sink/README.md @@ -5,7 +5,7 @@ Meilisearch index through the official Rust SDK. ## Configuration -- `url`: Meilisearch base URL. +- `url`: Meilisearch base URL. Paths, query strings, and fragments are ignored. - `index`: Target index UID. - `api_key`: Optional Meilisearch API key sent as `Authorization: Bearer`. - `primary_key`: Index primary key field. Defaults to `iggy_id`. @@ -14,13 +14,13 @@ Meilisearch index through the official Rust SDK. - `include_metadata`: Add Iggy metadata fields to each document. Defaults to `true`. - `batch_size`: Maximum documents per Meilisearch document request. Defaults to `1000`. - `timeout`: Request timeout as a humantime string, for example `30s`. Defaults to `30s`. -- `wait_for_tasks`: Poll Meilisearch tasks until terminal state before returning from `consume()`. Defaults to `true`. +- `wait_for_tasks`: Poll Meilisearch tasks until terminal state before returning from `consume()`. Defaults to `true`. Setting this to `false` makes document indexing fire-and-forget, so asynchronous Meilisearch task failures are not observed by the connector. - `task_timeout`: Maximum time to wait for each Meilisearch task. Defaults to `30s`. - `task_poll_interval`: Delay between task polls. Defaults to `100ms`. - `max_retries`: Maximum transient retries after the initial request. Defaults to `3`. - `retry_delay`: Initial transient retry delay. Defaults to `500ms`. - `max_retry_delay`: Maximum transient retry delay. Defaults to `5s`. -- `max_open_retries`: Maximum transient retries after the initial request while opening the index. Defaults to `5`. +- `max_open_retries`: Maximum transient retries after the initial request while opening the index. Defaults to `5`. This also applies to `get_task` polls while waiting for index creation during `open()`. ## Behavior @@ -42,6 +42,9 @@ preserved. If `primary_key` is set to a field other than `iggy_id`, the connector also inserts `iggy_id` as stable Iggy metadata when absent. `wait_for_tasks=false` only skips waiting for document indexing tasks during -`consume()`. If `create_index_if_not_exists=true` and the connector creates the -index during `open()`, it still waits for that index-creation task so the first -batch cannot race the index creation. +`consume()`. In that mode, successful submission lets the runtime commit the +consumer offset before Meilisearch has confirmed indexing, so later task +failures are not retried, logged, or counted by this connector. If +`create_index_if_not_exists=true` and the connector creates the index during +`open()`, it still waits for that index-creation task so the first batch cannot +race the index creation. diff --git a/core/connectors/sinks/meilisearch_sink/config.toml b/core/connectors/sinks/meilisearch_sink/config.toml index 6f29014616..45d62a5999 100644 --- a/core/connectors/sinks/meilisearch_sink/config.toml +++ b/core/connectors/sinks/meilisearch_sink/config.toml @@ -38,6 +38,6 @@ primary_key = "iggy_id" document_action = "replace" create_index_if_not_exists = true include_metadata = true -batch_size = 100 +batch_size = 1000 timeout = "30s" wait_for_tasks = true diff --git a/core/connectors/sinks/meilisearch_sink/src/lib.rs b/core/connectors/sinks/meilisearch_sink/src/lib.rs index 3b5b0fbba9..220c07a8ff 100644 --- a/core/connectors/sinks/meilisearch_sink/src/lib.rs +++ b/core/connectors/sinks/meilisearch_sink/src/lib.rs @@ -34,7 +34,6 @@ use meilisearch_sdk::{ task_info::TaskInfo, tasks::Task, }; -use reqwest::Url; use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value, json}; @@ -43,7 +42,8 @@ use tokio::{ sync::Mutex, time::{Instant, sleep}, }; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; +use url::Url; sink_connector!(MeilisearchSink); @@ -65,7 +65,7 @@ const ENCODING_BASE64: &str = "base64"; struct State { invocations_count: usize, documents_enqueued: usize, - documents_indexed: usize, + documents_confirmed: usize, errors_count: usize, } @@ -181,7 +181,7 @@ impl MeilisearchSink { state: Mutex::new(State { invocations_count: 0, documents_enqueued: 0, - documents_indexed: 0, + documents_confirmed: 0, errors_count: 0, }), } @@ -257,9 +257,8 @@ impl MeilisearchSink { messages_metadata: &MessagesMetadata, message: ConsumedMessage, ) -> Result { - let generated_id = generated_document_id(topic_metadata, messages_metadata, &message); let ConsumedMessage { - id, + id: message_id, offset, checksum, timestamp, @@ -301,20 +300,39 @@ impl MeilisearchSink { } }; - document - .entry(self.config.primary_key.clone()) - .or_insert_with(|| Value::String(generated_id.clone())); + let mut generated_id = None; + if !document.contains_key(self.config.primary_key.as_str()) { + let value = generated_document_id_from_parts( + topic_metadata, + messages_metadata, + offset, + message_id, + ); + document.insert( + self.config.primary_key.clone(), + Value::String(value.clone()), + ); + generated_id = Some(value); + } if self.config.include_metadata { - if self.config.primary_key != DEFAULT_PRIMARY_KEY { - document - .entry(DEFAULT_PRIMARY_KEY.to_string()) - .or_insert_with(|| Value::String(generated_id)); + if self.config.primary_key != DEFAULT_PRIMARY_KEY + && !document.contains_key(DEFAULT_PRIMARY_KEY) + { + let id = generated_id.get_or_insert_with(|| { + generated_document_id_from_parts( + topic_metadata, + messages_metadata, + offset, + message_id, + ) + }); + document.insert(DEFAULT_PRIMARY_KEY.to_string(), Value::String(id.clone())); } insert_metadata_field( &mut document, "iggy_message_id", - Value::String(id.to_string()), + Value::String(message_id.to_string()), ); insert_metadata_field(&mut document, "iggy_offset", Value::from(offset)); insert_metadata_field( @@ -377,6 +395,7 @@ impl MeilisearchSink { Err(partial_error) => { return Err(PartialIndexError { accepted: accepted + partial_error.accepted, + failed: partial_error.failed, error: partial_error.error, }); } @@ -409,11 +428,16 @@ impl MeilisearchSink { .await } } - .map_err(|error| PartialIndexError { accepted: 0, error })?; + .map_err(|error| PartialIndexError { + accepted: 0, + failed: documents.len(), + error, + })?; self.wait_for_task(client, task) .await .map_err(|error| PartialIndexError { accepted: documents.len(), + failed: documents.len(), error, })?; Ok(documents.len()) @@ -482,7 +506,7 @@ impl MeilisearchSink { &self, client: &Client, task: TaskInfo, - max_retries: u32, + max_get_task_retries: u32, ) -> Result { let task_uid = task.get_task_uid(); let started = Instant::now(); @@ -495,9 +519,11 @@ impl MeilisearchSink { let status = match tokio::time::timeout( remaining, - self.retry_sdk_operation_with_retries("get task status", max_retries, || { - client.get_task(task.clone()) - }), + self.retry_sdk_operation_with_retries( + "get task status", + max_get_task_retries, + || client.get_task(TaskUid(task_uid)), + ), ) .await { @@ -558,6 +584,7 @@ impl MeilisearchSink { Op: FnMut() -> Fut, Fut: Future>, { + // One retry budget covers both SDK errors and per-attempt timeouts. let mut retries = 0u32; loop { @@ -688,7 +715,7 @@ impl Sink for MeilisearchSink { let mut state = self.state.lock().await; state.documents_enqueued += accepted; if self.config.wait_for_tasks { - state.documents_indexed += accepted; + state.documents_confirmed += accepted; } info!( "Accepted {} of {} messages into Meilisearch index '{}'", @@ -699,8 +726,16 @@ impl Sink for MeilisearchSink { Err(partial_error) => { let mut state = self.state.lock().await; state.documents_enqueued += partial_error.accepted; - state.errors_count += 1; + state.errors_count += partial_error.failed; drop(state); + error!( + "Failed to index Meilisearch sink batch for connector ID: {}, index: {}, accepted: {}, failed: {}, error: {}", + self.id, + self.config.index, + partial_error.accepted, + partial_error.failed, + partial_error.error + ); Err(partial_error.error) } } @@ -709,11 +744,11 @@ impl Sink for MeilisearchSink { async fn close(&mut self) -> Result<(), Error> { let state = self.state.lock().await; info!( - "Meilisearch sink connector with ID: {} is closing. Stats: {} invocations, {} documents enqueued, {} documents indexed, {} errors", + "Meilisearch sink connector with ID: {} is closing. Stats: {} invocations, {} documents enqueued, {} documents confirmed, {} errors", self.id, state.invocations_count, state.documents_enqueued, - state.documents_indexed, + state.documents_confirmed, state.errors_count ); drop(state); @@ -724,17 +759,32 @@ impl Sink for MeilisearchSink { } } +#[cfg(test)] fn generated_document_id( topic_metadata: &TopicMetadata, messages_metadata: &MessagesMetadata, message: &ConsumedMessage, +) -> String { + generated_document_id_from_parts( + topic_metadata, + messages_metadata, + message.offset, + message.id, + ) +} + +fn generated_document_id_from_parts( + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + offset: u64, + id: u128, ) -> String { let components = json!([ topic_metadata.stream.as_str(), topic_metadata.topic.as_str(), messages_metadata.partition_id, - message.offset, - message.id.to_string() + offset, + id.to_string() ]); let encoded = serde_json::to_vec(&components) .map(|bytes| general_purpose::URL_SAFE_NO_PAD.encode(bytes)) @@ -764,7 +814,7 @@ fn sanitize_url_for_log(raw: &str) -> String { if url.password().is_some() { let _ = url.set_password(None); } - url.to_string() + url.to_string().trim_end_matches('/').to_string() } fn normalize_host(raw: &str) -> Result { @@ -782,19 +832,32 @@ fn normalize_host(raw: &str) -> Result { }; let url = Url::parse(&with_scheme) .map_err(|error| Error::Connection(format!("Invalid Meilisearch URL: {error}")))?; - let mut host = url.to_string(); - while host.ends_with('/') { - host.pop(); + if url.path() != "/" || url.query().is_some() || url.fragment().is_some() { + warn!("Ignoring path, query, or fragment from Meilisearch URL"); } - Ok(host) + let mut base_url = url; + base_url.set_path(""); + base_url.set_query(None); + base_url.set_fragment(None); + Ok(base_url.as_str().trim_end_matches('/').to_string()) } #[derive(Debug)] struct PartialIndexError { accepted: usize, + failed: usize, error: Error, } +#[derive(Clone, Copy)] +struct TaskUid(u32); + +impl AsRef for TaskUid { + fn as_ref(&self) -> &u32 { + &self.0 + } +} + fn is_index_not_found(error: &MeilisearchSdkError) -> bool { matches!( error, @@ -809,7 +872,9 @@ fn is_transient_sdk_error(error: &MeilisearchSdkError) -> bool { meilisearch_error.error_type == MeilisearchErrorType::Internal } MeilisearchSdkError::MeilisearchCommunication(communication_error) => { - communication_error.status_code == 429 || communication_error.status_code >= 500 + communication_error.status_code == 0 + || communication_error.status_code == 429 + || communication_error.status_code >= 500 } MeilisearchSdkError::HttpError(_) | MeilisearchSdkError::Timeout => true, _ => false, @@ -826,7 +891,10 @@ fn map_sdk_error(error: MeilisearchSdkError) -> Error { } } MeilisearchSdkError::MeilisearchCommunication(communication_error) => { - if communication_error.status_code == 429 || communication_error.status_code >= 500 { + if communication_error.status_code == 0 + || communication_error.status_code == 429 + || communication_error.status_code >= 500 + { Error::HttpRequestFailed(communication_error.to_string()) } else { Error::PermanentHttpError(communication_error.to_string()) @@ -1059,7 +1127,15 @@ mod tests { #[test] fn sanitize_url_should_redact_credentials_without_scheme() { let url = sanitize_url_for_log("user:pass@localhost:7700/indexes"); - assert_eq!(url, "http://localhost:7700/indexes"); + assert_eq!(url, "http://localhost:7700"); + } + + #[test] + fn normalize_host_should_strip_path_query_and_fragment() { + let url = + normalize_host("https://localhost:7700/path?foo=bar#section").expect("normalize host"); + + assert_eq!(url, "https://localhost:7700"); } #[test] diff --git a/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs b/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs index dba06a3f7c..c39aa8cb42 100644 --- a/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs +++ b/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs @@ -26,7 +26,7 @@ use integration::iggy_harness; server(connectors_runtime(config_path = "tests/connectors/meilisearch/sink.toml")), seed = seeds::connector_stream )] -async fn meilisearch_sink_indexes_json_messages( +async fn given_json_messages_when_sink_consumes_should_index_documents( harness: &TestHarness, fixture: MeilisearchSinkFixture, ) { From 7f43ab342f078eb5c511d448fc7932fc80685bfb Mon Sep 17 00:00:00 2001 From: Diaconu Radu-Mihai <52667211+countradooku@users.noreply.github.com> Date: Sun, 21 Jun 2026 19:35:53 +0300 Subject: [PATCH 5/7] fix(connectors): harden meilisearch sink retries --- .../sinks/meilisearch_sink/src/lib.rs | 96 +++++++++++++------ 1 file changed, 69 insertions(+), 27 deletions(-) diff --git a/core/connectors/sinks/meilisearch_sink/src/lib.rs b/core/connectors/sinks/meilisearch_sink/src/lib.rs index 220c07a8ff..c67fdc5a48 100644 --- a/core/connectors/sinks/meilisearch_sink/src/lib.rs +++ b/core/connectors/sinks/meilisearch_sink/src/lib.rs @@ -196,17 +196,35 @@ impl MeilisearchSink { } async fn check_connectivity(&self, client: &Client) -> Result<(), Error> { - let health = self - .retry_sdk_open_operation("health check", || client.health()) - .await?; - if health.status == "available" { - return Ok(()); - } + let mut retries = 0u32; - Err(Error::Connection(format!( - "Meilisearch health check returned status '{}'", - health.status - ))) + loop { + let health = self + .retry_sdk_open_operation("health check", || client.health()) + .await?; + if health.status == "available" { + return Ok(()); + } + + if retries >= self.config.max_open_retries { + return Err(Error::Connection(format!( + "Meilisearch health check returned status '{}'", + health.status + ))); + } + + retries += 1; + let delay = jitter(exponential_backoff( + self.config.retry_delay, + retries, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch health check returned status '{}' (retry {}/{}). Retrying in {:?}...", + health.status, retries, self.config.max_open_retries, delay + ); + sleep(delay).await; + } } async fn ensure_index_exists(&self, client: &Client) -> Result<(), Error> { @@ -271,9 +289,8 @@ impl MeilisearchSink { Payload::Json(value) => { Self::document_from_json_value(owned_value_to_serde_json(&value)) } - Payload::Raw(bytes) => { - let mut bytes_copy = bytes.clone(); - match simd_json::from_slice::(&mut bytes_copy) { + Payload::Raw(mut bytes) => { + match simd_json::from_slice::(&mut bytes) { Ok(value) => Self::document_from_json_value(owned_value_to_serde_json(&value)), Err(_) => Map::from_iter([ ( @@ -307,7 +324,7 @@ impl MeilisearchSink { messages_metadata, offset, message_id, - ); + )?; document.insert( self.config.primary_key.clone(), Value::String(value.clone()), @@ -319,15 +336,16 @@ impl MeilisearchSink { if self.config.primary_key != DEFAULT_PRIMARY_KEY && !document.contains_key(DEFAULT_PRIMARY_KEY) { - let id = generated_id.get_or_insert_with(|| { - generated_document_id_from_parts( + let id = match &generated_id { + Some(id) => id.clone(), + None => generated_document_id_from_parts( topic_metadata, messages_metadata, offset, message_id, - ) - }); - document.insert(DEFAULT_PRIMARY_KEY.to_string(), Value::String(id.clone())); + )?, + }; + document.insert(DEFAULT_PRIMARY_KEY.to_string(), Value::String(id)); } insert_metadata_field( &mut document, @@ -437,7 +455,7 @@ impl MeilisearchSink { .await .map_err(|error| PartialIndexError { accepted: documents.len(), - failed: documents.len(), + failed: 0, error, })?; Ok(documents.len()) @@ -584,11 +602,19 @@ impl MeilisearchSink { Op: FnMut() -> Fut, Fut: Future>, { - // One retry budget covers both SDK errors and per-attempt timeouts. + let started = Instant::now(); let mut retries = 0u32; loop { - let result = tokio::time::timeout(self.config.timeout, operation_fn()).await; + let remaining = self.config.timeout.saturating_sub(started.elapsed()); + if remaining.is_zero() { + return Err(Error::HttpRequestFailed(format!( + "Meilisearch {operation} timed out after {:?}", + self.config.timeout + ))); + } + + let result = tokio::time::timeout(remaining, operation_fn()).await; match result { Ok(Ok(value)) => return Ok(value), Ok(Err(error)) => { @@ -605,7 +631,11 @@ impl MeilisearchSink { warn!( "Meilisearch {operation} failed (retry {retries}/{max_retries}): {error}. Retrying in {delay:?}..." ); - sleep(delay).await; + let remaining = self.config.timeout.saturating_sub(started.elapsed()); + if remaining.is_zero() { + return Err(map_sdk_error(error)); + } + sleep(cmp::min(delay, remaining)).await; } Err(_) => { if retries >= max_retries { @@ -624,7 +654,14 @@ impl MeilisearchSink { "Meilisearch {operation} timed out after {:?} (retry {retries}/{max_retries}). Retrying in {delay:?}...", self.config.timeout ); - sleep(delay).await; + let remaining = self.config.timeout.saturating_sub(started.elapsed()); + if remaining.is_zero() { + return Err(Error::HttpRequestFailed(format!( + "Meilisearch {operation} timed out after {:?}", + self.config.timeout + ))); + } + sleep(cmp::min(delay, remaining)).await; } } } @@ -771,6 +808,7 @@ fn generated_document_id( message.offset, message.id, ) + .expect("test generated ID components should serialize") } fn generated_document_id_from_parts( @@ -778,7 +816,7 @@ fn generated_document_id_from_parts( messages_metadata: &MessagesMetadata, offset: u64, id: u128, -) -> String { +) -> Result { let components = json!([ topic_metadata.stream.as_str(), topic_metadata.topic.as_str(), @@ -788,8 +826,12 @@ fn generated_document_id_from_parts( ]); let encoded = serde_json::to_vec(&components) .map(|bytes| general_purpose::URL_SAFE_NO_PAD.encode(bytes)) - .expect("generated ID components are always JSON-serializable"); - format!("iggy_{encoded}") + .map_err(|error| { + Error::Serialization(format!( + "Failed to serialize generated document ID: {error}" + )) + })?; + Ok(format!("iggy_{encoded}")) } fn insert_metadata_field(object: &mut Map, field: &str, value: Value) { From 33841decf7e369d5f0ed9e4f8174ad099a9607e7 Mon Sep 17 00:00:00 2001 From: Diaconu Radu-Mihai <52667211+countradooku@users.noreply.github.com> Date: Tue, 23 Jun 2026 08:02:33 +0300 Subject: [PATCH 6/7] fix(connectors): refine meilisearch sink accounting --- .../sinks/meilisearch_sink/src/lib.rs | 137 +++++++++++++----- 1 file changed, 101 insertions(+), 36 deletions(-) diff --git a/core/connectors/sinks/meilisearch_sink/src/lib.rs b/core/connectors/sinks/meilisearch_sink/src/lib.rs index c67fdc5a48..2387a9054c 100644 --- a/core/connectors/sinks/meilisearch_sink/src/lib.rs +++ b/core/connectors/sinks/meilisearch_sink/src/lib.rs @@ -199,31 +199,66 @@ impl MeilisearchSink { let mut retries = 0u32; loop { - let health = self - .retry_sdk_open_operation("health check", || client.health()) - .await?; - if health.status == "available" { - return Ok(()); - } - - if retries >= self.config.max_open_retries { - return Err(Error::Connection(format!( - "Meilisearch health check returned status '{}'", - health.status - ))); + let result = tokio::time::timeout(self.config.timeout, client.health()).await; + match result { + Ok(Ok(health)) if health.status == "available" => return Ok(()), + Ok(Ok(health)) => { + if retries >= self.config.max_open_retries { + return Err(Error::Connection(format!( + "Meilisearch health check returned status '{}'", + health.status + ))); + } + retries += 1; + let delay = jitter(exponential_backoff( + self.config.retry_delay, + retries, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch health check returned status '{}' (retry {}/{}). Retrying in {:?}...", + health.status, retries, self.config.max_open_retries, delay + ); + sleep(delay).await; + } + Ok(Err(error)) => { + let should_retry = + retries < self.config.max_open_retries && is_transient_sdk_error(&error); + if !should_retry { + return Err(map_sdk_error(error)); + } + retries += 1; + let delay = jitter(exponential_backoff( + self.config.retry_delay, + retries, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch health check failed (retry {}/{}): {}. Retrying in {:?}...", + retries, self.config.max_open_retries, error, delay + ); + sleep(delay).await; + } + Err(_) => { + if retries >= self.config.max_open_retries { + return Err(Error::HttpRequestFailed(format!( + "Meilisearch health check timed out after {:?}", + self.config.timeout + ))); + } + retries += 1; + let delay = jitter(exponential_backoff( + self.config.retry_delay, + retries, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch health check timed out after {:?} (retry {}/{}). Retrying in {:?}...", + self.config.timeout, retries, self.config.max_open_retries, delay + ); + sleep(delay).await; + } } - - retries += 1; - let delay = jitter(exponential_backoff( - self.config.retry_delay, - retries, - self.config.max_retry_delay, - )); - warn!( - "Meilisearch health check returned status '{}' (retry {}/{}). Retrying in {:?}...", - health.status, retries, self.config.max_open_retries, delay - ); - sleep(delay).await; } } @@ -289,8 +324,9 @@ impl MeilisearchSink { Payload::Json(value) => { Self::document_from_json_value(owned_value_to_serde_json(&value)) } - Payload::Raw(mut bytes) => { - match simd_json::from_slice::(&mut bytes) { + Payload::Raw(bytes) => { + let mut bytes_copy = bytes.clone(); + match simd_json::from_slice::(&mut bytes_copy) { Ok(value) => Self::document_from_json_value(owned_value_to_serde_json(&value)), Err(_) => Map::from_iter([ ( @@ -454,8 +490,8 @@ impl MeilisearchSink { self.wait_for_task(client, task) .await .map_err(|error| PartialIndexError { - accepted: documents.len(), - failed: 0, + accepted: 0, + failed: documents.len(), error, })?; Ok(documents.len()) @@ -780,14 +816,25 @@ impl Sink for MeilisearchSink { async fn close(&mut self) -> Result<(), Error> { let state = self.state.lock().await; - info!( - "Meilisearch sink connector with ID: {} is closing. Stats: {} invocations, {} documents enqueued, {} documents confirmed, {} errors", - self.id, - state.invocations_count, - state.documents_enqueued, - state.documents_confirmed, - state.errors_count - ); + if self.config.wait_for_tasks { + info!( + "Meilisearch sink connector with ID: {} is closing. Stats: {} invocations, {} documents enqueued, {} documents confirmed, {} errors", + self.id, + state.invocations_count, + state.documents_enqueued, + state.documents_confirmed, + state.errors_count + ); + } else { + warn!( + "Meilisearch sink connector with ID: {} is closing with wait_for_tasks=false. Submitted document tasks may still be in flight or fail after offsets are committed.", + self.id + ); + info!( + "Meilisearch sink connector with ID: {} is closing. Stats: {} invocations, {} documents enqueued, documents confirmed unavailable (wait_for_tasks=false), {} errors", + self.id, state.invocations_count, state.documents_enqueued, state.errors_count + ); + } drop(state); self.client = None; @@ -1166,6 +1213,24 @@ mod tests { assert_eq!(document["data_encoding"], ENCODING_BASE64); } + #[test] + fn raw_payloads_preserve_original_bytes_when_json_parse_mutates_buffer() { + let sink = sink_with_config(base_config()); + let bytes = b"[1,2,3".to_vec(); + let expected = general_purpose::STANDARD.encode(&bytes); + + let document = sink + .prepare_document( + &topic_metadata(), + &messages_metadata(), + message(Payload::Raw(bytes)), + ) + .expect("prepare document"); + + assert_eq!(document["data"], expected); + assert_eq!(document["data_encoding"], ENCODING_BASE64); + } + #[test] fn sanitize_url_should_redact_credentials_without_scheme() { let url = sanitize_url_for_log("user:pass@localhost:7700/indexes"); From ae744d11372afbad00a9deba4e4353b91914cd0d Mon Sep 17 00:00:00 2001 From: Diaconu Radu-Mihai <52667211+countradooku@users.noreply.github.com> Date: Tue, 23 Jun 2026 17:58:16 +0300 Subject: [PATCH 7/7] fix(connectors): warn on meilisearch sink risks --- .../sinks/meilisearch_sink/src/lib.rs | 50 +++++++++++++++++-- .../fixtures/meilisearch/container.rs | 3 +- .../connectors/fixtures/meilisearch/mod.rs | 2 +- .../tests/connectors/fixtures/mod.rs | 2 +- .../meilisearch/meilisearch_sink.rs | 4 +- 5 files changed, 53 insertions(+), 8 deletions(-) diff --git a/core/connectors/sinks/meilisearch_sink/src/lib.rs b/core/connectors/sinks/meilisearch_sink/src/lib.rs index 2387a9054c..859ecec7c8 100644 --- a/core/connectors/sinks/meilisearch_sink/src/lib.rs +++ b/core/connectors/sinks/meilisearch_sink/src/lib.rs @@ -146,9 +146,16 @@ impl From for ResolvedMeilisearchSinkConfig { DEFAULT_TASK_POLL_INTERVAL, ); let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); - let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); - let max_retry_delay = + let mut retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let mut max_retry_delay = parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); + if retry_delay > max_retry_delay { + warn!( + "Meilisearch sink retry_delay ({:?}) exceeds max_retry_delay ({:?}). Swapping values.", + retry_delay, max_retry_delay + ); + std::mem::swap(&mut retry_delay, &mut max_retry_delay); + } let max_open_retries = config.max_open_retries.unwrap_or(DEFAULT_MAX_OPEN_RETRIES); Self { @@ -264,8 +271,21 @@ impl MeilisearchSink { async fn ensure_index_exists(&self, client: &Client) -> Result<(), Error> { match self.get_index_if_exists(client).await? { - Some(_) => { + Some(index) => { info!("Meilisearch index '{}' already exists", self.config.index); + if let Some(primary_key) = index.primary_key.as_deref() + && primary_key != self.config.primary_key + { + warn!( + "Meilisearch index '{}' primary key '{}' differs from configured primary key '{}'", + self.config.index, primary_key, self.config.primary_key + ); + } else if index.primary_key.is_none() { + warn!( + "Meilisearch index '{}' does not currently have a primary key. Configured primary key '{}' will be sent with document indexing requests.", + self.config.index, self.config.primary_key + ); + } Ok(()) } None if self.config.create_index_if_not_exists => self.create_index(client).await, @@ -718,6 +738,18 @@ impl Sink for MeilisearchSink { sanitize_url_for_log(&self.config.url), self.config.index ); + if self.config.document_action == MeilisearchDocumentAction::Update { + warn!( + "Meilisearch sink connector with ID: {} is using document_action=update. Runtime retries are at-least-once and partial batch success can apply non-idempotent updates more than once.", + self.id + ); + } + if !self.config.wait_for_tasks { + warn!( + "Meilisearch sink connector with ID: {} is opening with wait_for_tasks=false. Submitted document tasks may still be in flight or fail after offsets are committed.", + self.id + ); + } let client = self.create_client()?; self.check_connectivity(&client).await?; @@ -1057,6 +1089,18 @@ mod tests { } } + #[test] + fn swaps_retry_delays_when_retry_delay_exceeds_max_retry_delay() { + let mut config = base_config(); + config.retry_delay = Some("10s".to_string()); + config.max_retry_delay = Some("1s".to_string()); + + let sink = sink_with_config(config); + + assert_eq!(sink.config.retry_delay, Duration::from_secs(1)); + assert_eq!(sink.config.max_retry_delay, Duration::from_secs(10)); + } + #[test] fn generated_ids_use_meilisearch_safe_characters() { let id = generated_document_id( diff --git a/core/integration/tests/connectors/fixtures/meilisearch/container.rs b/core/integration/tests/connectors/fixtures/meilisearch/container.rs index ab2d34f9aa..55cadbe21f 100644 --- a/core/integration/tests/connectors/fixtures/meilisearch/container.rs +++ b/core/integration/tests/connectors/fixtures/meilisearch/container.rs @@ -154,13 +154,14 @@ pub trait MeilisearchOps: Sync { fn wait_for_documents( &self, + index_name: &str, expected_count: usize, ) -> impl std::future::Future, TestBinaryError>> + Send { async move { let mut last_count = 0usize; for _ in 0..POLL_ATTEMPTS { - if let Ok(documents) = self.list_documents(TEST_INDEX).await { + if let Ok(documents) = self.list_documents(index_name).await { last_count = documents.len(); if documents.len() >= expected_count { return Ok(documents); diff --git a/core/integration/tests/connectors/fixtures/meilisearch/mod.rs b/core/integration/tests/connectors/fixtures/meilisearch/mod.rs index ba21bf1587..09e0b9d91f 100644 --- a/core/integration/tests/connectors/fixtures/meilisearch/mod.rs +++ b/core/integration/tests/connectors/fixtures/meilisearch/mod.rs @@ -18,5 +18,5 @@ mod container; mod sink; -pub use container::MeilisearchOps; +pub use container::{MeilisearchOps, TEST_INDEX}; pub use sink::MeilisearchSinkFixture; diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index c07641b94c..113c12d668 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -63,7 +63,7 @@ pub use influxdb::{ InfluxDbSinkNoMetadataFixture, InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture, InfluxDbSourceRawFixture, InfluxDbSourceTextFixture, }; -pub use meilisearch::{MeilisearchOps, MeilisearchSinkFixture}; +pub use meilisearch::{MeilisearchOps, MeilisearchSinkFixture, TEST_INDEX}; pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture, diff --git a/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs b/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs index c39aa8cb42..17a81db446 100644 --- a/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs +++ b/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::connectors::fixtures::{MeilisearchOps, MeilisearchSinkFixture}; +use crate::connectors::fixtures::{MeilisearchOps, MeilisearchSinkFixture, TEST_INDEX}; use bytes::Bytes; use iggy::prelude::{IggyMessage, Partitioning}; use iggy_common::{Identifier, MessageClient}; @@ -62,7 +62,7 @@ async fn given_json_messages_when_sink_consumes_should_index_documents( .expect("send messages"); let documents = fixture - .wait_for_documents(payloads.len()) + .wait_for_documents(TEST_INDEX, payloads.len()) .await .expect("wait for Meilisearch documents");