Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ members = [
"core/connectors/sinks/stdout_sink",
"core/connectors/sources/elasticsearch_source",
"core/connectors/sources/influxdb_source",
"core/connectors/sources/meilisearch_source",
"core/connectors/sources/postgres_source",
"core/connectors/sources/random_source",
"core/consensus",
Expand Down Expand Up @@ -200,6 +201,11 @@ lending-iterator = "0.1.7"
libc = "0.2.186"
log = "0.4.33"
lz4_flex = "0.13.1"
meilisearch-sdk = { version = "0.33.0", default-features = false, features = [
"reqwest",
"tls",
"jwt_rust_crypto",
] }
message_bus = { path = "core/message_bus" }
metadata = { path = "core/metadata" }
mimalloc = "0.1"
Expand Down
1 change: 1 addition & 0 deletions core/connectors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Please refer to the **[Source documentation](https://github.com/apache/iggy/tree
### Available Sources

- **Elasticsearch Source** - polls documents from Elasticsearch indices
- **Meilisearch Source** - polls documents from Meilisearch indices
- **PostgreSQL Source** - reads rows from PostgreSQL tables with multiple consumption strategies (delete after read, mark as processed, timestamp tracking)
- **Random Source** - generates random test messages (useful for testing/development)

Expand Down
1 change: 1 addition & 0 deletions core/connectors/sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Source connectors are responsible for ingesting data from external sources into
| ------ | ----------- |
| **elasticsearch_source** | Polls documents from Elasticsearch indices with timestamp-based tracking |
| **influxdb_source** | Polls InfluxDB with cursor-based timestamp tracking; supports V2 (Flux, annotated CSV) and V3 (SQL, JSONL) |
| **meilisearch_source** | Polls documents from Meilisearch indices with primary-key cursor tracking |
| **postgres_source** | Reads rows from PostgreSQL tables with multiple strategies: delete after read, mark as processed, or timestamp tracking |
| **random_source** | Generates random test messages (useful for testing and development) |

Expand Down
56 changes: 56 additions & 0 deletions core/connectors/sources/meilisearch_source/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iggy_connector_meilisearch_source"
version = "0.4.0"
description = "Iggy Meilisearch source connector"
edition = "2024"
license = "Apache-2.0"
keywords = ["iggy", "messaging", "streaming", "search", "meilisearch"]
categories = ["command-line-utilities", "database", "network-programming"]
homepage = "https://iggy.apache.org"
documentation = "https://iggy.apache.org/docs"
repository = "https://github.com/apache/iggy"
readme = "../../README.md"
publish = false

# dashmap and once_cell are not imported directly in this crate's source, but
# the source_connector! macro (in iggy_connector_sdk::source) expands bare
# `use dashmap::DashMap` and `use once_cell::sync::Lazy` into this crate's
# namespace, so they must be listed here. Remove them only after the SDK macro
# is updated to use `$crate::connector_macro_support::{DashMap, Lazy}` (the
# same fix already applied to sink_connector!).
[package.metadata.cargo-machete]
ignored = ["dashmap", "once_cell"]

[lib]
crate-type = ["cdylib", "lib"]

[dependencies]
async-trait = { workspace = true }
dashmap = { workspace = true }
iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true }
meilisearch-sdk = { workspace = true }
once_cell = { workspace = true }
secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
88 changes: 88 additions & 0 deletions core/connectors/sources/meilisearch_source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Meilisearch Source Connector

A source connector that polls documents from a Meilisearch index and produces
them as JSON messages into Iggy.

## Configuration

- `url`: Meilisearch base URL.
- `index`: Source index UID.
- `api_key`: Optional Meilisearch API key sent as `Authorization: Bearer`.
- `query`: Optional search query. Defaults to an empty query.
- `filter`: Optional Meilisearch filter expression string or nested JSON array.
- `batch_size`: Maximum documents fetched per poll. Defaults to `100`.
- `polling_interval`: Delay between polls as a humantime string. Defaults to `5s`.
- `include_metadata`: Wrap each hit with Meilisearch metadata. Defaults to `false`.
- `timeout`: Request timeout as a humantime string. Defaults to `30s`.
- `max_retries`: Maximum transient retry attempts during polling. Defaults to `3`.
- `retry_delay`: Initial retry delay. Defaults to `500ms`.
- `max_retry_delay`: Maximum retry delay. Defaults to `5s`.
- `max_open_retries`: Maximum transient retry attempts during `open()`. Defaults to `5`.

## Filter Syntax

String filters work in regular TOML plugin config:

```toml
filter = "category = alpha"
```

Nested array filters require JSON plugin config because the connector receives
the field as `serde_json::Value`:

```json
{
"filter": [["category = alpha", "category = beta"], "enabled = true"]
}
```

Top-level filter array entries are combined with `AND`; nested arrays are
combined with `OR`.

## Behavior

The connector requires the source index to define a primary key. Each poll sends
a `/search` request sorted by that primary key and stores the last emitted
primary-key value in connector state. This avoids offset pagination skips when
documents are inserted or deleted between polls.

Comment thread
countradooku marked this conversation as resolved.
The primary-key field must be an integer, filterable, and sortable in
Meilisearch, because the connector adds a cursor filter and primary-key sort to
each search request. The connector validates the sortable setting during
`open()`, but Meilisearch settings do not expose document value types, so the
integer-value requirement is validated while polling. Documents with missing or
non-integer primary-key values are skipped with a warning, and the cursor
advances to the last valid integer primary key in the batch. String primary keys
are not supported until the Meilisearch version used for validation supports
greater-than filters on string attributes. Returned hits are serialized as JSON
message payloads.

Configure the primary-key field as both filterable and sortable before starting
the connector. For example, when the primary key is `id`:

```bash
curl -X PATCH "$MEILISEARCH_URL/indexes/iggy_messages/settings" \
-H 'Content-Type: application/json' \
--data '{
"filterableAttributes": ["id"],
"sortableAttributes": ["id"]
}'
```

Meilisearch state is advanced in memory when a batch is returned from `poll()`.
If the runtime fails to send that batch to Iggy, the source trait does not
provide an acknowledgment callback that would let this connector roll the cursor
back before the next poll. This is a known limitation of the current connector
source API.

When `include_metadata` is enabled, each payload has this shape:

```json
{
"document": {},
"meilisearch": {
"index": "iggy_messages",
"primary_key": "id"
}
}
```
46 changes: 46 additions & 0 deletions core/connectors/sources/meilisearch_source/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

type = "source"
key = "meilisearch"
enabled = true
version = 0
name = "Meilisearch source"
path = "../../target/release/libiggy_connector_meilisearch_source"
plugin_config_format = "json"
verbose = false

[[streams]]
stream = "test_stream"
topic = "test_topic"
schema = "json"
batch_length = 100
linger_time = "5ms"

[plugin_config]
url = "http://localhost:7700"
index = "iggy_messages"
query = ""
# filter = "category = alpha"
batch_size = 100
Comment thread
countradooku marked this conversation as resolved.
polling_interval = "5s"
include_metadata = true
timeout = "30s"
max_retries = 3
retry_delay = "500ms"
max_retry_delay = "5s"
max_open_retries = 5
Loading
Loading