diff --git a/Cargo.lock b/Cargo.lock index 5dcfa64639..6b87606ff8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6980,10 +6980,9 @@ name = "iggy_connector_quickwit_sink" version = "0.4.1-edge.1" dependencies = [ "async-trait", - "dashmap", "iggy_connector_sdk", - "once_cell", "reqwest 0.13.4", + "reqwest-middleware", "serde", "serde_yaml_ng", "simd-json", diff --git a/core/connectors/runtime/example_config/connectors/quickwit_sink.toml b/core/connectors/runtime/example_config/connectors/quickwit_sink.toml index e4622ffcf8..4966adbbf4 100644 --- a/core/connectors/runtime/example_config/connectors/quickwit_sink.toml +++ b/core/connectors/runtime/example_config/connectors/quickwit_sink.toml @@ -53,6 +53,13 @@ fields = ["email", "created_at"] [plugin_config] url = "http://localhost:7280" +verbose_logging = false +# max_retries = 3 +# retry_delay = "200ms" +# max_retry_delay = "5s" +# max_open_retries = 5 +# open_retry_max_delay = "30s" +# request_timeout = "30s" index = """ version: 0.9 diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml b/core/connectors/sinks/quickwit_sink/Cargo.toml index 8652dc2872..71a9bb3611 100644 --- a/core/connectors/sinks/quickwit_sink/Cargo.toml +++ b/core/connectors/sinks/quickwit_sink/Cargo.toml @@ -29,18 +29,14 @@ repository = "https://github.com/apache/iggy" readme = "../../README.md" publish = false -[package.metadata.cargo-machete] -ignored = ["dashmap", "once_cell"] - [lib] crate-type = ["cdylib", "lib"] [dependencies] async-trait = { workspace = true } -dashmap = { workspace = true } iggy_connector_sdk = { workspace = true } -once_cell = { workspace = true } reqwest = { workspace = true } +reqwest-middleware = { workspace = true } serde = { workspace = true } serde_yaml_ng = { workspace = true } simd-json = { workspace = true } diff --git a/core/connectors/sinks/quickwit_sink/src/lib.rs b/core/connectors/sinks/quickwit_sink/src/lib.rs index 0ee0d8f2b4..b0649be0bc 100644 --- a/core/connectors/sinks/quickwit_sink/src/lib.rs +++ b/core/connectors/sinks/quickwit_sink/src/lib.rs @@ -16,158 +16,264 @@ // under the License. use async_trait::async_trait; +use iggy_connector_sdk::retry::{ + ConnectivityConfig, build_retry_client, check_connectivity_with_retry, parse_duration, +}; use iggy_connector_sdk::{ ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, }; +use reqwest::StatusCode; +use reqwest::Url; +use reqwest_middleware::ClientWithMiddleware; use serde::{Deserialize, Serialize}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; sink_connector!(QuickwitSink); +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "200ms"; +const DEFAULT_MAX_RETRY_DELAY: &str = "5s"; +const DEFAULT_MAX_OPEN_RETRIES: u32 = 5; +const DEFAULT_OPEN_RETRY_MAX_DELAY: &str = "30s"; + #[derive(Debug)] pub struct QuickwitSink { id: u32, config: QuickwitSinkConfig, - client: reqwest::Client, + client: Option, + verbose: bool, index_id: String, } #[derive(Debug, Serialize, Deserialize)] pub struct QuickwitSinkConfig { - url: String, - index: String, + pub url: String, + /// Full Quickwit index config YAML — passed to `POST /api/v1/indexes` on first open. + /// `index_id` is extracted from this YAML to build ingest URLs. + pub index: String, + pub verbose_logging: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub max_retry_delay: Option, + pub max_open_retries: Option, + pub open_retry_max_delay: Option, + pub request_timeout: Option, } -#[derive(Debug, Serialize, Deserialize)] -struct IndexConfig { +#[derive(Debug, Deserialize)] +struct IndexIdExtract { index_id: String, } impl QuickwitSink { pub fn new(id: u32, config: QuickwitSinkConfig) -> Self { - let index_config = - serde_yaml_ng::from_str::(&config.index).expect("Invalid index config."); - QuickwitSink { + let verbose = config.verbose_logging.unwrap_or(false); + Self { id, config, - index_id: index_config.index_id, - client: reqwest::Client::new(), + client: None, + verbose, + index_id: String::new(), } } + fn client(&self) -> Result<&ClientWithMiddleware, Error> { + self.client + .as_ref() + .ok_or_else(|| Error::InitError("Quickwit sink client not initialized".into())) + } + async fn has_index(&self) -> Result { + let client = self.client()?; let url = format!("{}/api/v1/indexes/{}", self.config.url, self.index_id); - let response = self.client.get(&url).send().await.map_err(|error| { - error!( - "Failed to send HTTP request to check if index with ID: {} exists. {error}", - self.index_id - ); - Error::HttpRequestFailed(error.to_string()) - })?; + let response = client + .get(&url) + .send() + .await + .map_err(|e| Error::HttpRequestFailed(e.to_string()))?; let status = response.status(); if status.is_success() { Ok(true) - } else if status == reqwest::StatusCode::NOT_FOUND { + } else if status == StatusCode::NOT_FOUND { Ok(false) } else { Err(Error::HttpRequestFailed(format!( - "Unexpected status code: {status}", + "Unexpected status checking Quickwit index: {status}" ))) } } async fn create_index(&self) -> Result<(), Error> { - info!("Creating index: {}", self.index_id); + info!( + "Creating Quickwit index: {} for connector ID: {}", + self.index_id, self.id + ); + let client = self.client()?; let url = format!("{}/api/v1/indexes", self.config.url); - let response = self - .client + let response = client .post(&url) - .header("content-type", "application/yaml") - .body(self.config.index.to_owned()) + .header("Content-Type", "application/yaml") + .body(self.config.index.clone()) .send() .await - .map_err(|error| { + .map_err(|e| { error!( - "Failed to send HTTP request to create index: {}. {error}", - self.index_id + "Failed to create Quickwit index: {} for connector ID: {}. {e}", + self.index_id, self.id ); - Error::HttpRequestFailed(error.to_string()) + Error::HttpRequestFailed(e.to_string()) })?; - if !response.status().is_success() { - let status = response.status(); + let status = response.status(); + if status.is_success() { + info!( + "Created Quickwit index: {} for connector ID: {}", + self.index_id, self.id + ); + Ok(()) + } else if status == StatusCode::CONFLICT { + // Another instance beat us to it; the index exists, which is what we want. + info!( + "Quickwit index already exists (409): {} for connector ID: {}", + self.index_id, self.id + ); + Ok(()) + } else if status.is_client_error() { let reason = response.text().await.unwrap_or_default(); error!( - "Received an invalid HTTP response when creating index: {}. Status code: {status}, reason: {reason}", - self.index_id + "Permanent error creating Quickwit index: {} for connector ID: {}. status: {status}, reason: {reason}", + self.index_id, self.id ); - return Err(Error::InitError(format!( - "Failed to create index: {}. {reason}", + Err(Error::InitError(format!( + "Failed to create index '{0}': {status} {reason}", self.index_id - ))); + ))) + } else { + let reason = response.text().await.unwrap_or_default(); + Err(Error::InitError(format!( + "Failed to create index '{0}': {status} {reason}", + self.index_id + ))) } - - info!("Created index: {}", self.index_id); - Ok(()) } pub async fn ingest(&self, messages: Vec) -> Result<(), Error> { + let client = self.client()?; let url = format!( "{}/api/v1/{}/ingest?commit=auto", self.config.url, self.index_id ); - info!("Ingesting messages for index: {}...", self.index_id); let messages_count = messages.len(); - let messages = messages + let ndjson = messages .into_iter() .filter_map(|record| simd_json::to_string(&record).ok()) .collect::>() .join("\n"); - let response = self - .client + let response = client .post(&url) - .body(messages) + .header("Content-Type", "application/x-ndjson") + .body(ndjson) .send() .await - .map_err(|error| { + .map_err(|e| { error!( - "Failed to send HTTP request to ingest messages for index: {}. {error}", - self.index_id + "Failed to ingest {messages_count} messages into Quickwit index: {} for connector ID: {}. {e}", + self.index_id, self.id ); - Error::HttpRequestFailed(error.to_string()) + Error::HttpRequestFailed(e.to_string()) })?; - if !response.status().is_success() { - let status = response.status(); + let status = response.status(); + if status.is_success() { + debug!( + "Ingested {messages_count} messages into Quickwit index: {} for connector ID: {}", + self.index_id, self.id + ); + Ok(()) + } else if status.is_client_error() && status != StatusCode::TOO_MANY_REQUESTS { let text = response.text().await.unwrap_or_default(); error!( - "Received an invalid HTTP response when ingesting messages for index: {}. Status code: {status}, reason: {text}", - self.index_id + "Permanent error ingesting into Quickwit index: {} for connector ID: {}. status: {status}, reason: {text}", + self.index_id, self.id ); - return Err(Error::HttpRequestFailed(format!( - "Status code: {status}, reason: {text}" - ))); + Err(Error::PermanentHttpError(format!( + "status: {status}, reason: {text}" + ))) + } else { + let text = response.text().await.unwrap_or_default(); + error!( + "Transient error ingesting into Quickwit index: {} for connector ID: {}. status: {status}, reason: {text}", + self.index_id, self.id + ); + Err(Error::HttpRequestFailed(format!( + "status: {status}, reason: {text}" + ))) } - - info!( - "Ingested {messages_count} messages for index: {}", - self.index_id - ); - Ok(()) } } #[async_trait] impl Sink for QuickwitSink { async fn open(&mut self) -> Result<(), Error> { - info!( - "Opened Quickwit sink connector with ID: {} for URL: {}", - self.id, self.config.url + let parsed: IndexIdExtract = serde_yaml_ng::from_str(&self.config.index) + .map_err(|e| Error::InvalidConfigValue(format!("index: invalid YAML — {e}")))?; + self.index_id = parsed.index_id; + + let retry_delay = parse_duration(self.config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let max_retry_delay = parse_duration( + self.config.max_retry_delay.as_deref(), + DEFAULT_MAX_RETRY_DELAY, + ); + let max_open_retries = self + .config + .max_open_retries + .unwrap_or(DEFAULT_MAX_OPEN_RETRIES); + let open_retry_max_delay = parse_duration( + self.config.open_retry_max_delay.as_deref(), + DEFAULT_OPEN_RETRY_MAX_DELAY, ); + + let request_timeout = parse_duration(self.config.request_timeout.as_deref(), "30s"); + let raw_client = reqwest::Client::builder() + .timeout(request_timeout) + .build() + .map_err(|e| Error::InitError(format!("reqwest client: {e}")))?; + let health_url = Url::parse(&format!("{}/health/livez", self.config.url)) + .map_err(|e| Error::InvalidConfigValue(format!("url: {e}")))?; + + check_connectivity_with_retry( + &raw_client, + health_url, + "Quickwit sink connector", + self.id, + &ConnectivityConfig { + max_open_retries, + open_retry_max_delay, + retry_delay, + }, + ) + .await?; + + self.client = Some(build_retry_client( + raw_client, + self.config + .max_retries + .unwrap_or(DEFAULT_MAX_RETRIES) + .max(1), + retry_delay, + max_retry_delay, + "Quickwit", + )); + if !self.has_index().await? { self.create_index().await?; } + + info!( + "Opened Quickwit sink connector ID: {}, index: {}", + self.id, self.index_id + ); Ok(()) } @@ -177,19 +283,28 @@ impl Sink for QuickwitSink { messages_metadata: MessagesMetadata, messages: Vec, ) -> Result<(), Error> { - info!( - "Quickwit sink with ID: {} received: {} messages, format: {}", - self.id, - messages.len(), - messages_metadata.schema - ); + let total = messages.len(); + if self.verbose { + info!( + "Quickwit sink connector ID: {} received {total} messages, schema: {}", + self.id, messages_metadata.schema + ); + } else { + debug!( + "Quickwit sink connector ID: {} received {total} messages, schema: {}", + self.id, messages_metadata.schema + ); + } - let mut json_payloads = Vec::with_capacity(messages.len()); + let mut json_payloads = Vec::with_capacity(total); for message in messages { match message.payload { Payload::Json(value) => json_payloads.push(value), _ => { - warn!("Unsupported payload format: {}", messages_metadata.schema); + warn!( + "Quickwit sink connector ID: {} unsupported payload schema: {}", + self.id, messages_metadata.schema + ); } } } @@ -198,12 +313,65 @@ impl Sink for QuickwitSink { return Ok(()); } - self.ingest(json_payloads).await?; - Ok(()) + self.ingest(json_payloads).await } async fn close(&mut self) -> Result<(), Error> { - info!("Quickwit sink connector with ID: {} is closed.", self.id); + let _ = self.client.take(); + info!("Closed Quickwit sink connector ID: {}", self.id); Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config() -> QuickwitSinkConfig { + QuickwitSinkConfig { + url: "http://localhost:7280".to_string(), + index: "index_id: test\nversion: 0.8\n".to_string(), + verbose_logging: None, + max_retries: None, + retry_delay: None, + max_retry_delay: None, + max_open_retries: None, + open_retry_max_delay: None, + request_timeout: None, + } + } + + #[test] + fn given_default_config_verbose_should_be_false() { + let sink = QuickwitSink::new(1, test_config()); + assert!(!sink.verbose); + } + + #[test] + fn given_verbose_logging_enabled_should_set_verbose_flag() { + let mut config = test_config(); + config.verbose_logging = Some(true); + let sink = QuickwitSink::new(1, config); + assert!(sink.verbose); + } + + #[test] + fn given_verbose_logging_disabled_should_not_set_verbose_flag() { + let mut config = test_config(); + config.verbose_logging = Some(false); + let sink = QuickwitSink::new(1, config); + assert!(!sink.verbose); + } + + #[test] + fn given_new_sink_client_should_not_be_initialized() { + let sink = QuickwitSink::new(1, test_config()); + assert!(sink.client.is_none()); + } + + #[test] + fn given_new_sink_index_id_should_be_empty() { + let sink = QuickwitSink::new(1, test_config()); + assert!(sink.index_id.is_empty()); + } +}