diff --git a/Cargo.lock b/Cargo.lock index 12382acfca..31f1ef6fbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3200,6 +3200,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "convert_case" version = "0.9.0" @@ -6937,6 +6946,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_meilisearch_source" +version = "0.4.0" +dependencies = [ + "async-trait", + "dashmap", + "iggy_common", + "iggy_connector_sdk", + "meilisearch-sdk", + "once_cell", + "secrecy", + "serde", + "serde_json", + "tokio", + "tracing", + "url", +] + [[package]] name = "iggy_connector_mongodb_sink" version = "0.4.1-edge.1" @@ -7381,6 +7408,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "iso8601" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46" +dependencies = [ + "nom 8.0.0", +] + [[package]] name = "itertools" version = "0.13.0" @@ -8165,6 +8201,49 @@ dependencies = [ "digest 0.11.3", ] +[[package]] +name = "meilisearch-index-setting-macro" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93b5b21df781c820a9cc387b808d4128cbc164dd28d67ac6ed666a00996f8f15" +dependencies = [ + "convert_case 0.8.0", + "proc-macro2", + "quote", + "structmeta", + "syn 2.0.117", +] + +[[package]] +name = "meilisearch-sdk" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e6e3646ba2a9a306296c1edf4a050508a408c1b59ca456d9ad4965ec6e91e9" +dependencies = [ + "async-trait", + "bytes", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "iso8601", + "jsonwebtoken", + "log", + "meilisearch-index-setting-macro", + "pin-project-lite", + "reqwest 0.12.28", + "serde", + "serde_json", + "thiserror 2.0.18", + "time", + "tokio", + "uuid", + "wasm-bindgen-futures", + "web-sys", + "yaup", +] + [[package]] name = "memchr" version = "2.8.2" @@ -14615,6 +14694,17 @@ dependencies = [ "time", ] +[[package]] +name = "yaup" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0144f1a16a199846cb21024da74edd930b43443463292f536b7110b4855b5c6" +dependencies = [ + "form_urlencoded", + "serde", + "thiserror 1.0.69", +] + [[package]] name = "yew" version = "0.23.0" diff --git a/Cargo.toml b/Cargo.toml index 4c6777fdea..5ae6bbf0d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/influxdb_source", + "core/connectors/sources/meilisearch_source", "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", @@ -200,6 +201,11 @@ lending-iterator = "0.1.7" libc = "0.2.186" log = "0.4.33" lz4_flex = "0.13.1" +meilisearch-sdk = { version = "0.33.0", default-features = false, features = [ + "reqwest", + "tls", + "jwt_rust_crypto", +] } message_bus = { path = "core/message_bus" } metadata = { path = "core/metadata" } mimalloc = "0.1" diff --git a/core/connectors/README.md b/core/connectors/README.md index b7d24cfbab..53c87197ef 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -96,6 +96,7 @@ Please refer to the **[Source documentation](https://github.com/apache/iggy/tree ### Available Sources - **Elasticsearch Source** - polls documents from Elasticsearch indices +- **Meilisearch Source** - polls documents from Meilisearch indices - **PostgreSQL Source** - reads rows from PostgreSQL tables with multiple consumption strategies (delete after read, mark as processed, timestamp tracking) - **Random Source** - generates random test messages (useful for testing/development) diff --git a/core/connectors/sources/README.md b/core/connectors/sources/README.md index 68d3243387..57916b0bad 100644 --- a/core/connectors/sources/README.md +++ b/core/connectors/sources/README.md @@ -10,6 +10,7 @@ Source connectors are responsible for ingesting data from external sources into | ------ | ----------- | | **elasticsearch_source** | Polls documents from Elasticsearch indices with timestamp-based tracking | | **influxdb_source** | Polls InfluxDB with cursor-based timestamp tracking; supports V2 (Flux, annotated CSV) and V3 (SQL, JSONL) | +| **meilisearch_source** | Polls documents from Meilisearch indices with primary-key cursor tracking | | **postgres_source** | Reads rows from PostgreSQL tables with multiple strategies: delete after read, mark as processed, or timestamp tracking | | **random_source** | Generates random test messages (useful for testing and development) | diff --git a/core/connectors/sources/meilisearch_source/Cargo.toml b/core/connectors/sources/meilisearch_source/Cargo.toml new file mode 100644 index 0000000000..5caba7681a --- /dev/null +++ b/core/connectors/sources/meilisearch_source/Cargo.toml @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_meilisearch_source" +version = "0.4.0" +description = "Iggy Meilisearch source connector" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "search", "meilisearch"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +# dashmap and once_cell are not imported directly in this crate's source, but +# the source_connector! macro (in iggy_connector_sdk::source) expands bare +# `use dashmap::DashMap` and `use once_cell::sync::Lazy` into this crate's +# namespace, so they must be listed here. Remove them only after the SDK macro +# is updated to use `$crate::connector_macro_support::{DashMap, Lazy}` (the +# same fix already applied to sink_connector!). +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +meilisearch-sdk = { workspace = true } +once_cell = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +url = { workspace = true } diff --git a/core/connectors/sources/meilisearch_source/README.md b/core/connectors/sources/meilisearch_source/README.md new file mode 100644 index 0000000000..49783e972f --- /dev/null +++ b/core/connectors/sources/meilisearch_source/README.md @@ -0,0 +1,88 @@ +# Meilisearch Source Connector + +A source connector that polls documents from a Meilisearch index and produces +them as JSON messages into Iggy. + +## Configuration + +- `url`: Meilisearch base URL. +- `index`: Source index UID. +- `api_key`: Optional Meilisearch API key sent as `Authorization: Bearer`. +- `query`: Optional search query. Defaults to an empty query. +- `filter`: Optional Meilisearch filter expression string or nested JSON array. +- `batch_size`: Maximum documents fetched per poll. Defaults to `100`. +- `polling_interval`: Delay between polls as a humantime string. Defaults to `5s`. +- `include_metadata`: Wrap each hit with Meilisearch metadata. Defaults to `false`. +- `timeout`: Request timeout as a humantime string. Defaults to `30s`. +- `max_retries`: Maximum transient retry attempts during polling. Defaults to `3`. +- `retry_delay`: Initial retry delay. Defaults to `500ms`. +- `max_retry_delay`: Maximum retry delay. Defaults to `5s`. +- `max_open_retries`: Maximum transient retry attempts during `open()`. Defaults to `5`. + +## Filter Syntax + +String filters work in regular TOML plugin config: + +```toml +filter = "category = alpha" +``` + +Nested array filters require JSON plugin config because the connector receives +the field as `serde_json::Value`: + +```json +{ + "filter": [["category = alpha", "category = beta"], "enabled = true"] +} +``` + +Top-level filter array entries are combined with `AND`; nested arrays are +combined with `OR`. + +## Behavior + +The connector requires the source index to define a primary key. Each poll sends +a `/search` request sorted by that primary key and stores the last emitted +primary-key value in connector state. This avoids offset pagination skips when +documents are inserted or deleted between polls. + +The primary-key field must be an integer, filterable, and sortable in +Meilisearch, because the connector adds a cursor filter and primary-key sort to +each search request. The connector validates the sortable setting during +`open()`, but Meilisearch settings do not expose document value types, so the +integer-value requirement is validated while polling. Documents with missing or +non-integer primary-key values are skipped with a warning, and the cursor +advances to the last valid integer primary key in the batch. String primary keys +are not supported until the Meilisearch version used for validation supports +greater-than filters on string attributes. Returned hits are serialized as JSON +message payloads. + +Configure the primary-key field as both filterable and sortable before starting +the connector. For example, when the primary key is `id`: + +```bash +curl -X PATCH "$MEILISEARCH_URL/indexes/iggy_messages/settings" \ + -H 'Content-Type: application/json' \ + --data '{ + "filterableAttributes": ["id"], + "sortableAttributes": ["id"] + }' +``` + +Meilisearch state is advanced in memory when a batch is returned from `poll()`. +If the runtime fails to send that batch to Iggy, the source trait does not +provide an acknowledgment callback that would let this connector roll the cursor +back before the next poll. This is a known limitation of the current connector +source API. + +When `include_metadata` is enabled, each payload has this shape: + +```json +{ + "document": {}, + "meilisearch": { + "index": "iggy_messages", + "primary_key": "id" + } +} +``` diff --git a/core/connectors/sources/meilisearch_source/config.toml b/core/connectors/sources/meilisearch_source/config.toml new file mode 100644 index 0000000000..d0458b7103 --- /dev/null +++ b/core/connectors/sources/meilisearch_source/config.toml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "meilisearch" +enabled = true +version = 0 +name = "Meilisearch source" +path = "../../target/release/libiggy_connector_meilisearch_source" +plugin_config_format = "json" +verbose = false + +[[streams]] +stream = "test_stream" +topic = "test_topic" +schema = "json" +batch_length = 100 +linger_time = "5ms" + +[plugin_config] +url = "http://localhost:7700" +index = "iggy_messages" +query = "" +# filter = "category = alpha" +batch_size = 100 +polling_interval = "5s" +include_metadata = true +timeout = "30s" +max_retries = 3 +retry_delay = "500ms" +max_retry_delay = "5s" +max_open_retries = 5 diff --git a/core/connectors/sources/meilisearch_source/src/lib.rs b/core/connectors/sources/meilisearch_source/src/lib.rs new file mode 100644 index 0000000000..660a3d6836 --- /dev/null +++ b/core/connectors/sources/meilisearch_source/src/lib.rs @@ -0,0 +1,990 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use iggy_connector_sdk::{ + ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, + retry::{exponential_backoff, jitter, parse_duration}, + source_connector, +}; +use meilisearch_sdk::{ + client::Client, + errors::{ + Error as MeilisearchSdkError, ErrorCode as MeilisearchErrorCode, + ErrorType as MeilisearchErrorType, + }, +}; +use secrecy::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +use std::{future::Future, time::Duration}; +use tokio::{sync::Mutex, time::sleep}; +use tracing::{info, warn}; +use url::Url; + +source_connector!(MeilisearchSource); + +const CONNECTOR_NAME: &str = "Meilisearch source"; +const DEFAULT_BATCH_SIZE: usize = 100; +const DEFAULT_POLLING_INTERVAL: &str = "5s"; +const DEFAULT_INCLUDE_METADATA: bool = false; +const DEFAULT_TIMEOUT: &str = "30s"; +const DEFAULT_RETRY_DELAY: &str = "500ms"; +const DEFAULT_MAX_RETRY_DELAY: &str = "5s"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_MAX_OPEN_RETRIES: u32 = 5; +const PRIMARY_KEY_SORT_DIRECTION: &str = "asc"; + +#[derive(Debug, Serialize, Deserialize)] +pub struct MeilisearchSourceConfig { + pub url: String, + pub index: String, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")] + pub api_key: Option, + pub query: Option, + pub filter: Option, + pub batch_size: Option, + pub polling_interval: Option, + pub include_metadata: Option, + pub timeout: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub max_retry_delay: Option, + pub max_open_retries: Option, +} + +#[derive(Debug)] +pub struct MeilisearchSource { + id: u32, + config: ResolvedMeilisearchSourceConfig, + client: Option, + primary_key: Option, + primary_key_sort: Option, + filter_expression: Option, + state: Mutex, +} + +#[derive(Debug)] +struct ResolvedMeilisearchSourceConfig { + url: String, + index: String, + api_key: Option, + query: String, + filter: Option, + batch_size: usize, + polling_interval: Duration, + include_metadata: bool, + timeout: Duration, + max_retries: u32, + retry_delay: Duration, + max_retry_delay: Duration, + max_open_retries: u32, +} + +impl From for ResolvedMeilisearchSourceConfig { + fn from(config: MeilisearchSourceConfig) -> Self { + let batch_size = config.batch_size.unwrap_or(DEFAULT_BATCH_SIZE).max(1); + let polling_interval = + parse_duration(config.polling_interval.as_deref(), DEFAULT_POLLING_INTERVAL); + let include_metadata = config.include_metadata.unwrap_or(DEFAULT_INCLUDE_METADATA); + let timeout = parse_duration(config.timeout.as_deref(), DEFAULT_TIMEOUT); + let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let max_retry_delay = + parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); + let max_open_retries = config.max_open_retries.unwrap_or(DEFAULT_MAX_OPEN_RETRIES); + + Self { + url: config.url, + index: config.index, + api_key: config.api_key, + query: config.query.unwrap_or_default(), + filter: config.filter, + batch_size, + polling_interval, + include_metadata, + timeout, + max_retries, + retry_delay, + max_retry_delay, + max_open_retries, + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +struct State { + last_primary_key: Option, + documents_produced: usize, + poll_count: usize, +} + +impl MeilisearchSource { + pub fn new(id: u32, config: MeilisearchSourceConfig, state: Option) -> Self { + let restored_state = state + .and_then(|state| state.deserialize::(CONNECTOR_NAME, id)) + .inspect(|state| { + info!( + "Restored state for {CONNECTOR_NAME} connector with ID: {id}. \ + Last primary key: {:?}, documents produced: {}, poll count: {}", + state.last_primary_key, state.documents_produced, state.poll_count + ); + }); + + Self { + id, + config: config.into(), + client: None, + primary_key: None, + primary_key_sort: None, + filter_expression: None, + state: Mutex::new(restored_state.unwrap_or(State { + last_primary_key: None, + documents_produced: 0, + poll_count: 0, + })), + } + } + + fn serialize_state(&self, state: &State) -> Option { + ConnectorState::serialize(state, CONNECTOR_NAME, self.id) + } + + fn create_client(&self) -> Result { + let host = normalize_host(&self.config.url)?; + let api_key = self + .config + .api_key + .as_ref() + .map(|key| key.expose_secret().to_string()); + + Client::new(host, api_key).map_err(|error| { + Error::Connection(format!("Failed to create Meilisearch client: {error}")) + }) + } + + async fn check_connectivity(&self, client: &Client) -> Result<(), Error> { + let mut retries = 0u32; + + loop { + let health = self + .retry_sdk_open_operation("health check", || client.health()) + .await?; + if health.status == "available" { + return Ok(()); + } + + if retries >= self.config.max_open_retries { + return Err(Error::Connection(format!( + "Meilisearch health check returned status '{}'", + health.status + ))); + } + + retries += 1; + let delay = jitter(exponential_backoff( + self.config.retry_delay, + retries, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch health check returned status '{}' (retry {retries}/{}). Retrying in {delay:?}...", + health.status, self.config.max_open_retries + ); + sleep(delay).await; + } + } + + async fn get_primary_key(&self, client: &Client) -> Result { + let primary_key = self + .retry_sdk_open_operation("get primary key", || async { + let mut index = client.get_index(&self.config.index).await?; + index + .get_primary_key() + .await + .map(|primary_key| primary_key.map(str::to_string)) + }) + .await?; + + primary_key.ok_or_else(|| { + Error::InvalidConfigValue(format!( + "Meilisearch index '{}' must define a primary key for stable source polling", + self.config.index + )) + }) + } + + async fn check_sortable_primary_key( + &self, + client: &Client, + primary_key: &str, + ) -> Result<(), Error> { + let settings = self + .retry_sdk_open_operation("get index settings", || { + let index = client.index(&self.config.index); + async move { index.get_settings().await } + }) + .await?; + + let sortable_attributes = settings.sortable_attributes.unwrap_or_default(); + if primary_key_is_sortable(&sortable_attributes, primary_key) { + return Ok(()); + } + + Err(Error::InvalidConfigValue(format!( + "Meilisearch index '{}' must configure primary key '{}' in sortableAttributes for stable source polling", + self.config.index, primary_key + ))) + } + + async fn search_documents(&self, client: &Client) -> Result, Error> { + let last_primary_key = { + let state = self.state.lock().await; + state.last_primary_key.clone() + }; + let primary_key = self.primary_key.as_deref().ok_or_else(|| { + Error::Connection("Meilisearch primary key is not initialized".to_string()) + })?; + let sort = self.primary_key_sort.as_deref().ok_or_else(|| { + Error::Connection("Meilisearch primary key sort is not initialized".to_string()) + })?; + let cursor_filter = cursor_filter_expression(primary_key, last_primary_key.as_ref())?; + let combined_filter = + combine_filter_expressions(self.filter_expression.as_deref(), cursor_filter); + let sort_refs = [sort]; + let index = client.index(&self.config.index); + let mut query = index.search(); + query + .with_query(&self.config.query) + .with_limit(self.config.batch_size) + .with_sort(&sort_refs); + + if let Some(filter) = &combined_filter { + query.with_filter(filter); + } + + let results = self + .retry_sdk_operation("search documents", || { + let query = query.clone(); + async move { query.execute::().await } + }) + .await?; + let documents: Vec = results.hits.into_iter().map(|hit| hit.result).collect(); + let (documents, last_document_primary_key) = + valid_documents_and_last_primary_key(documents, primary_key, self.id); + let messages = self.documents_to_messages(documents)?; + + let mut state = self.state.lock().await; + if let Some(primary_key) = last_document_primary_key { + state.last_primary_key = Some(primary_key); + } + state.documents_produced += messages.len(); + state.poll_count += 1; + + Ok(messages) + } + + fn documents_to_messages(&self, documents: Vec) -> Result, Error> { + documents + .into_iter() + .map(|document| { + let payload = if self.config.include_metadata { + json!({ + "document": document, + "meilisearch": { + "index": self.config.index, + "primary_key": self.primary_key.as_deref(), + } + }) + } else { + document + }; + + serde_json::to_vec(&payload) + .map(|payload| ProducedMessage { + id: None, + checksum: None, + timestamp: None, + origin_timestamp: None, + headers: None, + payload, + }) + .map_err(|error| { + Error::Serialization(format!( + "Failed to serialize Meilisearch document: {error}" + )) + }) + }) + .collect() + } + + async fn retry_sdk_operation( + &self, + operation: &str, + operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + self.retry_sdk_operation_with_attempts(operation, self.config.max_retries, operation_fn) + .await + } + + async fn retry_sdk_open_operation( + &self, + operation: &str, + operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + self.retry_sdk_operation_with_attempts( + operation, + self.config.max_open_retries, + operation_fn, + ) + .await + } + + async fn retry_sdk_operation_with_attempts( + &self, + operation: &str, + max_retries: u32, + mut operation_fn: Op, + ) -> Result + where + Op: FnMut() -> Fut, + Fut: Future>, + { + let mut retries = 0u32; + + loop { + let result = tokio::time::timeout(self.config.timeout, operation_fn()).await; + match result { + Ok(Ok(value)) => return Ok(value), + Ok(Err(error)) => { + if retries >= max_retries || !is_transient_sdk_error(&error) { + return Err(map_sdk_error(error)); + } + retries += 1; + let delay = jitter(exponential_backoff( + self.config.retry_delay, + retries, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch {operation} failed (retry {retries}/{max_retries}): {error}. Retrying in {delay:?}..." + ); + sleep(delay).await; + } + Err(_) => { + if retries >= max_retries { + return Err(Error::HttpRequestFailed(format!( + "Meilisearch {operation} timed out after {:?}", + self.config.timeout + ))); + } + retries += 1; + let delay = jitter(exponential_backoff( + self.config.retry_delay, + retries, + self.config.max_retry_delay, + )); + warn!( + "Meilisearch {operation} timed out after {:?} (retry {retries}/{max_retries}). Retrying in {delay:?}...", + self.config.timeout + ); + sleep(delay).await; + } + } + } + } +} + +#[async_trait] +impl Source for MeilisearchSource { + async fn open(&mut self) -> Result<(), Error> { + let sanitized_url = sanitize_url_for_logging(&self.config.url); + info!( + "Opening Meilisearch source connector with ID: {} for URL: {}, index: {}", + self.id, sanitized_url, self.config.index + ); + + let filter_expression = filter_expression(self.config.filter.as_ref())?; + let client = self.create_client()?; + self.check_connectivity(&client).await?; + let primary_key = self.get_primary_key(&client).await?; + self.check_sortable_primary_key(&client, &primary_key) + .await?; + info!( + "Meilisearch source connector with ID: {} requires integer primary key values for cursor pagination. Index: {}, primary key: {}", + self.id, self.config.index, primary_key + ); + self.primary_key_sort = Some(format!("{primary_key}:{PRIMARY_KEY_SORT_DIRECTION}")); + self.filter_expression = filter_expression; + self.primary_key = Some(primary_key); + self.client = Some(client); + + info!( + "Successfully opened Meilisearch source connector with ID: {}", + self.id + ); + Ok(()) + } + + async fn poll(&self) -> Result { + sleep(self.config.polling_interval).await; + let client = self + .client + .as_ref() + .ok_or_else(|| Error::Connection("Meilisearch client not initialized".to_string()))?; + let messages = self.search_documents(client).await?; + let persisted_state = { + let state = self.state.lock().await; + self.serialize_state(&state) + }; + + Ok(ProducedMessages { + schema: Schema::Json, + messages, + state: persisted_state, + }) + } + + async fn close(&mut self) -> Result<(), Error> { + { + let state = self.state.lock().await; + info!( + "Meilisearch source connector with ID: {} is closing. Stats: {} documents produced, {} polls executed", + self.id, state.documents_produced, state.poll_count + ); + } + + self.client = None; + info!( + "Meilisearch source connector with ID: {} is closed.", + self.id + ); + Ok(()) + } +} + +fn normalize_host(host: &str) -> Result { + let trimmed = host.trim(); + if trimmed.is_empty() { + return Err(Error::Connection( + "Invalid Meilisearch URL: host cannot be empty".to_string(), + )); + } + + let with_scheme = if trimmed.starts_with("http://") || trimmed.starts_with("https://") { + trimmed.to_string() + } else { + format!("http://{trimmed}") + }; + + let mut url = Url::parse(&with_scheme) + .map_err(|error| Error::Connection(format!("Invalid Meilisearch URL: {error}")))?; + if url.host_str().is_none() { + return Err(Error::Connection( + "Invalid Meilisearch URL: host cannot be empty".to_string(), + )); + } + if !matches!(url.scheme(), "http" | "https") { + return Err(Error::Connection(format!( + "Invalid Meilisearch URL scheme '{}': expected http or https", + url.scheme() + ))); + } + if !matches!(url.path(), "" | "/") || url.query().is_some() || url.fragment().is_some() { + return Err(Error::Connection( + "Invalid Meilisearch URL: path, query, and fragment components are not supported" + .to_string(), + )); + } + + url.set_path(""); + url.set_query(None); + url.set_fragment(None); + let mut normalized = url.to_string(); + while normalized.ends_with('/') { + normalized.pop(); + } + Ok(normalized) +} + +fn sanitize_url_for_logging(host: &str) -> String { + let Ok(normalized) = normalize_host(host) else { + return "".to_string(); + }; + let Some((scheme, rest)) = normalized.split_once("://") else { + return normalized; + }; + let authority_end = rest.find('/').unwrap_or(rest.len()); + let (authority, path) = rest.split_at(authority_end); + if let Some((_, host)) = authority.rsplit_once('@') { + format!("{scheme}://@{host}{path}") + } else { + normalized + } +} + +fn filter_array_expression(filters: &[Value], nested: bool) -> Result, Error> { + let separator = if nested { " OR " } else { " AND " }; + let mut expressions = Vec::with_capacity(filters.len()); + + for filter in filters { + match filter { + Value::String(filter) if !filter.is_empty() => expressions.push(filter.clone()), + Value::Array(filters) => { + if let Some(filter) = filter_array_expression(filters, true)? { + expressions.push(format!("({filter})")); + } + } + _ => { + return Err(Error::InvalidConfigValue( + "Meilisearch filter arrays must contain only strings or nested arrays" + .to_string(), + )); + } + } + } + + if expressions.is_empty() { + Ok(None) + } else { + Ok(Some(expressions.join(separator))) + } +} + +fn filter_expression(filter: Option<&Value>) -> Result, Error> { + let Some(filter) = filter.filter(|value| !value.is_null()) else { + return Ok(None); + }; + + match filter { + Value::String(filter) if !filter.is_empty() => Ok(Some(filter.clone())), + Value::Array(filters) => filter_array_expression(filters, false), + _ => Err(Error::InvalidConfigValue( + "Meilisearch filter must be a string or an array of strings/arrays".to_string(), + )), + } +} + +fn combine_filter_expressions( + user_filter: Option<&str>, + cursor_filter: Option, +) -> Option { + match (user_filter, cursor_filter) { + (Some(user_filter), Some(cursor_filter)) => { + Some(format!("({user_filter}) AND ({cursor_filter})")) + } + (Some(user_filter), None) => Some(user_filter.to_string()), + (None, Some(cursor_filter)) => Some(cursor_filter), + (None, None) => None, + } +} + +fn cursor_filter_expression( + primary_key: &str, + last_primary_key: Option<&Value>, +) -> Result, Error> { + last_primary_key + .map(|value| { + primary_key_filter_literal(value).map(|literal| format!("{primary_key} > {literal}")) + }) + .transpose() +} + +fn primary_key_filter_literal(value: &Value) -> Result { + match value { + Value::Number(number) if number.is_i64() || number.is_u64() => serde_json::to_string(value) + .map_err(|error| { + Error::Serialization(format!( + "Failed to serialize Meilisearch primary key: {error}" + )) + }), + Value::Number(_) => Err(Error::InvalidConfigValue( + "Meilisearch source primary key values must be integers for cursor pagination" + .to_string(), + )), + _ => Err(Error::InvalidConfigValue( + "Meilisearch source primary key values must be numbers for cursor pagination" + .to_string(), + )), + } +} + +fn document_primary_key(document: &Value, primary_key: &str) -> Result { + let value = document.get(primary_key).ok_or_else(|| { + Error::InvalidConfigValue(format!( + "Meilisearch document is missing primary key '{primary_key}'" + )) + })?; + primary_key_filter_literal(value)?; + Ok(value.clone()) +} + +fn primary_key_is_sortable(sortable_attributes: &[String], primary_key: &str) -> bool { + sortable_attributes + .iter() + .any(|attribute| attribute == primary_key) +} + +fn valid_documents_and_last_primary_key( + documents: Vec, + primary_key: &str, + connector_id: u32, +) -> (Vec, Option) { + let mut valid_documents = Vec::with_capacity(documents.len()); + let mut last_primary_key = None; + + for document in documents { + match document_primary_key(&document, primary_key) { + Ok(primary_key_value) => { + last_primary_key = Some(primary_key_value); + valid_documents.push(document); + } + Err(error) => warn!( + "Skipping Meilisearch document for source connector with ID: {connector_id}. Invalid primary key '{primary_key}': {error}" + ), + } + } + + (valid_documents, last_primary_key) +} + +fn map_sdk_error(error: MeilisearchSdkError) -> Error { + match error { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + if meilisearch_error.error_type == MeilisearchErrorType::Internal { + Error::HttpRequestFailed(meilisearch_error.to_string()) + } else if meilisearch_error.error_code == MeilisearchErrorCode::IndexNotFound { + Error::InvalidConfigValue(meilisearch_error.to_string()) + } else { + Error::PermanentHttpError(meilisearch_error.to_string()) + } + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + if communication_error.status_code == 0 + || communication_error.status_code == 429 + || communication_error.status_code >= 500 + { + Error::HttpRequestFailed(communication_error.to_string()) + } else { + Error::PermanentHttpError(communication_error.to_string()) + } + } + MeilisearchSdkError::ParseError(error) => { + Error::Serialization(format!("Invalid Meilisearch response: {error}")) + } + MeilisearchSdkError::Timeout => { + Error::HttpRequestFailed("Meilisearch request timed out".to_string()) + } + MeilisearchSdkError::HttpError(error) => Error::HttpRequestFailed(error.to_string()), + other => Error::HttpRequestFailed(other.to_string()), + } +} + +fn is_transient_sdk_error(error: &MeilisearchSdkError) -> bool { + match error { + MeilisearchSdkError::Meilisearch(meilisearch_error) => { + meilisearch_error.error_type == MeilisearchErrorType::Internal + } + MeilisearchSdkError::MeilisearchCommunication(communication_error) => { + communication_error.status_code == 0 + || communication_error.status_code == 429 + || communication_error.status_code >= 500 + } + MeilisearchSdkError::HttpError(_) | MeilisearchSdkError::Timeout => true, + _ => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn config() -> MeilisearchSourceConfig { + MeilisearchSourceConfig { + url: "localhost:7700".to_string(), + index: "iggy_messages".to_string(), + api_key: None, + query: None, + filter: None, + batch_size: Some(10), + polling_interval: Some("1ms".to_string()), + include_metadata: None, + timeout: None, + max_retries: None, + retry_delay: None, + max_retry_delay: None, + max_open_retries: None, + } + } + + #[test] + fn given_host_without_scheme_should_normalize_url() { + let url = normalize_host("localhost:7700").unwrap(); + assert_eq!(url, "http://localhost:7700"); + } + + #[test] + fn given_persisted_state_should_restore_last_primary_key() { + let state = State { + last_primary_key: Some(json!(42)), + documents_produced: 42, + poll_count: 7, + }; + let connector_state = ConnectorState::serialize(&state, CONNECTOR_NAME, 1).unwrap(); + let source = MeilisearchSource::new(1, config(), Some(connector_state)); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let restored = source.state.lock().await; + assert_eq!(restored.last_primary_key, Some(json!(42))); + assert_eq!(restored.documents_produced, 42); + assert_eq!(restored.poll_count, 7); + }); + } + + #[test] + fn given_no_state_should_start_fresh() { + let source = MeilisearchSource::new(1, config(), None); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let state = source.state.lock().await; + assert_eq!(state.last_primary_key, None); + assert_eq!(state.documents_produced, 0); + assert_eq!(state.poll_count, 0); + }); + } + + #[test] + fn given_invalid_state_should_start_fresh() { + let invalid_state = ConnectorState(b"not valid msgpack".to_vec()); + let source = MeilisearchSource::new(1, config(), Some(invalid_state)); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let state = source.state.lock().await; + assert_eq!(state.last_primary_key, None); + assert_eq!(state.documents_produced, 0); + assert_eq!(state.poll_count, 0); + }); + } + + #[test] + fn state_should_be_serializable_and_deserializable() { + let original = State { + last_primary_key: Some(json!(9007199254740993_u64)), + documents_produced: 123, + poll_count: 11, + }; + + let connector_state = ConnectorState::serialize(&original, CONNECTOR_NAME, 1) + .expect("Failed to serialize state"); + let restored = connector_state + .deserialize::(CONNECTOR_NAME, 1) + .expect("Failed to deserialize state"); + + assert_eq!(original, restored); + } + + #[test] + fn filter_expression_should_accept_string_filter() { + let mut config = config(); + config.filter = Some(json!("status = active")); + + assert_eq!( + filter_expression(config.filter.as_ref()).unwrap(), + Some("status = active".to_string()) + ); + } + + #[test] + fn filter_expression_should_accept_nested_filter_arrays() { + let mut config = config(); + config.filter = Some(json!([ + ["genres = horror", "genres = thriller"], + "director = 'Jordan Peele'" + ])); + + assert_eq!( + filter_expression(config.filter.as_ref()).unwrap(), + Some( + "(genres = horror OR genres = thriller) AND director = 'Jordan Peele'".to_string() + ) + ); + } + + #[test] + fn filter_expression_should_reject_object_filter() { + let error = filter_expression(Some(&json!({"status": "active"}))) + .expect_err("object filters should be rejected"); + + assert!(matches!(error, Error::InvalidConfigValue(_))); + } + + #[test] + fn cursor_filter_should_compare_against_numeric_last_primary_key() { + assert_eq!( + cursor_filter_expression("id", Some(&json!(42))).unwrap(), + Some("id > 42".to_string()) + ); + } + + #[test] + fn cursor_filter_should_preserve_large_integer_last_primary_key() { + assert_eq!( + cursor_filter_expression("id", Some(&json!(9007199254740993_u64))).unwrap(), + Some("id > 9007199254740993".to_string()) + ); + } + + #[test] + fn cursor_filter_should_reject_float_last_primary_key() { + let error = cursor_filter_expression("id", Some(&json!(1.5))) + .expect_err("float primary key should be rejected"); + + assert!(matches!(error, Error::InvalidConfigValue(_))); + } + + #[test] + fn cursor_filter_should_reject_string_last_primary_key() { + let error = cursor_filter_expression("id", Some(&json!("movie-1"))) + .expect_err("string primary key should be rejected"); + + assert!(matches!(error, Error::InvalidConfigValue(_))); + } + + #[test] + fn normalize_host_should_trim_before_checking_scheme() { + let url = normalize_host(" https://localhost:7700/ ").unwrap(); + assert_eq!(url, "https://localhost:7700"); + } + + #[test] + fn normalize_host_should_reject_path_components() { + let error = normalize_host("https://localhost:7700/v1") + .expect_err("path components should be rejected"); + + assert!(matches!(error, Error::Connection(_))); + } + + #[test] + fn normalize_host_should_reject_query_components() { + let error = normalize_host("https://localhost:7700?tenant=a") + .expect_err("query should be rejected"); + + assert!(matches!(error, Error::Connection(_))); + } + + #[test] + fn sanitize_url_should_redact_credentials() { + let url = sanitize_url_for_logging("https://user:pass@localhost:7700"); + assert_eq!(url, "https://@localhost:7700"); + } + + #[test] + fn sanitize_url_should_redact_credentials_without_scheme() { + let url = sanitize_url_for_logging("user:pass@localhost:7700"); + assert_eq!(url, "http://@localhost:7700"); + } + + #[test] + fn primary_key_is_sortable_should_match_exact_attribute() { + let sortable_attributes = vec!["created_at".to_string(), "id".to_string()]; + + assert!(primary_key_is_sortable(&sortable_attributes, "id")); + assert!(!primary_key_is_sortable(&sortable_attributes, "user_id")); + } + + #[test] + fn valid_documents_should_skip_missing_middle_primary_key() { + let documents = vec![ + json!({"id": 1, "name": "first"}), + json!({"name": "missing"}), + json!({"id": 3, "name": "third"}), + ]; + + let (valid_documents, last_primary_key) = + valid_documents_and_last_primary_key(documents, "id", 1); + + assert_eq!(valid_documents.len(), 2); + assert_eq!(valid_documents[0]["name"], "first"); + assert_eq!(valid_documents[1]["name"], "third"); + assert_eq!(last_primary_key, Some(json!(3))); + } + + #[test] + fn valid_documents_should_skip_non_integer_middle_primary_key() { + let documents = vec![ + json!({"id": 1, "name": "first"}), + json!({"id": 1.5, "name": "float"}), + json!({"id": 3, "name": "third"}), + ]; + + let (valid_documents, last_primary_key) = + valid_documents_and_last_primary_key(documents, "id", 1); + + assert_eq!(valid_documents.len(), 2); + assert_eq!(valid_documents[0]["name"], "first"); + assert_eq!(valid_documents[1]["name"], "third"); + assert_eq!(last_primary_key, Some(json!(3))); + } + + #[test] + fn valid_documents_should_return_no_cursor_for_all_invalid_primary_keys() { + let documents = vec![ + json!({"id": 1.5, "name": "float"}), + json!({"name": "missing"}), + ]; + + let (valid_documents, last_primary_key) = + valid_documents_and_last_primary_key(documents, "id", 1); + + assert!(valid_documents.is_empty()); + assert_eq!(last_primary_key, None); + } + + #[test] + fn documents_should_be_wrapped_when_metadata_is_enabled() { + let mut config = config(); + config.include_metadata = Some(true); + let mut source = MeilisearchSource::new(1, config, None); + source.primary_key = Some("id".to_string()); + let messages = source + .documents_to_messages(vec![json!({"id": 1, "title": "hello"})]) + .unwrap(); + + let payload: Value = serde_json::from_slice(&messages[0].payload).unwrap(); + assert_eq!( + payload, + json!({ + "document": {"id": 1, "title": "hello"}, + "meilisearch": { + "index": "iggy_messages", + "primary_key": "id", + } + }) + ); + } +} diff --git a/core/integration/tests/connectors/fixtures/meilisearch/container.rs b/core/integration/tests/connectors/fixtures/meilisearch/container.rs new file mode 100644 index 0000000000..88514d988f --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/container.rs @@ -0,0 +1,329 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures; +use integration::harness::TestBinaryError; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use reqwest_retry::RetryTransientMiddleware; +use reqwest_retry::policies::ExponentialBackoff; +use serde::Deserialize; +use std::time::Duration; +use testcontainers_modules::testcontainers::core::wait::HttpWaitStrategy; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tokio::time::sleep; +use tracing::info; + +const MEILISEARCH_IMAGE: &str = "getmeili/meilisearch"; +const MEILISEARCH_TAG: &str = "v1.13"; +const MEILISEARCH_PORT: u16 = 7700; +const MEILISEARCH_HEALTH_ENDPOINT: &str = "/health"; +pub const TEST_INDEX: &str = "iggy_messages"; +const DOCUMENT_LIST_LIMIT: usize = 10000; +const POLL_ATTEMPTS: usize = 100; +const POLL_INTERVAL_MS: u64 = 50; + +#[derive(Debug, Deserialize)] +pub struct MeilisearchDocumentsResponse { + pub results: Vec, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct MeilisearchTaskResponse { + task_uid: usize, +} + +#[derive(Debug, Deserialize)] +struct MeilisearchTaskStatus { + status: String, + #[allow(dead_code)] + error: Option, +} + +pub struct MeilisearchContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub base_url: String, +} + +impl MeilisearchContainer { + pub async fn start() -> Result { + let container = GenericImage::new(MEILISEARCH_IMAGE, MEILISEARCH_TAG) + .with_exposed_port(MEILISEARCH_PORT.tcp()) + .with_wait_for(WaitFor::http( + HttpWaitStrategy::new(MEILISEARCH_HEALTH_ENDPOINT) + .with_port(MEILISEARCH_PORT.tcp()) + .with_expected_status_code(200u16), + )) + .with_container_name(fixtures::unique_container_name("meilisearch")) + .with_env_var("MEILI_ENV", "development") + .with_mapped_port(0, MEILISEARCH_PORT.tcp()) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + info!("Started Meilisearch container"); + + let mapped_port = container + .ports() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: format!("Failed to get ports: {e}"), + })? + .map_to_host_port_ipv4(MEILISEARCH_PORT) + .ok_or_else(|| TestBinaryError::FixtureSetup { + fixture_type: "MeilisearchContainer".to_string(), + message: "No mapping for Meilisearch port".to_string(), + })?; + + let base_url = format!("http://localhost:{mapped_port}"); + info!("Meilisearch container available at {base_url}"); + + Ok(Self { + container, + base_url, + }) + } +} + +pub fn create_http_client() -> HttpClient { + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to build HTTP client"); + reqwest_middleware::ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build() +} + +pub trait MeilisearchOps: Sync { + fn container(&self) -> &MeilisearchContainer; + fn http_client(&self) -> &HttpClient; + + fn create_source_index( + &self, + ) -> impl std::future::Future> + Send { + async move { + let url = format!("{}/indexes", self.container().base_url); + let response = self + .http_client() + .post(&url) + .json(&serde_json::json!({ + "uid": TEST_INDEX, + "primaryKey": "id", + })) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to create Meilisearch source index: {e}"), + })?; + let task = parse_task_response(response, "create Meilisearch source index").await?; + self.wait_for_task(task.task_uid).await?; + + let url = format!( + "{}/indexes/{}/settings", + self.container().base_url, + TEST_INDEX + ); + let response = self + .http_client() + .patch(&url) + .json(&serde_json::json!({ + "filterableAttributes": ["id"], + "sortableAttributes": ["id"], + })) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to configure Meilisearch source index: {e}"), + })?; + let task = parse_task_response(response, "configure Meilisearch source index").await?; + self.wait_for_task(task.task_uid).await + } + } + + fn wait_for_task( + &self, + task_uid: usize, + ) -> impl std::future::Future> + Send { + async move { + let url = format!("{}/tasks/{task_uid}", self.container().base_url); + for _ in 0..POLL_ATTEMPTS { + let response = self.http_client().get(&url).send().await.map_err(|e| { + TestBinaryError::InvalidState { + message: format!("Failed to fetch Meilisearch task {task_uid}: {e}"), + } + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!( + "Failed to fetch Meilisearch task {task_uid}: status={status}, body={body}" + ), + }); + } + + let task = response + .json::() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse Meilisearch task response: {e}"), + })?; + match task.status.as_str() { + "succeeded" => return Ok(()), + "failed" | "canceled" => { + return Err(TestBinaryError::InvalidState { + message: format!( + "Meilisearch task {task_uid} ended with status '{}': {:?}", + task.status, task.error + ), + }); + } + _ => sleep(Duration::from_millis(POLL_INTERVAL_MS)).await, + } + } + + Err(TestBinaryError::InvalidState { + message: format!( + "Meilisearch task {task_uid} did not complete after {POLL_ATTEMPTS} attempts" + ), + }) + } + } + + fn list_documents( + &self, + index_name: &str, + ) -> impl std::future::Future, TestBinaryError>> + Send + { + async move { + let url = format!( + "{}/indexes/{}/documents", + self.container().base_url, + index_name + ); + let response = self + .http_client() + .get(&url) + .query(&[("limit", DOCUMENT_LIST_LIMIT)]) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to list Meilisearch documents: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!( + "Failed to list Meilisearch documents: status={status}, body={body}" + ), + }); + } + + response + .json::() + .await + .map(|documents| documents.results) + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse Meilisearch documents response: {e}"), + }) + } + } + + fn wait_for_documents( + &self, + expected_count: usize, + ) -> impl std::future::Future, TestBinaryError>> + Send + { + async move { + let mut last_count = 0usize; + for _ in 0..POLL_ATTEMPTS { + if let Ok(documents) = self.list_documents(TEST_INDEX).await { + last_count = documents.len(); + if documents.len() >= expected_count { + return Ok(documents); + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + Err(TestBinaryError::InvalidState { + message: format!( + "Expected {expected_count} Meilisearch documents, found {last_count} after {POLL_ATTEMPTS} attempts" + ), + }) + } + } + + fn index_documents( + &self, + index_name: &str, + documents: Vec, + ) -> impl std::future::Future> + Send { + async move { + let url = format!( + "{}/indexes/{}/documents", + self.container().base_url, + index_name + ); + let response = self + .http_client() + .post(&url) + .query(&[("primaryKey", "id")]) + .json(&documents) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to index Meilisearch documents: {e}"), + })?; + + let task = parse_task_response(response, "index Meilisearch documents").await?; + self.wait_for_task(task.task_uid).await + } + } +} + +async fn parse_task_response( + response: reqwest::Response, + operation: &str, +) -> Result { + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!("Failed to {operation}: status={status}, body={body}"), + }); + } + + response + .json::() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to parse Meilisearch task response: {e}"), + }) +} diff --git a/core/integration/tests/connectors/fixtures/meilisearch/mod.rs b/core/integration/tests/connectors/fixtures/meilisearch/mod.rs new file mode 100644 index 0000000000..c221483d8c --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod container; +mod source; + +pub use container::MeilisearchOps; +pub use source::MeilisearchSourceFixture; diff --git a/core/integration/tests/connectors/fixtures/meilisearch/source.rs b/core/integration/tests/connectors/fixtures/meilisearch/source.rs new file mode 100644 index 0000000000..aa3fa761f5 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/meilisearch/source.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::container::{MeilisearchContainer, MeilisearchOps, TEST_INDEX, create_http_client}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture, seeds}; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use std::collections::HashMap; + +const ENV_SOURCE_URL: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PLUGIN_CONFIG_URL"; +const ENV_SOURCE_INDEX: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PLUGIN_CONFIG_INDEX"; +const ENV_SOURCE_INCLUDE_METADATA: &str = + "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PLUGIN_CONFIG_INCLUDE_METADATA"; +const ENV_SOURCE_POLLING_INTERVAL: &str = + "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PLUGIN_CONFIG_POLLING_INTERVAL"; +const ENV_SOURCE_STREAMS_0_STREAM: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_STREAMS_0_STREAM"; +const ENV_SOURCE_STREAMS_0_TOPIC: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_STREAMS_0_TOPIC"; +const ENV_SOURCE_STREAMS_0_SCHEMA: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_STREAMS_0_SCHEMA"; +const ENV_SOURCE_PATH: &str = "IGGY_CONNECTORS_SOURCE_MEILISEARCH_PATH"; + +pub struct MeilisearchSourceFixture { + container: MeilisearchContainer, + http_client: HttpClient, +} + +impl MeilisearchOps for MeilisearchSourceFixture { + fn container(&self) -> &MeilisearchContainer { + &self.container + } + + fn http_client(&self) -> &HttpClient { + &self.http_client + } +} + +#[async_trait] +impl TestFixture for MeilisearchSourceFixture { + async fn setup() -> Result { + let container = MeilisearchContainer::start().await?; + let http_client = create_http_client(); + let fixture = Self { + container, + http_client, + }; + fixture.create_source_index().await?; + + Ok(fixture) + } + + fn connectors_runtime_envs(&self) -> HashMap { + HashMap::from([ + (ENV_SOURCE_URL.to_string(), self.container.base_url.clone()), + (ENV_SOURCE_INDEX.to_string(), TEST_INDEX.to_string()), + (ENV_SOURCE_INCLUDE_METADATA.to_string(), "false".to_string()), + (ENV_SOURCE_POLLING_INTERVAL.to_string(), "25ms".to_string()), + ( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + seeds::names::STREAM.to_string(), + ), + ( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + seeds::names::TOPIC.to_string(), + ), + (ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()), + ( + ENV_SOURCE_PATH.to_string(), + "../../target/debug/libiggy_connector_meilisearch_source".to_string(), + ), + ]) + } +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 0b2f264d03..25faac16a4 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -23,6 +23,7 @@ mod elasticsearch; mod http; mod iceberg; mod influxdb; +mod meilisearch; mod mongodb; mod postgres; mod quickwit; @@ -62,6 +63,7 @@ pub use influxdb::{ InfluxDbSinkNoMetadataFixture, InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture, InfluxDbSourceRawFixture, InfluxDbSourceTextFixture, }; +pub use meilisearch::{MeilisearchOps, MeilisearchSourceFixture}; pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture, diff --git a/core/integration/tests/connectors/meilisearch/meilisearch_source.rs b/core/integration/tests/connectors/meilisearch/meilisearch_source.rs new file mode 100644 index 0000000000..06bbbca5db --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/meilisearch_source.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures::{MeilisearchOps, MeilisearchSourceFixture}; +use iggy_common::MessageClient; +use iggy_common::{Consumer, Identifier, PollingStrategy}; +use integration::harness::seeds; +use integration::iggy_harness; +use std::time::Duration; +use tokio::time::sleep; + +const TEST_MESSAGE_COUNT: usize = 2; +const POLL_ATTEMPTS: usize = 100; +const POLL_INTERVAL_MS: u64 = 50; +const SOURCE_INDEX: &str = "iggy_messages"; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/meilisearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_index_documents_when_source_polls_should_produce_messages( + harness: &TestHarness, + fixture: MeilisearchSourceFixture, +) { + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let documents = vec![ + serde_json::json!({"id": 1, "name": "first", "category": "alpha"}), + serde_json::json!({"id": 2, "name": "second", "category": "beta"}), + ]; + + fixture + .index_documents(SOURCE_INDEX, documents) + .await + .expect("index Meilisearch documents"); + fixture + .wait_for_documents(TEST_MESSAGE_COUNT) + .await + .expect("wait for Meilisearch documents"); + + let received = poll_documents( + &client, + &stream_id, + &topic_id, + &consumer_id, + TEST_MESSAGE_COUNT, + ) + .await; + + assert_eq!(received.len(), TEST_MESSAGE_COUNT); + assert!(received.iter().any(|document| document["name"] == "first")); + assert!(received.iter().any(|document| document["name"] == "second")); + + sleep(Duration::from_millis(POLL_INTERVAL_MS * 4)).await; + let duplicate_poll = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id), + &PollingStrategy::next(), + 10, + true, + ) + .await + .expect("poll Meilisearch source messages after cursor advance"); + assert!( + duplicate_poll.messages.is_empty(), + "second poll should not receive duplicate Meilisearch documents" + ); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/meilisearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_persisted_state_when_connector_restarts_should_resume_after_last_primary_key( + harness: &mut TestHarness, + fixture: MeilisearchSourceFixture, +) { + let client = harness.root_client().await.unwrap(); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "state_test_consumer".try_into().unwrap(); + + fixture + .index_documents( + SOURCE_INDEX, + vec![ + serde_json::json!({"id": 1, "name": "first", "category": "alpha"}), + serde_json::json!({"id": 2, "name": "second", "category": "beta"}), + ], + ) + .await + .expect("index first Meilisearch document batch"); + fixture + .wait_for_documents(TEST_MESSAGE_COUNT) + .await + .expect("wait for first Meilisearch document batch"); + + let received_before = poll_documents( + &client, + &stream_id, + &topic_id, + &consumer_id, + TEST_MESSAGE_COUNT, + ) + .await; + assert_eq!(received_before.len(), TEST_MESSAGE_COUNT); + + harness + .server_mut() + .stop_dependents() + .expect("stop Meilisearch source connector"); + + fixture + .index_documents( + SOURCE_INDEX, + vec![ + serde_json::json!({"id": 3, "name": "third", "category": "gamma"}), + serde_json::json!({"id": 4, "name": "fourth", "category": "delta"}), + ], + ) + .await + .expect("index second Meilisearch document batch"); + fixture + .wait_for_documents(TEST_MESSAGE_COUNT * 2) + .await + .expect("wait for second Meilisearch document batch"); + + harness + .server_mut() + .start_dependents() + .await + .expect("restart Meilisearch source connector"); + sleep(Duration::from_millis(100)).await; + + let received_after = poll_documents( + &client, + &stream_id, + &topic_id, + &consumer_id, + TEST_MESSAGE_COUNT, + ) + .await; + + assert_eq!(received_after.len(), TEST_MESSAGE_COUNT); + for document in received_after { + let id = document.get("id").and_then(|value| value.as_i64()); + assert!( + id.is_some_and(|id| id > TEST_MESSAGE_COUNT as i64), + "after restart, received first-batch document: {document}" + ); + } +} + +async fn poll_documents( + client: &C, + stream_id: &Identifier, + topic_id: &Identifier, + consumer_id: &Identifier, + expected_count: usize, +) -> Vec +where + C: MessageClient + Sync, +{ + let mut received = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + stream_id, + topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + let json = serde_json::from_slice::(&msg.payload) + .expect("Meilisearch source payload should be valid JSON"); + received.push(json); + } + if received.len() >= expected_count { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + received +} diff --git a/core/integration/tests/connectors/meilisearch/mod.rs b/core/integration/tests/connectors/meilisearch/mod.rs new file mode 100644 index 0000000000..0134180f16 --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod meilisearch_source; diff --git a/core/integration/tests/connectors/meilisearch/source.toml b/core/integration/tests/connectors/meilisearch/source.toml new file mode 100644 index 0000000000..0b6a2e442b --- /dev/null +++ b/core/integration/tests/connectors/meilisearch/source.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sources/meilisearch_source" diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index fc624f897f..b5afedcbf6 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -24,6 +24,7 @@ mod http; mod http_config_provider; mod iceberg; mod influxdb; +mod meilisearch; mod mongodb; mod postgres; mod quickwit;