diff --git a/Cargo.lock b/Cargo.lock index 5dcfa64639..323fde1a06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6932,6 +6932,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_opensearch_source" +version = "0.4.1-edge.1" +dependencies = [ + "async-trait", + "axum", + "dashmap", + "iggy_common", + "iggy_connector_sdk", + "once_cell", + "opensearch", + "rmp-serde", + "secrecy", + "serde", + "serde_json", + "tempfile", + "tokio", + "tracing", +] + [[package]] name = "iggy_connector_postgres_sink" version = "0.4.1-edge.1" @@ -8956,6 +8976,26 @@ dependencies = [ "uuid", ] +[[package]] +name = "opensearch" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af6815a23449a0860c8fe049a828c3589d3ad56d3b5875d0d1f340d1291871e" +dependencies = [ + "base64", + "bytes", + "dyn-clone", + "lazy_static", + "percent-encoding", + "reqwest 0.13.4", + "rustc_version", + "serde", + "serde_json", + "serde_with", + "url", + "void", +] + [[package]] name = "openssl-probe" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 2e2212f49a..6579c56372 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/influxdb_source", + "core/connectors/sources/opensearch_source", "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", @@ -212,6 +213,7 @@ nonzero_lit = "0.1.2" notify = "8.2.0" octocrab = "0.51.0" once_cell = "1.21.4" +opensearch = { version = "2.4.0", features = ["rustls-tls"], default-features = false } opentelemetry = { version = "0.32.0", features = ["trace", "logs"] } opentelemetry-appender-tracing = { version = "0.32.0", features = ["log"] } opentelemetry-otlp = { version = "0.32.0", features = [ diff --git a/core/connectors/runtime/example_config/connectors/opensearch_source.toml b/core/connectors/runtime/example_config/connectors/opensearch_source.toml new file mode 100644 index 0000000000..8906cc7ede --- /dev/null +++ b/core/connectors/runtime/example_config/connectors/opensearch_source.toml @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "opensearch" +enabled = true +version = 0 +name = "OpenSearch source" +path = "/target/release/libiggy_connector_opensearch_source" +plugin_config_format = "json" + +[[streams]] +stream = "opensearch_stream" +topic = "documents" +schema = "json" +batch_length = 100 +linger_time = "5ms" + +[plugin_config] +url = "http://localhost:9200" +index = "logs-*" +polling_interval = "10s" +batch_size = 100 +timestamp_field = "@timestamp" +# username = "admin" +# password = "replace_with_secret" +# verbose_logging = false + +# HTTP resilience (defaults shown; see docs/RESILIENCE.md). +# max_retries = 3 +# retry_delay = "1s" +# retry_max_delay = "30s" +# max_open_retries = 5 +# open_retry_max_delay = "30s" +# circuit_breaker_threshold = 5 +# circuit_breaker_cool_down = "60s" + +# Optional: restrict which documents are polled (defaults to match_all). +# [plugin_config.query] +# match = { "log.level" = "error" } + +# Optional: mirror state to a local JSON file in addition to runtime msgpack state. +# File state is secondary — runtime ConnectorState (msgpack) is authoritative on restart. +# [plugin_config.state] +# enabled = true +# storage_type = "file" +# state_id = "opensearch_logs_connector" +# [plugin_config.state.storage_config] +# base_path = "./connector_states" diff --git a/core/connectors/sources/opensearch_source/Cargo.toml b/core/connectors/sources/opensearch_source/Cargo.toml new file mode 100644 index 0000000000..361065dbd2 --- /dev/null +++ b/core/connectors/sources/opensearch_source/Cargo.toml @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_opensearch_source" +version = "0.4.1-edge.1" +description = "Iggy OpenSearch source connector" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "opensearch"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +# dashmap and once_cell are not imported directly in this crate's source, but +# the source_connector! macro (in iggy_connector_sdk::source) expands bare +# `use dashmap::DashMap` and `use once_cell::sync::Lazy` into this crate's +# namespace, so they must be listed here. +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +opensearch = { workspace = true } +rmp-serde = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +axum = { workspace = true } +tempfile = { workspace = true } diff --git a/core/connectors/sources/opensearch_source/README.md b/core/connectors/sources/opensearch_source/README.md new file mode 100644 index 0000000000..f17fd779bb --- /dev/null +++ b/core/connectors/sources/opensearch_source/README.md @@ -0,0 +1,296 @@ +# OpenSearch Source Connector + +Polls documents from an OpenSearch index and publishes them to Iggy streams as JSON +messages. Incremental progress is tracked with OpenSearch `search_after` pagination +on `(timestamp_field, _id)`. + +## Architecture + +The connector is a cdylib source plugin loaded by the Iggy connectors runtime via FFI. + +| Layer | Crate / binary | Role | +| ----- | -------------- | ---- | +| Plugin | `iggy_connector_opensearch_source` | Implements `Source` trait; talks to OpenSearch | +| SDK | `iggy_connector_sdk` | `Source`, `ProducedMessage`, `ConnectorState`, `source_connector!` macro | +| Runtime | `iggy-connectors` | Loads `.dylib`, calls `open` / `poll` / `close`, publishes to Iggy, saves `ConnectorState` | +| Server | `iggy-server` | Receives messages on configured streams/topics | + +The connector is read-only. It does not write to OpenSearch. + +## Configuration + +```toml +type = "source" +key = "opensearch" +enabled = true +version = 0 +name = "OpenSearch source" +path = "target/release/libiggy_connector_opensearch_source" +plugin_config_format = "json" + +[[streams]] +stream = "opensearch_stream" +topic = "documents" +schema = "json" +batch_length = 100 +linger_time = "5ms" + +[plugin_config] +url = "http://localhost:9200" +index = "logs-*" +polling_interval = "10s" +batch_size = 100 +timestamp_field = "@timestamp" +query = { match_all = {} } +``` + +### Required fields + +| Field | Type | Description | +| ----- | ---- | ----------- | +| `url` | `String` | OpenSearch HTTP base URL | +| `index` | `String` | Index name or pattern | +| `timestamp_field` | `String` | Document field used for sort order and cursor; must exist on every document | + +### Optional fields + +| Field | Type | Default | Description | +| ----- | ---- | ------- | ----------- | +| `polling_interval` | `String` | `"10s"` | Delay after each completed poll cycle (humantime format). First poll runs immediately. | +| `batch_size` | `usize` | `100` | Documents per search request (minimum `1`) | +| `query` | JSON object | `{"match_all": {}}` | OpenSearch query DSL; applied on every poll | +| `username` / `password` | `String` | none | HTTP basic authentication | +| `verbose_logging` | `bool` | `false` | Log per-poll batch counts at `info!` instead of `debug!` | +| `max_retries` | `u32` | `3` | Total HTTP attempts per search during `poll()` | +| `retry_delay` | `String` | `"1s"` | Base backoff between HTTP retries | +| `retry_max_delay` | `String` | `"30s"` | Maximum backoff between HTTP retries | +| `max_open_retries` | `u32` | `5` | Total attempts for index-exists check during `open()` | +| `open_retry_max_delay` | `String` | `"30s"` | Maximum backoff during `open()` probes | +| `circuit_breaker_threshold` | `u32` | `5` | Consecutive poll failures before circuit opens | +| `circuit_breaker_cool_down` | `String` | `"60s"` | Duration to skip polls when circuit is open | + +### File-backed state (optional) + +Runtime state is always returned from `poll()` and persisted by the connectors +runtime. To additionally mirror state to JSON files on disk: + +```toml +[plugin_config.state] +enabled = true +storage_type = "file" +storage_config = { base_path = "./connector_states" } +state_id = "opensearch_logs_connector" +``` + +Only `storage_type = "file"` is implemented. See [State and persistence](#state-and-persistence). + +## How it works + +### Poll cycle + +Each call to `poll()`: + +1. Issues `POST /{index}/_search` with the query below. +2. Maps each hit's `_source` to a JSON `ProducedMessage`. +3. Updates the `search_after` cursor to the sort tuple of the last hit with a valid sort tuple. +4. Returns `ProducedMessages` containing the messages and a serialized `ConnectorState`. +5. Sleeps `polling_interval` before returning. + +The runtime persists `ConnectorState` (msgpack) after each successful `poll()` return. + +### Search request + +```json +{ + "query": "", + "size": "", + "sort": [ + { "": { "order": "asc" } }, + { "_id": { "order": "asc" } } + ], + "search_after": [""] +} +``` + +Two sort keys give stable order when timestamps collide. `_id` is the tiebreaker. + +### Per-hit processing + +For each hit in the response: + +1. **Missing sort tuple** — skip with `warn!` for that hit; if no hit in the batch has a valid sort tuple, the poll fails with `Error::Storage`. +2. **Missing `_source`** — skip with `warn!` (not published); cursor still advances to that hit's sort position. +3. **Both present** — serialize `_source` as JSON payload. + +The cursor (`search_after`) advances for any hit with a valid sort tuple, including hits skipped for missing `_source`. An empty batch leaves the cursor unchanged. + +### Timestamp parsing + +The `timestamp_field` value in `_source` is parsed to populate `last_poll_timestamp` (informational only; does not affect pagination). + +| `_source` value | Parsing | +| --------------- | ------- | +| RFC 3339 string | `DateTime::parse_from_rfc3339` | +| Integer `> 1e12` | Epoch milliseconds | +| Integer `≤ 1e12` | Epoch seconds | +| Other | Ignored; document still published | + +## State and persistence + +### Internal state fields + +| Field | Purpose | +| ----- | ------- | +| `search_after` | `Option>` — OpenSearch sort tuple from last published hit; authoritative resume cursor | +| `last_poll_timestamp` | `Option>` — timestamp of last processed document; informational | +| `total_documents_published` | Cumulative documents emitted to Iggy | +| `poll_count` | Total search requests executed (successful + empty) | +| `error_count` / `last_error` | Search failure tracking | +| `processing_stats` | Bytes processed, empty/successful poll counts, avg latency | + +**Invariant:** `search_after` is the authoritative resume cursor. `last_poll_timestamp` is +informational only and does not affect pagination. + +### Dual persistence + +| Mechanism | Format | When written | When read | Failure mode | +| --------- | ------ | ------------ | --------- | ------------ | +| Runtime `ConnectorState` | MessagePack | Every `poll()` return | `new(id, config, Some(state))` | Corrupt → `open()` fails with `InitError` | +| File `SourceState` | JSON | `close()` if `state.enabled` and connector opened successfully | `open()` if `state.enabled` and no runtime state present | Load failure → `open()` fails with `InitError` | + +File path: `{base_path}/{state_id}.json`; defaults: `base_path = "./connector_states"`, +`state_id = "opensearch_source_{id}"`. + +Runtime `ConnectorState` is authoritative. When valid runtime state is restored on +startup, file state is not loaded. File mirror is written atomically (write-tmp → +fdatasync → rename → dir-fsync) on `close()`. + +## Initial load and tuning + +### Cursor behavior by phase + +| Phase | Cursor behavior | +| ----- | --------------- | +| Fresh start (no state) | No `search_after` — reads from start of sort order | +| Steady state | `search_after` advances — only documents after cursor returned | +| Restart with saved state | Cursor restored from `ConnectorState`; resumes without re-reading | + +There is no separate initial-load code path. Every poll uses the same logic. + +### Throughput + +With defaults (`batch_size = 100`, `polling_interval = "10s"`): + +```text +100 docs / 10s ≈ 10 docs/sec +10,000,000 docs ≈ ~11.5 days to catch up +``` + +Aggressive config for large initial loads: + +```toml +[plugin_config] +polling_interval = "100ms" +batch_size = 5000 +timestamp_field = "@timestamp" +``` + +Optional time-window queries for manual partitioning: + +```toml +[plugin_config] +query = { "range" = { "@timestamp" = { "gte" = "2024-01-01", "lt" = "2024-02-01" } } } +``` + +Requirements for correct operation: + +- `timestamp_field` present on every document. +- Index mapping has a date-type field for `timestamp_field`. +- `_source` enabled in the index mapping (see Limitations). + +## Error handling + +| Error variant | When raised | +| ------------- | ----------- | +| `InitError` | Corrupt runtime state; missing index at `open()`; file state load failure | +| `InvalidConfigValue` | Missing `timestamp_field`; `batch_size = 0`; unsupported `storage_type` | +| `Storage` | Network or HTTP errors; client not initialized at `poll()` | +| `Serialization` | JSON / MessagePack failures | + +## Resilience + +### Retry policy + +| Category | Conditions | +| -------- | ---------- | +| Transient (retry) | Network errors; HTTP `429`; HTTP `5xx`; honors `Retry-After` header on `429` | +| Permanent (no retry) | `400`, `401`, `403`, `404`; malformed responses; search DSL errors | + +### Circuit breaker + +When open, `poll()` skips the search, logs a warning, sleeps `polling_interval`, and returns an empty batch with no state update - cursor does not advance. Consecutive failures after retries exhausted increment the breaker; a successful search resets it. + +### Delivery semantics + +**At-least-once toward Iggy.** The in-memory cursor advances in `finalize_poll()` before the runtime persists `ConnectorState`. A crash after cursor advance but before the runtime save re-emits the last batch on restart. + +Consumer guidance: dedup on OpenSearch `_id` plus index name (for index patterns), or a business key in `_source`. + +### Backfill + +Pagination is forward-only on `(timestamp_field asc, _id asc)`. Documents indexed with `timestamp_field` values older than the current cursor are not read until state is reset. + +| Mode | Use case | +| ---- | -------- | +| Forward tail (default) | Live streaming; cursor tracks ingest order | +| Bounded backfill | Set a time-window `query` in `plugin_config` | +| Full rescan | Reset runtime `ConnectorState`; duplicates possible | + +## Limitations + +- **Single sequential reader** — one `search_after` cursor, one batch per poll. + No parallel shard/slice workers or dedicated bulk-ingest mode. +- **Same path for initial load and steady state** — a fresh connector walks the + index from the oldest `(timestamp_field, _id)` upward. There is no separate + bootstrap implementation. +- **Throughput tied to `polling_interval` and `batch_size`** — defaults (`10s`, + `100`) yield roughly 10 documents/second. Tens of millions of documents require + tuning both knobs and sufficient OpenSearch / Iggy capacity. +- **`search_after` only** — no Scroll API, point-in-time (PIT), or sliced + parallel export. Offset paging (`from`/`size`) is not used. +- **At-least-once delivery** — no deduplication by `_id`. The in-memory cursor advances + before the runtime persists `ConnectorState`; a crash can re-emit the last batch. + See [Resilience](#resilience). +- **HTTP retry and circuit breaker** — transient `429`/`5xx` and network errors are retried + per `max_retries`; consecutive failures trip a circuit breaker that skips polls until + cool-down. Permanent errors (`4xx` except `429`) are not retried. + See [Resilience](#resilience). +- **Backfill gap** — documents indexed with `timestamp_field` values older than + the current cursor are not read until connector state is reset. +- **Full `_source` only** — entire document JSON is published; no field + projection or schema variants beyond `Schema::Json`. +- **Optional file state** — only `storage_type = "file"` is implemented. File mirror is + written atomically on `close()`, not every poll. Runtime msgpack wins on restart when + both are present. A failed file save on `close()` returns an error. +- **`_source`-disabled documents skipped permanently** — hits returned without `_source` + (e.g., index mapping with `"_source": false`) are skipped with a `warn!`. The cursor + advances past them. If `_source` later becomes available for a document at the same + `(timestamp_field, _id)` sort position, it will not be re-fetched. Ensure `_source` is + enabled in the index mapping before using this connector. +- **Missing sort tuple** — individual hits without a sort tuple are skipped. A batch where + **no** hit has a valid sort tuple fails the poll with `Error::Storage`. +- **Single-node transport** — `SingleNodeConnectionPool` to `url`; no cluster + node sniffing. + +## Troubleshooting + +| Symptom | Check | +| ------- | ----- | +| `open()` fails with missing index | Index name, URL, and credentials | +| `open()` fails with `state restore failed` | Delete or repair the connector runtime state file | +| `open()` fails with `file state load failed` | Delete or repair the file state JSON | +| No new documents after restart | `timestamp_field` mapping must match indexed documents | +| Duplicate messages | At-least-once delivery; lower `batch_size` only after confirming sort stability on `(timestamp_field, _id)` | +| Initial load too slow | Increase `batch_size`, decrease `polling_interval` | +| Backfilled docs missing | Timestamps older than cursor are skipped; reset state or adjust query | +| Repeated `warn!` about missing `_source` | Index mapping has `"_source": false`; connector cannot publish those documents | diff --git a/core/connectors/sources/opensearch_source/config.toml b/core/connectors/sources/opensearch_source/config.toml new file mode 100644 index 0000000000..dceed1b064 --- /dev/null +++ b/core/connectors/sources/opensearch_source/config.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "opensearch" +enabled = true +version = 0 +name = "OpenSearch source" +path = "../../target/release/libiggy_connector_opensearch_source" +plugin_config_format = "json" + +[[streams]] +stream = "test_stream" +topic = "test_topic" +schema = "json" +batch_length = 1000 +linger_time = "5ms" + +[plugin_config] +url = "http://localhost:9200" +index = "test_documents" +polling_interval = "100ms" +batch_size = 100 +timestamp_field = "timestamp" diff --git a/core/connectors/sources/opensearch_source/dependencies.md b/core/connectors/sources/opensearch_source/dependencies.md new file mode 100644 index 0000000000..e020a313c9 --- /dev/null +++ b/core/connectors/sources/opensearch_source/dependencies.md @@ -0,0 +1,46 @@ + + +# OpenSearch Source Connector — Direct Runtime Dependencies + +This file lists every direct (non-dev) dependency declared in +`core/connectors/sources/opensearch_source/Cargo.toml`, together with +its workspace-pinned version, license, and the specific role it plays +in this connector. Transitive dependencies are not listed here; refer +to `cargo tree -p iggy_connector_opensearch_source` for the full graph. + +--- + +## Runtime dependencies + +| Crate | Version (workspace) | License | Role in this connector | +| --- | --- | --- | --- | +| `async-trait` | `^0.1.89` | MIT / Apache-2.0 | Proc-macro that enables `async fn` in trait definitions; required by the `Source` trait impl in `lib.rs`. | +| `dashmap` | `^6.1.0` | MIT | Concurrent hash map; injected into this crate's namespace by the `source_connector!` macro expansion in the SDK. Not used directly in source files. | +| `humantime` | `^2.3.0` | MIT / Apache-2.0 | Workspace dependency; duration parsing uses `iggy_connector_sdk::retry::parse_duration`. | +| `iggy_common` | workspace | Apache-2.0 | Shared Iggy types: `DateTime`, `Utc`, and `serde_secret` for optional basic-auth password serialisation. | +| `iggy_connector_sdk` | workspace | Apache-2.0 | Core connector abstractions: `Source` trait, `ProducedMessage`, `ProducedMessages`, `ConnectorState`, `Schema`, `Error`, `parse_duration`, and the `source_connector!` registration macro. | +| `once_cell` | `^1.21.4` | MIT / Apache-2.0 | `Lazy` global; injected by the `source_connector!` macro expansion in the SDK. Not used directly in source files. | +| `opensearch` | `2.4.0` | Apache-2.0 | Official OpenSearch Rust client for index existence checks and `search` requests with `search_after` pagination. | +| `rmp-serde` | workspace | MIT / Apache-2.0 | Serialises connector runtime state to MessagePack for the connectors runtime state file. | +| `secrecy` | `^0.10` | MIT / Apache-2.0 | `SecretString` wrapper that prevents accidental logging of passwords in config structs. | +| `serde` | workspace | MIT / Apache-2.0 | Derive macros for config and persisted-state de/serialisation. | +| `serde_json` | workspace | MIT / Apache-2.0 | Builds OpenSearch query bodies and serialises document payloads into produced messages. | +| `tokio` | workspace | MIT | Async runtime; `tokio::sync::Mutex` for connector state and `tokio::time::sleep` for poll-interval delays. | +| `tracing` | workspace | MIT | Structured logging macros (`info!`, `warn!`, `error!`) used throughout the poll loop and state paths. | diff --git a/core/connectors/sources/opensearch_source/src/http_tests.rs b/core/connectors/sources/opensearch_source/src/http_tests.rs new file mode 100644 index 0000000000..52efad06a0 --- /dev/null +++ b/core/connectors/sources/opensearch_source/src/http_tests.rs @@ -0,0 +1,653 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::state_manager::{SourceState, create_state_storage}; +use crate::{OpenSearchSource, OpenSearchSourceConfig, StateConfig}; +use axum::Router; +use axum::extract::Request; +use axum::http::StatusCode; +use axum::routing::{head, post}; +use iggy_connector_sdk::{Error, Source}; +use secrecy::SecretString; +use serde_json::{Value, json}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::sync::Mutex; + +const TEST_INDEX: &str = "test_documents"; + +async fn start_server(router: Router) -> String { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + tokio::spawn(async move { + axum::serve(listener, router).await.unwrap(); + }); + format!("http://127.0.0.1:{port}") +} + +fn base_config(url: &str) -> OpenSearchSourceConfig { + OpenSearchSourceConfig { + url: url.to_string(), + index: TEST_INDEX.to_string(), + username: None, + password: None, + query: None, + polling_interval: Some("1ms".to_string()), + batch_size: Some(10), + timestamp_field: Some("timestamp".to_string()), + verbose_logging: false, + state: None, + max_retries: Some(1), + retry_delay: Some("1ms".to_string()), + retry_max_delay: Some("10ms".to_string()), + max_open_retries: Some(1), + open_retry_max_delay: Some("10ms".to_string()), + circuit_breaker_threshold: None, + circuit_breaker_cool_down: None, + } +} + +fn search_hit(doc_id: &str, timestamp: &str, extra: Value) -> Value { + let mut source = json!({ + "id": 1, + "timestamp": timestamp, + }); + if let Some(obj) = extra.as_object() { + for (key, value) in obj { + source[key] = value.clone(); + } + } + json!({ + "_id": doc_id, + "_source": source, + "sort": [timestamp, doc_id] + }) +} + +fn search_response(hits: Vec) -> Value { + json!({ + "hits": { + "hits": hits + } + }) +} + +fn mock_router(index_exists: StatusCode, search_fn: F) -> Router +where + F: Fn(Request) -> Fut + Clone + Send + Sync + 'static, + Fut: std::future::Future + Send + 'static, +{ + Router::new() + .route( + "/test_documents/_search", + post(move |request: Request| { + let search_fn = search_fn.clone(); + async move { search_fn(request).await } + }), + ) + .route("/test_documents", head(move || async move { index_exists })) +} + +fn empty_search_router(index_exists: StatusCode) -> Router { + mock_router(index_exists, |_| async move { + (StatusCode::OK, search_response(vec![]).to_string()) + }) +} + +#[tokio::test] +async fn given_index_exists_when_open_should_succeed() { + let app = empty_search_router(StatusCode::OK); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.expect("open should succeed"); + assert!(source.client_initialized()); +} + +#[tokio::test] +async fn given_missing_index_when_open_should_return_init_error() { + let app = empty_search_router(StatusCode::NOT_FOUND); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + let error = source.open().await.expect_err("open should fail"); + assert!(matches!(error, Error::InitError(_))); + let text = error.to_string(); + assert!(text.contains("does not exist")); + assert!(text.contains(TEST_INDEX)); +} + +#[tokio::test] +async fn given_invalid_url_when_open_should_fail() { + let mut config = base_config("http://127.0.0.1:1"); + config.url = "not-a-valid-url".to_string(); + let mut source = OpenSearchSource::new(1, config, None); + let error = source.open().await.expect_err("open should fail"); + assert!(matches!(error, Error::InvalidConfigValue(_))); +} + +#[tokio::test] +async fn given_search_hits_when_poll_should_produce_json_messages() { + let hits = vec![search_hit( + "doc-1", + "2024-01-01T00:00:00Z", + json!({"name": "alpha"}), + )]; + let body = search_response(hits).to_string(); + let app = mock_router(StatusCode::OK, move |_| { + let body = body.clone(); + async move { (StatusCode::OK, body) } + }); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + + let produced = source.poll().await.expect("poll should succeed"); + assert_eq!(produced.schema, iggy_connector_sdk::Schema::Json); + assert_eq!(produced.messages.len(), 1); + let payload: Value = + serde_json::from_slice(&produced.messages[0].payload).expect("valid json payload"); + assert_eq!(payload["name"], "alpha"); + assert!(produced.state.is_some()); + + let (fetched, polls, _, _) = source.test_metrics().await; + assert_eq!(fetched, 1); + assert_eq!(polls, 1); +} + +#[tokio::test] +async fn given_empty_search_when_poll_should_increment_empty_poll_count() { + let app = empty_search_router(StatusCode::OK); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + + let produced = source.poll().await.expect("poll should succeed"); + assert!(produced.messages.is_empty()); + + let (_, _, _, empty_polls) = source.test_metrics().await; + assert_eq!(empty_polls, 1); +} + +#[tokio::test] +async fn given_search_after_cursor_when_second_poll_should_request_next_page() { + let request_count = Arc::new(AtomicUsize::new(0)); + let captured_bodies: Arc>> = Arc::new(Mutex::new(Vec::new())); + let count = request_count.clone(); + let bodies = captured_bodies.clone(); + + let first_page = search_response(vec![search_hit("doc-1", "2024-01-01T00:00:00Z", json!({}))]); + let second_page = search_response(vec![search_hit("doc-2", "2024-01-02T00:00:00Z", json!({}))]); + + let app = mock_router(StatusCode::OK, move |request: Request| { + let first_page = first_page.clone(); + let second_page = second_page.clone(); + let count = count.clone(); + let bodies = bodies.clone(); + async move { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .unwrap_or_default(); + if let Ok(body) = serde_json::from_slice::(&bytes) { + bodies.lock().await.push(body); + } + let page = count.fetch_add(1, Ordering::SeqCst); + let response = if page == 0 { first_page } else { second_page }; + (StatusCode::OK, response.to_string()) + } + }); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + + let first = source.poll().await.expect("first poll"); + assert_eq!(first.messages.len(), 1); + + let second = source.poll().await.expect("second poll"); + assert_eq!(second.messages.len(), 1); + + let bodies = captured_bodies.lock().await; + assert_eq!(bodies.len(), 2); + assert!(bodies[0].get("search_after").is_none()); + assert!(bodies[1].get("search_after").is_some()); + + let (fetched, _, _, _) = source.test_metrics().await; + assert_eq!(fetched, 2); +} + +#[tokio::test] +async fn given_custom_query_when_search_should_include_query_in_body() { + let captured: Arc>> = Arc::new(Mutex::new(None)); + let cap = captured.clone(); + let app = mock_router(StatusCode::OK, move |request: Request| { + let cap = cap.clone(); + async move { + let bytes = axum::body::to_bytes(request.into_body(), usize::MAX) + .await + .unwrap_or_default(); + if let Ok(body) = serde_json::from_slice::(&bytes) { + *cap.lock().await = Some(body); + } + (StatusCode::OK, search_response(vec![]).to_string()) + } + }); + let base = start_server(app).await; + let mut config = base_config(&base); + config.query = Some(json!({ "term": { "status": "active" } })); + let mut source = OpenSearchSource::new(1, config, None); + source.open().await.unwrap(); + source.poll().await.unwrap(); + + let body = captured.lock().await.clone().expect("search body captured"); + assert_eq!(body["query"]["term"]["status"], "active"); +} + +#[tokio::test] +async fn given_search_failure_when_poll_should_increment_error_count() { + let app = mock_router(StatusCode::OK, |_| async move { + (StatusCode::INTERNAL_SERVER_ERROR, "boom".to_string()) + }); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + + let error = source.poll().await.expect_err("poll should fail"); + assert!(matches!(error, Error::Storage(_))); + + let (_, _, errors, _) = source.test_metrics().await; + assert_eq!(errors, 1); +} + +#[tokio::test] +async fn given_basic_auth_when_search_should_send_authorization_header() { + let captured: Arc> = Arc::new(Mutex::new(String::new())); + let cap = captured.clone(); + let app = mock_router(StatusCode::OK, move |request: Request| { + let cap = cap.clone(); + async move { + let auth = request + .headers() + .get("authorization") + .and_then(|value| value.to_str().ok()) + .unwrap_or("") + .to_string(); + *cap.lock().await = auth; + (StatusCode::OK, search_response(vec![]).to_string()) + } + }); + let base = start_server(app).await; + let mut config = base_config(&base); + config.username = Some("iggy".to_string()); + config.password = Some(SecretString::from("secret")); + let mut source = OpenSearchSource::new(1, config, None); + source.open().await.unwrap(); + source.poll().await.unwrap(); + + let auth = captured.lock().await.clone(); + assert!(auth.starts_with("Basic ")); +} + +#[tokio::test] +async fn given_batch_where_all_hits_lack_sort_when_poll_should_return_error() { + let hit = json!({ + "_id": "doc-1", + "_source": { "id": 1, "timestamp": "2024-01-01T00:00:00Z" } + }); + let body = search_response(vec![hit]).to_string(); + let app = mock_router(StatusCode::OK, move |_| { + let body = body.clone(); + async move { (StatusCode::OK, body) } + }); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + + let error = source + .poll() + .await + .expect_err("all-no-sort batch must fail"); + assert!( + matches!(error, Error::Storage(_)), + "expected Storage error, got {error:?}" + ); + + let (_, _, errors, _) = source.test_metrics().await; + assert_eq!(errors, 1, "error counter must be incremented"); + assert!( + source.test_search_after().await.is_none(), + "cursor must not advance when batch errors" + ); +} + +#[tokio::test] +async fn given_cursor_set_when_empty_poll_should_preserve_cursor() { + let request_count = Arc::new(AtomicUsize::new(0)); + let count = request_count.clone(); + + let first_page = search_response(vec![search_hit("doc-1", "2024-01-01T00:00:00Z", json!({}))]); + let empty_page = search_response(vec![]); + + let app = mock_router(StatusCode::OK, move |_| { + let first_page = first_page.clone(); + let empty_page = empty_page.clone(); + let count = count.clone(); + async move { + let page = count.fetch_add(1, Ordering::SeqCst); + let response = if page == 0 { first_page } else { empty_page }; + (StatusCode::OK, response.to_string()) + } + }); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + + source.poll().await.expect("first poll"); + let cursor_after_first = source.test_search_after().await; + assert!( + cursor_after_first.is_some(), + "cursor must be set after non-empty poll" + ); + + source.poll().await.expect("empty poll"); + let cursor_after_empty = source.test_search_after().await; + assert_eq!( + cursor_after_empty, cursor_after_first, + "empty poll must not reset cursor to None" + ); +} + +#[tokio::test] +async fn given_all_hits_missing_source_when_poll_should_return_error() { + let hit = json!({ + "_id": "doc-1", + "sort": ["2024-01-01T00:00:00Z", "doc-1"] + }); + let body = search_response(vec![hit]).to_string(); + let app = mock_router(StatusCode::OK, move |_| { + let body = body.clone(); + async move { (StatusCode::OK, body) } + }); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + + // All hits have valid sort tuples but none have _source; cursor must not advance. + let error = source + .poll() + .await + .expect_err("all-_source-absent batch must error"); + assert!( + matches!(error, Error::Storage(_)), + "expected Storage error, got {error:?}" + ); + assert!( + source.test_search_after().await.is_none(), + "cursor must not advance when entire batch has no _source" + ); +} + +#[tokio::test] +async fn given_trailing_hit_without_source_when_poll_should_advance_cursor_past_it() { + let request_count = Arc::new(AtomicUsize::new(0)); + let count = request_count.clone(); + + let first_page = search_response(vec![ + search_hit("doc-1", "2024-01-01T00:00:00Z", json!({})), + json!({ + "_id": "doc-2", + "sort": ["2024-01-02T00:00:00Z", "doc-2"] + }), + ]); + let empty_page = search_response(vec![]); + + let app = mock_router(StatusCode::OK, move |_| { + let first_page = first_page.clone(); + let empty_page = empty_page.clone(); + let count = count.clone(); + async move { + let page = count.fetch_add(1, Ordering::SeqCst); + let response = if page == 0 { first_page } else { empty_page }; + (StatusCode::OK, response.to_string()) + } + }); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + + let produced = source.poll().await.expect("first poll"); + assert_eq!( + produced.messages.len(), + 1, + "only doc-1 published; doc-2 has no _source" + ); + + let cursor = source.test_search_after().await; + assert!( + cursor.is_some(), + "cursor must be set after batch with trailing no-_source hit" + ); + let cursor_vals = cursor.unwrap(); + assert_eq!( + cursor_vals[1].as_str(), + Some("doc-2"), + "cursor must point to doc-2 (trailing no-_source), not doc-1" + ); + + // Second poll must get empty page (cursor past doc-2), not re-fetch doc-2. + let produced2 = source.poll().await.expect("second poll"); + assert!( + produced2.messages.is_empty(), + "doc-2 (no _source) must not be re-fetched after cursor advances past it" + ); +} + +#[tokio::test] +async fn given_poll_without_open_should_return_connection_error() { + let source = OpenSearchSource::new(1, base_config("http://127.0.0.1:9"), None); + let error = source.poll().await.expect_err("poll without open"); + assert!(matches!(error, Error::Connection(_))); +} + +#[tokio::test] +async fn given_open_when_close_should_clear_client() { + let app = empty_search_router(StatusCode::OK); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + source.close().await.expect("close should succeed"); + assert!(!source.client_initialized()); +} + +#[tokio::test] +async fn given_enabled_file_state_when_open_close_should_persist_state() { + let temp_dir = tempfile::tempdir().expect("tempdir"); + let base_path = temp_dir.path().to_string_lossy().to_string(); + + let hits = vec![search_hit("doc-1", "2024-01-01T00:00:00Z", json!({}))]; + let body = search_response(hits).to_string(); + let app = mock_router(StatusCode::OK, move |_| { + let body = body.clone(); + async move { (StatusCode::OK, body) } + }); + let base = start_server(app).await; + + let mut config = base_config(&base); + config.state = Some(StateConfig { + enabled: true, + storage_type: Some("file".to_string()), + storage_config: Some(json!({ "base_path": base_path })), + state_id: Some("opensearch_test_state".to_string()), + }); + + let mut source = OpenSearchSource::new(1, config.clone(), None); + source.open().await.unwrap(); + source.poll().await.unwrap(); + source.close().await.unwrap(); + + let mut reloaded = OpenSearchSource::new(2, config, None); + reloaded.open().await.unwrap(); + let (fetched, polls, _, _) = reloaded.test_metrics().await; + assert_eq!(fetched, 1); + assert_eq!(polls, 1); + assert!( + reloaded.test_search_after().await.is_some(), + "search_after cursor must be restored from file state" + ); +} + +#[tokio::test] +async fn given_runtime_state_when_open_should_not_load_stale_file_state() { + let temp_dir = tempfile::tempdir().expect("tempdir"); + let base_path = temp_dir.path().to_string_lossy().to_string(); + let state_id = "opensearch_runtime_authoritative"; + + let hits = vec![search_hit("doc-1", "2024-01-01T00:00:00Z", json!({}))]; + let body = search_response(hits).to_string(); + let app = mock_router(StatusCode::OK, move |_| { + let body = body.clone(); + async move { (StatusCode::OK, body) } + }); + let base = start_server(app).await; + + let mut config = base_config(&base); + config.state = Some(StateConfig { + enabled: true, + storage_type: Some("file".to_string()), + storage_config: Some(json!({ "base_path": base_path.clone() })), + state_id: Some(state_id.to_string()), + }); + + let mut source = OpenSearchSource::new(1, config.clone(), None); + source.open().await.unwrap(); + let produced = source.poll().await.expect("poll"); + let runtime_state = produced.state.expect("runtime state persisted"); + source.close().await.unwrap(); + + let stale_state = SourceState { + id: state_id.to_string(), + last_updated: iggy_common::Utc::now(), + version: crate::state_manager::SOURCE_STATE_VERSION, + data: json!({ + "search_after": ["1970-01-01T00:00:00Z", "stale-doc"], + "poll_count": 99 + }), + metadata: None, + }; + let storage = create_state_storage(config.state.as_ref().unwrap()).expect("file storage"); + storage + .save_source_state(&stale_state) + .await + .expect("write stale file"); + + let mut restarted = OpenSearchSource::new(2, config, Some(runtime_state)); + restarted.open().await.unwrap(); + + let (_, polls, _, _) = restarted.test_metrics().await; + assert_eq!( + polls, 1, + "runtime ConnectorState must not be overwritten by stale file" + ); +} + +#[tokio::test] +async fn given_epoch_seconds_timestamp_should_update_last_poll_timestamp() { + let hit = search_hit("doc-1", "2024-01-15T10:00:00Z", json!({})); + let mut hit = hit; + // Override _source.timestamp with epoch-seconds integer to exercise the numeric + // branch of parse_document_timestamp. Sort tuple stays as the RFC3339 string that + // search_hit() placed there — sort is used only for cursor positioning. + hit["_source"]["timestamp"] = json!(1_705_312_200_i64); + let body = search_response(vec![hit]).to_string(); + let app = mock_router(StatusCode::OK, move |_| { + let body = body.clone(); + async move { (StatusCode::OK, body) } + }); + let base = start_server(app).await; + let mut source = OpenSearchSource::new(1, base_config(&base), None); + source.open().await.unwrap(); + source.poll().await.unwrap(); + + let last_ts = source.test_last_poll_timestamp().await; + assert!( + last_ts.is_some(), + "epoch-seconds timestamp must be parsed and stored in state" + ); +} + +#[tokio::test] +async fn given_verbose_logging_when_poll_should_succeed() { + let app = empty_search_router(StatusCode::OK); + let base = start_server(app).await; + let mut config = base_config(&base); + config.verbose_logging = true; + let mut source = OpenSearchSource::new(1, config, None); + source.open().await.unwrap(); + source.poll().await.expect("verbose poll should succeed"); +} + +#[tokio::test] +async fn given_transient_search_errors_when_poll_should_retry_and_succeed() { + let request_count = Arc::new(AtomicUsize::new(0)); + let count = request_count.clone(); + + let app = mock_router(StatusCode::OK, move |_| { + let count = count.clone(); + async move { + let attempt = count.fetch_add(1, Ordering::SeqCst); + if attempt < 2 { + (StatusCode::SERVICE_UNAVAILABLE, "temporary".to_string()) + } else { + (StatusCode::OK, search_response(vec![]).to_string()) + } + } + }); + let base = start_server(app).await; + let mut config = base_config(&base); + config.max_retries = Some(3); + config.retry_delay = Some("1ms".to_string()); + let mut source = OpenSearchSource::new(1, config, None); + source.open().await.unwrap(); + + source + .poll() + .await + .expect("poll should succeed after retries"); + assert!( + request_count.load(Ordering::SeqCst) >= 3, + "search should be retried after transient 503" + ); +} + +#[tokio::test] +async fn given_circuit_breaker_open_when_poll_should_return_empty_without_error() { + let app = mock_router(StatusCode::OK, |_| async move { + (StatusCode::INTERNAL_SERVER_ERROR, "boom".to_string()) + }); + let base = start_server(app).await; + let mut config = base_config(&base); + config.circuit_breaker_threshold = Some(1); + config.circuit_breaker_cool_down = Some("60s".to_string()); + let mut source = OpenSearchSource::new(1, config, None); + source.open().await.unwrap(); + + let _ = source.poll().await.expect_err("first poll should fail"); + + let produced = source + .poll() + .await + .expect("open circuit should skip search"); + assert!(produced.messages.is_empty()); + assert!(produced.state.is_none()); +} diff --git a/core/connectors/sources/opensearch_source/src/lib.rs b/core/connectors/sources/opensearch_source/src/lib.rs new file mode 100644 index 0000000000..2a4ffbc365 --- /dev/null +++ b/core/connectors/sources/opensearch_source/src/lib.rs @@ -0,0 +1,1136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use iggy_common::{DateTime, Utc}; +use iggy_connector_sdk::retry::{CircuitBreaker, parse_duration}; +use iggy_connector_sdk::{ + ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, +}; +use opensearch::{ + OpenSearch, SearchParts, + auth::Credentials, + http::{Url, transport::TransportBuilder}, +}; +use secrecy::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +use std::sync::Arc; +use std::time::Duration; +use tokio::{sync::Mutex, time::sleep}; +use tracing::{debug, error, info, warn}; + +mod retry; +mod state_manager; +use crate::retry::{ + DEFAULT_CB_COOL_DOWN, DEFAULT_CB_THRESHOLD, DEFAULT_MAX_OPEN_RETRIES, DEFAULT_MAX_RETRIES, + DEFAULT_OPEN_RETRY_MAX_DELAY, DEFAULT_RETRY_DELAY, DEFAULT_RETRY_MAX_DELAY, RetryBackoff, + is_transient_status, normalized_max_attempts, sleep_before_retry, +}; +use crate::state_manager::{SOURCE_STATE_VERSION, SourceState, validate_state_storage_config}; + +source_connector!(OpenSearchSource); + +const CONNECTOR_NAME: &str = "OpenSearch source"; +const DEFAULT_POLLING_INTERVAL: &str = "10s"; +const DEFAULT_BATCH_SIZE: usize = 100; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct State { + #[serde(default)] + last_poll_timestamp: Option>, + #[serde(default, alias = "total_documents_fetched")] + total_documents_published: usize, + #[serde(default)] + poll_count: usize, + /// OpenSearch `search_after` tuple from the last hit in the previous batch. + #[serde(default)] + search_after: Option>, + #[serde(default)] + error_count: usize, + #[serde(default)] + last_error: Option, + #[serde(default)] + processing_stats: ProcessingStats, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct ProcessingStats { + #[serde(default)] + total_bytes_processed: u64, + /// Running cumulative average over the connector's lifetime, persisted and accumulated + /// across restarts. Reflects long-term throughput baseline, not session-only average. + #[serde(default)] + avg_batch_processing_time_ms: f64, + #[serde(default)] + last_successful_poll: Option>, + #[serde(default)] + empty_polls_count: usize, + #[serde(default)] + successful_polls_count: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateConfig { + #[serde(default)] + pub enabled: bool, + pub storage_type: Option, + pub storage_config: Option, + pub state_id: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OpenSearchSourceConfig { + pub url: String, + pub index: String, + pub username: Option, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")] + pub password: Option, + pub query: Option, + pub polling_interval: Option, + pub batch_size: Option, + pub timestamp_field: Option, + #[serde(default)] + pub verbose_logging: bool, + pub state: Option, + /// Total poll-phase attempt count (not retry count); 1 = no retries, 3 = 2 retries. + pub max_retries: Option, + pub retry_delay: Option, + pub retry_max_delay: Option, + /// Total open-phase attempt count (not retry count); 1 = no retries, 5 = 4 retries. + pub max_open_retries: Option, + pub open_retry_max_delay: Option, + pub circuit_breaker_threshold: Option, + pub circuit_breaker_cool_down: Option, +} + +#[derive(Debug)] +pub struct OpenSearchSource { + id: u32, + config: OpenSearchSourceConfig, + client: Option, + polling_interval: Duration, + /// Pre-built query body (query + size + sort). Set once in `open()` after config validation. + /// `None` before `open()` and after a failed `open()`. + search_body_base: Option, + verbose: bool, + max_retries: u32, + retry_delay: Duration, + retry_max_delay: Duration, + max_open_retries: u32, + open_retry_max_delay: Duration, + circuit_breaker: Arc, + state: Mutex, + /// `Some(cause)` when runtime state restore was rejected; `None` means restore succeeded. + state_restore_error: Option, + /// True when `new()` restored a valid runtime `ConnectorState`. File mirror must not override it. + runtime_state_restored: bool, +} + +struct SearchOutcome { + messages: Vec, + search_after: Option>, + last_poll_timestamp: Option>, + batch_bytes: u64, +} + +impl OpenSearchSource { + pub fn new(id: u32, config: OpenSearchSourceConfig, state: Option) -> Self { + let polling_interval = + parse_duration(config.polling_interval.as_deref(), DEFAULT_POLLING_INTERVAL); + let verbose = config.verbose_logging; + let (restored_state, state_restore_error, runtime_state_restored) = + restore_state(id, state); + + let cb_threshold = config + .circuit_breaker_threshold + .unwrap_or(DEFAULT_CB_THRESHOLD); + let cb_cool_down = parse_duration( + config.circuit_breaker_cool_down.as_deref(), + DEFAULT_CB_COOL_DOWN, + ); + let circuit_breaker = Arc::new(CircuitBreaker::new(cb_threshold, cb_cool_down)); + let max_retries = + normalized_max_attempts(config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES)); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let retry_max_delay = + parse_duration(config.retry_max_delay.as_deref(), DEFAULT_RETRY_MAX_DELAY); + let max_open_retries = + normalized_max_attempts(config.max_open_retries.unwrap_or(DEFAULT_MAX_OPEN_RETRIES)); + let open_retry_max_delay = parse_duration( + config.open_retry_max_delay.as_deref(), + DEFAULT_OPEN_RETRY_MAX_DELAY, + ); + + OpenSearchSource { + id, + config, + max_retries, + retry_delay, + retry_max_delay, + max_open_retries, + open_retry_max_delay, + circuit_breaker, + client: None, + polling_interval, + search_body_base: None, + verbose, + state: Mutex::new(restored_state), + state_restore_error, + runtime_state_restored, + } + } + + fn serialize_state(&self, state: &State) -> Option { + ConnectorState::serialize(state, CONNECTOR_NAME, self.id) + } + + fn batch_size(&self) -> usize { + self.config.batch_size.unwrap_or(DEFAULT_BATCH_SIZE) + } + + fn timestamp_field(&self) -> &str { + self.config + .timestamp_field + .as_deref() + .expect("timestamp_field validated at open()") + } + + fn get_state_id(&self) -> String { + self.config + .state + .as_ref() + .and_then(|s| s.state_id.clone()) + .unwrap_or_else(|| format!("opensearch_source_{}", self.id)) + } + + pub(crate) async fn internal_state_to_source_state(&self) -> Result { + let state = self.state.lock().await; + let data = serde_json::to_value(&*state).map_err(|error| { + Error::Serialization(format!("Failed to serialize connector state: {error}")) + })?; + + Ok(SourceState { + id: self.get_state_id(), + last_updated: Utc::now(), + version: SOURCE_STATE_VERSION, + data, + metadata: Some(json!({ + "connector_type": "opensearch_source", + "connector_id": self.id, + "index": self.config.index, + "url": self.config.url, + })), + }) + } + + pub(crate) async fn source_state_to_internal_state( + &mut self, + source_state: SourceState, + ) -> Result<(), Error> { + if source_state.version != SOURCE_STATE_VERSION { + return Err(Error::Serialization(format!( + "unsupported file state version {}, expected {SOURCE_STATE_VERSION}", + source_state.version + ))); + } + + let restored: State = serde_json::from_value(source_state.data).map_err(|error| { + Error::Serialization(format!("Failed to deserialize connector state: {error}")) + })?; + + let mut state = self.state.lock().await; + *state = restored; + Ok(()) + } + + async fn create_client(&self) -> Result { + let url = Url::parse(&self.config.url).map_err(|error| { + Error::InvalidConfigValue(format!("Invalid OpenSearch URL: {error}")) + })?; + + let conn_pool = opensearch::http::transport::SingleNodeConnectionPool::new(url); + let mut transport_builder = TransportBuilder::new(conn_pool); + + if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) { + let credentials = + Credentials::Basic(username.clone(), password.expose_secret().to_string()); + transport_builder = transport_builder.auth(credentials); + } + + let transport = transport_builder + .build() + .map_err(|error| Error::InitError(format!("Failed to build transport: {error}")))?; + + Ok(OpenSearch::new(transport)) + } + + async fn check_index_exists_with_retry(&self, client: &OpenSearch) -> Result<(), Error> { + let max_attempts = self.max_open_retries; + let mut attempt = 0u32; + + loop { + attempt += 1; + let response = match client + .indices() + .exists(opensearch::indices::IndicesExistsParts::Index(&[&self + .config + .index])) + .send() + .await + { + Ok(response) => response, + Err(error) => { + if attempt < max_attempts { + sleep_before_retry( + "index_exists", + self.id, + attempt, + max_attempts, + // retry_delay is shared with the poll phase (no separate open-phase + // base delay); open_retry_max_delay caps the growth independently. + // Same pattern as the InfluxDB source connector. + &RetryBackoff { + delay: self.retry_delay, + max_delay: self.open_retry_max_delay, + }, + None, + &error.to_string(), + ) + .await; + continue; + } + return Err(Error::InitError(format!( + "Failed to check index existence: {error}" + ))); + } + }; + + if response.status_code().is_success() { + return Ok(()); + } + + let status = response.status_code().as_u16(); + if status == 404 { + return Err(Error::InitError(format!( + "Index '{}' does not exist or is not accessible", + self.config.index + ))); + } + + let retry_after = response + .headers() + .get("retry-after") + .and_then(|value| value.to_str().ok()) + .map(str::to_owned); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "unknown error".to_string()); + + if is_transient_status(status) && attempt < max_attempts { + sleep_before_retry( + "index_exists", + self.id, + attempt, + max_attempts, + &RetryBackoff { + delay: self.retry_delay, + max_delay: self.open_retry_max_delay, + }, + retry_after.as_deref(), + &format!("HTTP {status}: {error_text}"), + ) + .await; + continue; + } + + return Err(Error::InitError(format!( + "Index '{}' check failed: HTTP {status}: {error_text}", + self.config.index + ))); + } + } + + async fn send_search_with_retry( + &self, + client: &OpenSearch, + mut search_body: Value, + ) -> Result { + let max_attempts = self.max_retries; + let mut attempt = 0u32; + + loop { + attempt += 1; + let body = if attempt < max_attempts { + search_body.clone() + } else { + std::mem::take(&mut search_body) + }; + let response = match client + .search(SearchParts::Index(&[&self.config.index])) + .body(body) + .send() + .await + { + Ok(response) => response, + Err(error) => { + if attempt < max_attempts { + sleep_before_retry( + "search", + self.id, + attempt, + max_attempts, + &RetryBackoff { + delay: self.retry_delay, + max_delay: self.retry_max_delay, + }, + None, + &error.to_string(), + ) + .await; + continue; + } + return Err(Error::Storage(format!("Failed to execute search: {error}"))); + } + }; + + if response.status_code().is_success() { + return Ok(response); + } + + let status = response.status_code().as_u16(); + let retry_after = response + .headers() + .get("retry-after") + .and_then(|value| value.to_str().ok()) + .map(str::to_owned); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "unknown error".to_string()); + + if is_transient_status(status) && attempt < max_attempts { + sleep_before_retry( + "search", + self.id, + attempt, + max_attempts, + &RetryBackoff { + delay: self.retry_delay, + max_delay: self.retry_max_delay, + }, + retry_after.as_deref(), + &format!("HTTP {status}: {error_text}"), + ) + .await; + continue; + } + + return Err(Error::Storage(format!( + "Search request failed: {error_text}" + ))); + } + } + + async fn search_documents(&self, client: &OpenSearch) -> Result { + let state = self.state.lock().await; + let search_after = state.search_after.clone(); + drop(state); + + let timestamp_field = self.timestamp_field(); + + let mut search_body = self + .search_body_base + .as_ref() + .ok_or_else(|| { + Error::Connection("connector not initialized; call open() first".to_string()) + })? + .clone(); + + if let Some(cursor) = search_after { + search_body["search_after"] = json!(cursor); + } + + let response = self.send_search_with_retry(client, search_body).await?; + + let mut response_body: Value = response + .json() + .await + .map_err(|e| Error::Storage(format!("Failed to parse search response: {e}")))?; + + let hits: Vec = response_body + .get_mut("hits") + .and_then(|h| h.get_mut("hits")) + .and_then(|arr| arr.as_array_mut()) + .map(std::mem::take) + .unwrap_or_default(); + + let mut messages = Vec::with_capacity(hits.len()); + let mut batch_bytes = 0u64; + let mut last_sort = None; + let mut last_poll_timestamp = None; + + for hit in &hits { + let Some(sort) = hit + .get("sort") + .and_then(|s| s.as_array()) + .filter(|a| !a.is_empty()) + else { + warn!( + connector_id = self.id, + hit_id = hit.get("_id").and_then(|value| value.as_str()), + "Skipping OpenSearch hit without sort tuple; document will not be published" + ); + continue; + }; + + last_sort = Some(sort); + + let Some(source) = hit.get("_source") else { + warn!( + connector_id = self.id, + hit_id = hit.get("_id").and_then(|v| v.as_str()), + "Skipping OpenSearch hit without _source; document will not be published" + ); + continue; + }; + + if let Some(timestamp_value) = source.get(timestamp_field) + && let Some(timestamp_utc) = parse_document_timestamp(timestamp_value) + { + last_poll_timestamp = Some(timestamp_utc); + } + + let payload = serde_json::to_vec(source) + .map_err(|e| Error::Serialization(format!("Failed to serialize document: {e}")))?; + batch_bytes += payload.len() as u64; + + messages.push(ProducedMessage { + id: None, + headers: None, + checksum: None, + timestamp: None, + origin_timestamp: None, + payload, + }); + } + + if !hits.is_empty() && last_sort.is_none() { + return Err(Error::Storage(format!( + "OpenSearch returned {} hit(s) but none had a sort tuple; \ + index may be missing the sort field or using an incompatible mapping", + hits.len() + ))); + } + + // Guard: cursor would advance past the entire batch but nothing would be published. + // This happens when _source is disabled on the index (all sort-bearing hits lack _source). + if !hits.is_empty() && messages.is_empty() && last_sort.is_some() { + return Err(Error::Storage(format!( + "OpenSearch returned {} hit(s) with valid sort tuples but all were missing \ + _source; index may have _source disabled. Refusing to advance cursor \ + without publishing any messages.", + hits.len() + ))); + } + + Ok(SearchOutcome { + messages, + search_after: last_sort.map(ToOwned::to_owned), + last_poll_timestamp, + batch_bytes, + }) + } + + async fn finalize_poll( + &self, + outcome: SearchOutcome, + processing_time_ms: f64, + ) -> (Vec, Option) { + let mut state = self.state.lock().await; + state.total_documents_published += outcome.messages.len(); + state.poll_count += 1; + if let Some(cursor) = outcome.search_after { + state.search_after = Some(cursor); + } + if let Some(timestamp) = outcome.last_poll_timestamp { + state.last_poll_timestamp = Some(timestamp); + } + state.processing_stats.total_bytes_processed += outcome.batch_bytes; + + if outcome.messages.is_empty() { + state.processing_stats.empty_polls_count += 1; + } else { + state.processing_stats.successful_polls_count += 1; + state.processing_stats.last_successful_poll = Some(Utc::now()); + } + + let total_polls = state.processing_stats.successful_polls_count + + state.processing_stats.empty_polls_count; + // total_polls >= 1 because one of the two counters was just incremented above, + // but use saturating_sub to make the invariant machine-checked. + state.processing_stats.avg_batch_processing_time_ms = + (state.processing_stats.avg_batch_processing_time_ms + * total_polls.saturating_sub(1) as f64 + + processing_time_ms) + / total_polls as f64; + + let produced_count = outcome.messages.len(); + let total_documents_published = state.total_documents_published; + let state_snapshot = state.clone(); + let messages = outcome.messages; + drop(state); + let persisted_state = self.serialize_state(&state_snapshot); + + if self.verbose { + info!( + "OpenSearch source connector ID: {} produced {produced_count} messages. \ + Total published: {total_documents_published}", + self.id + ); + } else { + debug!( + "OpenSearch source connector ID: {} produced {produced_count} messages. \ + Total published: {total_documents_published}", + self.id + ); + } + + (messages, persisted_state) + } + + #[cfg(test)] + fn client_initialized(&self) -> bool { + self.client.is_some() + } + + #[cfg(test)] + async fn test_metrics(&self) -> (usize, usize, usize, usize) { + let state = self.state.lock().await; + ( + state.total_documents_published, + state.poll_count, + state.error_count, + state.processing_stats.empty_polls_count, + ) + } + + #[cfg(test)] + async fn test_search_after(&self) -> Option> { + self.state.lock().await.search_after.clone() + } + + #[cfg(test)] + async fn test_last_poll_timestamp(&self) -> Option> { + self.state.lock().await.last_poll_timestamp + } + + async fn handle_poll_error(&self, error: Error) -> Result { + self.circuit_breaker.record_failure().await; + let mut state = self.state.lock().await; + // error_count and last_error accumulate in State and are captured by the next + // finalize_poll call (success path). Both runtime ConnectorState and file state + // preserve these values across restarts. + state.error_count += 1; + state.last_error = Some(error.to_string()); + drop(state); + error!( + "{CONNECTOR_NAME} connector ID: {} poll failed: {error}", + self.id + ); + sleep(self.polling_interval).await; + Err(error) + } +} + +fn restore_state(id: u32, state: Option) -> (State, Option, bool) { + let Some(connector_state) = state else { + return (State::default(), None, false); + }; + + match connector_state.deserialize::(CONNECTOR_NAME, id) { + Some(restored) => { + info!( + "Restored state for {CONNECTOR_NAME} connector with ID: {id}. \ + Documents published: {}, poll count: {}", + restored.total_documents_published, restored.poll_count + ); + (restored, None, true) + } + None => { + let cause = "persisted state exists but could not be deserialized. \ + Refusing to start to prevent silent cursor reset." + .to_string(); + error!("{CONNECTOR_NAME} ID {id}: {cause}"); + (State::default(), Some(cause), false) + } + } +} + +fn validate_open_config(config: &OpenSearchSourceConfig) -> Result<(), Error> { + if config.timestamp_field.as_deref().is_none_or(str::is_empty) { + return Err(Error::InvalidConfigValue( + "timestamp_field is required for incremental OpenSearch polling".to_string(), + )); + } + + if matches!(config.batch_size, Some(0)) { + return Err(Error::InvalidConfigValue( + "batch_size must be at least 1".to_string(), + )); + } + + if let Some(state) = &config.state + && state.enabled + { + validate_state_storage_config(state)?; + } + + Ok(()) +} + +fn parse_document_timestamp(value: &Value) -> Option> { + match value { + Value::String(text) => DateTime::parse_from_rfc3339(text) + .ok() + .map(|timestamp| timestamp.with_timezone(&Utc)), + Value::Number(number) => { + let raw = number.as_i64()?; + // Values above 1e12 are already milliseconds (Unix epoch seconds won't reach + // 1e12 until year 33658). Values at or below are treated as seconds and + // multiplied by 1000. + let millis = if !(-1_000_000_000_000..=1_000_000_000_000).contains(&raw) { + raw + } else { + raw.saturating_mul(1_000) + }; + DateTime::from_timestamp_millis(millis).map(|timestamp| timestamp.with_timezone(&Utc)) + } + _ => None, + } +} + +#[async_trait] +impl Source for OpenSearchSource { + async fn open(&mut self) -> Result<(), Error> { + if let Some(ref cause) = self.state_restore_error { + return Err(Error::InitError(format!("state restore failed: {cause}"))); + } + + validate_open_config(&self.config)?; + + let timestamp_field = self.timestamp_field(); + let batch_size = self.batch_size(); + self.search_body_base = Some(json!({ + "query": self.config.query.clone().unwrap_or_else(|| json!({ "match_all": {} })), + "size": batch_size, + "sort": [ + { timestamp_field: { "order": "asc" } }, + { "_id": { "order": "asc" } } + ] + })); + + info!( + "Opening OpenSearch source connector with ID: {} for URL: {}, index: {}", + self.id, self.config.url, self.config.index + ); + + let client = self.create_client().await?; + + self.check_index_exists_with_retry(&client).await?; + + self.client = Some(client); + + if self + .config + .state + .as_ref() + .map(|s| s.enabled) + .unwrap_or(false) + { + if self.runtime_state_restored { + info!( + "Skipping file state load for OpenSearch source connector with ID: {} \ + because runtime ConnectorState is authoritative", + self.id + ); + } else { + self.load_state().await.map_err(|error| { + Error::InitError(format!("file state load failed: {error}")) + })?; + } + } + + info!( + "Successfully opened OpenSearch source connector with ID: {}", + self.id + ); + Ok(()) + } + + async fn poll(&self) -> Result { + if self.circuit_breaker.is_open().await { + warn!( + "{CONNECTOR_NAME} connector ID: {} — circuit breaker is OPEN. Skipping poll.", + self.id + ); + sleep(self.polling_interval).await; + return Ok(ProducedMessages { + schema: Schema::Json, + messages: vec![], + state: None, + }); + } + + let start_time = std::time::Instant::now(); + + let client = self + .client + .as_ref() + .ok_or_else(|| Error::Connection("OpenSearch client not initialized".to_string()))?; + + match self.search_documents(client).await { + Ok(outcome) => { + self.circuit_breaker.record_success(); + let processing_time = start_time.elapsed().as_millis() as f64; + let (messages, persisted_state) = + self.finalize_poll(outcome, processing_time).await; + sleep(self.polling_interval).await; + Ok(ProducedMessages { + schema: Schema::Json, + messages, + state: persisted_state, + }) + } + Err(error) => self.handle_poll_error(error).await, + } + } + + async fn close(&mut self) -> Result<(), Error> { + let state = self.state.lock().await; + info!( + "OpenSearch source connector with ID: {} is closing. Stats: {} total documents published, {} polls executed, {} errors", + self.id, state.total_documents_published, state.poll_count, state.error_count + ); + drop(state); + + if self.client.is_some() + && self + .config + .state + .as_ref() + .map(|s| s.enabled) + .unwrap_or(false) + { + self.save_state().await?; + } + + self.client = None; + info!( + "OpenSearch source connector with ID: {} is closed.", + self.id + ); + Ok(()) + } +} + +#[cfg(test)] +mod http_tests; + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config() -> OpenSearchSourceConfig { + OpenSearchSourceConfig { + url: "http://localhost:9200".to_string(), + index: "test_documents".to_string(), + username: None, + password: None, + query: None, + polling_interval: Some("100ms".to_string()), + batch_size: Some(10), + timestamp_field: Some("timestamp".to_string()), + verbose_logging: false, + state: None, + max_retries: None, + retry_delay: None, + retry_max_delay: None, + max_open_retries: None, + open_retry_max_delay: None, + circuit_breaker_threshold: None, + circuit_breaker_cool_down: None, + } + } + + fn test_state() -> State { + State { + last_poll_timestamp: None, + total_documents_published: 500, + poll_count: 7, + search_after: Some(vec![json!("2024-01-01T00:00:00Z"), json!("doc_42")]), + error_count: 1, + last_error: Some("connection reset".to_string()), + processing_stats: ProcessingStats { + total_bytes_processed: 1024, + avg_batch_processing_time_ms: 12.5, + last_successful_poll: None, + empty_polls_count: 2, + successful_polls_count: 5, + }, + } + } + + #[test] + fn given_persisted_runtime_state_when_new_should_restore_counts() { + let state = test_state(); + let serialized = rmp_serde::to_vec(&state).expect("Failed to serialize state"); + let connector_state = ConnectorState(serialized); + + let source = OpenSearchSource::new(1, test_config(), Some(connector_state)); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let restored = source.state.lock().await; + assert_eq!(restored.total_documents_published, 500); + assert_eq!(restored.poll_count, 7); + assert_eq!( + restored.search_after, + Some(vec![json!("2024-01-01T00:00:00Z"), json!("doc_42")]) + ); + assert!(source.state_restore_error.is_none()); + assert!(source.runtime_state_restored); + }); + } + + #[test] + fn given_no_runtime_state_when_new_should_start_fresh() { + let source = OpenSearchSource::new(1, test_config(), None); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let state = source.state.lock().await; + assert_eq!(state.total_documents_published, 0); + assert_eq!(state.poll_count, 0); + assert_eq!(state.search_after, None); + assert!(source.state_restore_error.is_none()); + assert!(!source.runtime_state_restored); + }); + } + + #[test] + fn given_invalid_runtime_state_when_new_should_set_restore_error() { + let invalid_state = ConnectorState(b"not valid msgpack".to_vec()); + let source = OpenSearchSource::new(1, test_config(), Some(invalid_state)); + + assert!(source.state_restore_error.is_some()); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let state = source.state.lock().await; + assert_eq!(state.total_documents_published, 0); + assert_eq!(state.poll_count, 0); + }); + } + + #[test] + fn given_invalid_state_when_open_should_fail() { + let invalid_state = ConnectorState(b"not valid msgpack".to_vec()); + let mut source = OpenSearchSource::new(1, test_config(), Some(invalid_state)); + let runtime = tokio::runtime::Runtime::new().unwrap(); + let result = runtime.block_on(source.open()); + assert!( + matches!(result, Err(Error::InitError(_))), + "open() must fail with InitError on restore failure" + ); + } + + #[test] + fn given_missing_timestamp_field_when_validate_should_fail() { + let mut config = test_config(); + config.timestamp_field = None; + let error = validate_open_config(&config).expect_err("missing timestamp_field"); + assert!(matches!(error, Error::InvalidConfigValue(_))); + } + + #[test] + fn given_empty_timestamp_field_when_validate_should_fail() { + let mut config = test_config(); + config.timestamp_field = Some(String::new()); + let error = validate_open_config(&config).expect_err("empty timestamp_field"); + assert!(matches!(error, Error::InvalidConfigValue(_))); + } + + #[test] + fn given_unparsable_timestamp_when_parsed_should_return_none() { + let value = json!("not-a-timestamp"); + assert!(parse_document_timestamp(&value).is_none()); + } + + #[test] + fn given_source_state_json_when_apply_should_restore_metrics() { + use crate::state_manager::{SOURCE_STATE_VERSION, SourceState}; + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut source = OpenSearchSource::new(1, test_config(), None); + let source_state = SourceState { + id: "opensearch_source_1".to_string(), + last_updated: Utc::now(), + version: SOURCE_STATE_VERSION, + data: json!({ + "total_documents_published": 9, + "poll_count": 4, + "search_after": ["2024-02-01T00:00:00Z", "doc-9"], + "error_count": 2, + "last_error": "timeout", + "processing_stats": { + "total_bytes_processed": 100, + "avg_batch_processing_time_ms": 1.5, + "last_successful_poll": null, + "empty_polls_count": 1, + "successful_polls_count": 3 + } + }), + metadata: None, + }; + + source + .source_state_to_internal_state(source_state) + .await + .expect("apply source state"); + + let state = source.state.lock().await; + assert_eq!(state.total_documents_published, 9); + assert_eq!(state.poll_count, 4); + assert_eq!( + state.search_after, + Some(vec![json!("2024-02-01T00:00:00Z"), json!("doc-9")]) + ); + assert_eq!(state.error_count, 2); + }); + } + + #[test] + fn given_unsupported_file_state_version_when_applied_should_fail() { + use crate::state_manager::SourceState; + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut source = OpenSearchSource::new(1, test_config(), None); + let source_state = SourceState { + id: "opensearch_source_1".to_string(), + last_updated: Utc::now(), + version: 99, + data: json!({ "poll_count": 1 }), + metadata: None, + }; + + let error = source + .source_state_to_internal_state(source_state) + .await + .expect_err("unsupported version"); + assert!(matches!(error, Error::Serialization(_))); + }); + } + + #[test] + fn given_invalid_url_when_open_should_return_invalid_config() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut config = test_config(); + config.url = "not-a-url".to_string(); + let mut source = OpenSearchSource::new(1, config, None); + let error = source.open().await.expect_err("invalid url"); + assert!(matches!(error, Error::InvalidConfigValue(_))); + }); + } + + #[test] + fn given_internal_state_when_export_should_round_trip_source_state() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let source = OpenSearchSource::new(1, test_config(), None); + { + let mut runtime_state = source.state.lock().await; + *runtime_state = test_state(); + } + let exported = source + .internal_state_to_source_state() + .await + .expect("export state"); + assert_eq!(exported.id, "opensearch_source_1"); + assert_eq!(exported.data["total_documents_published"], 500); + assert_eq!( + exported.metadata.as_ref().unwrap()["index"], + "test_documents" + ); + }); + } + + #[test] + fn given_zero_batch_size_when_validate_should_fail() { + let mut config = test_config(); + config.batch_size = Some(0); + let error = validate_open_config(&config).expect_err("zero batch_size"); + assert!(matches!(error, Error::InvalidConfigValue(_))); + } + + #[test] + fn given_rfc3339_timestamp_when_parsed_should_succeed() { + let value = json!("2024-01-15T10:30:00Z"); + assert!(parse_document_timestamp(&value).is_some()); + } + + #[test] + fn given_epoch_millis_timestamp_when_parsed_should_succeed() { + let value = json!(1_705_312_200_000_i64); + assert!(parse_document_timestamp(&value).is_some()); + } + + #[test] + fn given_state_when_serialized_should_round_trip() { + let original = test_state(); + + let serialized = rmp_serde::to_vec(&original).expect("Failed to serialize"); + let deserialized: State = + rmp_serde::from_slice(&serialized).expect("Failed to deserialize"); + + assert_eq!( + original.total_documents_published, + deserialized.total_documents_published + ); + assert_eq!(original.poll_count, deserialized.poll_count); + assert_eq!(original.search_after, deserialized.search_after); + assert_eq!(original.error_count, deserialized.error_count); + } + + #[test] + fn given_state_when_serialized_should_produce_connector_state() { + let source = OpenSearchSource::new(1, test_config(), None); + let state = test_state(); + + let connector_state = source.serialize_state(&state); + assert!(connector_state.is_some()); + + let restored: State = connector_state + .unwrap() + .deserialize(CONNECTOR_NAME, 1) + .expect("Failed to deserialize state"); + assert_eq!(restored.total_documents_published, 500); + } +} diff --git a/core/connectors/sources/opensearch_source/src/retry.rs b/core/connectors/sources/opensearch_source/src/retry.rs new file mode 100644 index 0000000000..587b3dfbb2 --- /dev/null +++ b/core/connectors/sources/opensearch_source/src/retry.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy_connector_sdk::retry::{exponential_backoff, jitter, parse_retry_after}; +use std::time::Duration; +use tokio::time::sleep; +use tracing::warn; + +pub(crate) const DEFAULT_MAX_RETRIES: u32 = 3; +pub(crate) const DEFAULT_RETRY_DELAY: &str = "1s"; +pub(crate) const DEFAULT_RETRY_MAX_DELAY: &str = "30s"; +pub(crate) const DEFAULT_MAX_OPEN_RETRIES: u32 = 5; +pub(crate) const DEFAULT_OPEN_RETRY_MAX_DELAY: &str = "30s"; +pub(crate) const DEFAULT_CB_THRESHOLD: u32 = 5; +pub(crate) const DEFAULT_CB_COOL_DOWN: &str = "60s"; + +/// Total attempt count (minimum 1), consistent with InfluxDB connector config. +pub(crate) fn normalized_max_attempts(max_retries: u32) -> u32 { + max_retries.max(1) +} + +/// Returns true for HTTP status codes worth retrying: 429 and 5xx. +pub(crate) fn is_transient_status(status: u16) -> bool { + status == 429 || (500..600).contains(&status) +} + +pub(crate) struct RetryBackoff { + pub delay: Duration, + pub max_delay: Duration, +} + +pub(crate) async fn sleep_before_retry( + operation: &str, + connector_id: u32, + attempt: u32, + max_attempts: u32, + backoff: &RetryBackoff, + retry_after: Option<&str>, + reason: &str, +) { + let delay = retry_after.and_then(parse_retry_after).unwrap_or_else(|| { + jitter(exponential_backoff( + backoff.delay, + attempt, + backoff.max_delay, + )) + }); + warn!( + connector_id, + operation, + attempt, + max_attempts, + delay_ms = delay.as_millis(), + reason, + "OpenSearch request failed; retrying" + ); + sleep(delay).await; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn given_429_or_5xx_when_check_transient_should_return_true() { + assert!(is_transient_status(429)); + assert!(is_transient_status(503)); + assert!(!is_transient_status(404)); + assert!(!is_transient_status(400)); + } + + #[test] + fn given_zero_max_retries_when_normalize_should_return_one() { + assert_eq!(normalized_max_attempts(0), 1); + assert_eq!(normalized_max_attempts(3), 3); + } +} diff --git a/core/connectors/sources/opensearch_source/src/state_manager.rs b/core/connectors/sources/opensearch_source/src/state_manager.rs new file mode 100644 index 0000000000..a11bb91211 --- /dev/null +++ b/core/connectors/sources/opensearch_source/src/state_manager.rs @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{OpenSearchSource, StateConfig}; +use async_trait::async_trait; +use iggy_common::{DateTime, Utc}; +use iggy_connector_sdk::Error; +use serde::{Deserialize, Serialize}; +use std::io::ErrorKind; +use std::sync::Arc; +use tracing::info; + +pub(crate) const SOURCE_STATE_VERSION: u32 = 1; + +impl OpenSearchSource { + pub(super) async fn save_state(&self) -> Result<(), Error> { + let Some(state_config) = self.config.state.as_ref().filter(|s| s.enabled) else { + return Ok(()); + }; + let storage = create_state_storage(state_config)?; + + let source_state = self.internal_state_to_source_state().await?; + storage.save_source_state(&source_state).await?; + + info!( + "Saved state for OpenSearch source connector with ID: {}", + self.id + ); + Ok(()) + } + + pub(super) async fn load_state(&mut self) -> Result<(), Error> { + let Some(state_config) = self.config.state.as_ref().filter(|s| s.enabled) else { + return Ok(()); + }; + let storage = create_state_storage(state_config)?; + + let state_id = self.get_state_id(); + if let Some(source_state) = storage.load_source_state(&state_id).await? { + self.source_state_to_internal_state(source_state).await?; + + let (last_poll_timestamp, total_documents_published, poll_count) = { + let state = self.state.lock().await; + ( + state.last_poll_timestamp, + state.total_documents_published, + state.poll_count, + ) + }; + info!( + "Loaded state for OpenSearch source connector with ID: {} - last poll: {:?}, total docs: {}, polls: {}", + self.id, last_poll_timestamp, total_documents_published, poll_count + ); + } else { + info!( + "No existing state found for OpenSearch source connector with ID: {}, starting fresh", + self.id + ); + } + + Ok(()) + } +} + +pub(crate) fn validate_state_storage_config(config: &StateConfig) -> Result<(), Error> { + match config.storage_type.as_deref() { + Some("file") | None => Ok(()), + Some(storage_type) => Err(Error::InvalidConfigValue(format!( + "state storage_type {storage_type:?} is not supported; only \"file\" is implemented" + ))), + } +} + +pub(crate) fn create_state_storage(config: &StateConfig) -> Result, Error> { + validate_state_storage_config(config)?; + + let base_path = config + .storage_config + .as_ref() + .and_then(|c| c.get("base_path")) + .and_then(|p| p.as_str()) + .unwrap_or("./connector_states"); + + Ok(Arc::new(FileStateStorage::new(base_path))) +} + +/// Optional file-backed mirror of connector state. Runtime `ConnectorState` msgpack is authoritative. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct SourceState { + pub id: String, + pub last_updated: DateTime, + #[serde(default)] + pub version: u32, + pub data: serde_json::Value, + pub metadata: Option, +} + +#[async_trait] +pub(crate) trait StateStorage: Send + Sync { + async fn save_source_state(&self, state: &SourceState) -> Result<(), Error>; + + async fn load_source_state(&self, id: &str) -> Result, Error>; +} + +pub(crate) struct FileStateStorage { + base_path: std::path::PathBuf, +} + +impl FileStateStorage { + pub(crate) fn new>(base_path: P) -> Self { + Self { + base_path: base_path.as_ref().to_path_buf(), + } + } + + fn get_state_path(&self, id: &str) -> std::path::PathBuf { + self.base_path.join(format!("{id}.json")) + } +} + +#[async_trait] +impl StateStorage for FileStateStorage { + async fn save_source_state(&self, state: &SourceState) -> Result<(), Error> { + use tokio::fs::{self, OpenOptions}; + use tokio::io::AsyncWriteExt; + + fs::create_dir_all(&self.base_path) + .await + .map_err(|e| Error::Storage(format!("Failed to create state directory: {e}")))?; + + let path = self.get_state_path(&state.id); + // PID in the suffix avoids races when two processes write the same state_id. + let tmp_path = path.with_extension(format!("json.{}.tmp", std::process::id())); + let json = serde_json::to_string(state) + .map_err(|e| Error::Serialization(format!("Failed to serialize source state: {e}")))?; + + let mut tmp_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&tmp_path) + .await + .map_err(|e| Error::Storage(format!("Failed to open state temp file: {e}")))?; + if let Err(e) = tmp_file.write_all(json.as_bytes()).await { + let _ = fs::remove_file(&tmp_path).await; + return Err(Error::Storage(format!( + "Failed to write state temp file: {e}" + ))); + } + if let Err(e) = tmp_file.sync_all().await { + let _ = fs::remove_file(&tmp_path).await; + return Err(Error::Storage(format!( + "Failed to sync state temp file: {e}" + ))); + } + drop(tmp_file); + + if let Err(e) = fs::rename(&tmp_path, &path).await { + let _ = fs::remove_file(&tmp_path).await; + return Err(Error::Storage(format!("Failed to rename state file: {e}"))); + } + + // Flush the parent directory entry so the rename is durable on crash. + if let Some(parent) = path.parent() { + let dir = tokio::fs::File::open(parent) + .await + .map_err(|e| Error::Storage(format!("Failed to open state directory: {e}")))?; + dir.sync_all() + .await + .map_err(|e| Error::Storage(format!("Failed to sync state directory: {e}")))?; + } + + Ok(()) + } + + async fn load_source_state(&self, id: &str) -> Result, Error> { + use tokio::fs; + + let path = self.get_state_path(id); + let content = match fs::read_to_string(&path).await { + Ok(content) => content, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(None), + Err(error) => { + return Err(Error::Storage(format!( + "Failed to read state file: {error}" + ))); + } + }; + + let state: SourceState = serde_json::from_str(&content).map_err(|e| { + Error::Serialization(format!("Failed to deserialize source state: {e}")) + })?; + + Ok(Some(state)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use iggy_common::Utc; + use serde_json::json; + use tempfile::TempDir; + + fn file_state_config(base_path: &str) -> StateConfig { + StateConfig { + enabled: true, + storage_type: Some("file".to_string()), + storage_config: Some(json!({ "base_path": base_path })), + state_id: Some("opensearch_unit_state".to_string()), + } + } + + #[test] + fn given_unknown_storage_type_should_fail() { + let config = StateConfig { + enabled: true, + storage_type: Some("s3".to_string()), + storage_config: None, + state_id: None, + }; + let error = validate_state_storage_config(&config); + assert!(matches!(error, Err(Error::InvalidConfigValue(_)))); + } + + #[test] + fn given_file_storage_should_save_and_load_source_state() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let temp_dir = TempDir::new().expect("tempdir"); + let config = file_state_config(&temp_dir.path().to_string_lossy()); + let storage = create_state_storage(&config).expect("file storage"); + + let source_state = SourceState { + id: "opensearch_unit_state".to_string(), + last_updated: Utc::now(), + version: SOURCE_STATE_VERSION, + data: json!({ + "total_documents_published": 7, + "poll_count": 2, + "search_after": ["2024-01-01T00:00:00Z", "doc-7"] + }), + metadata: None, + }; + + storage + .save_source_state(&source_state) + .await + .expect("save state"); + let loaded = storage + .load_source_state("opensearch_unit_state") + .await + .expect("load state") + .expect("state file should exist"); + assert_eq!(loaded.data["total_documents_published"], 7); + assert_eq!(loaded.data["poll_count"], 2); + }); + } + + #[test] + fn given_missing_state_file_when_load_should_return_none() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let temp_dir = TempDir::new().expect("tempdir"); + let config = file_state_config(&temp_dir.path().to_string_lossy()); + let storage = create_state_storage(&config).expect("file storage"); + let loaded = storage + .load_source_state("missing_state_id") + .await + .expect("load should not error"); + assert!(loaded.is_none()); + }); + } + + #[test] + fn given_default_storage_type_should_use_file_backend() { + let temp_dir = TempDir::new().expect("tempdir"); + let config = StateConfig { + enabled: true, + storage_type: None, + storage_config: Some( + json!({ "base_path": temp_dir.path().to_string_lossy().as_ref() }), + ), + state_id: None, + }; + let storage = create_state_storage(&config).expect("default file storage"); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let source_state = SourceState { + id: "opensearch_default".to_string(), + last_updated: Utc::now(), + version: SOURCE_STATE_VERSION, + data: json!({ "poll_count": 1 }), + metadata: None, + }; + storage + .save_source_state(&source_state) + .await + .expect("save"); + }); + } +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 0b2f264d03..d415b78073 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -24,6 +24,7 @@ mod http; mod iceberg; mod influxdb; mod mongodb; +mod opensearch; mod postgres; mod quickwit; mod wiremock; @@ -66,6 +67,11 @@ pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture, }; +pub use opensearch::{ + OpenSearchSourceCircuitBreakerFixture, OpenSearchSourceMissingIndexFixture, + OpenSearchSourcePreCreatedFixture, OpenSearchSourceSmallBatchFixture, + OpenSearchSourceTransientErrorFixture, OpenSearchSourceTypedFieldsFixture, +}; pub use postgres::{ PostgresOps, PostgresSinkByteaFixture, PostgresSinkFixture, PostgresSinkJsonFixture, PostgresSourceByteaFixture, PostgresSourceDeleteFixture, PostgresSourceJsonFixture, diff --git a/core/integration/tests/connectors/fixtures/opensearch/container.rs b/core/integration/tests/connectors/fixtures/opensearch/container.rs new file mode 100644 index 0000000000..c4518d8a5a --- /dev/null +++ b/core/integration/tests/connectors/fixtures/opensearch/container.rs @@ -0,0 +1,405 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures; +use integration::harness::TestBinaryError; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use reqwest_retry::RetryTransientMiddleware; +use reqwest_retry::policies::ExponentialBackoff; +use serde::Deserialize; +use testcontainers_modules::testcontainers::core::wait::HttpWaitStrategy; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tracing::info; + +const OPENSEARCH_IMAGE: &str = "docker.io/opensearchproject/opensearch"; +const OPENSEARCH_TAG: &str = "2.19.1"; +const OPENSEARCH_PORT: u16 = 9200; +const OPENSEARCH_HEALTH_ENDPOINT: &str = "/_cluster/health"; + +pub const DEFAULT_TEST_STREAM: &str = "test_stream"; +pub const DEFAULT_TEST_TOPIC: &str = "test_topic"; + +pub const ENV_SOURCE_URL: &str = "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_URL"; +pub const ENV_SOURCE_INDEX: &str = "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_INDEX"; +pub const ENV_SOURCE_POLLING_INTERVAL: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_POLLING_INTERVAL"; +pub const ENV_SOURCE_BATCH_SIZE: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_BATCH_SIZE"; +pub const ENV_SOURCE_TIMESTAMP_FIELD: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_TIMESTAMP_FIELD"; +pub const ENV_SOURCE_STREAMS_0_STREAM: &str = "IGGY_CONNECTORS_SOURCE_OPENSEARCH_STREAMS_0_STREAM"; +pub const ENV_SOURCE_STREAMS_0_TOPIC: &str = "IGGY_CONNECTORS_SOURCE_OPENSEARCH_STREAMS_0_TOPIC"; +pub const ENV_SOURCE_STREAMS_0_SCHEMA: &str = "IGGY_CONNECTORS_SOURCE_OPENSEARCH_STREAMS_0_SCHEMA"; +pub const ENV_SOURCE_PATH: &str = "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PATH"; +pub const ENV_SOURCE_MAX_RETRIES: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_MAX_RETRIES"; +pub const ENV_SOURCE_RETRY_DELAY: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_RETRY_DELAY"; +pub const ENV_SOURCE_RETRY_MAX_DELAY: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_RETRY_MAX_DELAY"; +pub const ENV_SOURCE_MAX_OPEN_RETRIES: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_MAX_OPEN_RETRIES"; +pub const ENV_SOURCE_OPEN_RETRY_MAX_DELAY: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_OPEN_RETRY_MAX_DELAY"; +pub const ENV_SOURCE_CIRCUIT_BREAKER_THRESHOLD: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_CIRCUIT_BREAKER_THRESHOLD"; +pub const ENV_SOURCE_CIRCUIT_BREAKER_COOL_DOWN: &str = + "IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_CIRCUIT_BREAKER_COOL_DOWN"; + +#[allow(dead_code)] +#[derive(Debug, Deserialize)] +pub struct OpenSearchSearchResponse { + pub hits: OpenSearchHits, +} + +#[allow(dead_code)] +#[derive(Debug, Deserialize)] +pub struct OpenSearchHits { + pub total: OpenSearchTotal, + pub hits: Vec, +} + +#[allow(dead_code)] +#[derive(Debug, Deserialize)] +pub struct OpenSearchTotal { + pub value: usize, +} + +#[allow(dead_code)] +#[derive(Debug, Deserialize)] +pub struct OpenSearchHit { + #[serde(rename = "_source")] + pub source: serde_json::Value, +} + +pub struct OpenSearchContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub base_url: String, +} + +impl OpenSearchContainer { + pub async fn start() -> Result { + let container = GenericImage::new(OPENSEARCH_IMAGE, OPENSEARCH_TAG) + .with_exposed_port(OPENSEARCH_PORT.tcp()) + .with_wait_for(WaitFor::http( + HttpWaitStrategy::new(OPENSEARCH_HEALTH_ENDPOINT) + .with_port(OPENSEARCH_PORT.tcp()) + .with_expected_status_code(200u16), + )) + .with_startup_timeout(std::time::Duration::from_secs(120)) + .with_env_var("discovery.type", "single-node") + .with_env_var("plugins.security.disabled", "true") + .with_env_var("DISABLE_INSTALL_DEMO_CONFIG", "true") + .with_env_var("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "iggy-test-password1!") + .with_env_var("OPENSEARCH_JAVA_OPTS", "-Xms512m -Xmx512m") + .with_mapped_port(0, OPENSEARCH_PORT.tcp()) + .with_container_name(fixtures::unique_container_name("opensearch")) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + info!("Started OpenSearch container"); + + let mapped_port = container + .ports() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchContainer".to_string(), + message: format!("Failed to get ports: {e}"), + })? + .map_to_host_port_ipv4(OPENSEARCH_PORT) + .ok_or_else(|| TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchContainer".to_string(), + message: "No mapping for OpenSearch port".to_string(), + })?; + + let base_url = format!("http://localhost:{mapped_port}"); + info!("OpenSearch container available at {base_url}"); + + Ok(Self { + container, + base_url, + }) + } +} + +pub fn create_http_client() -> HttpClient { + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .expect("Failed to build HTTP client"); + reqwest_middleware::ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build() +} + +pub trait OpenSearchOps: Sync { + fn container(&self) -> &OpenSearchContainer; + fn http_client(&self) -> &HttpClient; + + fn create_index( + &self, + index_name: &str, + ) -> impl std::future::Future> + Send { + async move { + let url = format!("{}/{}", self.container().base_url, index_name); + let mapping = serde_json::json!({ + "mappings": { + "properties": { + "id": { "type": "integer" }, + "name": { "type": "keyword" }, + "value": { "type": "integer" }, + "timestamp": { "type": "date" } + } + } + }); + + let response = self + .http_client() + .put(&url) + .header("Content-Type", "application/json") + .json(&mapping) + .send() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchOps".to_string(), + message: format!("Failed to create index: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchOps".to_string(), + message: format!("Failed to create index: status={status}, body={body}"), + }); + } + + info!("Created OpenSearch index: {index_name}"); + Ok(()) + } + } + + fn create_typed_fields_index( + &self, + index_name: &str, + ) -> impl std::future::Future> + Send { + async move { + let url = format!("{}/{}", self.container().base_url, index_name); + let mapping = serde_json::json!({ + "mappings": { + "properties": { + "id": { "type": "integer" }, + "title": { "type": "text" }, + "status": { "type": "keyword" }, + "count": { "type": "long" }, + "score": { "type": "float" }, + "ratio": { "type": "double" }, + "active": { "type": "boolean" }, + "timestamp": { "type": "date" }, + "client_ip": { "type": "ip" }, + "location": { "type": "geo_point" }, + "tags": { "type": "keyword" }, + "optional_note": { "type": "keyword" } + } + } + }); + + let response = self + .http_client() + .put(&url) + .header("Content-Type", "application/json") + .json(&mapping) + .send() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchOps".to_string(), + message: format!("Failed to create typed index: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchOps".to_string(), + message: format!("Failed to create typed index: status={status}, body={body}"), + }); + } + + info!("Created typed OpenSearch index: {index_name}"); + Ok(()) + } + } + + fn index_document( + &self, + index_name: &str, + doc_id: &str, + document: &serde_json::Value, + ) -> impl std::future::Future> + Send { + async move { + let url = format!( + "{}/{}/_doc/{}", + self.container().base_url, + index_name, + doc_id + ); + + let response = self + .http_client() + .put(&url) + .header("Content-Type", "application/json") + .json(document) + .send() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchOps".to_string(), + message: format!("Failed to index document: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::FixtureSetup { + fixture_type: "OpenSearchOps".to_string(), + message: format!("Failed to index document: status={status}, body={body}"), + }); + } + + Ok(()) + } + } + + fn refresh_index( + &self, + index_name: &str, + ) -> impl std::future::Future> + Send { + async move { + let url = format!("{}/{}/_refresh", self.container().base_url, index_name); + + let response = self.http_client().post(&url).send().await.map_err(|e| { + TestBinaryError::InvalidState { + message: format!("Failed to refresh index: {e}"), + } + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!("Failed to refresh index: status={status}, body={body}"), + }); + } + + info!("Refreshed OpenSearch index: {index_name}"); + Ok(()) + } + } + + #[allow(dead_code)] + fn search_all( + &self, + index_name: &str, + ) -> impl std::future::Future> + Send + { + async move { + let url = format!("{}/{}/_search", self.container().base_url, index_name); + let query = serde_json::json!({ + "query": { "match_all": {} }, + "size": 1000, + "_source": true + }); + + let response = self + .http_client() + .post(&url) + .header("Content-Type", "application/json") + .json(&query) + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to search index: {e}"), + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!("Failed to search index: status={status}, body={body}"), + }); + } + + let text = response + .text() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to get response text: {e}"), + })?; + + info!("OpenSearch search response: {text}"); + + serde_json::from_str::(&text).map_err(|e| { + TestBinaryError::InvalidState { + message: format!("Failed to parse search response: {e}, body: {text}"), + } + }) + } + } + + fn count_documents( + &self, + index_name: &str, + ) -> impl std::future::Future> + Send { + async move { + let url = format!("{}/{}/_count", self.container().base_url, index_name); + + let response = self.http_client().get(&url).send().await.map_err(|e| { + TestBinaryError::InvalidState { + message: format!("Failed to count documents: {e}"), + } + })?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(TestBinaryError::InvalidState { + message: format!("Failed to count documents: status={status}, body={body}"), + }); + } + + #[derive(Deserialize)] + struct CountResponse { + count: usize, + } + + let count_response = response.json::().await.map_err(|e| { + TestBinaryError::InvalidState { + message: format!("Failed to parse count response: {e}"), + } + })?; + + Ok(count_response.count) + } + } +} diff --git a/core/integration/tests/connectors/fixtures/opensearch/mod.rs b/core/integration/tests/connectors/fixtures/opensearch/mod.rs new file mode 100644 index 0000000000..c943aa06b2 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/opensearch/mod.rs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod container; +pub mod resilience; +pub mod source; + +pub use resilience::{ + OpenSearchSourceCircuitBreakerFixture, OpenSearchSourceTransientErrorFixture, +}; +pub use source::{ + OpenSearchSourceMissingIndexFixture, OpenSearchSourcePreCreatedFixture, + OpenSearchSourceSmallBatchFixture, OpenSearchSourceTypedFieldsFixture, +}; diff --git a/core/integration/tests/connectors/fixtures/opensearch/resilience.rs b/core/integration/tests/connectors/fixtures/opensearch/resilience.rs new file mode 100644 index 0000000000..694b27ab81 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/opensearch/resilience.rs @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::container::{ + DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SOURCE_BATCH_SIZE, + ENV_SOURCE_CIRCUIT_BREAKER_COOL_DOWN, ENV_SOURCE_CIRCUIT_BREAKER_THRESHOLD, ENV_SOURCE_INDEX, + ENV_SOURCE_MAX_OPEN_RETRIES, ENV_SOURCE_MAX_RETRIES, ENV_SOURCE_OPEN_RETRY_MAX_DELAY, + ENV_SOURCE_PATH, ENV_SOURCE_POLLING_INTERVAL, ENV_SOURCE_RETRY_DELAY, + ENV_SOURCE_RETRY_MAX_DELAY, ENV_SOURCE_STREAMS_0_SCHEMA, ENV_SOURCE_STREAMS_0_STREAM, + ENV_SOURCE_STREAMS_0_TOPIC, ENV_SOURCE_TIMESTAMP_FIELD, ENV_SOURCE_URL, +}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture}; +use std::collections::HashMap; +use wiremock::matchers::{method, path_regex}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +pub const RESILIENCE_INDEX: &str = "resilience_test"; + +fn index_path_pattern() -> &'static str { + r"/resilience_test$" +} + +fn search_path_pattern() -> &'static str { + r"/resilience_test/_search" +} + +fn search_success_body() -> serde_json::Value { + serde_json::json!({ + "hits": { + "hits": [{ + "_id": "doc-retry-1", + "_source": { + "id": 42, + "name": "retry_doc", + "value": 100, + "timestamp": "2024-01-15T12:00:00.000Z" + }, + "sort": ["2024-01-15T12:00:00.000Z", "doc-retry-1"] + }] + } + }) +} + +fn resilience_base_envs(mock_uri: &str) -> HashMap { + let mut envs = HashMap::new(); + envs.insert(ENV_SOURCE_URL.to_string(), mock_uri.to_string()); + envs.insert(ENV_SOURCE_INDEX.to_string(), RESILIENCE_INDEX.to_string()); + envs.insert(ENV_SOURCE_POLLING_INTERVAL.to_string(), "100ms".to_string()); + envs.insert(ENV_SOURCE_BATCH_SIZE.to_string(), "10".to_string()); + envs.insert( + ENV_SOURCE_TIMESTAMP_FIELD.to_string(), + "timestamp".to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + "../../target/debug/libiggy_connector_opensearch_source".to_string(), + ); + envs.insert(ENV_SOURCE_RETRY_DELAY.to_string(), "50ms".to_string()); + envs.insert(ENV_SOURCE_RETRY_MAX_DELAY.to_string(), "200ms".to_string()); + envs.insert( + ENV_SOURCE_OPEN_RETRY_MAX_DELAY.to_string(), + "200ms".to_string(), + ); + envs.insert(ENV_SOURCE_MAX_OPEN_RETRIES.to_string(), "2".to_string()); + envs +} + +async fn mount_index_exists(mock_server: &MockServer) { + Mock::given(method("HEAD")) + .and(path_regex(index_path_pattern())) + .respond_with(ResponseTemplate::new(200)) + .mount(mock_server) + .await; +} + +async fn mount_transient_search_mocks(mock_server: &MockServer) { + Mock::given(method("POST")) + .and(path_regex(search_path_pattern())) + .respond_with(ResponseTemplate::new(503).set_body_string("temporary")) + .up_to_n_times(2) + .with_priority(1) + .mount(mock_server) + .await; + + Mock::given(method("POST")) + .and(path_regex(search_path_pattern())) + .respond_with(ResponseTemplate::new(200).set_body_json(search_success_body())) + .with_priority(2) + .mount(mock_server) + .await; +} + +async fn mount_persistent_search_failure(mock_server: &MockServer) { + Mock::given(method("POST")) + .and(path_regex(search_path_pattern())) + .respond_with(ResponseTemplate::new(500).set_body_string("persistent")) + .mount(mock_server) + .await; +} + +/// Wiremock-backed fixture: two transient `503` search responses, then one hit. +pub struct OpenSearchSourceTransientErrorFixture { + mock_server: MockServer, +} + +impl OpenSearchSourceTransientErrorFixture { + pub async fn search_request_count(&self) -> usize { + self.mock_server + .received_requests() + .await + .unwrap_or_default() + .iter() + .filter(|request| { + request.url.path().contains(search_path_pattern()) + && request.method.as_str() == "POST" + }) + .count() + } +} + +#[async_trait] +impl TestFixture for OpenSearchSourceTransientErrorFixture { + async fn setup() -> Result { + let mock_server = MockServer::start().await; + mount_index_exists(&mock_server).await; + mount_transient_search_mocks(&mock_server).await; + + Ok(Self { mock_server }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = resilience_base_envs(&self.mock_server.uri()); + envs.insert(ENV_SOURCE_MAX_RETRIES.to_string(), "5".to_string()); + envs + } +} + +/// Wiremock-backed fixture: every search returns `500` with a low circuit-breaker threshold. +pub struct OpenSearchSourceCircuitBreakerFixture { + mock_server: MockServer, +} + +impl OpenSearchSourceCircuitBreakerFixture { + pub async fn search_request_count(&self) -> usize { + self.mock_server + .received_requests() + .await + .unwrap_or_default() + .iter() + .filter(|request| { + request.url.path().contains(search_path_pattern()) + && request.method.as_str() == "POST" + }) + .count() + } +} + +#[async_trait] +impl TestFixture for OpenSearchSourceCircuitBreakerFixture { + async fn setup() -> Result { + let mock_server = MockServer::start().await; + mount_index_exists(&mock_server).await; + mount_persistent_search_failure(&mock_server).await; + + Ok(Self { mock_server }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = resilience_base_envs(&self.mock_server.uri()); + envs.insert( + ENV_SOURCE_CIRCUIT_BREAKER_THRESHOLD.to_string(), + "3".to_string(), + ); + envs.insert( + ENV_SOURCE_CIRCUIT_BREAKER_COOL_DOWN.to_string(), + "500ms".to_string(), + ); + envs + } +} diff --git a/core/integration/tests/connectors/fixtures/opensearch/source.rs b/core/integration/tests/connectors/fixtures/opensearch/source.rs new file mode 100644 index 0000000000..b2f2cfffcb --- /dev/null +++ b/core/integration/tests/connectors/fixtures/opensearch/source.rs @@ -0,0 +1,289 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::container::{ + DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SOURCE_BATCH_SIZE, ENV_SOURCE_INDEX, + ENV_SOURCE_PATH, ENV_SOURCE_POLLING_INTERVAL, ENV_SOURCE_STREAMS_0_SCHEMA, + ENV_SOURCE_STREAMS_0_STREAM, ENV_SOURCE_STREAMS_0_TOPIC, ENV_SOURCE_TIMESTAMP_FIELD, + ENV_SOURCE_URL, OpenSearchContainer, OpenSearchOps, create_http_client, +}; +use async_trait::async_trait; +use iggy_common::IggyTimestamp; +use integration::harness::{TestBinaryError, TestFixture}; +use reqwest_middleware::ClientWithMiddleware as HttpClient; +use std::collections::HashMap; +use uuid::Uuid; + +const TEST_INDEX_PREFIX: &str = "test_documents"; + +/// OpenSearch source fixture for basic document polling. +pub struct OpenSearchSourceFixture { + container: OpenSearchContainer, + http_client: HttpClient, + // Unique per fixture so parallel tests never collide on the same index. + index: String, +} + +impl OpenSearchOps for OpenSearchSourceFixture { + fn container(&self) -> &OpenSearchContainer { + &self.container + } + + fn http_client(&self) -> &HttpClient { + &self.http_client + } +} + +impl OpenSearchSourceFixture { + #[allow(dead_code)] + pub fn index_name(&self) -> &str { + &self.index + } + + pub async fn setup_index(&self) -> Result<(), TestBinaryError> { + self.create_index(&self.index).await + } + + pub async fn insert_document( + &self, + doc_id: i32, + name: &str, + value: i32, + ) -> Result<(), TestBinaryError> { + let timestamp = IggyTimestamp::from( + IggyTimestamp::now().as_micros() + u64::from(doc_id as u32) * 1_000, + ) + .to_rfc3339_string(); + let document = serde_json::json!({ + "id": doc_id, + "name": name, + "value": value, + "timestamp": timestamp + }); + self.index_document(&self.index, &doc_id.to_string(), &document) + .await + } + + pub async fn insert_documents(&self, count: usize) -> Result<(), TestBinaryError> { + for i in 1..=count { + self.insert_document(i as i32, &format!("doc_{i}"), (i * 10) as i32) + .await?; + } + self.refresh_index().await?; + Ok(()) + } + + pub async fn get_document_count(&self) -> Result { + self.count_documents(&self.index).await + } + + pub async fn refresh_index(&self) -> Result<(), TestBinaryError> { + OpenSearchOps::refresh_index(self, &self.index).await + } + + pub async fn insert_typed_sample_document(&self) -> Result<(), TestBinaryError> { + let timestamp = IggyTimestamp::now().to_rfc3339_string(); + let document = serde_json::json!({ + "id": 1, + "title": "OpenSearch typed field coverage", + "status": "active", + "count": 9_223_372_036_854_775_807_i64, + "score": 98.6_f32, + "ratio": 0.125_f64, + "active": true, + "timestamp": timestamp, + "client_ip": "192.168.1.42", + "location": { "lat": 40.12, "lon": -71.34 }, + "tags": ["integration", "opensearch"], + "optional_note": null + }); + self.index_document(&self.index, "typed-1", &document) + .await?; + self.refresh_index().await + } +} + +#[async_trait] +impl TestFixture for OpenSearchSourceFixture { + async fn setup() -> Result { + let container = OpenSearchContainer::start().await?; + let http_client = create_http_client(); + let index = format!("{TEST_INDEX_PREFIX}_{}", Uuid::new_v4().simple()); + + // Container startup already waits for /_cluster/health to return 200 + // via HttpWaitStrategy, so no additional health check is needed. + Ok(Self { + container, + http_client, + index, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert(ENV_SOURCE_URL.to_string(), self.container.base_url.clone()); + envs.insert(ENV_SOURCE_INDEX.to_string(), self.index.clone()); + envs.insert(ENV_SOURCE_POLLING_INTERVAL.to_string(), "100ms".to_string()); + envs.insert(ENV_SOURCE_BATCH_SIZE.to_string(), "100".to_string()); + envs.insert( + ENV_SOURCE_TIMESTAMP_FIELD.to_string(), + "timestamp".to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + "../../target/debug/libiggy_connector_opensearch_source".to_string(), + ); + envs + } +} + +/// OpenSearch source fixture with typed-field index mapping. +pub struct OpenSearchSourceTypedFieldsFixture { + inner: OpenSearchSourceFixture, +} + +impl std::ops::Deref for OpenSearchSourceTypedFieldsFixture { + type Target = OpenSearchSourceFixture; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl OpenSearchOps for OpenSearchSourceTypedFieldsFixture { + fn container(&self) -> &OpenSearchContainer { + &self.inner.container + } + + fn http_client(&self) -> &HttpClient { + &self.inner.http_client + } +} + +#[async_trait] +impl TestFixture for OpenSearchSourceTypedFieldsFixture { + async fn setup() -> Result { + let inner = OpenSearchSourceFixture::setup().await?; + inner.create_typed_fields_index(&inner.index).await?; + Ok(Self { inner }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + self.inner.connectors_runtime_envs() + } +} + +/// OpenSearch source fixture with pre-created index. +pub struct OpenSearchSourcePreCreatedFixture { + inner: OpenSearchSourceFixture, +} + +impl std::ops::Deref for OpenSearchSourcePreCreatedFixture { + type Target = OpenSearchSourceFixture; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl OpenSearchOps for OpenSearchSourcePreCreatedFixture { + fn container(&self) -> &OpenSearchContainer { + &self.inner.container + } + + fn http_client(&self) -> &HttpClient { + &self.inner.http_client + } +} + +#[async_trait] +impl TestFixture for OpenSearchSourcePreCreatedFixture { + async fn setup() -> Result { + let inner = OpenSearchSourceFixture::setup().await?; + + inner.setup_index().await?; + + Ok(Self { inner }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + self.inner.connectors_runtime_envs() + } +} + +/// OpenSearch source fixture with pre-created index and a small `batch_size` for +/// pagination integration tests. +pub struct OpenSearchSourceSmallBatchFixture { + inner: OpenSearchSourceFixture, +} + +impl std::ops::Deref for OpenSearchSourceSmallBatchFixture { + type Target = OpenSearchSourceFixture; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl OpenSearchOps for OpenSearchSourceSmallBatchFixture { + fn container(&self) -> &OpenSearchContainer { + &self.inner.container + } + + fn http_client(&self) -> &HttpClient { + &self.inner.http_client + } +} + +#[async_trait] +impl TestFixture for OpenSearchSourceSmallBatchFixture { + async fn setup() -> Result { + let inner = OpenSearchSourceFixture::setup().await?; + inner.setup_index().await?; + Ok(Self { inner }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = self.inner.connectors_runtime_envs(); + envs.insert(ENV_SOURCE_BATCH_SIZE.to_string(), "2".to_string()); + envs + } +} + +/// OpenSearch source fixture pointing at an index that is never created, +/// for exercising the connector's "missing index" failure path. +pub struct OpenSearchSourceMissingIndexFixture { + inner: OpenSearchSourceFixture, +} + +#[async_trait] +impl TestFixture for OpenSearchSourceMissingIndexFixture { + async fn setup() -> Result { + let inner = OpenSearchSourceFixture::setup().await?; + Ok(Self { inner }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + self.inner.connectors_runtime_envs() + } +} diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index fc624f897f..594d88f965 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -25,6 +25,7 @@ mod http_config_provider; mod iceberg; mod influxdb; mod mongodb; +mod opensearch; mod postgres; mod quickwit; mod random; diff --git a/core/integration/tests/connectors/opensearch/docker-compose.yml b/core/integration/tests/connectors/opensearch/docker-compose.yml new file mode 100644 index 0000000000..e765a8b006 --- /dev/null +++ b/core/integration/tests/connectors/opensearch/docker-compose.yml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# docker-compose for OpenSearch connector integration tests. +# +# File location: +# core/integration/tests/connectors/opensearch/docker-compose.yml +# +# Used as a fallback when running tests without a Docker daemon that supports +# testcontainers auto-launch (e.g. some CI environments). Start manually with: +# docker compose -f core/integration/tests/connectors/opensearch/docker-compose.yml up -d +# then run (set IGGY_CONNECTORS_SOURCE_OPENSEARCH_PLUGIN_CONFIG_URL=http://localhost:9200): +# cargo test -p integration -- connectors::opensearch + +services: + opensearch: + image: opensearchproject/opensearch:2.19.1 + container_name: iggy-opensearch-manual + ports: + - "9200:9200" + environment: + discovery.type: single-node + plugins.security.disabled: "true" + DISABLE_INSTALL_DEMO_CONFIG: "true" + OPENSEARCH_INITIAL_ADMIN_PASSWORD: "iggy-test-password1!" + OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m" + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:9200/_cluster/health || exit 1"] + interval: 10s + timeout: 5s + retries: 12 + start_period: 60s diff --git a/core/integration/tests/connectors/opensearch/mod.rs b/core/integration/tests/connectors/opensearch/mod.rs new file mode 100644 index 0000000000..394a024035 --- /dev/null +++ b/core/integration/tests/connectors/opensearch/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod opensearch_source; +mod opensearch_source_resilience; +mod opensearch_source_types; + +const TEST_MESSAGE_COUNT: usize = 3; +const POLL_ATTEMPTS: usize = 100; +const POLL_INTERVAL_MS: u64 = 50; diff --git a/core/integration/tests/connectors/opensearch/opensearch_source.rs b/core/integration/tests/connectors/opensearch/opensearch_source.rs new file mode 100644 index 0000000000..b385e155af --- /dev/null +++ b/core/integration/tests/connectors/opensearch/opensearch_source.rs @@ -0,0 +1,478 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT}; +use crate::connectors::fixtures::{ + OpenSearchSourceMissingIndexFixture, OpenSearchSourcePreCreatedFixture, + OpenSearchSourceSmallBatchFixture, +}; +use iggy_common::MessageClient; +use iggy_common::{Consumer, Identifier, PollingStrategy}; +use iggy_connector_sdk::api::{ConnectorStatus, SourceInfoResponse}; +use integration::harness::seeds; +use integration::iggy_harness; +use reqwest::Client; +use std::collections::HashSet; +use std::time::Duration; +use tokio::time::sleep; + +fn document_ids(messages: &[serde_json::Value]) -> HashSet { + messages + .iter() + .filter_map(|record| record.get("id").and_then(|value| value.as_i64())) + .collect() +} + +fn assert_contains_document_ids(messages: &[serde_json::Value], expected_ids: &[i64]) { + let ids = document_ids(messages); + for expected_id in expected_ids { + assert!( + ids.contains(expected_id), + "expected document id {expected_id}, got ids {ids:?}" + ); + } +} + +async fn poll_all_messages_from_offset_zero( + client: &impl MessageClient, + consumer_id: &Identifier, + min_messages: usize, +) -> Vec { + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let mut received = Vec::new(); + + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::offset(0), + 100, + false, + ) + .await + { + received.clear(); + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= min_messages { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + received +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_documents_in_index_when_connector_polls_should_produce_messages( + harness: &TestHarness, + fixture: OpenSearchSourcePreCreatedFixture, +) { + let client = harness.root_client().await.unwrap(); + + fixture + .insert_documents(TEST_MESSAGE_COUNT) + .await + .expect("Failed to insert documents"); + + let doc_count = fixture + .get_document_count() + .await + .expect("Failed to get document count"); + assert_eq!( + doc_count, TEST_MESSAGE_COUNT, + "Expected {TEST_MESSAGE_COUNT} documents in OpenSearch" + ); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= TEST_MESSAGE_COUNT, + "Expected at least {TEST_MESSAGE_COUNT} messages, got {}", + received.len() + ); + + let expected_ids: Vec = (1..=TEST_MESSAGE_COUNT as i64).collect(); + assert_contains_document_ids(&received, &expected_ids); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_more_documents_than_batch_size_when_connector_polls_should_fetch_all( + harness: &TestHarness, + fixture: OpenSearchSourceSmallBatchFixture, +) { + const DOC_COUNT: usize = 6; + + let client = harness.root_client().await.unwrap(); + + fixture + .insert_documents(DOC_COUNT) + .await + .expect("Failed to insert documents"); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "pagination_consumer".try_into().unwrap(); + + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= DOC_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= DOC_COUNT, + "Expected at least {DOC_COUNT} messages across search_after pages, got {}", + received.len() + ); + + let expected_ids: Vec = (1..=DOC_COUNT as i64).collect(); + assert_contains_document_ids(&received, &expected_ids); + + // Wait for at least one empty poll to fire (connector catches up to end of index). + // A cursor-reset bug would cause the connector to re-fetch all docs on the next empty poll. + sleep(Duration::from_millis(POLL_INTERVAL_MS * 5)).await; + + let audit_consumer: Identifier = "pagination_audit".try_into().unwrap(); + let all_on_stream = + poll_all_messages_from_offset_zero(&client, &audit_consumer, DOC_COUNT).await; + + let all_ids: Vec = all_on_stream + .iter() + .filter_map(|record| record.get("id").and_then(|v| v.as_i64())) + .collect(); + let unique_count = document_ids(&all_on_stream).len(); + assert_eq!( + all_ids.len(), + unique_count, + "stream contains duplicate document IDs after empty poll; cursor was reset" + ); + assert_eq!( + unique_count, DOC_COUNT, + "expected exactly {DOC_COUNT} unique documents on stream, got {unique_count}" + ); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_empty_index_when_connector_polls_should_not_fail( + harness: &TestHarness, + fixture: OpenSearchSourcePreCreatedFixture, +) { + let client = harness.root_client().await.unwrap(); + + let doc_count = fixture + .get_document_count() + .await + .expect("Failed to get document count"); + assert_eq!(doc_count, 0, "Expected empty index"); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + sleep(Duration::from_millis(100)).await; + + let polled = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id), + &PollingStrategy::next(), + 10, + false, + ) + .await; + + assert!( + polled.is_ok(), + "Should be able to poll from topic even with empty source" + ); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_bulk_documents_when_connector_polls_should_produce_all_messages( + harness: &TestHarness, + fixture: OpenSearchSourcePreCreatedFixture, +) { + let client = harness.root_client().await.unwrap(); + let bulk_count = 10; + + fixture + .insert_documents(bulk_count) + .await + .expect("Failed to insert documents"); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 100, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= bulk_count { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= bulk_count, + "Expected at least {bulk_count} messages, got {}", + received.len() + ); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_runtime_state_when_connector_restarts_should_resume_after_cursor( + harness: &mut TestHarness, + fixture: OpenSearchSourcePreCreatedFixture, +) { + fixture + .insert_documents(TEST_MESSAGE_COUNT) + .await + .expect("Failed to insert first batch"); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "state_test_consumer".try_into().unwrap(); + + let client = harness.root_client().await.unwrap(); + let received_before = { + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + received + }; + assert_eq!(received_before.len(), TEST_MESSAGE_COUNT); + + harness + .server_mut() + .stop_dependents() + .expect("Failed to stop connectors"); + + let second_batch_start_id = (TEST_MESSAGE_COUNT + 1) as i32; + for i in 0..TEST_MESSAGE_COUNT { + fixture + .insert_document( + second_batch_start_id + i as i32, + &format!("doc_batch2_{i}"), + (TEST_MESSAGE_COUNT + i) as i32 * 10, + ) + .await + .expect("Failed to insert document"); + } + fixture + .refresh_index() + .await + .expect("Failed to refresh index"); + + harness + .server_mut() + .start_dependents() + .await + .expect("Failed to restart connectors"); + sleep(Duration::from_millis(100)).await; + + let audit_consumer: Identifier = "state_audit_consumer".try_into().unwrap(); + let all_messages = + poll_all_messages_from_offset_zero(&client, &audit_consumer, TEST_MESSAGE_COUNT * 2).await; + + let batch1_ids: HashSet = (1..=TEST_MESSAGE_COUNT as i64).collect(); + let batch1_occurrences = all_messages + .iter() + .filter_map(|record| record.get("id").and_then(|value| value.as_i64())) + .filter(|id| batch1_ids.contains(id)) + .count(); + assert_eq!( + batch1_occurrences, TEST_MESSAGE_COUNT, + "batch 1 IDs must appear exactly once on the stream; duplicates mean cursor reset" + ); + + let batch2_ids: HashSet = + ((TEST_MESSAGE_COUNT + 1) as i64..=(TEST_MESSAGE_COUNT * 2) as i64).collect(); + let batch2_seen: HashSet = all_messages + .iter() + .filter_map(|record| record.get("id").and_then(|value| value.as_i64())) + .filter(|id| batch2_ids.contains(id)) + .collect(); + assert_eq!( + batch2_seen.len(), + TEST_MESSAGE_COUNT, + "batch 2 IDs must be present after restart, got {batch2_seen:?}" + ); +} + +async fn fetch_sources(http_client: &Client, api_address: &str) -> Vec { + let response = http_client + .get(format!("{api_address}/sources")) + .send() + .await + .expect("Failed to query /sources"); + assert_eq!(response.status(), 200); + response.json().await.expect("Failed to parse sources") +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_missing_index_when_connector_opens_should_report_error( + harness: &TestHarness, + _fixture: OpenSearchSourceMissingIndexFixture, +) { + let api_address = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http_client = Client::new(); + + let mut sources = fetch_sources(&http_client, &api_address).await; + for _ in 0..POLL_ATTEMPTS { + if sources + .iter() + .any(|source| source.status == ConnectorStatus::Error) + { + break; + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + sources = fetch_sources(&http_client, &api_address).await; + } + + assert_eq!(sources.len(), 1, "Expected a single configured source"); + let source = &sources[0]; + assert_eq!(source.status, ConnectorStatus::Error); + let last_error = source + .last_error + .as_ref() + .expect("Source with missing index should expose a last_error"); + assert!( + last_error.message.contains("Plugin initialization failed"), + "missing index should fail during plugin open, got: {}", + last_error.message + ); +} diff --git a/core/integration/tests/connectors/opensearch/opensearch_source_resilience.rs b/core/integration/tests/connectors/opensearch/opensearch_source_resilience.rs new file mode 100644 index 0000000000..aaf1bbb7b0 --- /dev/null +++ b/core/integration/tests/connectors/opensearch/opensearch_source_resilience.rs @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{POLL_ATTEMPTS, POLL_INTERVAL_MS}; +use crate::connectors::fixtures::{ + OpenSearchSourceCircuitBreakerFixture, OpenSearchSourceTransientErrorFixture, +}; +use iggy_common::MessageClient; +use iggy_common::{Consumer, Identifier, PollingStrategy}; +use iggy_connector_sdk::api::ConnectorStatus; +use integration::harness::seeds; +use integration::iggy_harness; +use reqwest::Client; +use std::time::Duration; +use tokio::time::sleep; + +async fn poll_json_messages( + client: &impl MessageClient, + consumer_id: &str, + min_messages: usize, +) -> Vec { + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer: Identifier = consumer_id.try_into().unwrap(); + let mut received = Vec::new(); + + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= min_messages { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + received +} + +async fn wait_for_source_status( + http_client: &Client, + api_address: &str, + expected: ConnectorStatus, +) -> ConnectorStatus { + let mut status = ConnectorStatus::Starting; + for _ in 0..POLL_ATTEMPTS { + let response = http_client + .get(format!("{api_address}/sources")) + .send() + .await + .expect("Failed to query /sources"); + assert_eq!(response.status(), 200); + let sources: Vec = + response.json().await.expect("Failed to parse sources"); + if let Some(source) = sources.first() { + status = source.status; + if status == expected { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + status +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source_resilience.toml")), + seed = seeds::connector_stream +)] +async fn given_transient_search_errors_when_connector_polls_should_retry_and_produce( + harness: &TestHarness, + fixture: OpenSearchSourceTransientErrorFixture, +) { + let client = harness.root_client().await.unwrap(); + let received = poll_json_messages(&client, "resilience_retry_consumer", 1).await; + + assert_eq!( + received.len(), + 1, + "expected one message after transient search errors were retried" + ); + assert_eq!( + received[0].get("id").and_then(|value| value.as_i64()), + Some(42) + ); + assert!( + fixture.search_request_count().await >= 3, + "search should be retried after transient 503 responses" + ); + + let api_address = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let status = + wait_for_source_status(&Client::new(), &api_address, ConnectorStatus::Running).await; + assert_eq!(status, ConnectorStatus::Running); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source_resilience.toml")), + seed = seeds::connector_stream +)] +async fn given_persistent_search_errors_when_connector_polls_should_remain_running_without_messages( + harness: &TestHarness, + fixture: OpenSearchSourceCircuitBreakerFixture, +) { + let client = harness.root_client().await.unwrap(); + let api_address = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http_client = Client::new(); + + sleep(Duration::from_millis(POLL_INTERVAL_MS * 20)).await; + + let status = wait_for_source_status(&http_client, &api_address, ConnectorStatus::Running).await; + assert_eq!( + status, + ConnectorStatus::Running, + "persistent search failures should not move source to Error" + ); + + let received = poll_json_messages(&client, "resilience_cb_consumer", 1).await; + assert!( + received.is_empty(), + "persistent failures should not produce messages, got {}", + received.len() + ); + + assert!( + fixture.search_request_count().await >= 3, + "connector should retry transient HTTP failures before surfacing poll errors" + ); +} diff --git a/core/integration/tests/connectors/opensearch/opensearch_source_types.rs b/core/integration/tests/connectors/opensearch/opensearch_source_types.rs new file mode 100644 index 0000000000..45051a25a0 --- /dev/null +++ b/core/integration/tests/connectors/opensearch/opensearch_source_types.rs @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT}; +use crate::connectors::fixtures::{ + OpenSearchSourcePreCreatedFixture, OpenSearchSourceTypedFieldsFixture, +}; +use iggy_common::MessageClient; +use iggy_common::{Consumer, Identifier, PollingStrategy}; +use integration::harness::seeds; +use integration::iggy_harness; +use serde_json::Value; +use std::collections::HashSet; +use std::time::Duration; +use tokio::time::sleep; + +async fn poll_json_messages( + client: &impl MessageClient, + consumer_id: &Identifier, + limit: u32, +) -> Vec { + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let mut received = Vec::new(); + + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + limit, + true, + ) + .await + { + for message in polled.messages { + if let Ok(json) = serde_json::from_slice::(&message.payload) { + received.push(json); + } + } + if !received.is_empty() { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + received +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_document_in_index_when_connector_polls_should_expose_payload_structure( + harness: &TestHarness, + fixture: OpenSearchSourcePreCreatedFixture, +) { + fixture + .insert_document(1, "structure_doc", 42) + .await + .expect("insert document"); + fixture.refresh_index().await.expect("refresh index"); + + let client = harness.root_client().await.unwrap(); + let consumer_id: Identifier = "payload_structure_consumer".try_into().unwrap(); + let messages = poll_json_messages(&client, &consumer_id, 10).await; + + assert_eq!( + messages.len(), + 1, + "expected one message, got {}", + messages.len() + ); + let record = &messages[0]; + assert_eq!(record["id"], 1); + assert_eq!(record["name"], "structure_doc"); + assert_eq!(record["value"], 42); + assert!(record.get("timestamp").is_some(), "missing timestamp field"); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_typed_fields_document_when_connector_polls_should_round_trip_payload( + harness: &TestHarness, + fixture: OpenSearchSourceTypedFieldsFixture, +) { + fixture + .insert_typed_sample_document() + .await + .expect("insert typed document"); + + let client = harness.root_client().await.unwrap(); + let consumer_id: Identifier = "typed_fields_consumer".try_into().unwrap(); + let messages = poll_json_messages(&client, &consumer_id, 10).await; + + assert_eq!(messages.len(), 1, "expected one typed message"); + let record = &messages[0]; + assert_eq!(record["title"], "OpenSearch typed field coverage"); + assert_eq!(record["status"], "active"); + assert_eq!(record["count"].as_i64(), Some(9_223_372_036_854_775_807)); + assert!((record["score"].as_f64().unwrap() - 98.6).abs() < 0.01); + assert!((record["ratio"].as_f64().unwrap() - 0.125).abs() < f64::EPSILON); + assert_eq!(record["active"], true); + assert_eq!(record["client_ip"], "192.168.1.42"); + assert_eq!(record["location"]["lat"], 40.12); + assert_eq!(record["location"]["lon"], -71.34); + assert!(record["tags"].is_array()); + assert!(record["optional_note"].is_null()); + assert!(record.get("timestamp").is_some()); +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/opensearch/source.toml")), + seed = seeds::connector_stream +)] +async fn given_first_batch_polled_when_second_batch_inserted_should_not_duplicate( + harness: &TestHarness, + fixture: OpenSearchSourcePreCreatedFixture, +) { + fixture + .insert_documents(TEST_MESSAGE_COUNT) + .await + .expect("insert first batch"); + + let client = harness.root_client().await.unwrap(); + let consumer_id: Identifier = "cursor_consumer".try_into().unwrap(); + + let first_batch = poll_json_messages(&client, &consumer_id, 10).await; + assert_eq!(first_batch.len(), TEST_MESSAGE_COUNT); + let first_ids: HashSet = first_batch + .iter() + .filter_map(|record| record.get("id").and_then(Value::as_i64)) + .collect(); + + let second_batch_start_id = (TEST_MESSAGE_COUNT + 1) as i32; + for offset in 0..TEST_MESSAGE_COUNT { + fixture + .insert_document( + second_batch_start_id + offset as i32, + &format!("batch_two_{offset}"), + (100 + offset) as i32, + ) + .await + .expect("insert second batch document"); + } + fixture.refresh_index().await.expect("refresh index"); + + // Wait for second batch to arrive, collecting only new IDs. + let mut second_batch_seen = false; + for _ in 0..POLL_ATTEMPTS { + let polled = poll_json_messages(&client, &consumer_id, 10).await; + let new_count = polled + .iter() + .filter_map(|r| r.get("id").and_then(Value::as_i64)) + .filter(|id| !first_ids.contains(id)) + .count(); + if new_count >= TEST_MESSAGE_COUNT { + second_batch_seen = true; + break; + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + assert!(second_batch_seen, "second batch never arrived"); + + // Verify the full stream from offset 0 contains exactly 2*TEST_MESSAGE_COUNT unique docs. + // If the cursor reset bug were present, the connector would re-emit first-batch docs and + // the stream would contain duplicates. + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let audit_consumer: Identifier = "no_dup_audit".try_into().unwrap(); + let mut all_on_stream: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(audit_consumer.clone()), + &PollingStrategy::offset(0), + 100, + false, + ) + .await + { + all_on_stream.clear(); + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice::(&msg.payload) { + all_on_stream.push(json); + } + } + if all_on_stream.len() >= TEST_MESSAGE_COUNT * 2 { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + let all_ids: Vec = all_on_stream + .iter() + .filter_map(|r| r.get("id").and_then(Value::as_i64)) + .collect(); + let unique_ids: HashSet = all_ids.iter().copied().collect(); + assert_eq!( + all_ids.len(), + unique_ids.len(), + "stream has {} total IDs but only {} unique; cursor reset caused re-delivery", + all_ids.len(), + unique_ids.len() + ); + assert_eq!( + unique_ids.len(), + TEST_MESSAGE_COUNT * 2, + "expected {} unique docs on stream (first + second batch), got {}", + TEST_MESSAGE_COUNT * 2, + unique_ids.len() + ); +} diff --git a/core/integration/tests/connectors/opensearch/plugin_config/config.toml b/core/integration/tests/connectors/opensearch/plugin_config/config.toml new file mode 100644 index 0000000000..d463accac6 --- /dev/null +++ b/core/integration/tests/connectors/opensearch/plugin_config/config.toml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "opensearch" +enabled = true +version = 0 +name = "OpenSearch source (resilience)" +path = "../../target/debug/libiggy_connector_opensearch_source" +plugin_config_format = "json" + +[[streams]] +stream = "test_stream" +topic = "test_topic" +schema = "json" +batch_length = 1000 +linger_time = "5ms" + +[plugin_config] +url = "http://localhost:9200" +index = "resilience_test" +polling_interval = "100ms" +batch_size = 10 +timestamp_field = "timestamp" +max_retries = 3 +retry_delay = "50ms" +retry_max_delay = "200ms" +max_open_retries = 3 +open_retry_max_delay = "200ms" +circuit_breaker_threshold = 1 +circuit_breaker_cool_down = "5s" diff --git a/core/integration/tests/connectors/opensearch/source.toml b/core/integration/tests/connectors/opensearch/source.toml new file mode 100644 index 0000000000..f0baa94e43 --- /dev/null +++ b/core/integration/tests/connectors/opensearch/source.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sources/opensearch_source" diff --git a/core/integration/tests/connectors/opensearch/source_resilience.toml b/core/integration/tests/connectors/opensearch/source_resilience.toml new file mode 100644 index 0000000000..8487d4a0f4 --- /dev/null +++ b/core/integration/tests/connectors/opensearch/source_resilience.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "tests/connectors/opensearch/plugin_config"