diff --git a/Cargo.lock b/Cargo.lock index f3564681e4..804bd2b695 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3196,6 +3196,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "convert_case" version = "0.9.0" @@ -6916,6 +6925,43 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_meilisearch_sink" +version = "0.4.0" +dependencies = [ + "async-trait", + "base64", + "dashmap", + "iggy_common", + "iggy_connector_sdk", + "meilisearch-sdk", + "once_cell", + "reqwest 0.13.4", + "secrecy", + "serde", + "serde_json", + "simd-json", + "tokio", + "tracing", +] + +[[package]] +name = "iggy_connector_meilisearch_source" +version = "0.4.0" +dependencies = [ + "async-trait", + "dashmap", + "iggy_common", + "iggy_connector_sdk", + "meilisearch-sdk", + "once_cell", + "secrecy", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "iggy_connector_mongodb_sink" version = "0.4.1-edge.1" @@ -7360,6 +7406,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "iso8601" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46" +dependencies = [ + "nom 8.0.0", +] + [[package]] name = "itertools" version = "0.13.0" @@ -8150,6 +8205,49 @@ dependencies = [ "digest 0.11.3", ] +[[package]] +name = "meilisearch-index-setting-macro" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93b5b21df781c820a9cc387b808d4128cbc164dd28d67ac6ed666a00996f8f15" +dependencies = [ + "convert_case 0.8.0", + "proc-macro2", + "quote", + "structmeta", + "syn 2.0.117", +] + +[[package]] +name = "meilisearch-sdk" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e6e3646ba2a9a306296c1edf4a050508a408c1b59ca456d9ad4965ec6e91e9" +dependencies = [ + "async-trait", + "bytes", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "iso8601", + "jsonwebtoken", + "log", + "meilisearch-index-setting-macro", + "pin-project-lite", + "reqwest 0.12.28", + "serde", + "serde_json", + "thiserror 2.0.18", + "time", + "tokio", + "uuid", + "wasm-bindgen-futures", + "web-sys", + "yaup", +] + [[package]] name = "memchr" version = "2.8.1" @@ -14822,6 +14920,17 @@ dependencies = [ "time", ] +[[package]] +name = "yaup" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0144f1a16a199846cb21024da74edd930b43443463292f536b7110b4855b5c6" +dependencies = [ + "form_urlencoded", + "serde", + "thiserror 1.0.69", +] + [[package]] name = "yew" version = "0.23.0" diff --git a/Cargo.toml b/Cargo.toml index 29828a7696..65cd38ef87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,12 +39,14 @@ members = [ "core/connectors/sinks/http_sink", "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/influxdb_sink", + "core/connectors/sinks/meilisearch_sink", "core/connectors/sinks/mongodb_sink", "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/influxdb_source", + "core/connectors/sources/meilisearch_source", "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", @@ -194,6 +196,11 @@ lending-iterator = "0.1.7" libc = "0.2.186" log = "0.4.30" lz4_flex = "0.13.1" +meilisearch-sdk = { version = "0.33.0", default-features = false, features = [ + "reqwest", + "tls", + "jwt_rust_crypto", +] } message_bus = { path = "core/message_bus" } metadata = { path = "core/metadata" } mimalloc = "0.1" diff --git a/core/connectors/README.md b/core/connectors/README.md index b7d24cfbab..53c87197ef 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -96,6 +96,7 @@ Please refer to the **[Source documentation](https://github.com/apache/iggy/tree ### Available Sources - **Elasticsearch Source** - polls documents from Elasticsearch indices +- **Meilisearch Source** - polls documents from Meilisearch indices - **PostgreSQL Source** - reads rows from PostgreSQL tables with multiple consumption strategies (delete after read, mark as processed, timestamp tracking) - **Random Source** - generates random test messages (useful for testing/development) diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md index 9aaaa2ffaa..dcd6d0e554 100644 --- a/core/connectors/sinks/README.md +++ b/core/connectors/sinks/README.md @@ -12,6 +12,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s | **elasticsearch_sink** | Sends messages to Elasticsearch indices for full-text search and analytics | | **iceberg_sink** | Writes data to Apache Iceberg tables via REST catalog with S3/GCS/Azure storage | | **influxdb_sink** | Writes messages to InfluxDB as line-protocol points; supports both V2 (org/bucket, Flux) and V3 (db, SQL) | +| **meilisearch_sink** | Indexes messages in Meilisearch for full-text search | | **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas | | **quickwit_sink** | Indexes messages in Quickwit search engine for log analytics | | **stdout_sink** | Prints messages to standard output (useful for debugging and development) | diff --git a/core/connectors/sinks/meilisearch_sink/Cargo.toml b/core/connectors/sinks/meilisearch_sink/Cargo.toml new file mode 100644 index 0000000000..8bda76a40d --- /dev/null +++ b/core/connectors/sinks/meilisearch_sink/Cargo.toml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_meilisearch_sink" +version = "0.4.0" +description = "Iggy Meilisearch sink connector" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "search", "meilisearch"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +dashmap = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +meilisearch-sdk = { workspace = true } +once_cell = { workspace = true } +reqwest = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/core/connectors/sinks/meilisearch_sink/README.md b/core/connectors/sinks/meilisearch_sink/README.md new file mode 100644 index 0000000000..7a17d2a41f --- /dev/null +++ b/core/connectors/sinks/meilisearch_sink/README.md @@ -0,0 +1,45 @@ +# Meilisearch Sink Connector + +A sink connector that consumes messages from Iggy streams and writes them to a +Meilisearch index through the official Rust SDK. + +## Configuration + +- `url`: Meilisearch base URL. +- `index`: Target index UID. +- `api_key`: Optional Meilisearch API key sent as `Authorization: Bearer`. +- `primary_key`: Index primary key field. Defaults to `iggy_id`. +- `document_action`: `replace` uses SDK add-or-replace semantics; `update` uses SDK add-or-update semantics. Defaults to `replace`. +- `create_index_if_not_exists`: Create the index during `open()` when missing. Defaults to `true`. +- `include_metadata`: Add Iggy metadata fields to each document. Defaults to `true`. +- `batch_size`: Maximum documents per Meilisearch document request. Defaults to `1000`. +- `timeout`: Request timeout as a humantime string, for example `30s`. Defaults to `30s`. +- `wait_for_tasks`: Poll Meilisearch tasks until terminal state before returning from `consume()`. Defaults to `true`. +- `task_timeout`: Maximum time to wait for each Meilisearch task. Defaults to `30s`. +- `task_poll_interval`: Delay between task polls. Defaults to `100ms`. +- `max_retries`: Maximum transient retry attempts. Defaults to `3`. +- `retry_delay`: Initial transient retry delay. Defaults to `500ms`. +- `max_retry_delay`: Maximum transient retry delay. Defaults to `5s`. +- `max_open_retries`: Maximum transient retry attempts while opening the index. Defaults to `5`. + +## Behavior + +JSON object payloads are indexed as documents. JSON arrays or scalar values are +wrapped in a `value` field because Meilisearch documents must be objects. Raw +payloads are parsed as JSON when possible; otherwise, they are indexed as base64 +data. Text payloads are indexed in a `text` field. Unsupported payload schemas +fail the batch instead of being silently dropped. + +When the configured primary key is absent, the connector injects a stable value +derived from the exact Iggy stream, topic, partition, offset, and message ID. +This avoids Meilisearch primary-key inference failures and keeps repeated +delivery idempotent for the same message. + +When `include_metadata` is enabled, metadata fields are only inserted when the +document does not already contain those names. Existing user fields are +preserved. + +`wait_for_tasks=false` only skips waiting for document indexing tasks during +`consume()`. If `create_index_if_not_exists=true` and the connector creates the +index during `open()`, it still waits for that index-creation task so the first +batch cannot race the index creation. diff --git a/core/connectors/sinks/meilisearch_sink/config.toml b/core/connectors/sinks/meilisearch_sink/config.toml new file mode 100644 index 0000000000..6f29014616 --- /dev/null +++ b/core/connectors/sinks/meilisearch_sink/config.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "meilisearch" +enabled = true +version = 0 +name = "Meilisearch sink" +path = "../../target/release/libiggy_connector_meilisearch_sink" +verbose = false + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "meilisearch_sink_cg" + +[plugin_config] +url = "http://localhost:7700" +index = "iggy_messages" +primary_key = "iggy_id" +document_action = "replace" +create_index_if_not_exists = true +include_metadata = true +batch_size = 100 +timeout = "30s" +wait_for_tasks = true diff --git a/core/connectors/sinks/meilisearch_sink/src/lib.rs b/core/connectors/sinks/meilisearch_sink/src/lib.rs new file mode 100644 index 0000000000..897e873e42 --- /dev/null +++ b/core/connectors/sinks/meilisearch_sink/src/lib.rs @@ -0,0 +1,1074 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use base64::{Engine as _, engine::general_purpose}; +use iggy_common::IggyTimestamp; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, + convert::owned_value_to_serde_json, + retry::{exponential_backoff, jitter, parse_duration}, + sink_connector, +}; +use meilisearch_sdk::{ + client::Client, + errors::{ + Error as MeilisearchSdkError, ErrorCode as MeilisearchErrorCode, + ErrorType as MeilisearchErrorType, + }, + indexes::Index, + task_info::TaskInfo, + tasks::Task, +}; +use reqwest::Url; +use secrecy::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value, json}; +use std::{future::Future, time::Duration}; +use tokio::sync::Mutex; +use tracing::{info, warn}; + +sink_connector!(MeilisearchSink); + +const DEFAULT_PRIMARY_KEY: &str = "iggy_id"; +const DEFAULT_CREATE_INDEX_IF_NOT_EXISTS: bool = true; +const DEFAULT_INCLUDE_METADATA: bool = true; +const DEFAULT_BATCH_SIZE: usize = 1000; +const DEFAULT_TIMEOUT: &str = "30s"; +const DEFAULT_WAIT_FOR_TASKS: bool = true; +const DEFAULT_TASK_TIMEOUT: &str = "30s"; +const DEFAULT_TASK_POLL_INTERVAL: &str = "100ms"; +const DEFAULT_RETRY_DELAY: &str = "500ms"; +const DEFAULT_MAX_RETRY_DELAY: &str = "5s"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_MAX_OPEN_RETRIES: u32 = 5; +const ENCODING_BASE64: &str = "base64"; + +#[derive(Debug)] +struct State { + invocations_count: usize, + documents_enqueued: usize, + documents_indexed: usize, + errors_count: usize, +} + +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MeilisearchDocumentAction { + #[default] + Replace, + Update, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MeilisearchSinkConfig { + pub url: String, + pub index: String, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")] + pub api_key: Option, + pub primary_key: Option, + pub document_action: Option, + pub create_index_if_not_exists: Option, + pub include_metadata: Option, + pub batch_size: Option, + pub timeout: Option, + pub wait_for_tasks: Option, + pub task_timeout: Option, + pub task_poll_interval: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub max_retry_delay: Option, + pub max_open_retries: Option, +} + +#[derive(Debug)] +pub struct MeilisearchSink { + id: u32, + config: ResolvedMeilisearchSinkConfig, + client: Option, + state: Mutex, +} + +#[derive(Debug)] +struct ResolvedMeilisearchSinkConfig { + url: String, + index: String, + api_key: Option, + primary_key: String, + document_action: MeilisearchDocumentAction, + create_index_if_not_exists: bool, + include_metadata: bool, + batch_size: usize, + timeout: Duration, + wait_for_tasks: bool, + task_timeout: Duration, + task_poll_interval: Duration, + max_retries: u32, + retry_delay: Duration, + max_retry_delay: Duration, + max_open_retries: u32, +} + +impl From for ResolvedMeilisearchSinkConfig { + fn from(config: MeilisearchSinkConfig) -> Self { + let primary_key = config + .primary_key + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_PRIMARY_KEY.to_string()); + let document_action = config.document_action.unwrap_or_default(); + let create_index_if_not_exists = config + .create_index_if_not_exists + .unwrap_or(DEFAULT_CREATE_INDEX_IF_NOT_EXISTS); + let include_metadata = config.include_metadata.unwrap_or(DEFAULT_INCLUDE_METADATA); + let batch_size = config.batch_size.unwrap_or(DEFAULT_BATCH_SIZE).max(1); + let timeout = parse_duration(config.timeout.as_deref(), DEFAULT_TIMEOUT); + let wait_for_tasks = config.wait_for_tasks.unwrap_or(DEFAULT_WAIT_FOR_TASKS); + let task_timeout = parse_duration(config.task_timeout.as_deref(), DEFAULT_TASK_TIMEOUT); + let task_poll_interval = parse_duration( + config.task_poll_interval.as_deref(), + DEFAULT_TASK_POLL_INTERVAL, + ); + let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES).max(1); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let max_retry_delay = + parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); + let max_open_retries = config + .max_open_retries + .unwrap_or(DEFAULT_MAX_OPEN_RETRIES) + .max(1); + + Self { + url: config.url, + index: config.index, + api_key: config.api_key, + primary_key, + document_action, + create_index_if_not_exists, + include_metadata, + batch_size, + timeout, + wait_for_tasks, + task_timeout, + task_poll_interval, + max_retries, + retry_delay, + max_retry_delay, + max_open_retries, + } + } +} + +impl MeilisearchSink { + pub fn new(id: u32, config: MeilisearchSinkConfig) -> Self { + Self { + id, + config: config.into(), + client: None, + state: Mutex::new(State { + invocations_count: 0, + documents_enqueued: 0, + documents_indexed: 0, + errors_count: 0, + }), + } + } + + fn create_client(&self) -> Result { + let url = normalize_host(&self.config.url)?; + let api_key = self.config.api_key.as_ref().map(|key| key.expose_secret()); + Client::new(url, api_key).map_err(|error| { + Error::Connection(format!("Failed to create Meilisearch client: {error}")) + }) + } + + async fn check_connectivity(&self, client: &Client) -> Result<(), Error> { + let health = self + .retry_sdk_open_operation("health check", || client.health()) + .await?; + if health.status == "available" { + return Ok(()); + } + + Err(Error::Connection(format!( + "Meilisearch health check returned status '{}'", + health.status + ))) + } + + async fn ensure_index_exists(&self, client: &Client) -> Result<(), Error> { + match self.get_index_if_exists(client).await? { + Some(_) => { + info!("Meilisearch index '{}' already exists", self.config.index); + Ok(()) + } + None if self.config.create_index_if_not_exists => self.create_index(client).await, + None => Err(Error::InitError(format!( + "Meilisearch index '{}' does not exist and create_index_if_not_exists=false", + self.config.index + ))), + } + } + + async fn get_index_if_exists(&self, client: &Client) -> Result, Error> { + let max_attempts = self.config.max_open_retries.max(1); + let mut attempt = 0u32; + + loop { + match client.get_index(&self.config.index).await { + Ok(index) => return Ok(Some(index)), + Err(error) if is_index_not_found(&error) => return Ok(None), + Err(error) => { + attempt += 1; + let should_retry = attempt < max_attempts && is_transient_sdk_error(&error); + if !should_retry { + return Err(map_sdk_error(error)); + } + let delay = jitter(exponential_backoff( + self.config.retry_delay, + attempt, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch get index '{}' failed (attempt {attempt}/{max_attempts}): {error}. Retrying in {delay:?}...", + self.config.index + ); + tokio::time::sleep(delay).await; + } + } + } + } + + async fn create_index(&self, client: &Client) -> Result<(), Error> { + info!( + "Creating Meilisearch index '{}' with primary key '{}'", + self.config.index, self.config.primary_key + ); + + let task = self + .retry_sdk_open_operation("create index", || { + client.create_index(&self.config.index, Some(&self.config.primary_key)) + }) + .await?; + self.wait_for_index_creation_task(client, task).await?; + + info!("Created Meilisearch index '{}'", self.config.index); + Ok(()) + } + + fn prepare_document( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + message: ConsumedMessage, + ) -> Result, Error> { + let generated_id = generated_document_id(topic_metadata, messages_metadata, &message); + let ConsumedMessage { + id, + offset, + checksum, + timestamp, + origin_timestamp, + headers, + payload, + } = message; + + let mut document = match payload { + Payload::Json(value) => { + Self::document_from_json_value(owned_value_to_serde_json(&value)) + } + Payload::Raw(bytes) => { + let mut bytes_copy = bytes.clone(); + match simd_json::from_slice::(&mut bytes_copy) { + Ok(value) => Self::document_from_json_value(owned_value_to_serde_json(&value)), + Err(_) => json!({ + "data": general_purpose::STANDARD.encode(&bytes), + "data_type": "raw", + "data_encoding": ENCODING_BASE64, + }), + } + } + Payload::Text(text) => json!({ + "text": text, + "data_type": "text", + }), + _ => { + return Err(Error::InvalidRecordValue(format!( + "Unsupported payload format for Meilisearch sink: {}", + messages_metadata.schema + ))); + } + }; + + let object = document + .as_object_mut() + .expect("document_from_json_value always returns an object"); + + object + .entry(self.config.primary_key.clone()) + .or_insert_with(|| Value::String(generated_id.clone())); + + if self.config.include_metadata { + object + .entry(DEFAULT_PRIMARY_KEY.to_string()) + .or_insert_with(|| Value::String(generated_id)); + insert_metadata_field(object, "iggy_message_id", Value::String(id.to_string())); + insert_metadata_field(object, "iggy_offset", Value::from(offset)); + insert_metadata_field( + object, + "iggy_stream", + Value::from(topic_metadata.stream.as_str()), + ); + insert_metadata_field( + object, + "iggy_topic", + Value::from(topic_metadata.topic.as_str()), + ); + insert_metadata_field( + object, + "iggy_partition", + Value::from(messages_metadata.partition_id), + ); + insert_metadata_field(object, "iggy_checksum", Value::from(checksum)); + insert_metadata_field(object, "iggy_timestamp", Value::from(timestamp)); + insert_metadata_field( + object, + "iggy_origin_timestamp", + Value::from(origin_timestamp), + ); + insert_metadata_field( + object, + "iggy_ingested_at", + Value::from(IggyTimestamp::now().as_millis() as i64), + ); + if let Some(headers) = &headers + && let Ok(headers_value) = serde_json::to_value(headers) + { + insert_metadata_field(object, "iggy_headers", headers_value); + } + } + + Ok(Some(document)) + } + + fn document_from_json_value(value: Value) -> Value { + match value { + Value::Object(_) => value, + other => { + let mut object = Map::new(); + object.insert("value".to_string(), other); + Value::Object(object) + } + } + } + + async fn index_documents( + &self, + client: &Client, + documents: Vec, + ) -> Result { + let mut accepted = 0usize; + for chunk in documents.chunks(self.config.batch_size) { + match self.index_document_chunk(client, chunk).await { + Ok(indexed) => accepted += indexed, + Err(partial_error) => { + return Err(PartialIndexError { + accepted: accepted + partial_error.accepted, + error: partial_error.error, + }); + } + } + } + Ok(accepted) + } + + async fn index_document_chunk( + &self, + client: &Client, + documents: &[Value], + ) -> Result { + if documents.is_empty() { + return Ok(0); + } + + let index = client.index(&self.config.index); + let task = match self.config.document_action { + MeilisearchDocumentAction::Replace => { + self.retry_sdk_operation("add or replace documents", || { + index.add_or_replace(documents, Some(&self.config.primary_key)) + }) + .await + } + MeilisearchDocumentAction::Update => { + self.retry_sdk_operation("add or update documents", || { + index.add_or_update(documents, Some(&self.config.primary_key)) + }) + .await + } + } + .map_err(|error| PartialIndexError { accepted: 0, error })?; + self.wait_for_task(client, task) + .await + .map_err(|error| PartialIndexError { + accepted: documents.len(), + error, + })?; + Ok(documents.len()) + } + + async fn wait_for_task(&self, client: &Client, task: TaskInfo) -> Result<(), Error> { + if !self.config.wait_for_tasks { + return Ok(()); + } + + self.wait_for_task_completion(client, task).await + } + + async fn wait_for_index_creation_task( + &self, + client: &Client, + task: TaskInfo, + ) -> Result<(), Error> { + let task = self + .wait_for_task_status(client, task, self.config.max_open_retries) + .await?; + + if task.is_success() { + return Ok(()); + } + + if task.is_failure() { + let failure = task.unwrap_failure(); + if failure.error_code == MeilisearchErrorCode::IndexAlreadyExists { + return Ok(()); + } + return Err(Error::PermanentHttpError(format!( + "Meilisearch task failed: {}", + failure + ))); + } + + Err(Error::HttpRequestFailed( + "Meilisearch task did not reach a terminal state".to_string(), + )) + } + + async fn wait_for_task_completion(&self, client: &Client, task: TaskInfo) -> Result<(), Error> { + let task = self + .wait_for_task_status(client, task, self.config.max_retries) + .await?; + + if task.is_success() { + return Ok(()); + } + + if task.is_failure() { + let failure = task.unwrap_failure(); + return Err(Error::PermanentHttpError(format!( + "Meilisearch task failed: {}", + failure + ))); + } + + Err(Error::HttpRequestFailed( + "Meilisearch task did not reach a terminal state".to_string(), + )) + } + + async fn wait_for_task_status( + &self, + client: &Client, + task: TaskInfo, + max_attempts: u32, + ) -> Result { + let task_uid = task.get_task_uid(); + let mut elapsed_time = Duration::ZERO; + + while self.config.task_timeout > elapsed_time { + let status = self + .retry_sdk_operation_with_attempts("get task status", max_attempts, || { + client.get_task(task.clone()) + }) + .await?; + + if status.is_success() || status.is_failure() { + return Ok(status); + } + + tokio::time::sleep(self.config.task_poll_interval).await; + elapsed_time += self.config.task_poll_interval; + } + + Err(Error::HttpRequestFailed(format!( + "Meilisearch task {task_uid} timed out after {:?}", + self.config.task_timeout + ))) + } + + async fn retry_sdk_operation( + &self, + operation: &str, + operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + self.retry_sdk_operation_with_attempts(operation, self.config.max_retries, operation_fn) + .await + } + + async fn retry_sdk_open_operation( + &self, + operation: &str, + operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + self.retry_sdk_operation_with_attempts( + operation, + self.config.max_open_retries, + operation_fn, + ) + .await + } + + async fn retry_sdk_operation_with_attempts( + &self, + operation: &str, + max_attempts: u32, + mut operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + let max_attempts = max_attempts.max(1); + let mut attempt = 0u32; + + loop { + let result = tokio::time::timeout(self.config.timeout, operation_fn()).await; + match result { + Ok(Ok(value)) => return Ok(value), + Ok(Err(error)) => { + attempt += 1; + let should_retry = attempt < max_attempts && is_transient_sdk_error(&error); + if !should_retry { + return Err(map_sdk_error(error)); + } + let delay = jitter(exponential_backoff( + self.config.retry_delay, + attempt, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch {operation} failed (attempt {attempt}/{max_attempts}): {error}. Retrying in {delay:?}..." + ); + tokio::time::sleep(delay).await; + } + Err(_) => { + attempt += 1; + if attempt >= max_attempts { + return Err(Error::HttpRequestFailed(format!( + "Meilisearch {operation} timed out after {:?}", + self.config.timeout + ))); + } + let delay = jitter(exponential_backoff( + self.config.retry_delay, + attempt, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch {operation} timed out after {:?} (attempt {attempt}/{max_attempts}). Retrying in {delay:?}...", + self.config.timeout + ); + tokio::time::sleep(delay).await; + } + } + } + } + + async fn record_errors(&self, errors: usize) { + let mut state = self.state.lock().await; + state.errors_count += errors; + } +} + +#[async_trait] +impl Sink for MeilisearchSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Meilisearch sink connector with ID: {} for URL: {}, index: {}", + self.id, + sanitize_url_for_log(&self.config.url), + self.config.index + ); + + let client = self.create_client()?; + self.check_connectivity(&client).await?; + self.ensure_index_exists(&client).await?; + + self.client = Some(client); + info!( + "Successfully opened Meilisearch sink connector with ID: {}", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let mut state = self.state.lock().await; + state.invocations_count += 1; + let invocation = state.invocations_count; + drop(state); + + info!( + "Meilisearch sink with ID: {} received: {} messages, schema: {}, stream: {}, topic: {}, partition: {}, offset: {}, invocation: {}", + self.id, + messages.len(), + messages_metadata.schema, + topic_metadata.stream, + topic_metadata.topic, + messages_metadata.partition_id, + messages_metadata.current_offset, + invocation + ); + + let client = self + .client + .as_ref() + .ok_or_else(|| Error::Connection("Meilisearch client not initialized".to_string()))?; + + let messages_count = messages.len(); + let mut documents = Vec::with_capacity(messages.len()); + let mut invalid_records = 0usize; + for message in messages { + match self.prepare_document(topic_metadata, &messages_metadata, message) { + Ok(Some(document)) => documents.push(document), + Ok(None) => {} + Err(Error::InvalidRecordValue(reason)) => { + invalid_records += 1; + warn!( + "Dropping invalid Meilisearch sink record for connector ID: {}, reason: {}", + self.id, reason + ); + } + Err(error) => return Err(error), + } + } + if invalid_records > 0 { + self.record_errors(invalid_records).await; + } + + if documents.is_empty() { + return Ok(()); + } + + match self.index_documents(client, documents).await { + Ok(accepted) => { + let mut state = self.state.lock().await; + state.documents_enqueued += accepted; + if self.config.wait_for_tasks { + state.documents_indexed += accepted; + } + info!( + "Accepted {} of {} messages into Meilisearch index '{}'", + accepted, messages_count, self.config.index + ); + Ok(()) + } + Err(partial_error) => { + let mut state = self.state.lock().await; + state.documents_enqueued += partial_error.accepted; + if self.config.wait_for_tasks { + state.documents_indexed += partial_error.accepted; + } + state.errors_count += 1; + drop(state); + Err(partial_error.error) + } + } + } + + async fn close(&mut self) -> Result<(), Error> { + let state = self.state.lock().await; + info!( + "Meilisearch sink connector with ID: {} is closing. Stats: {} invocations, {} documents enqueued, {} documents indexed, {} errors", + self.id, + state.invocations_count, + state.documents_enqueued, + state.documents_indexed, + state.errors_count + ); + drop(state); + + self.client = None; + info!("Meilisearch sink connector with ID: {} is closed.", self.id); + Ok(()) + } +} + +fn generated_document_id( + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + message: &ConsumedMessage, +) -> String { + let components = json!([ + topic_metadata.stream.as_str(), + topic_metadata.topic.as_str(), + messages_metadata.partition_id, + message.offset, + message.id + ]); + let encoded = serde_json::to_vec(&components) + .map(|bytes| general_purpose::URL_SAFE_NO_PAD.encode(bytes)) + .expect("generated ID components are always JSON-serializable"); + format!("iggy_{encoded}") +} + +fn insert_metadata_field(object: &mut Map, field: &str, value: Value) { + if object.contains_key(field) { + warn!( + "Document already contains Meilisearch metadata field '{field}', preserving original value" + ); + } else { + object.insert(field.to_string(), value); + } +} + +fn sanitize_url_for_log(raw: &str) -> String { + let normalized = normalize_host(raw).unwrap_or_else(|_| raw.trim().to_string()); + let Ok(mut url) = Url::parse(&normalized) else { + return "".to_string(); + }; + + if !url.username().is_empty() { + let _ = url.set_username(""); + } + if url.password().is_some() { + let _ = url.set_password(None); + } + url.to_string() +} + +fn normalize_host(raw: &str) -> Result { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Err(Error::Connection( + "Invalid Meilisearch URL: host cannot be empty".to_string(), + )); + } + + let with_scheme = if trimmed.starts_with("http://") || trimmed.starts_with("https://") { + trimmed.to_string() + } else { + format!("http://{trimmed}") + }; + let url = Url::parse(&with_scheme) + .map_err(|error| Error::Connection(format!("Invalid Meilisearch URL: {error}")))?; + let mut host = url.to_string(); + while host.ends_with('/') { + host.pop(); + } + Ok(host) +} + +#[derive(Debug)] +struct PartialIndexError { + accepted: usize, + error: Error, +} + +fn is_index_not_found(error: &MeilisearchSdkError) -> bool { + matches!( + error, + MeilisearchSdkError::Meilisearch(meilisearch_error) + if meilisearch_error.error_code == MeilisearchErrorCode::IndexNotFound + ) +} + +fn is_transient_sdk_error(error: &MeilisearchSdkError) -> bool { + match error { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + meilisearch_error.error_type == MeilisearchErrorType::Internal + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + communication_error.status_code == 429 || communication_error.status_code >= 500 + } + MeilisearchSdkError::HttpError(_) | MeilisearchSdkError::Timeout => true, + _ => false, + } +} + +struct SinkSdkError(MeilisearchSdkError); + +impl From for SinkSdkError { + fn from(error: MeilisearchSdkError) -> Self { + Self(error) + } +} + +impl From for Error { + fn from(error: SinkSdkError) -> Self { + match error.0 { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + if meilisearch_error.error_type == MeilisearchErrorType::Internal { + Error::HttpRequestFailed(meilisearch_error.to_string()) + } else { + Error::PermanentHttpError(meilisearch_error.to_string()) + } + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + if communication_error.status_code == 429 || communication_error.status_code >= 500 + { + Error::HttpRequestFailed(communication_error.to_string()) + } else { + Error::PermanentHttpError(communication_error.to_string()) + } + } + MeilisearchSdkError::ParseError(error) => { + Error::Serialization(format!("Invalid Meilisearch response: {error}")) + } + MeilisearchSdkError::Timeout => { + Error::HttpRequestFailed("Meilisearch task timed out".to_string()) + } + MeilisearchSdkError::HttpError(error) => Error::HttpRequestFailed(error.to_string()), + other => Error::HttpRequestFailed(other.to_string()), + } + } +} + +fn map_sdk_error(error: MeilisearchSdkError) -> Error { + Error::from(SinkSdkError::from(error)) +} + +#[cfg(test)] +mod tests { + use super::*; + use iggy_connector_sdk::Schema; + + fn topic_metadata() -> TopicMetadata { + TopicMetadata { + stream: "orders.stream".to_string(), + topic: "created/topic".to_string(), + } + } + + fn messages_metadata() -> MessagesMetadata { + MessagesMetadata { + partition_id: 7, + current_offset: 10, + schema: Schema::Json, + } + } + + fn message(payload: Payload) -> ConsumedMessage { + ConsumedMessage { + id: 42, + offset: 11, + checksum: 12, + timestamp: 13, + origin_timestamp: 14, + headers: None, + payload, + } + } + + fn sink_with_config(config: MeilisearchSinkConfig) -> MeilisearchSink { + MeilisearchSink::new(1, config) + } + + fn base_config() -> MeilisearchSinkConfig { + MeilisearchSinkConfig { + url: "http://localhost:7700".to_string(), + index: "messages".to_string(), + api_key: None, + primary_key: None, + document_action: None, + create_index_if_not_exists: None, + include_metadata: None, + batch_size: None, + timeout: None, + wait_for_tasks: None, + task_timeout: None, + task_poll_interval: None, + max_retries: None, + retry_delay: None, + max_retry_delay: None, + max_open_retries: None, + } + } + + #[test] + fn generated_ids_use_meilisearch_safe_characters() { + let id = generated_document_id( + &topic_metadata(), + &messages_metadata(), + &message(Payload::Text("x".to_string())), + ); + + assert!(id.starts_with("iggy_")); + assert!( + id.chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') + ); + } + + #[test] + fn generated_ids_do_not_collapse_sanitized_names() { + let first_topic = TopicMetadata { + stream: "orders.stream".to_string(), + topic: "created/topic".to_string(), + }; + let second_topic = TopicMetadata { + stream: "orders/stream".to_string(), + topic: "created.topic".to_string(), + }; + + let first = generated_document_id( + &first_topic, + &messages_metadata(), + &message(Payload::Text("x".to_string())), + ); + let second = generated_document_id( + &second_topic, + &messages_metadata(), + &message(Payload::Text("x".to_string())), + ); + + assert_ne!(first, second); + } + + #[test] + fn injects_default_primary_key_and_metadata() { + let sink = sink_with_config(base_config()); + let payload = Payload::Json(simd_json::json!({ + "name": "Alice" + })); + let message = message(payload); + let expected_id = generated_document_id(&topic_metadata(), &messages_metadata(), &message); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["name"], "Alice"); + assert_eq!(document["iggy_id"], expected_id); + assert_eq!(document["iggy_offset"], 11); + assert_eq!(document["iggy_stream"], "orders.stream"); + assert_eq!(document["iggy_topic"], "created/topic"); + } + + #[test] + fn preserves_existing_configured_primary_key() { + let mut config = base_config(); + config.primary_key = Some("id".to_string()); + let sink = sink_with_config(config); + let payload = Payload::Json(simd_json::json!({ + "id": "existing", + "name": "Alice" + })); + let message = message(payload); + let expected_id = generated_document_id(&topic_metadata(), &messages_metadata(), &message); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["id"], "existing"); + assert_eq!(document["iggy_id"], expected_id); + } + + #[test] + fn preserves_existing_metadata_fields() { + let sink = sink_with_config(base_config()); + let payload = Payload::Json(simd_json::json!({ + "name": "Alice", + "iggy_offset": 999, + "iggy_stream": "user-stream" + })); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message(payload)) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["iggy_offset"], 999); + assert_eq!(document["iggy_stream"], "user-stream"); + } + + #[test] + fn wraps_non_object_json_payloads() { + let sink = sink_with_config(base_config()); + let message = message(Payload::Json(simd_json::json!(["a", "b"]))); + let expected_id = generated_document_id(&topic_metadata(), &messages_metadata(), &message); + + let document = sink + .prepare_document(&topic_metadata(), &messages_metadata(), message) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["value"], json!(["a", "b"])); + assert_eq!(document["iggy_id"], expected_id); + } + + #[test] + fn raw_payloads_are_base64_encoded_when_not_json() { + let sink = sink_with_config(base_config()); + + let document = sink + .prepare_document( + &topic_metadata(), + &messages_metadata(), + message(Payload::Raw(vec![0, 1, 2, 3])), + ) + .expect("prepare document") + .expect("document"); + + assert_eq!(document["data"], "AAECAw=="); + assert_eq!(document["data_encoding"], ENCODING_BASE64); + } + + #[test] + fn sanitize_url_should_redact_credentials_without_scheme() { + let url = sanitize_url_for_log("user:pass@localhost:7700/indexes"); + assert_eq!(url, "http://localhost:7700/indexes"); + } + + #[test] + fn unsupported_payloads_return_error() { + let sink = sink_with_config(base_config()); + let error = sink + .prepare_document( + &topic_metadata(), + &messages_metadata(), + message(Payload::Avro(vec![1, 2, 3])), + ) + .expect_err("unsupported payload should fail"); + + assert!(matches!(error, Error::InvalidRecordValue(_))); + } +} diff --git a/core/connectors/sources/README.md b/core/connectors/sources/README.md index 68d3243387..57916b0bad 100644 --- a/core/connectors/sources/README.md +++ b/core/connectors/sources/README.md @@ -10,6 +10,7 @@ Source connectors are responsible for ingesting data from external sources into | ------ | ----------- | | **elasticsearch_source** | Polls documents from Elasticsearch indices with timestamp-based tracking | | **influxdb_source** | Polls InfluxDB with cursor-based timestamp tracking; supports V2 (Flux, annotated CSV) and V3 (SQL, JSONL) | +| **meilisearch_source** | Polls documents from Meilisearch indices with primary-key cursor tracking | | **postgres_source** | Reads rows from PostgreSQL tables with multiple strategies: delete after read, mark as processed, or timestamp tracking | | **random_source** | Generates random test messages (useful for testing and development) | diff --git a/core/connectors/sources/meilisearch_source/Cargo.toml b/core/connectors/sources/meilisearch_source/Cargo.toml new file mode 100644 index 0000000000..e4688a989f --- /dev/null +++ b/core/connectors/sources/meilisearch_source/Cargo.toml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_meilisearch_source" +version = "0.4.0" +description = "Iggy Meilisearch source connector" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "search", "meilisearch"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +meilisearch-sdk = { workspace = true } +once_cell = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/core/connectors/sources/meilisearch_source/README.md b/core/connectors/sources/meilisearch_source/README.md new file mode 100644 index 0000000000..927749ff3a --- /dev/null +++ b/core/connectors/sources/meilisearch_source/README.md @@ -0,0 +1,46 @@ +# Meilisearch Source Connector + +A source connector that polls documents from a Meilisearch index and produces +them as JSON messages into Iggy. + +## Configuration + +- `url`: Meilisearch base URL. +- `index`: Source index UID. +- `api_key`: Optional Meilisearch API key sent as `Authorization: Bearer`. +- `query`: Optional search query. Defaults to an empty query. +- `filter`: Optional Meilisearch filter expression or array. +- `batch_size`: Maximum documents fetched per poll. Defaults to `100`. +- `polling_interval`: Delay between polls as a humantime string. Defaults to `5s`. +- `include_metadata`: Wrap each hit with Meilisearch metadata. Defaults to `false`. +- `timeout`: Request timeout as a humantime string. Defaults to `30s`. +- `max_retries`: Maximum transient retry attempts during polling. Defaults to `3`. +- `retry_delay`: Initial retry delay. Defaults to `500ms`. +- `max_retry_delay`: Maximum retry delay. Defaults to `5s`. +- `max_open_retries`: Maximum transient retry attempts during `open()`. Defaults to `5`. + +## Behavior + +The connector requires the source index to define a primary key. Each poll sends +a `/search` request sorted by that primary key and stores the last emitted +primary-key value in connector state. This avoids offset pagination skips when +documents are inserted or deleted between polls. + +The primary-key field must be numeric, filterable, and sortable in Meilisearch, +because the connector adds a cursor filter and primary-key sort to each search +request. The numeric primary-key requirement matches the pinned integration test +version, `getmeili/meilisearch:v1.13`, where greater-than filters on string +attributes are not supported. Returned hits are serialized as JSON message +payloads. + +When `include_metadata` is enabled, each payload has this shape: + +```json +{ + "document": {}, + "meilisearch": { + "index": "iggy_messages", + "primary_key": "id" + } +} +``` diff --git a/core/connectors/sources/meilisearch_source/config.toml b/core/connectors/sources/meilisearch_source/config.toml new file mode 100644 index 0000000000..3a79d3118a --- /dev/null +++ b/core/connectors/sources/meilisearch_source/config.toml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "meilisearch" +enabled = true +version = 0 +name = "Meilisearch source" +path = "../../target/release/libiggy_connector_meilisearch_source" +plugin_config_format = "json" +verbose = false + +[[streams]] +stream = "test_stream" +topic = "test_topic" +schema = "json" +batch_length = 100 +linger_time = "5ms" + +[plugin_config] +url = "http://localhost:7700" +index = "iggy_messages" +query = "" +batch_size = 100 +polling_interval = "100ms" +include_metadata = true +timeout = "30s" +max_retries = 3 +retry_delay = "500ms" +max_retry_delay = "5s" +max_open_retries = 5 diff --git a/core/connectors/sources/meilisearch_source/src/lib.rs b/core/connectors/sources/meilisearch_source/src/lib.rs new file mode 100644 index 0000000000..47a58a6b7a --- /dev/null +++ b/core/connectors/sources/meilisearch_source/src/lib.rs @@ -0,0 +1,738 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use iggy_connector_sdk::{ + ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, + retry::{exponential_backoff, jitter, parse_duration}, + source_connector, +}; +use meilisearch_sdk::{ + client::Client, + errors::{ + Error as MeilisearchSdkError, ErrorCode as MeilisearchErrorCode, + ErrorType as MeilisearchErrorType, + }, +}; +use secrecy::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +use std::{future::Future, time::Duration}; +use tokio::{sync::Mutex, time::sleep}; +use tracing::{info, warn}; + +source_connector!(MeilisearchSource); + +const CONNECTOR_NAME: &str = "Meilisearch source"; +const DEFAULT_BATCH_SIZE: usize = 100; +const DEFAULT_POLLING_INTERVAL: &str = "5s"; +const DEFAULT_INCLUDE_METADATA: bool = false; +const DEFAULT_TIMEOUT: &str = "30s"; +const DEFAULT_RETRY_DELAY: &str = "500ms"; +const DEFAULT_MAX_RETRY_DELAY: &str = "5s"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_MAX_OPEN_RETRIES: u32 = 5; +const PRIMARY_KEY_SORT_DIRECTION: &str = "asc"; + +#[derive(Debug, Serialize, Deserialize)] +pub struct MeilisearchSourceConfig { + pub url: String, + pub index: String, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")] + pub api_key: Option, + pub query: Option, + pub filter: Option, + pub batch_size: Option, + pub polling_interval: Option, + pub include_metadata: Option, + pub timeout: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub max_retry_delay: Option, + pub max_open_retries: Option, +} + +#[derive(Debug)] +pub struct MeilisearchSource { + id: u32, + config: ResolvedMeilisearchSourceConfig, + client: Option, + primary_key: Option, + state: Mutex, +} + +#[derive(Debug)] +struct ResolvedMeilisearchSourceConfig { + url: String, + index: String, + api_key: Option, + query: String, + filter: Option, + batch_size: usize, + polling_interval: Duration, + include_metadata: bool, + timeout: Duration, + max_retries: u32, + retry_delay: Duration, + max_retry_delay: Duration, + max_open_retries: u32, +} + +impl From for ResolvedMeilisearchSourceConfig { + fn from(config: MeilisearchSourceConfig) -> Self { + let batch_size = config.batch_size.unwrap_or(DEFAULT_BATCH_SIZE).max(1); + let polling_interval = + parse_duration(config.polling_interval.as_deref(), DEFAULT_POLLING_INTERVAL); + let include_metadata = config.include_metadata.unwrap_or(DEFAULT_INCLUDE_METADATA); + let timeout = parse_duration(config.timeout.as_deref(), DEFAULT_TIMEOUT); + let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES).max(1); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let max_retry_delay = + parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); + let max_open_retries = config + .max_open_retries + .unwrap_or(DEFAULT_MAX_OPEN_RETRIES) + .max(1); + + Self { + url: config.url, + index: config.index, + api_key: config.api_key, + query: config.query.unwrap_or_default(), + filter: config.filter, + batch_size, + polling_interval, + include_metadata, + timeout, + max_retries, + retry_delay, + max_retry_delay, + max_open_retries, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct State { + last_primary_key: Option, + documents_produced: usize, + poll_count: usize, +} + +impl MeilisearchSource { + pub fn new(id: u32, config: MeilisearchSourceConfig, state: Option) -> Self { + let restored_state = state + .and_then(|state| state.deserialize::(CONNECTOR_NAME, id)) + .inspect(|state| { + info!( + "Restored state for {CONNECTOR_NAME} connector with ID: {id}. \ + Last primary key: {:?}, documents produced: {}, poll count: {}", + state.last_primary_key, state.documents_produced, state.poll_count + ); + }); + + Self { + id, + config: config.into(), + client: None, + primary_key: None, + state: Mutex::new(restored_state.unwrap_or(State { + last_primary_key: None, + documents_produced: 0, + poll_count: 0, + })), + } + } + + fn serialize_state(&self, state: &State) -> Option { + ConnectorState::serialize(state, CONNECTOR_NAME, self.id) + } + + fn create_client(&self) -> Result { + let host = normalize_host(&self.config.url)?; + let api_key = self + .config + .api_key + .as_ref() + .map(|key| key.expose_secret().to_string()); + + Client::new(host, api_key).map_err(|error| { + Error::Connection(format!("Failed to create Meilisearch client: {error}")) + }) + } + + async fn check_connectivity(&self, client: &Client) -> Result<(), Error> { + let health = self + .retry_sdk_open_operation("health check", || client.health()) + .await?; + if health.status == "available" { + return Ok(()); + } + + Err(Error::Connection(format!( + "Meilisearch health check returned status '{}'", + health.status + ))) + } + + async fn get_primary_key(&self, client: &Client) -> Result { + let primary_key = self + .retry_sdk_open_operation("get primary key", || async { + let mut index = client.get_index(&self.config.index).await?; + index + .get_primary_key() + .await + .map(|primary_key| primary_key.map(str::to_string)) + }) + .await?; + + primary_key.ok_or_else(|| { + Error::InvalidConfigValue(format!( + "Meilisearch index '{}' must define a primary key for stable source polling", + self.config.index + )) + }) + } + + async fn search_documents(&self, client: &Client) -> Result, Error> { + let last_primary_key = { + let state = self.state.lock().await; + state.last_primary_key.clone() + }; + let primary_key = self.primary_key.as_deref().ok_or_else(|| { + Error::Connection("Meilisearch primary key is not initialized".to_string()) + })?; + let filter_expression = self.filter_expression()?; + let cursor_filter = cursor_filter_expression(primary_key, last_primary_key.as_ref())?; + let combined_filter = combine_filter_expressions(filter_expression, cursor_filter); + let sort = format!("{primary_key}:{PRIMARY_KEY_SORT_DIRECTION}"); + let sort_refs = [sort.as_str()]; + let index = client.index(&self.config.index); + let mut query = index.search(); + query + .with_query(&self.config.query) + .with_limit(self.config.batch_size) + .with_sort(&sort_refs); + + if let Some(filter) = &combined_filter { + query.with_filter(filter); + } + + let results = self + .retry_sdk_operation("search documents", || { + let query = query.clone(); + async move { query.execute::().await } + }) + .await?; + let documents: Vec = results.hits.into_iter().map(|hit| hit.result).collect(); + let last_document_primary_key = documents + .last() + .map(|document| document_primary_key(document, primary_key)) + .transpose()?; + let messages = self.documents_to_messages(documents)?; + + let mut state = self.state.lock().await; + if let Some(primary_key) = last_document_primary_key { + state.last_primary_key = Some(primary_key); + } + state.documents_produced += messages.len(); + state.poll_count += 1; + + Ok(messages) + } + + fn filter_expression(&self) -> Result, Error> { + let Some(filter) = self.config.filter.as_ref().filter(|value| !value.is_null()) else { + return Ok(None); + }; + + match filter { + Value::String(filter) if !filter.is_empty() => Ok(Some(filter.clone())), + Value::Array(filters) => filter_array_expression(filters, false), + _ => Err(Error::InvalidConfigValue( + "Meilisearch filter must be a string or an array of strings/arrays".to_string(), + )), + } + } + + fn documents_to_messages(&self, documents: Vec) -> Result, Error> { + documents + .into_iter() + .map(|document| { + let payload = if self.config.include_metadata { + json!({ + "document": document, + "meilisearch": { + "index": self.config.index, + "primary_key": self.primary_key.as_deref(), + } + }) + } else { + document + }; + + serde_json::to_vec(&payload) + .map(|payload| ProducedMessage { + id: None, + checksum: None, + timestamp: None, + origin_timestamp: None, + headers: None, + payload, + }) + .map_err(|error| { + Error::Serialization(format!( + "Failed to serialize Meilisearch document: {error}" + )) + }) + }) + .collect() + } + + async fn retry_sdk_operation( + &self, + operation: &str, + operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + self.retry_sdk_operation_with_attempts(operation, self.config.max_retries, operation_fn) + .await + } + + async fn retry_sdk_open_operation( + &self, + operation: &str, + operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + self.retry_sdk_operation_with_attempts( + operation, + self.config.max_open_retries, + operation_fn, + ) + .await + } + + async fn retry_sdk_operation_with_attempts( + &self, + operation: &str, + max_attempts: u32, + mut operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + let max_attempts = max_attempts.max(1); + let mut attempt = 0u32; + + loop { + let result = tokio::time::timeout(self.config.timeout, operation_fn()).await; + match result { + Ok(Ok(value)) => return Ok(value), + Ok(Err(error)) => { + attempt += 1; + let should_retry = attempt < max_attempts && is_transient_sdk_error(&error); + if !should_retry { + return Err(map_sdk_error(error)); + } + let delay = jitter(exponential_backoff( + self.config.retry_delay, + attempt, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch {operation} failed (attempt {attempt}/{max_attempts}): {error}. Retrying in {delay:?}..." + ); + tokio::time::sleep(delay).await; + } + Err(_) => { + attempt += 1; + if attempt >= max_attempts { + return Err(Error::HttpRequestFailed(format!( + "Meilisearch {operation} timed out after {:?}", + self.config.timeout + ))); + } + let delay = jitter(exponential_backoff( + self.config.retry_delay, + attempt, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch {operation} timed out after {:?} (attempt {attempt}/{max_attempts}). Retrying in {delay:?}...", + self.config.timeout + ); + tokio::time::sleep(delay).await; + } + } + } + } +} + +#[async_trait] +impl Source for MeilisearchSource { + async fn open(&mut self) -> Result<(), Error> { + let sanitized_url = sanitize_url_for_logging(&self.config.url); + info!( + "Opening Meilisearch source connector with ID: {} for URL: {}, index: {}", + self.id, sanitized_url, self.config.index + ); + + let client = self.create_client()?; + self.check_connectivity(&client).await?; + let primary_key = self.get_primary_key(&client).await?; + self.primary_key = Some(primary_key); + self.client = Some(client); + + info!( + "Successfully opened Meilisearch source connector with ID: {}", + self.id + ); + Ok(()) + } + + async fn poll(&self) -> Result { + sleep(self.config.polling_interval).await; + let client = self + .client + .as_ref() + .ok_or_else(|| Error::Connection("Meilisearch client not initialized".to_string()))?; + let messages = self.search_documents(client).await?; + let persisted_state = { + let state = self.state.lock().await; + self.serialize_state(&state) + }; + + Ok(ProducedMessages { + schema: Schema::Json, + messages, + state: persisted_state, + }) + } + + async fn close(&mut self) -> Result<(), Error> { + let state = self.state.lock().await; + info!( + "Meilisearch source connector with ID: {} is closing. Stats: {} documents produced, {} polls executed", + self.id, state.documents_produced, state.poll_count + ); + drop(state); + + self.client = None; + info!( + "Meilisearch source connector with ID: {} is closed.", + self.id + ); + Ok(()) + } +} + +fn normalize_host(host: &str) -> Result { + let trimmed = host.trim(); + if trimmed.is_empty() { + return Err(Error::Connection( + "Invalid Meilisearch URL: host cannot be empty".to_string(), + )); + } + + let with_scheme = if trimmed.starts_with("http://") || trimmed.starts_with("https://") { + trimmed.to_string() + } else { + format!("http://{trimmed}") + }; + let mut host = with_scheme; + while host.ends_with('/') { + host.pop(); + } + Ok(host) +} + +fn sanitize_url_for_logging(host: &str) -> String { + let Ok(normalized) = normalize_host(host) else { + return "".to_string(); + }; + let Some((scheme, rest)) = normalized.split_once("://") else { + return normalized; + }; + let authority_end = rest.find('/').unwrap_or(rest.len()); + let (authority, path) = rest.split_at(authority_end); + if let Some((_, host)) = authority.rsplit_once('@') { + format!("{scheme}://@{host}{path}") + } else { + normalized + } +} + +fn filter_array_expression(filters: &[Value], nested: bool) -> Result, Error> { + let separator = if nested { " OR " } else { " AND " }; + let mut expressions = Vec::with_capacity(filters.len()); + + for filter in filters { + match filter { + Value::String(filter) if !filter.is_empty() => expressions.push(filter.clone()), + Value::Array(filters) => { + if let Some(filter) = filter_array_expression(filters, true)? { + expressions.push(format!("({filter})")); + } + } + _ => { + return Err(Error::InvalidConfigValue( + "Meilisearch filter arrays must contain only strings or nested arrays" + .to_string(), + )); + } + } + } + + if expressions.is_empty() { + Ok(None) + } else { + Ok(Some(expressions.join(separator))) + } +} + +fn combine_filter_expressions( + user_filter: Option, + cursor_filter: Option, +) -> Option { + match (user_filter, cursor_filter) { + (Some(user_filter), Some(cursor_filter)) => { + Some(format!("({user_filter}) AND ({cursor_filter})")) + } + (Some(user_filter), None) => Some(user_filter), + (None, Some(cursor_filter)) => Some(cursor_filter), + (None, None) => None, + } +} + +fn cursor_filter_expression( + primary_key: &str, + last_primary_key: Option<&Value>, +) -> Result, Error> { + last_primary_key + .map(|value| { + primary_key_filter_literal(value).map(|literal| format!("{primary_key} > {literal}")) + }) + .transpose() +} + +fn primary_key_filter_literal(value: &Value) -> Result { + match value { + Value::Number(_) => serde_json::to_string(value).map_err(|error| { + Error::Serialization(format!( + "Failed to serialize Meilisearch primary key: {error}" + )) + }), + _ => Err(Error::InvalidConfigValue( + "Meilisearch source primary key values must be numbers with getmeili/meilisearch:v1.13" + .to_string(), + )), + } +} + +fn document_primary_key(document: &Value, primary_key: &str) -> Result { + let value = document.get(primary_key).ok_or_else(|| { + Error::InvalidConfigValue(format!( + "Meilisearch document is missing primary key '{primary_key}'" + )) + })?; + primary_key_filter_literal(value)?; + Ok(value.clone()) +} + +fn map_sdk_error(error: MeilisearchSdkError) -> Error { + match error { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + if meilisearch_error.error_type == MeilisearchErrorType::Internal { + Error::HttpRequestFailed(meilisearch_error.to_string()) + } else if meilisearch_error.error_code == MeilisearchErrorCode::IndexNotFound { + Error::InvalidConfigValue(meilisearch_error.to_string()) + } else { + Error::PermanentHttpError(meilisearch_error.to_string()) + } + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + if communication_error.status_code == 429 || communication_error.status_code >= 500 { + Error::HttpRequestFailed(communication_error.to_string()) + } else { + Error::PermanentHttpError(communication_error.to_string()) + } + } + MeilisearchSdkError::ParseError(error) => { + Error::Serialization(format!("Invalid Meilisearch response: {error}")) + } + MeilisearchSdkError::Timeout => { + Error::HttpRequestFailed("Meilisearch request timed out".to_string()) + } + MeilisearchSdkError::HttpError(error) => Error::HttpRequestFailed(error.to_string()), + other => Error::HttpRequestFailed(other.to_string()), + } +} + +fn is_transient_sdk_error(error: &MeilisearchSdkError) -> bool { + match error { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + meilisearch_error.error_type == MeilisearchErrorType::Internal + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + communication_error.status_code == 429 || communication_error.status_code >= 500 + } + MeilisearchSdkError::HttpError(_) | MeilisearchSdkError::Timeout => true, + _ => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn config() -> MeilisearchSourceConfig { + MeilisearchSourceConfig { + url: "localhost:7700".to_string(), + index: "iggy_messages".to_string(), + api_key: None, + query: None, + filter: None, + batch_size: Some(10), + polling_interval: Some("1ms".to_string()), + include_metadata: None, + timeout: None, + max_retries: None, + retry_delay: None, + max_retry_delay: None, + max_open_retries: None, + } + } + + #[test] + fn given_host_without_scheme_should_normalize_url() { + let url = normalize_host("localhost:7700").unwrap(); + assert_eq!(url, "http://localhost:7700"); + } + + #[test] + fn given_persisted_state_should_restore_last_primary_key() { + let state = State { + last_primary_key: Some(json!(42)), + documents_produced: 42, + poll_count: 7, + }; + let connector_state = ConnectorState::serialize(&state, CONNECTOR_NAME, 1).unwrap(); + let source = MeilisearchSource::new(1, config(), Some(connector_state)); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let restored = source.state.lock().await; + assert_eq!(restored.last_primary_key, Some(json!(42))); + assert_eq!(restored.documents_produced, 42); + assert_eq!(restored.poll_count, 7); + }); + } + + #[test] + fn filter_expression_should_accept_string_filter() { + let mut config = config(); + config.filter = Some(json!("status = active")); + let source = MeilisearchSource::new(1, config, None); + + assert_eq!( + source.filter_expression().unwrap(), + Some("status = active".to_string()) + ); + } + + #[test] + fn filter_expression_should_accept_nested_filter_arrays() { + let mut config = config(); + config.filter = Some(json!([ + ["genres = horror", "genres = thriller"], + "director = 'Jordan Peele'" + ])); + let source = MeilisearchSource::new(1, config, None); + + assert_eq!( + source.filter_expression().unwrap(), + Some( + "(genres = horror OR genres = thriller) AND director = 'Jordan Peele'".to_string() + ) + ); + } + + #[test] + fn cursor_filter_should_compare_against_numeric_last_primary_key() { + assert_eq!( + cursor_filter_expression("id", Some(&json!(42))).unwrap(), + Some("id > 42".to_string()) + ); + } + + #[test] + fn cursor_filter_should_reject_string_last_primary_key() { + let error = cursor_filter_expression("id", Some(&json!("movie-1"))) + .expect_err("string primary key should be rejected"); + + assert!(matches!(error, Error::InvalidConfigValue(_))); + } + + #[test] + fn normalize_host_should_trim_before_checking_scheme() { + let url = normalize_host(" https://localhost:7700/ ").unwrap(); + assert_eq!(url, "https://localhost:7700"); + } + + #[test] + fn sanitize_url_should_redact_credentials() { + let url = sanitize_url_for_logging("https://user:pass@localhost:7700/indexes"); + assert_eq!(url, "https://@localhost:7700/indexes"); + } + + #[test] + fn sanitize_url_should_redact_credentials_without_scheme() { + let url = sanitize_url_for_logging("user:pass@localhost:7700/indexes"); + assert_eq!(url, "http://@localhost:7700/indexes"); + } + + #[test] + fn documents_should_be_wrapped_when_metadata_is_enabled() { + let mut config = config(); + config.include_metadata = Some(true); + let mut source = MeilisearchSource::new(1, config, None); + source.primary_key = Some("id".to_string()); + let messages = source + .documents_to_messages(vec![json!({"id": 1, "title": "hello"})]) + .unwrap(); + + let payload: Value = serde_json::from_slice(&messages[0].payload).unwrap(); + assert_eq!( + payload, + json!({ + "document": {"id": 1, "title": "hello"}, + "meilisearch": { + "index": "iggy_messages", + "primary_key": "id", + } + }) + ); + } +} diff --git a/core/integration/tests/connectors/fixtures/meilisearch/container.rs b/core/integration/tests/connectors/fixtures/meilisearch/container.rs new file mode 100644 index 0000000000..f678a83ddf --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/container.rs @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures; +use integration::harness::TestBinaryError; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use reqwest_retry::RetryTransientMiddleware; +use reqwest_retry::policies::ExponentialBackoff; +use serde::Deserialize; +use std::time::Duration; +use testcontainers_modules::testcontainers::core::wait::HttpWaitStrategy; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tokio::time::sleep; +use tracing::info; +use uuid::Uuid; + +const MEILISEARCH_IMAGE: &str = "getmeili/meilisearch"; +const MEILISEARCH_TAG: &str = "v1.13"; +const MEILISEARCH_PORT: u16 = 7700; +const MEILISEARCH_HEALTH_ENDPOINT: &str = "/health"; +pub const TEST_INDEX: &str = "iggy_messages"; +const POLL_ATTEMPTS: usize = 100; +const POLL_INTERVAL_MS: u64 = 50; + +#[derive(Debug, Deserialize)] +pub struct MeilisearchDocumentsResponse { + pub results: Vec, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct MeilisearchTaskResponse { + task_uid: usize, +} + +#[derive(Debug, Deserialize)] +struct MeilisearchTaskStatus { + status: String, + #[allow(dead_code)] + error: Option, +} + +pub struct MeilisearchContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub base_url: String, +} + +impl MeilisearchContainer { + pub async fn start() -> Result { + let unique_network = format!("iggy-meilisearch-{}", Uuid::new_v4()); + + let container = GenericImage::new(MEILISEARCH_IMAGE, MEILISEARCH_TAG) + .with_exposed_port(MEILISEARCH_PORT.tcp()) + .with_wait_for(WaitFor::http( + HttpWaitStrategy::new(MEILISEARCH_HEALTH_ENDPOINT) + .with_port(MEILISEARCH_PORT.tcp()) + .with_expected_status_code(200u16), + )) + .with_network(unique_network) + .with_container_name(fixtures::unique_container_name("meilisearch")) + .with_env_var("MEILI_ENV", "development") + .with_mapped_port(0, MEILISEARCH_PORT.tcp()) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + info!("Started Meilisearch container"); + + let mapped_port = container + .ports() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: format!("Failed to get ports: {e}"), + })? + .map_to_host_port_ipv4(MEILISEARCH_PORT) + .ok_or_else(|| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: "No mapping for Meilisearch port".to_string(), + })?; + + let base_url = format!("http://localhost:{mapped_port}"); + info!("Meilisearch container available at {base_url}"); + + Ok(Self { + container, + base_url, + }) + } +} + +pub fn create_http_client() -> HttpClient { + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to build HTTP client"); + reqwest_middleware::ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build() +} + +pub trait MeilisearchOps: Sync { + fn container(&self) -> &MeilisearchContainer; + fn http_client(&self) -> &HttpClient; + + fn create_source_index( + &self, + ) -> impl std::future::Future> + Send { + async move { + let url = format!("{}/indexes", self.container().base_url); + let response = self + .http_client() + .post(&url) + .json(&serde_json::json!({ + "uid": TEST_INDEX, + "primaryKey": "id", + })) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to create Meilisearch source index: {e}"), + })?; + let task = parse_task_response(response, "create Meilisearch source index").await?; + self.wait_for_task(task.task_uid).await?; + + let url = format!( + "{}/indexes/{}/settings", + self.container().base_url, + TEST_INDEX + ); + let response = self + .http_client() + .patch(&url) + .json(&serde_json::json!({ + "filterableAttributes": ["id"], + "sortableAttributes": ["id"], + })) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to configure Meilisearch source index: {e}"), + })?; + let task = parse_task_response(response, "configure Meilisearch source index").await?; + self.wait_for_task(task.task_uid).await + } + } + + fn wait_for_task( + &self, + task_uid: usize, + ) -> impl std::future::Future> + Send { + async move { + let url = format!("{}/tasks/{task_uid}", self.container().base_url); + for _ in 0..POLL_ATTEMPTS { + let response = self.http_client().get(&url).send().await.map_err(|e| { + TestBinaryError::InvalidState { + message: format!("Failed to fetch Meilisearch task {task_uid}: {e}"), + } + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!( + "Failed to fetch Meilisearch task {task_uid}: status={status}, body={body}" + ), + }); + } + + let task = response + .json::() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse Meilisearch task response: {e}"), + })?; + match task.status.as_str() { + "succeeded" => return Ok(()), + "failed" | "canceled" => { + return Err(TestBinaryError::InvalidState { + message: format!( + "Meilisearch task {task_uid} ended with status '{}': {:?}", + task.status, task.error + ), + }); + } + _ => sleep(Duration::from_millis(POLL_INTERVAL_MS)).await, + } + } + + Err(TestBinaryError::InvalidState { + message: format!( + "Meilisearch task {task_uid} did not complete after {POLL_ATTEMPTS} attempts" + ), + }) + } + } + + fn list_documents( + &self, + index_name: &str, + ) -> impl std::future::Future, TestBinaryError>> + Send + { + async move { + let url = format!( + "{}/indexes/{}/documents", + self.container().base_url, + index_name + ); + let response = self + .http_client() + .get(&url) + .query(&[("limit", "100")]) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to list Meilisearch documents: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!( + "Failed to list Meilisearch documents: status={status}, body={body}" + ), + }); + } + + response + .json::() + .await + .map(|documents| documents.results) + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse Meilisearch documents response: {e}"), + }) + } + } + + fn wait_for_documents( + &self, + expected_count: usize, + ) -> impl std::future::Future, TestBinaryError>> + Send + { + async move { + let mut last_count = 0usize; + for _ in 0..POLL_ATTEMPTS { + if let Ok(documents) = self.list_documents(TEST_INDEX).await { + last_count = documents.len(); + if documents.len() >= expected_count { + return Ok(documents); + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + Err(TestBinaryError::InvalidState { + message: format!( + "Expected {expected_count} Meilisearch documents, found {last_count} after {POLL_ATTEMPTS} attempts" + ), + }) + } + } + + fn index_documents( + &self, + index_name: &str, + documents: Vec, + ) -> impl std::future::Future> + Send { + async move { + let url = format!( + "{}/indexes/{}/documents", + self.container().base_url, + index_name + ); + let response = self + .http_client() + .post(&url) + .query(&[("primaryKey", "id")]) + .json(&documents) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to index Meilisearch documents: {e}"), + })?; + + if response.status().is_success() { + return Ok(()); + } + + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + Err(TestBinaryError::InvalidState { + message: format!( + "Failed to index Meilisearch documents: status={status}, body={body}" + ), + }) + } + } +} + +async fn parse_task_response( + response: reqwest::Response, + operation: &str, +) -> Result { + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!("Failed to {operation}: status={status}, body={body}"), + }); + } + + response + .json::() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse Meilisearch task response: {e}"), + }) +} diff --git a/core/integration/tests/connectors/fixtures/meilisearch/mod.rs b/core/integration/tests/connectors/fixtures/meilisearch/mod.rs new file mode 100644 index 0000000000..e237206ddd --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod container; +mod sink; +mod source; + +pub use container::MeilisearchOps; +pub use sink::MeilisearchSinkFixture; +pub use source::MeilisearchSourceFixture; diff --git a/core/integration/tests/connectors/fixtures/meilisearch/sink.rs b/core/integration/tests/connectors/fixtures/meilisearch/sink.rs new file mode 100644 index 0000000000..01db6da3ba --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/sink.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::container::{MeilisearchContainer, MeilisearchOps, TEST_INDEX, create_http_client}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture, seeds}; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use std::collections::HashMap; + +const ENV_SINK_URL: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_URL"; +const ENV_SINK_INDEX: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_INDEX"; +const ENV_SINK_PRIMARY_KEY: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_PRIMARY_KEY"; +const ENV_SINK_TASK_POLL_INTERVAL: &str = + "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_TASK_POLL_INTERVAL"; +const ENV_SINK_TASK_TIMEOUT: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PLUGIN_CONFIG_TASK_TIMEOUT"; +const ENV_SINK_STREAMS_0_STREAM: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_STREAMS_0_STREAM"; +const ENV_SINK_STREAMS_0_TOPICS: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_STREAMS_0_TOPICS"; +const ENV_SINK_STREAMS_0_SCHEMA: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_STREAMS_0_SCHEMA"; +const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str = + "IGGY_CONNECTORS_SINK_MEILISEARCH_STREAMS_0_CONSUMER_GROUP"; +const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_MEILISEARCH_PATH"; + +pub struct MeilisearchSinkFixture { + container: MeilisearchContainer, + http_client: HttpClient, +} + +impl MeilisearchOps for MeilisearchSinkFixture { + fn container(&self) -> &MeilisearchContainer { + &self.container + } + + fn http_client(&self) -> &HttpClient { + &self.http_client + } +} + +#[async_trait] +impl TestFixture for MeilisearchSinkFixture { + async fn setup() -> Result { + let container = MeilisearchContainer::start().await?; + let http_client = create_http_client(); + + Ok(Self { + container, + http_client, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + HashMap::from([ + (ENV_SINK_URL.to_string(), self.container.base_url.clone()), + (ENV_SINK_INDEX.to_string(), TEST_INDEX.to_string()), + (ENV_SINK_PRIMARY_KEY.to_string(), "iggy_id".to_string()), + (ENV_SINK_TASK_TIMEOUT.to_string(), "10s".to_string()), + (ENV_SINK_TASK_POLL_INTERVAL.to_string(), "25ms".to_string()), + ( + ENV_SINK_STREAMS_0_STREAM.to_string(), + seeds::names::STREAM.to_string(), + ), + ( + ENV_SINK_STREAMS_0_TOPICS.to_string(), + format!("[{}]", seeds::names::TOPIC), + ), + (ENV_SINK_STREAMS_0_SCHEMA.to_string(), "json".to_string()), + ( + ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(), + "meilisearch_sink_cg".to_string(), + ), + ( + ENV_SINK_PATH.to_string(), + "../../target/debug/libiggy_connector_meilisearch_sink".to_string(), + ), + ]) + } +} diff --git a/core/integration/tests/connectors/fixtures/meilisearch/source.rs b/core/integration/tests/connectors/fixtures/meilisearch/source.rs new file mode 100644 index 0000000000..aa3fa761f5 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/source.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::container::{MeilisearchContainer, MeilisearchOps, TEST_INDEX, create_http_client}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture, seeds}; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use std::collections::HashMap; + +const ENV_SOURCE_URL: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PLUGIN_CONFIG_URL"; +const ENV_SOURCE_INDEX: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PLUGIN_CONFIG_INDEX"; +const ENV_SOURCE_INCLUDE_METADATA: &str = + "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PLUGIN_CONFIG_INCLUDE_METADATA"; +const ENV_SOURCE_POLLING_INTERVAL: &str = + "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PLUGIN_CONFIG_POLLING_INTERVAL"; +const ENV_SOURCE_STREAMS_0_STREAM: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_STREAMS_0_STREAM"; +const ENV_SOURCE_STREAMS_0_TOPIC: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_STREAMS_0_TOPIC"; +const ENV_SOURCE_STREAMS_0_SCHEMA: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_STREAMS_0_SCHEMA"; +const ENV_SOURCE_PATH: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PATH"; + +pub struct MeilisearchSourceFixture { + container: MeilisearchContainer, + http_client: HttpClient, +} + +impl MeilisearchOps for MeilisearchSourceFixture { + fn container(&self) -> &MeilisearchContainer { + &self.container + } + + fn http_client(&self) -> &HttpClient { + &self.http_client + } +} + +#[async_trait] +impl TestFixture for MeilisearchSourceFixture { + async fn setup() -> Result { + let container = MeilisearchContainer::start().await?; + let http_client = create_http_client(); + let fixture = Self { + container, + http_client, + }; + fixture.create_source_index().await?; + + Ok(fixture) + } + + fn connectors_runtime_envs(&self) -> HashMap { + HashMap::from([ + (ENV_SOURCE_URL.to_string(), self.container.base_url.clone()), + (ENV_SOURCE_INDEX.to_string(), TEST_INDEX.to_string()), + (ENV_SOURCE_INCLUDE_METADATA.to_string(), "false".to_string()), + (ENV_SOURCE_POLLING_INTERVAL.to_string(), "25ms".to_string()), + ( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + seeds::names::STREAM.to_string(), + ), + ( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + seeds::names::TOPIC.to_string(), + ), + (ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()), + ( + ENV_SOURCE_PATH.to_string(), + "../../target/debug/libiggy_connector_meilisearch_source".to_string(), + ), + ]) + } +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 0b2f264d03..e715f00df5 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -23,6 +23,7 @@ mod elasticsearch; mod http; mod iceberg; mod influxdb; +mod meilisearch; mod mongodb; mod postgres; mod quickwit; @@ -62,6 +63,7 @@ pub use influxdb::{ InfluxDbSinkNoMetadataFixture, InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture, InfluxDbSourceRawFixture, InfluxDbSourceTextFixture, }; +pub use meilisearch::{MeilisearchOps, MeilisearchSinkFixture, MeilisearchSourceFixture}; pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture, diff --git a/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs b/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs new file mode 100644 index 0000000000..dba06a3f7c --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/meilisearch_sink.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures::{MeilisearchOps, MeilisearchSinkFixture}; +use bytes::Bytes; +use iggy::prelude::{IggyMessage, Partitioning}; +use iggy_common::{Identifier, MessageClient}; +use integration::harness::seeds; +use integration::iggy_harness; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/meilisearch/sink.toml")), + seed = seeds::connector_stream +)] +async fn meilisearch_sink_indexes_json_messages( + harness: &TestHarness, + fixture: MeilisearchSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let payloads = [ + serde_json::json!({"name": "first", "category": "alpha"}), + serde_json::json!({"name": "second", "category": "beta"}), + ]; + + let mut messages = payloads + .iter() + .enumerate() + .map(|(i, payload)| { + IggyMessage::builder() + .id((i + 1) as u128) + .payload(Bytes::from(serde_json::to_vec(payload).expect("serialize"))) + .build() + .expect("build message") + }) + .collect::>(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("send messages"); + + let documents = fixture + .wait_for_documents(payloads.len()) + .await + .expect("wait for Meilisearch documents"); + + assert_eq!(documents.len(), payloads.len()); + assert!(documents.iter().any(|document| document["name"] == "first")); + assert!( + documents + .iter() + .any(|document| document["name"] == "second") + ); + assert!( + documents + .iter() + .all(|document| document["iggy_id"].as_str().is_some()) + ); +} diff --git a/core/integration/tests/connectors/meilisearch/meilisearch_source.rs b/core/integration/tests/connectors/meilisearch/meilisearch_source.rs new file mode 100644 index 0000000000..48aec4e576 --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/meilisearch_source.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures::{MeilisearchOps, MeilisearchSourceFixture}; +use iggy_common::MessageClient; +use iggy_common::{Consumer, Identifier, PollingStrategy}; +use integration::harness::seeds; +use integration::iggy_harness; +use std::time::Duration; +use tokio::time::sleep; + +const TEST_MESSAGE_COUNT: usize = 2; +const POLL_ATTEMPTS: usize = 100; +const POLL_INTERVAL_MS: u64 = 50; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/meilisearch/source.toml")), + seed = seeds::connector_stream +)] +async fn meilisearch_source_produces_index_documents( + harness: &TestHarness, + fixture: MeilisearchSourceFixture, +) { + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let documents = vec![ + serde_json::json!({"id": 1, "name": "first", "category": "alpha"}), + serde_json::json!({"id": 2, "name": "second", "category": "beta"}), + ]; + + fixture + .index_documents("iggy_messages", documents) + .await + .expect("index Meilisearch documents"); + fixture + .wait_for_documents(TEST_MESSAGE_COUNT) + .await + .expect("wait for Meilisearch documents"); + + let mut received = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice::(&msg.payload) { + received.push(json); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert_eq!(received.len(), TEST_MESSAGE_COUNT); + assert!(received.iter().any(|document| document["name"] == "first")); + assert!(received.iter().any(|document| document["name"] == "second")); +} diff --git a/core/integration/tests/connectors/meilisearch/mod.rs b/core/integration/tests/connectors/meilisearch/mod.rs new file mode 100644 index 0000000000..b116d9957b --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod meilisearch_sink; +mod meilisearch_source; diff --git a/core/integration/tests/connectors/meilisearch/sink.toml b/core/integration/tests/connectors/meilisearch/sink.toml new file mode 100644 index 0000000000..0a846ac965 --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/sink.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sinks/meilisearch_sink" diff --git a/core/integration/tests/connectors/meilisearch/source.toml b/core/integration/tests/connectors/meilisearch/source.toml new file mode 100644 index 0000000000..0b6a2e442b --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/source.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sources/meilisearch_source" diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index fc624f897f..b5afedcbf6 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -24,6 +24,7 @@ mod http; mod http_config_provider; mod iceberg; mod influxdb; +mod meilisearch; mod mongodb; mod postgres; mod quickwit;