diff --git a/.github/workflows/_build_rust_artifacts.yml b/.github/workflows/_build_rust_artifacts.yml index db6910e263..c53556d3b2 100644 --- a/.github/workflows/_build_rust_artifacts.yml +++ b/.github/workflows/_build_rust_artifacts.yml @@ -46,7 +46,7 @@ on: connector_plugins: type: string required: false - default: "iggy_connector_elasticsearch_sink,iggy_connector_elasticsearch_source,iggy_connector_iceberg_sink,iggy_connector_postgres_sink,iggy_connector_postgres_source,iggy_connector_quickwit_sink,iggy_connector_random_source,iggy_connector_stdout_sink" + default: "iggy_connector_elasticsearch_sink,iggy_connector_elasticsearch_source,iggy_connector_iceberg_sink,iggy_connector_postgres_sink,iggy_connector_postgres_source,iggy_connector_quickwit_sink,iggy_connector_random_source,iggy_connector_stdout_sink,iggy_connector_surrealdb_sink" description: "Comma-separated list of connector plugin crates to build as shared libraries" outputs: artifact_name: diff --git a/.github/workflows/edge-release.yml b/.github/workflows/edge-release.yml index 7ca84fc5da..146f8b6329 100644 --- a/.github/workflows/edge-release.yml +++ b/.github/workflows/edge-release.yml @@ -109,6 +109,7 @@ jobs: - `iggy_connector_quickwit_sink` - `iggy_connector_random_source` - `iggy_connector_stdout_sink` + - `iggy_connector_surrealdb_sink` ## Downloads diff --git a/Cargo.lock b/Cargo.lock index 5c9905cab9..58d4d805dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7067,6 +7067,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_surrealdb_sink" +version = "0.4.1-edge.1" +dependencies = [ + "async-trait", + "base64", + "bytes", + "iggy_common", + "iggy_connector_sdk", + "reqwest 0.13.4", + "secrecy", + "serde", + "serde_json", + "simd-json", + "tokio", + "tracing", +] + [[package]] name = "iggy_examples" version = "0.0.6" diff --git a/Cargo.toml b/Cargo.toml index 4d196d9632..856d773616 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", "core/connectors/sinks/stdout_sink", + "core/connectors/sinks/surrealdb_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/influxdb_source", "core/connectors/sources/postgres_source", diff --git a/core/connectors/README.md b/core/connectors/README.md index b7d24cfbab..e078b46b77 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -86,6 +86,7 @@ Each sink should have its own, custom configuration, which is passed along with - **PostgreSQL Sink** - stores messages in PostgreSQL database tables - **Quickwit Sink** - indexes messages in Quickwit search engine - **Stdout Sink** - prints messages to standard output (useful for debugging/development) +- **SurrealDB Sink** - writes messages into SurrealDB with deterministic record IDs for idempotent replay ## Source diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md index 367a220287..64b6927c5b 100644 --- a/core/connectors/sinks/README.md +++ b/core/connectors/sinks/README.md @@ -15,6 +15,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s | **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas | | **quickwit_sink** | Indexes messages in Quickwit search engine for log analytics | | **stdout_sink** | Prints messages to standard output (useful for debugging and development) | +| **surrealdb_sink** | Writes messages into SurrealDB with deterministic record IDs for idempotent replay | The sink is represented by the single `Sink` trait, which defines the basic interface for all sink connectors. It provides methods for initializing the sink, writing data to external destination, and closing the sink. diff --git a/core/connectors/sinks/surrealdb_sink/Cargo.toml b/core/connectors/sinks/surrealdb_sink/Cargo.toml new file mode 100644 index 0000000000..1ad27d71d1 --- /dev/null +++ b/core/connectors/sinks/surrealdb_sink/Cargo.toml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_surrealdb_sink" +version = "0.4.1-edge.1" +description = "Iggy SurrealDB sink connector for writing stream messages into SurrealDB" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "surrealdb", "sink"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" +publish = false + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +bytes = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +reqwest = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +simd-json = { workspace = true } diff --git a/core/connectors/sinks/surrealdb_sink/README.md b/core/connectors/sinks/surrealdb_sink/README.md new file mode 100644 index 0000000000..154f357395 --- /dev/null +++ b/core/connectors/sinks/surrealdb_sink/README.md @@ -0,0 +1,99 @@ +# SurrealDB Sink Connector + +Writes Apache Iggy stream messages into SurrealDB over the HTTP API. + +The sink writes one SurrealQL bulk `INSERT IGNORE` per connector batch. Each +record uses a deterministic SurrealDB record id derived from stream, topic, +partition, offset and Iggy message id, so replayed batches are idempotent and +existing records are left untouched. + +Persistent sink failures are at-most-once from the runtime's perspective: +messages may already be committed in Iggy before this connector exhausts its +write attempts, so failed writes are logged but not redelivered. + +## Configuration + +```toml +type = "sink" +key = "surrealdb" +enabled = true +version = 0 +name = "SurrealDB sink" +path = "target/release/libiggy_connector_surrealdb_sink" +plugin_config_format = "toml" + +[[streams]] +stream = "example_stream" +topics = ["example_topic"] +schema = "json" +batch_length = 1000 +poll_interval = "5ms" +consumer_group = "surrealdb_sink_connector" + +[plugin_config] +endpoint = "127.0.0.1:8000" +namespace = "iggy" +database = "connectors" +table = "iggy_messages" +username = "root" +password = "root" +auth_scope = "root" +use_tls = false +auto_define_table = true +define_indexes = true +batch_size = 1000 +payload_format = "auto" +include_metadata = true +include_headers = true +include_checksum = true +include_origin_timestamp = true +query_timeout = "30s" +max_retries = 3 +retry_delay = "100ms" +max_retry_delay = "5s" +verbose_logging = false +``` + +### Plugin Fields + +| Field | Default | Description | +| --- | --- | --- | +| `endpoint` | required | SurrealDB HTTP host and port without scheme, for example `127.0.0.1:8000`. Full `http://` or `https://` URLs are also accepted. | +| `namespace` | required | SurrealDB namespace selected during `open()`. | +| `database` | required | SurrealDB database selected during `open()`. | +| `table` | required | Target table. Must be a safe SurrealQL identifier. | +| `username` / `password` | none | Optional credentials. | +| `auth_scope` | `root` | `root`, `namespace`, `database`, or `none`. | +| `use_tls` | `false` | Uses `https://` when true and `endpoint` has no scheme, `http://` otherwise. | +| `auto_define_table` | `false` | Runs `DEFINE TABLE IF NOT EXISTS SCHEMALESS`. | +| `define_indexes` | `false` | Defines an offset index on stream/topic/partition/offset. Requires `auto_define_table`. | +| `batch_size` | `1000` | Maximum number of records per SurrealDB request. | +| `payload_format` | `auto` | `auto`, `json`, `text`, `base64`, or `binary` (`binary` is an alias for `base64`). | +| `include_metadata` | `true` | Stores stream/topic/partition/offset/timestamps/schema fields. | +| `include_headers` | `true` | Stores Iggy headers as a deterministic object. Raw headers are base64 encoded. | +| `include_checksum` | `true` | Stores `iggy_checksum`. | +| `include_origin_timestamp` | `true` | Stores `iggy_origin_timestamp`. | +| `query_timeout` | `30s` | SurrealDB HTTP request timeout. | +| `max_retries` | `3` | Total attempts for transient write failures. Values below `1` are raised to `1`. | +| `retry_delay` | `100ms` | Base retry delay. | +| `max_retry_delay` | `5s` | Capped exponential retry delay. | +| `verbose_logging` | `false` | Emits per-batch success logs at `info`. | + +## Stored Shape + +With metadata enabled, records contain: + +- `id`: deterministic SurrealDB record id key +- `iggy_message_id`: original Iggy message id as a string +- `iggy_stream`, `iggy_topic`, `iggy_partition_id`, `iggy_offset` +- `iggy_timestamp`, `iggy_origin_timestamp`, `iggy_checksum`, `iggy_schema` +- `iggy_headers` +- `payload` +- `payload_encoding` + +`payload_format = "auto"` stores decoded JSON payloads as queryable SurrealDB +values, text payloads as strings, and binary payloads as base64 strings. + +The `messages_processed` counter reports valid records submitted to SurrealDB. +With `INSERT IGNORE`, duplicates can be ignored by SurrealDB while still being +counted as submitted. diff --git a/core/connectors/sinks/surrealdb_sink/config.toml b/core/connectors/sinks/surrealdb_sink/config.toml new file mode 100644 index 0000000000..e7b0dc3fd0 --- /dev/null +++ b/core/connectors/sinks/surrealdb_sink/config.toml @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "surrealdb" +enabled = true +version = 0 +name = "SurrealDB sink" +path = "target/release/libiggy_connector_surrealdb_sink" +plugin_config_format = "toml" +verbose = false +benchmark = false + +[[streams]] +stream = "example_stream" +topics = ["example_topic"] +schema = "json" +batch_length = 1000 +poll_interval = "5ms" +consumer_group = "surrealdb_sink_connector" + +[plugin_config] +endpoint = "127.0.0.1:8000" +namespace = "iggy" +database = "connectors" +table = "iggy_messages" +username = "root" +password = "root" +auth_scope = "root" +use_tls = false +auto_define_table = true +define_indexes = true +batch_size = 1000 +payload_format = "auto" +include_metadata = true +include_headers = true +include_checksum = true +include_origin_timestamp = true +query_timeout = "30s" +# Total write attempts for transient failures. Minimum is 1. A value of 0 is raised to 1. +max_retries = 3 +retry_delay = "100ms" +max_retry_delay = "5s" +verbose_logging = false diff --git a/core/connectors/sinks/surrealdb_sink/src/lib.rs b/core/connectors/sinks/surrealdb_sink/src/lib.rs new file mode 100644 index 0000000000..4b828439fd --- /dev/null +++ b/core/connectors/sinks/surrealdb_sink/src/lib.rs @@ -0,0 +1,1845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use base64::Engine; +use base64::engine::general_purpose; +use bytes::Bytes; +use iggy_connector_sdk::convert::owned_value_to_serde_json; +use iggy_connector_sdk::retry::{exponential_backoff, jitter, parse_duration}; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use reqwest::{Body, Client as HttpClient, RequestBuilder, StatusCode, Url}; +use secrecy::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value, json}; +use std::fmt; +use std::fmt::Write; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{debug, error, info, warn}; + +sink_connector!(SurrealDbSink); + +const DEFAULT_BATCH_SIZE: usize = 1000; +const DEFAULT_QUERY_TIMEOUT: &str = "30s"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "100ms"; +const DEFAULT_MAX_RETRY_DELAY: &str = "5s"; +const ENCODING_BASE64: &str = "base64"; +const ENCODING_JSON: &str = "json"; +const ENCODING_TEXT: &str = "text"; + +type SurrealDbClient = HttpClient; + +#[derive(Debug)] +pub struct SurrealDbSink { + id: u32, + client: Mutex>, + reconnecting: AtomicBool, + base_url: String, + endpoint: String, + namespace: String, + database: String, + table: String, + username: Option, + password: Option, + auth_scope_config: Option, + auth_scope: AuthScope, + payload_format_config: Option, + payload_format: PayloadFormat, + use_tls: bool, + batch_size: usize, + query_timeout: Duration, + max_retries: u32, + retry_delay: Duration, + max_retry_delay: Duration, + include_metadata: bool, + include_headers: bool, + include_checksum: bool, + include_origin_timestamp: bool, + auto_define_table: bool, + define_indexes: bool, + verbose: bool, + messages_processed: AtomicU64, + insertion_errors: AtomicU64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SurrealDbSinkConfig { + pub endpoint: String, + pub namespace: String, + pub database: String, + pub table: String, + pub username: Option, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")] + pub password: Option, + pub auth_scope: Option, + pub use_tls: Option, + pub auto_define_table: Option, + pub define_indexes: Option, + pub batch_size: Option, + pub payload_format: Option, + pub include_metadata: Option, + pub include_headers: Option, + pub include_checksum: Option, + pub include_origin_timestamp: Option, + pub query_timeout: Option, + pub max_retries: Option, + pub retry_delay: Option, + pub max_retry_delay: Option, + pub verbose_logging: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AuthScope { + Root, + Namespace, + Database, + None, +} + +impl AuthScope { + fn parse_config(value: Option<&str>) -> Result { + match value { + Some(value) if value.eq_ignore_ascii_case("namespace") => Ok(AuthScope::Namespace), + Some(value) if value.eq_ignore_ascii_case("database") => Ok(AuthScope::Database), + Some(value) if value.eq_ignore_ascii_case("none") => Ok(AuthScope::None), + Some(value) if value.eq_ignore_ascii_case("root") => Ok(AuthScope::Root), + Some(value) => Err(Error::InvalidConfigValue(format!( + "SurrealDB auth_scope must be one of root, namespace, database, or none: {value}" + ))), + None => Ok(AuthScope::Root), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PayloadFormat { + Auto, + Json, + Text, + Base64, +} + +impl PayloadFormat { + fn parse_config(value: Option<&str>) -> Result { + match value { + Some(value) if value.eq_ignore_ascii_case("json") => Ok(PayloadFormat::Json), + Some(value) if value.eq_ignore_ascii_case("text") => Ok(PayloadFormat::Text), + Some(value) if value.eq_ignore_ascii_case("base64") => Ok(PayloadFormat::Base64), + Some(value) if value.eq_ignore_ascii_case("binary") => Ok(PayloadFormat::Base64), + Some(value) if value.eq_ignore_ascii_case("auto") => Ok(PayloadFormat::Auto), + Some(value) => Err(Error::InvalidConfigValue(format!( + "SurrealDB payload_format must be one of auto, json, text, base64, or binary: {value}" + ))), + None => Ok(PayloadFormat::Auto), + } + } +} + +#[derive(Debug)] +struct PayloadDocument { + value: Value, + encoding: &'static str, +} + +#[derive(Debug)] +struct BatchInsertOutcome { + inserted_count: u64, + error_count: u64, + error: Option, +} + +#[derive(Debug, Deserialize)] +struct SurrealSqlStatement { + status: String, + detail: Option, + result: Option, +} + +#[derive(Debug)] +enum SurrealDbRequestError { + Request(reqwest::Error), + HttpStatus { status: StatusCode, body: String }, + Query(String), + Decode(String), +} + +impl fmt::Display for SurrealDbRequestError { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SurrealDbRequestError::Request(error) => write!(formatter, "{error}"), + SurrealDbRequestError::HttpStatus { status, body } => { + write!(formatter, "HTTP status {status}: {body}") + } + SurrealDbRequestError::Query(message) | SurrealDbRequestError::Decode(message) => { + formatter.write_str(message) + } + } + } +} + +impl SurrealDbSink { + pub fn new(id: u32, config: SurrealDbSinkConfig) -> Self { + let endpoint = config.endpoint.clone(); + let namespace = config.namespace.clone(); + let database = config.database.clone(); + let table = config.table.clone(); + let username = config.username.clone(); + let password = config.password.clone(); + let auth_scope_config = config.auth_scope.clone(); + let payload_format_config = config.payload_format.clone(); + let use_tls = config.use_tls.unwrap_or(false); + let base_url = build_base_url(&endpoint, use_tls); + let batch_size = config + .batch_size + .unwrap_or(DEFAULT_BATCH_SIZE as u32) + .max(1) as usize; + let query_timeout = parse_duration(config.query_timeout.as_deref(), DEFAULT_QUERY_TIMEOUT); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let mut max_retry_delay = + parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); + let max_retries = match config.max_retries { + Some(0) => { + warn!("SurrealDB sink ID: {id} max_retries must be at least 1. Using 1 attempt."); + 1 + } + Some(max_retries) => max_retries, + None => DEFAULT_MAX_RETRIES, + }; + if max_retry_delay < retry_delay { + warn!( + "SurrealDB sink ID: {id} max_retry_delay is smaller than retry_delay. Using retry_delay as max_retry_delay." + ); + max_retry_delay = retry_delay; + } + let include_metadata = config.include_metadata.unwrap_or(true); + let include_headers = config.include_headers.unwrap_or(true); + let include_checksum = config.include_checksum.unwrap_or(true); + let include_origin_timestamp = config.include_origin_timestamp.unwrap_or(true); + let auto_define_table = config.auto_define_table.unwrap_or(false); + let define_indexes = config.define_indexes.unwrap_or(false); + let verbose = config.verbose_logging.unwrap_or(false); + + SurrealDbSink { + id, + client: Mutex::new(None), + reconnecting: AtomicBool::new(false), + base_url, + endpoint, + namespace, + database, + table, + username, + password, + auth_scope_config, + auth_scope: AuthScope::Root, + payload_format_config, + payload_format: PayloadFormat::Auto, + use_tls, + batch_size, + query_timeout, + max_retries, + retry_delay, + max_retry_delay, + include_metadata, + include_headers, + include_checksum, + include_origin_timestamp, + auto_define_table, + define_indexes, + verbose, + messages_processed: AtomicU64::new(0), + insertion_errors: AtomicU64::new(0), + } + } +} + +#[async_trait] +impl Sink for SurrealDbSink { + async fn open(&mut self) -> Result<(), Error> { + self.auth_scope = AuthScope::parse_config(self.auth_scope_config.as_deref())?; + self.payload_format = PayloadFormat::parse_config(self.payload_format_config.as_deref())?; + validate_endpoint_config(&self.endpoint, self.use_tls)?; + validate_identifier("namespace", &self.namespace)?; + validate_identifier("database", &self.database)?; + validate_identifier("table", &self.table)?; + + if self.auto_define_table && self.auth_scope != AuthScope::Root { + return Err(Error::InvalidConfigValue( + "SurrealDB auto_define_table requires auth_scope=root because namespace/database DDL is executed" + .to_string(), + )); + } + + if self.define_indexes && !self.auto_define_table { + warn!( + "SurrealDB sink ID: {} define_indexes=true requires auto_define_table=true; index DDL will not run.", + self.id + ); + } + + info!( + "Opening SurrealDB sink connector with ID: {}. Endpoint: {}, namespace: {}, database: {}, table: {}", + self.id, self.base_url, self.namespace, self.database, self.table + ); + + let client = self.connect_and_select().await?; + *self.client.lock().await = Some(client); + info!( + "Opened SurrealDB sink connector ID: {} for table: {}", + self.id, self.table + ); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + self.process_messages(topic_metadata, &messages_metadata, messages) + .await + } + + async fn close(&mut self) -> Result<(), Error> { + info!("Closing SurrealDB sink connector with ID: {}", self.id); + self.client.get_mut().take(); + + let messages_processed = self.messages_processed.load(Ordering::Relaxed); + let insertion_errors = self.insertion_errors.load(Ordering::Relaxed); + info!( + "SurrealDB sink ID: {} processed {} messages with {} errors", + self.id, messages_processed, insertion_errors + ); + Ok(()) + } +} + +impl SurrealDbSink { + async fn connect_and_select(&self) -> Result { + let client = self.connect()?; + self.signin_if_configured(&client).await?; + self.health_check(&client).await?; + + if self.auto_define_table { + self.ensure_namespace_database(&client).await?; + self.ensure_table(&client).await?; + } + + Ok(client) + } + + fn connect(&self) -> Result { + HttpClient::builder() + .timeout(self.query_timeout) + .build() + .map_err(|e| Error::InitError(format!("Failed to create SurrealDB HTTP client: {e}"))) + } + + async fn get_client(&self) -> Result { + self.client + .lock() + .await + .clone() + .ok_or_else(|| Error::InitError("SurrealDB sink is not connected".to_string())) + } + + async fn reconnect(&self) -> Result { + if self + .reconnecting + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + debug!( + "Skipping SurrealDB reconnect for connector ID: {} because another reconnect is in progress", + self.id + ); + tokio::time::sleep(self.retry_delay).await; + return Ok(false); + } + + warn!("Reconnecting SurrealDB sink connector ID: {}", self.id); + let result = async { + let client = self.connect_and_select().await?; + *self.client.lock().await = Some(client); + Ok(()) + } + .await; + self.reconnecting.store(false, Ordering::Release); + result.map(|()| true) + } + + async fn signin_if_configured(&self, client: &SurrealDbClient) -> Result<(), Error> { + if self.auth_scope == AuthScope::None { + return Ok(()); + } + + let username = self.username.as_ref().ok_or_else(|| { + Error::InitError( + "SurrealDB username is required when auth_scope is not none".to_string(), + ) + })?; + let password = self.password.as_ref().ok_or_else(|| { + Error::InitError( + "SurrealDB password is required when auth_scope is not none".to_string(), + ) + })?; + let mut payload = Map::new(); + payload.insert("user".to_string(), Value::String(username.clone())); + payload.insert( + "pass".to_string(), + Value::String(password.expose_secret().to_string()), + ); + + if matches!(self.auth_scope, AuthScope::Namespace | AuthScope::Database) { + payload.insert("ns".to_string(), Value::String(self.namespace.clone())); + } + if matches!(self.auth_scope, AuthScope::Database) { + payload.insert("db".to_string(), Value::String(self.database.clone())); + } + + let response = client + .post(format!("{}/signin", self.base_url)) + .json(&Value::Object(payload)) + .send() + .await + .map_err(|e| Error::InitError(format!("Failed to authenticate with SurrealDB: {e}")))?; + let status = response.status(); + if status.is_success() { + return Ok(()); + } + + let body = response + .text() + .await + .unwrap_or_else(|e| format!("failed to read response body: {e}")); + Err(Error::InitError(format!( + "Failed to authenticate with SurrealDB: HTTP status {status}: {body}" + ))) + } + + async fn ensure_table(&self, client: &SurrealDbClient) -> Result<(), Error> { + let table = &self.table; + let mut query = format!("DEFINE TABLE IF NOT EXISTS {table} SCHEMALESS;"); + + if self.define_indexes { + let offset_index = format!("{table}_iggy_offset_idx"); + validate_identifier("index", &offset_index)?; + query.push_str(&format!( + " DEFINE INDEX IF NOT EXISTS {offset_index} ON TABLE {table} FIELDS iggy_stream, iggy_topic, iggy_partition_id, iggy_offset;" + )); + } + + self.execute_sql(client, query) + .await + .map_err(|e| Error::InitError(format!("Failed to define SurrealDB table: {e}")))?; + + Ok(()) + } + + async fn ensure_namespace_database(&self, client: &SurrealDbClient) -> Result<(), Error> { + let query = format!( + "DEFINE NAMESPACE IF NOT EXISTS {}; USE NS {}; DEFINE DATABASE IF NOT EXISTS {};", + self.namespace, self.namespace, self.database + ); + + self.execute_sql_without_scope(client, query) + .await + .map_err(|e| { + Error::InitError(format!( + "Failed to define SurrealDB namespace/database: {e}" + )) + })?; + + Ok(()) + } + + async fn health_check(&self, client: &SurrealDbClient) -> Result<(), Error> { + let response = client + .get(format!("{}/health", self.base_url)) + .send() + .await + .map_err(|e| Error::InitError(format!("SurrealDB health check failed: {e}")))?; + let status = response.status(); + if status.is_success() { + return Ok(()); + } + + let body = response + .text() + .await + .unwrap_or_else(|e| format!("failed to read response body: {e}")); + Err(Error::InitError(format!( + "SurrealDB health check failed: HTTP status {status}: {body}" + ))) + } + + async fn execute_sql( + &self, + client: &SurrealDbClient, + query: impl Into, + ) -> Result, SurrealDbRequestError> { + self.execute_sql_request(client, query, Some((&self.namespace, &self.database))) + .await + } + + async fn execute_sql_without_scope( + &self, + client: &SurrealDbClient, + query: impl Into, + ) -> Result, SurrealDbRequestError> { + self.execute_sql_request(client, query, None).await + } + + async fn execute_sql_request( + &self, + client: &SurrealDbClient, + query: impl Into, + scope: Option<(&str, &str)>, + ) -> Result, SurrealDbRequestError> { + let mut request = self + .apply_auth(client.post(format!("{}/sql", self.base_url))) + .header("Accept", "application/json") + .header("Content-Type", "text/plain") + .body(query); + + if let Some((namespace, database)) = scope { + request = request + .header("Surreal-NS", namespace) + .header("Surreal-DB", database); + } + + let response = request + .send() + .await + .map_err(SurrealDbRequestError::Request)?; + let status = response.status(); + let body = response + .text() + .await + .map_err(SurrealDbRequestError::Request)?; + + if !status.is_success() { + return Err(SurrealDbRequestError::HttpStatus { status, body }); + } + + let statements: Vec = serde_json::from_str(&body).map_err(|e| { + SurrealDbRequestError::Decode(format!( + "Failed to decode SurrealDB SQL response: {e}; response: {body}" + )) + })?; + + if let Some(statement) = statements + .iter() + .find(|statement| !statement.status.eq_ignore_ascii_case("OK")) + { + return Err(SurrealDbRequestError::Query( + statement + .detail + .clone() + .or_else(|| statement.result.as_ref().map(value_to_error_message)) + .unwrap_or_else(|| format!("SurrealDB query status: {}", statement.status)), + )); + } + + Ok(statements) + } + + fn apply_auth(&self, request: RequestBuilder) -> RequestBuilder { + if self.auth_scope == AuthScope::None { + return request; + } + + let Some(username) = self.username.as_ref() else { + return request; + }; + let Some(password) = self.password.as_ref() else { + return request; + }; + + request.basic_auth(username, Some(password.expose_secret())) + } + + async fn process_messages( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let mut successful_inserts = 0u64; + let mut last_error = None; + let record_id_prefix = RecordIdPrefix::new(topic_metadata); + let mut batch = Vec::with_capacity(self.batch_size); + + for message in messages { + batch.push(message); + if batch.len() == self.batch_size { + let batch_len = batch.len(); + let full_batch = std::mem::replace(&mut batch, Vec::with_capacity(self.batch_size)); + let outcome = self + .insert_batch( + full_batch, + &record_id_prefix, + topic_metadata, + messages_metadata, + ) + .await; + successful_inserts += outcome.inserted_count; + + if let Some(batch_error) = outcome.error { + self.insertion_errors + .fetch_add(outcome.error_count, Ordering::Relaxed); + error!( + "Failed to insert SurrealDB batch of {batch_len} messages for connector ID: {}, table: {}, error: {batch_error}", + self.id, self.table + ); + last_error = Some(batch_error); + } + } + } + + if !batch.is_empty() { + let batch_len = batch.len(); + let outcome = self + .insert_batch(batch, &record_id_prefix, topic_metadata, messages_metadata) + .await; + successful_inserts += outcome.inserted_count; + + if let Some(batch_error) = outcome.error { + self.insertion_errors + .fetch_add(outcome.error_count, Ordering::Relaxed); + error!( + "Failed to insert SurrealDB batch of {batch_len} messages for connector ID: {}, table: {}, error: {batch_error}", + self.id, self.table + ); + last_error = Some(batch_error); + } + } + + self.messages_processed + .fetch_add(successful_inserts, Ordering::Relaxed); + + if self.verbose { + info!( + "SurrealDB sink ID: {} wrote {successful_inserts} messages to table '{}'", + self.id, self.table + ); + } else { + debug!( + "SurrealDB sink ID: {} wrote {successful_inserts} messages to table '{}'", + self.id, self.table + ); + } + + if let Some(error) = last_error { + Err(error) + } else { + Ok(()) + } + } + + async fn insert_batch( + &self, + messages: Vec, + record_id_prefix: &RecordIdPrefix, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + ) -> BatchInsertOutcome { + if messages.is_empty() { + return BatchInsertOutcome { + inserted_count: 0, + error_count: 0, + error: None, + }; + } + + let mut records = Vec::with_capacity(messages.len()); + let mut record_error_count = 0u64; + let mut last_record_error = None; + for message in messages { + match self.build_record(record_id_prefix, topic_metadata, messages_metadata, message) { + Ok(record) => records.push(record), + Err(error) => { + record_error_count += 1; + last_record_error = Some(error); + } + } + } + + if records.is_empty() { + return BatchInsertOutcome { + inserted_count: 0, + error_count: record_error_count, + error: last_record_error, + }; + } + + let mut outcome = self.insert_records_with_retry(records).await; + outcome.error_count += record_error_count; + let db_error = outcome.error.take(); + outcome.error = last_record_error.or(db_error); + + outcome + } + + async fn insert_records_with_retry(&self, records: Vec) -> BatchInsertOutcome { + let mut attempts = 0u32; + let query = match build_insert_query(&self.table, &records) { + Ok(query) => query, + Err(error) => { + return BatchInsertOutcome { + inserted_count: 0, + error_count: records.len() as u64, + error: Some(error), + }; + } + }; + let record_count = records.len() as u64; + + loop { + let client = match self.get_client().await { + Ok(client) => client, + Err(error) => { + return BatchInsertOutcome { + inserted_count: 0, + error_count: record_count, + error: Some(error), + }; + } + }; + let result = self.execute_sql(&client, query.clone()).await; + + match result { + Ok(_) => { + return BatchInsertOutcome { + inserted_count: record_count, + error_count: 0, + error: None, + }; + } + Err(error) => { + let transient = is_transient_error(&error); + attempts += 1; + + if !transient || attempts >= self.max_retries { + return BatchInsertOutcome { + inserted_count: 0, + error_count: record_count, + error: Some(Error::CannotStoreData(format!( + "SurrealDB batch insert failed after {attempts} attempts: {error}" + ))), + }; + } + + if transient && is_connection_error(&error) { + match self.reconnect().await { + Ok(true) => {} + Ok(false) => continue, + Err(reconnect_error) => { + return BatchInsertOutcome { + inserted_count: 0, + error_count: record_count, + error: Some(Error::Connection(format!( + "Failed to reconnect to SurrealDB after transient write error: {reconnect_error}" + ))), + }; + } + } + } + + let delay = jitter(exponential_backoff( + self.retry_delay, + attempts.saturating_sub(1), + self.max_retry_delay, + )); + warn!( + "Transient SurrealDB write error for connector ID: {} (attempt {attempts}/{}): {error}. Retrying in {:?}.", + self.id, self.max_retries, delay + ); + tokio::time::sleep(delay).await; + } + } + } + } + + fn build_record( + &self, + record_id_prefix: &RecordIdPrefix, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + message: ConsumedMessage, + ) -> Result { + let mut record = Map::new(); + record.insert( + "id".to_string(), + Value::String(build_record_id( + record_id_prefix, + messages_metadata, + message.id, + message.offset, + )), + ); + record.insert( + "iggy_message_id".to_string(), + Value::String(message.id.to_string()), + ); + + if self.include_metadata { + record.insert( + "iggy_stream".to_string(), + Value::String(topic_metadata.stream.clone()), + ); + record.insert( + "iggy_topic".to_string(), + Value::String(topic_metadata.topic.clone()), + ); + record.insert( + "iggy_partition_id".to_string(), + Value::String(messages_metadata.partition_id.to_string()), + ); + record.insert( + "iggy_offset".to_string(), + Value::String(message.offset.to_string()), + ); + record.insert( + "iggy_timestamp".to_string(), + Value::String(message.timestamp.to_string()), + ); + record.insert( + "iggy_schema".to_string(), + Value::String(messages_metadata.schema.to_string()), + ); + } + + if self.include_checksum { + record.insert( + "iggy_checksum".to_string(), + Value::String(message.checksum.to_string()), + ); + } + + if self.include_origin_timestamp { + record.insert( + "iggy_origin_timestamp".to_string(), + Value::String(message.origin_timestamp.to_string()), + ); + } + + if self.include_headers + && let Some(headers) = &message.headers + && !headers.is_empty() + { + record.insert("iggy_headers".to_string(), encode_headers(headers)?); + } + + let payload = self.build_payload_document(message.payload)?; + record.insert("payload".to_string(), payload.value); + record.insert( + "payload_encoding".to_string(), + Value::String(payload.encoding.to_string()), + ); + + Ok(Value::Object(record)) + } + + fn build_payload_document(&self, payload: Payload) -> Result { + match self.payload_format { + PayloadFormat::Auto => build_auto_payload_document(payload), + PayloadFormat::Json => build_json_payload_document(payload), + PayloadFormat::Text => build_text_payload_document(payload), + PayloadFormat::Base64 => build_base64_payload_document(payload), + } + } +} + +fn build_insert_query(table: &str, records: &[Value]) -> Result { + let mut query = Vec::with_capacity(table.len() + records.len() * 128 + 32); + query.extend_from_slice(b"INSERT IGNORE INTO "); + query.extend_from_slice(table.as_bytes()); + query.push(b' '); + serde_json::to_writer(&mut query, records) + .map_err(|e| Error::InvalidRecordValue(format!("Invalid SurrealDB records: {e}")))?; + query.extend_from_slice(b" RETURN NONE;"); + Ok(Bytes::from(query)) +} + +fn build_auto_payload_document(payload: Payload) -> Result { + match payload { + Payload::Json(value) => Ok(PayloadDocument { + value: owned_value_to_serde_json(&value), + encoding: ENCODING_JSON, + }), + Payload::Text(text) | Payload::Proto(text) => Ok(PayloadDocument { + value: Value::String(text), + encoding: ENCODING_TEXT, + }), + Payload::Raw(_) | Payload::FlatBuffer(_) | Payload::Avro(_) => { + build_base64_payload_document(payload) + } + } +} + +fn build_json_payload_document(payload: Payload) -> Result { + match payload { + Payload::Json(value) => Ok(PayloadDocument { + value: owned_value_to_serde_json(&value), + encoding: ENCODING_JSON, + }), + _ => { + let bytes = payload.try_into_vec()?; + let value = serde_json::from_slice(&bytes) + .map_err(|e| Error::InvalidRecordValue(format!("Invalid JSON payload: {e}")))?; + Ok(PayloadDocument { + value, + encoding: ENCODING_JSON, + }) + } + } +} + +fn build_text_payload_document(payload: Payload) -> Result { + match payload { + Payload::Text(text) | Payload::Proto(text) => Ok(PayloadDocument { + value: Value::String(text), + encoding: ENCODING_TEXT, + }), + _ => { + let bytes = payload.try_into_vec()?; + let text = String::from_utf8(bytes) + .map_err(|e| Error::InvalidRecordValue(format!("Invalid UTF-8 payload: {e}")))?; + Ok(PayloadDocument { + value: Value::String(text), + encoding: ENCODING_TEXT, + }) + } + } +} + +fn build_base64_payload_document(payload: Payload) -> Result { + let bytes = payload.try_into_vec()?; + Ok(PayloadDocument { + value: Value::String(general_purpose::STANDARD.encode(bytes)), + encoding: ENCODING_BASE64, + }) +} + +fn encode_headers( + headers: &std::collections::BTreeMap, +) -> Result { + let mut encoded = Map::new(); + + for (key, value) in headers { + let value = if let Ok(raw) = value.as_raw() { + json!({ + "data": general_purpose::STANDARD.encode(raw), + "iggy_header_encoding": ENCODING_BASE64 + }) + } else { + Value::String(value.to_string_value()) + }; + encoded.insert(key.to_string_value(), value); + } + + Ok(Value::Object(encoded)) +} + +#[derive(Debug)] +struct RecordIdPrefix { + stream: String, + topic: String, +} + +impl RecordIdPrefix { + fn new(topic_metadata: &TopicMetadata) -> Self { + let mut stream = String::with_capacity(topic_metadata.stream.len() * 2); + push_hex_component(&mut stream, topic_metadata.stream.as_bytes()); + let mut topic = String::with_capacity(topic_metadata.topic.len() * 2); + push_hex_component(&mut topic, topic_metadata.topic.as_bytes()); + + Self { stream, topic } + } +} + +fn build_record_id( + record_id_prefix: &RecordIdPrefix, + messages_metadata: &MessagesMetadata, + message_id: u128, + offset: u64, +) -> String { + let mut id = + String::with_capacity(record_id_prefix.stream.len() + record_id_prefix.topic.len() + 72); + id.push('s'); + id.push_str(&record_id_prefix.stream); + id.push_str("_t"); + id.push_str(&record_id_prefix.topic); + id.push_str("_p"); + id.push_str(&messages_metadata.partition_id.to_string()); + id.push_str("_o"); + id.push_str(&offset.to_string()); + id.push_str("_m"); + let _ = write!(&mut id, "{message_id:032x}"); + id +} + +fn push_hex_component(out: &mut String, bytes: &[u8]) { + const HEX: &[u8; 16] = b"0123456789abcdef"; + + for byte in bytes { + out.push(HEX[(byte >> 4) as usize] as char); + out.push(HEX[(byte & 0x0f) as usize] as char); + } +} + +fn validate_identifier(field: &str, value: &str) -> Result<(), Error> { + let mut chars = value.chars(); + let Some(first) = chars.next() else { + return Err(Error::InvalidConfigValue(format!( + "SurrealDB {field} cannot be empty" + ))); + }; + + if !(first == '_' || first.is_ascii_alphabetic()) { + return Err(Error::InvalidConfigValue(format!( + "SurrealDB {field} must start with an ASCII letter or underscore" + ))); + } + + if chars.any(|ch| !(ch == '_' || ch.is_ascii_alphanumeric())) { + return Err(Error::InvalidConfigValue(format!( + "SurrealDB {field} must contain only ASCII letters, digits, and underscores" + ))); + } + + Ok(()) +} + +fn validate_endpoint_config(endpoint: &str, use_tls: bool) -> Result<(), Error> { + let endpoint = endpoint.trim(); + if endpoint.is_empty() { + return Err(Error::InvalidConfigValue( + "SurrealDB endpoint cannot be empty".to_string(), + )); + } + + let has_scheme = endpoint.starts_with("http://") || endpoint.starts_with("https://"); + if use_tls && endpoint.starts_with("http://") { + warn!("SurrealDB use_tls=true is ignored because endpoint has explicit http:// scheme."); + } + + let parsed_endpoint = if has_scheme { + endpoint.to_string() + } else { + let scheme = if use_tls { "https" } else { "http" }; + format!("{scheme}://{endpoint}") + }; + let url = Url::parse(&parsed_endpoint).map_err(|e| { + Error::InvalidConfigValue(format!("Invalid SurrealDB endpoint '{endpoint}': {e}")) + })?; + + if url.host_str().is_none() { + return Err(Error::InvalidConfigValue(format!( + "Invalid SurrealDB endpoint '{endpoint}': host is required" + ))); + } + + if !url.username().is_empty() || url.password().is_some() { + return Err(Error::InvalidConfigValue( + "SurrealDB endpoint must not include embedded credentials; use username/password config fields instead" + .to_string(), + )); + } + + if !matches!(url.path(), "" | "/") || url.query().is_some() || url.fragment().is_some() { + return Err(Error::InvalidConfigValue( + "SurrealDB endpoint must not include a path, query, or fragment".to_string(), + )); + } + + Ok(()) +} + +fn build_base_url(endpoint: &str, use_tls: bool) -> String { + let endpoint = endpoint.trim().trim_end_matches('/'); + let has_scheme = endpoint.starts_with("http://") || endpoint.starts_with("https://"); + let endpoint = if has_scheme { + endpoint.to_string() + } else { + let scheme = if use_tls { "https" } else { "http" }; + format!("{scheme}://{endpoint}") + }; + + if let Ok(mut url) = Url::parse(&endpoint) { + let _ = url.set_username(""); + let _ = url.set_password(None); + url.set_path(""); + url.set_query(None); + url.set_fragment(None); + return url.as_str().trim_end_matches('/').to_string(); + } + + endpoint +} + +fn value_to_error_message(value: &Value) -> String { + value + .as_str() + .map(ToString::to_string) + .unwrap_or_else(|| value.to_string()) +} + +fn is_transient_error(error: &SurrealDbRequestError) -> bool { + is_transaction_conflict(error) + || is_connection_error(error) + || is_timeout_or_service_error(error) +} + +fn is_transaction_conflict(error: &SurrealDbRequestError) -> bool { + let message = error.to_string().to_ascii_lowercase(); + message.contains("transaction conflict") || message.contains("transaction can be retried") +} + +fn is_connection_error(error: &SurrealDbRequestError) -> bool { + let SurrealDbRequestError::Request(error) = error else { + return false; + }; + + let message = error.to_string().to_ascii_lowercase(); + error.is_connect() + || error.is_timeout() + || message.contains("connection") + || message.contains("network") + || message.contains("broken pipe") + || message.contains("reset by peer") +} + +fn is_timeout_or_service_error(error: &SurrealDbRequestError) -> bool { + if let SurrealDbRequestError::Request(error) = error + && error.is_timeout() + { + return true; + } + if let SurrealDbRequestError::HttpStatus { status, .. } = error + && matches!( + *status, + StatusCode::REQUEST_TIMEOUT + | StatusCode::TOO_MANY_REQUESTS + | StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT + ) + { + return true; + } + + let message = error.to_string().to_ascii_lowercase(); + message.contains("timeout") + || message.contains("timed out") + || message.contains("temporarily unavailable") + || message.contains("service unavailable") +} + +#[cfg(test)] +mod tests { + use super::*; + use iggy_common::{HeaderKey, HeaderValue}; + use iggy_connector_sdk::Schema; + use std::collections::BTreeMap; + use std::str::FromStr; + + fn test_config() -> SurrealDbSinkConfig { + SurrealDbSinkConfig { + endpoint: "127.0.0.1:8000".to_string(), + namespace: "iggy".to_string(), + database: "connectors".to_string(), + table: "iggy_messages".to_string(), + username: Some("root".to_string()), + password: Some(SecretString::from("root")), + auth_scope: None, + use_tls: None, + auto_define_table: None, + define_indexes: None, + batch_size: None, + payload_format: None, + include_metadata: None, + include_headers: None, + include_checksum: None, + include_origin_timestamp: None, + query_timeout: None, + max_retries: None, + retry_delay: None, + max_retry_delay: None, + verbose_logging: None, + } + } + + fn test_topic_metadata() -> TopicMetadata { + TopicMetadata { + stream: "test_stream".to_string(), + topic: "test_topic".to_string(), + } + } + + fn test_messages_metadata() -> MessagesMetadata { + MessagesMetadata { + partition_id: 7, + current_offset: 0, + schema: Schema::Json, + } + } + + fn test_message(payload: Payload) -> ConsumedMessage { + ConsumedMessage { + id: 42, + offset: 9, + checksum: 123, + timestamp: 1_700_000_000_000_000, + origin_timestamp: 1_700_000_000_000_001, + headers: None, + payload, + } + } + + fn json_payload(value: serde_json::Value) -> Payload { + let mut bytes = serde_json::to_vec(&value).expect("Failed to serialize JSON"); + Payload::Json(simd_json::to_owned_value(&mut bytes).expect("Failed to parse JSON")) + } + + #[test] + fn given_default_config_should_apply_expected_runtime_values() { + let sink = SurrealDbSink::new(1, test_config()); + + assert_eq!(sink.batch_size, DEFAULT_BATCH_SIZE); + assert_eq!(sink.auth_scope, AuthScope::Root); + assert_eq!(sink.payload_format, PayloadFormat::Auto); + assert_eq!(sink.query_timeout, Duration::from_secs(30)); + assert_eq!(sink.max_retries, DEFAULT_MAX_RETRIES); + assert_eq!(sink.retry_delay, Duration::from_millis(100)); + assert_eq!(sink.max_retry_delay, Duration::from_secs(5)); + assert!(sink.include_metadata); + assert!(sink.include_headers); + assert!(sink.include_checksum); + assert!(sink.include_origin_timestamp); + assert!(!sink.auto_define_table); + assert!(!sink.define_indexes); + } + + #[test] + fn given_config_overrides_should_apply_expected_values() { + let mut config = test_config(); + config.auth_scope = Some("database".to_string()); + config.payload_format = Some("base64".to_string()); + config.batch_size = Some(10); + config.query_timeout = Some("5s".to_string()); + config.max_retries = Some(5); + config.retry_delay = Some("250ms".to_string()); + config.max_retry_delay = Some("2s".to_string()); + config.include_metadata = Some(false); + config.include_headers = Some(false); + config.include_checksum = Some(false); + config.include_origin_timestamp = Some(false); + config.auto_define_table = Some(true); + config.define_indexes = Some(true); + config.verbose_logging = Some(true); + + let sink = SurrealDbSink::new(1, config); + + assert_eq!(sink.auth_scope_config.as_deref(), Some("database")); + assert_eq!(sink.payload_format_config.as_deref(), Some("base64")); + assert_eq!(sink.auth_scope, AuthScope::Root); + assert_eq!(sink.payload_format, PayloadFormat::Auto); + assert_eq!(sink.batch_size, 10); + assert_eq!(sink.query_timeout, Duration::from_secs(5)); + assert_eq!(sink.max_retries, 5); + assert_eq!(sink.retry_delay, Duration::from_millis(250)); + assert_eq!(sink.max_retry_delay, Duration::from_secs(2)); + assert!(!sink.include_metadata); + assert!(!sink.include_headers); + assert!(!sink.include_checksum); + assert!(!sink.include_origin_timestamp); + assert!(sink.auto_define_table); + assert!(sink.define_indexes); + assert!(sink.verbose); + } + + #[test] + fn given_zero_max_retries_should_use_minimum_one_attempt() { + let mut config = test_config(); + config.max_retries = Some(0); + + let mut sink = SurrealDbSink::new(1, config); + sink.payload_format = PayloadFormat::Json; + + assert_eq!(sink.max_retries, 1); + } + + #[test] + fn given_reversed_retry_delays_should_clamp_max_retry_delay() { + let mut config = test_config(); + config.retry_delay = Some("5s".to_string()); + config.max_retry_delay = Some("100ms".to_string()); + + let mut sink = SurrealDbSink::new(1, config); + sink.payload_format = PayloadFormat::Json; + + assert_eq!(sink.retry_delay, Duration::from_secs(5)); + assert_eq!(sink.max_retry_delay, Duration::from_secs(5)); + } + + #[test] + fn given_payload_format_inputs_should_map_expected_variant() { + let cases = [ + (None, PayloadFormat::Auto), + (Some("auto"), PayloadFormat::Auto), + (Some("json"), PayloadFormat::Json), + (Some("text"), PayloadFormat::Text), + (Some("base64"), PayloadFormat::Base64), + (Some("binary"), PayloadFormat::Base64), + ]; + + for (input, expected) in cases { + assert_eq!(PayloadFormat::parse_config(input).unwrap(), expected); + } + + assert!(matches!( + PayloadFormat::parse_config(Some("unknown")), + Err(Error::InvalidConfigValue(_)) + )); + } + + #[test] + fn given_auth_scope_inputs_should_map_expected_variant() { + let cases = [ + (None, AuthScope::Root), + (Some("root"), AuthScope::Root), + (Some("namespace"), AuthScope::Namespace), + (Some("database"), AuthScope::Database), + (Some("none"), AuthScope::None), + ]; + + for (input, expected) in cases { + assert_eq!(AuthScope::parse_config(input).unwrap(), expected); + } + + assert!(matches!( + AuthScope::parse_config(Some("unknown")), + Err(Error::InvalidConfigValue(_)) + )); + } + + #[test] + fn given_unknown_auth_scope_when_opening_should_fail_validation() { + let mut config = test_config(); + config.auth_scope = Some("roo".to_string()); + let mut sink = SurrealDbSink::new(1, config); + + tokio::runtime::Runtime::new() + .expect("runtime should start") + .block_on(async { + let result = sink.open().await; + + assert!(matches!(result, Err(Error::InvalidConfigValue(_)))); + }); + } + + #[test] + fn given_unknown_payload_format_when_opening_should_fail_validation() { + let mut config = test_config(); + config.payload_format = Some("jsn".to_string()); + let mut sink = SurrealDbSink::new(1, config); + + tokio::runtime::Runtime::new() + .expect("runtime should start") + .block_on(async { + let result = sink.open().await; + + assert!(matches!(result, Err(Error::InvalidConfigValue(_)))); + }); + } + + #[test] + fn given_auto_define_table_with_scoped_auth_when_opening_should_fail_validation() { + let mut config = test_config(); + config.auth_scope = Some("database".to_string()); + config.auto_define_table = Some(true); + let mut sink = SurrealDbSink::new(1, config); + + tokio::runtime::Runtime::new() + .expect("runtime should start") + .block_on(async { + let result = sink.open().await; + + assert!(matches!(result, Err(Error::InvalidConfigValue(_)))); + }); + } + + #[test] + fn given_endpoint_path_when_opening_should_fail_validation() { + let mut config = test_config(); + config.endpoint = "http://127.0.0.1:8000/extra/path".to_string(); + let mut sink = SurrealDbSink::new(1, config); + + tokio::runtime::Runtime::new() + .expect("runtime should start") + .block_on(async { + let result = sink.open().await; + + assert!(matches!(result, Err(Error::InvalidConfigValue(_)))); + }); + } + + #[test] + fn given_invalid_namespace_when_opening_should_fail_validation() { + let mut config = test_config(); + config.namespace = "bad-namespace".to_string(); + let mut sink = SurrealDbSink::new(1, config); + + tokio::runtime::Runtime::new() + .expect("runtime should start") + .block_on(async { + let result = sink.open().await; + + assert!(matches!(result, Err(Error::InvalidConfigValue(_)))); + }); + } + + #[test] + fn given_identifier_values_should_validate_expected_shapes() { + assert!(validate_identifier("table", "iggy_messages").is_ok()); + assert!(validate_identifier("table", "_messages9").is_ok()); + assert!(validate_identifier("table", "").is_err()); + assert!(validate_identifier("table", "9messages").is_err()); + assert!(validate_identifier("table", "messages-name").is_err()); + assert!(validate_identifier("table", "messages]").is_err()); + assert!(validate_identifier("table", "messages\"").is_err()); + assert!(validate_identifier("table", "messages; DROP TABLE x").is_err()); + } + + #[test] + fn given_topic_metadata_should_build_deterministic_record_id() { + let topic_metadata = test_topic_metadata(); + let record_id_prefix = RecordIdPrefix::new(&topic_metadata); + let id = build_record_id(&record_id_prefix, &test_messages_metadata(), 42, 9); + + assert_eq!( + id, + "s746573745f73747265616d_t746573745f746f706963_p7_o9_m0000000000000000000000000000002a" + ); + } + + #[test] + fn given_table_name_should_build_bulk_insert_query() { + let records = [json!({ + "id": "record_1", + "payload": {"message": "hello"} + })]; + + assert_eq!( + String::from_utf8( + build_insert_query("iggy_messages", &records) + .expect("query should build") + .to_vec() + ) + .expect("query should be valid UTF-8"), + r#"INSERT IGNORE INTO iggy_messages [{"id":"record_1","payload":{"message":"hello"}}] RETURN NONE;"# + ); + } + + #[test] + fn given_adversarial_record_values_should_build_escaped_insert_query() { + let records = [json!({ + "id": "record_\"[]", + "payload": { + "text": "quote \" bracket ] brace } semi ; newline \n" + } + })]; + let query = String::from_utf8( + build_insert_query("iggy_messages", &records) + .expect("query should build") + .to_vec(), + ) + .expect("query should be valid UTF-8"); + let json_start = "INSERT IGNORE INTO iggy_messages "; + let json_end = " RETURN NONE;"; + assert!(query.starts_with(json_start)); + assert!(query.ends_with(json_end)); + + let encoded_records = &query[json_start.len()..query.len() - json_end.len()]; + let decoded_records: Vec = + serde_json::from_str(encoded_records).expect("records should stay valid JSON"); + assert_eq!(decoded_records, records.to_vec()); + } + + #[test] + fn given_auto_payload_json_should_store_queryable_json() { + let payload = json_payload(json!({"name": "Alice", "active": true})); + let document = build_auto_payload_document(payload).expect("Failed to build payload"); + + assert_eq!(document.encoding, ENCODING_JSON); + assert_eq!(document.value, json!({"name": "Alice", "active": true})); + } + + #[test] + fn given_auto_payload_text_should_store_text() { + let payload = Payload::Text("hello".to_string()); + let document = build_auto_payload_document(payload).expect("Failed to build payload"); + + assert_eq!(document.encoding, ENCODING_TEXT); + assert_eq!(document.value, Value::String("hello".to_string())); + } + + #[test] + fn given_auto_payload_raw_should_store_base64() { + let payload = Payload::Raw(vec![0, 1, 2, 255]); + let document = build_auto_payload_document(payload).expect("Failed to build payload"); + + assert_eq!(document.encoding, ENCODING_BASE64); + assert_eq!(document.value, Value::String("AAEC/w==".to_string())); + } + + #[test] + fn given_json_payload_format_should_parse_raw_json() { + let payload = Payload::Raw(br#"{"count":3}"#.to_vec()); + let document = build_json_payload_document(payload).expect("Failed to build payload"); + + assert_eq!(document.encoding, ENCODING_JSON); + assert_eq!(document.value, json!({"count": 3})); + } + + #[test] + fn given_json_payload_format_when_invalid_should_fail() { + let payload = Payload::Raw(b"not-json".to_vec()); + let result = build_json_payload_document(payload); + + assert!(matches!(result, Err(Error::InvalidRecordValue(_)))); + } + + #[test] + fn given_text_payload_format_when_invalid_utf8_should_fail() { + let payload = Payload::Raw(vec![0xff, 0xfe]); + let result = build_text_payload_document(payload); + + assert!(matches!(result, Err(Error::InvalidRecordValue(_)))); + } + + #[test] + fn given_headers_should_encode_raw_as_base64_and_values_as_strings() { + let mut headers = BTreeMap::new(); + headers.insert( + HeaderKey::try_from("trace-id").expect("valid key"), + HeaderValue::from_str("abc").expect("valid value"), + ); + headers.insert( + HeaderKey::try_from("binary").expect("valid key"), + HeaderValue::try_from(vec![1_u8, 2, 3]).expect("valid raw"), + ); + + let encoded = encode_headers(&headers).expect("Failed to encode headers"); + + assert_eq!( + encoded, + json!({ + "binary": { + "data": "AQID", + "iggy_header_encoding": "base64" + }, + "trace-id": "abc" + }) + ); + } + + #[test] + fn given_message_should_build_full_record() { + let mut message = test_message(json_payload(json!({"event": "created"}))); + let mut headers = BTreeMap::new(); + headers.insert( + HeaderKey::try_from("source").expect("valid key"), + HeaderValue::from_str("unit-test").expect("valid value"), + ); + message.headers = Some(headers); + + let sink = SurrealDbSink::new(1, test_config()); + let topic_metadata = test_topic_metadata(); + let record_id_prefix = RecordIdPrefix::new(&topic_metadata); + let record = sink + .build_record( + &record_id_prefix, + &topic_metadata, + &test_messages_metadata(), + message, + ) + .expect("Failed to build record"); + let object = record.as_object().expect("record should be object"); + + assert_eq!( + object.get("id"), + Some(&Value::String( + "s746573745f73747265616d_t746573745f746f706963_p7_o9_m0000000000000000000000000000002a" + .to_string() + )) + ); + assert_eq!(object.get("iggy_message_id"), Some(&json!("42"))); + assert_eq!(object.get("iggy_stream"), Some(&json!("test_stream"))); + assert_eq!(object.get("iggy_topic"), Some(&json!("test_topic"))); + assert_eq!(object.get("iggy_partition_id"), Some(&json!("7"))); + assert_eq!(object.get("iggy_offset"), Some(&json!("9"))); + assert_eq!( + object.get("iggy_timestamp"), + Some(&json!("1700000000000000")) + ); + assert_eq!(object.get("iggy_checksum"), Some(&json!("123"))); + assert_eq!( + object.get("iggy_origin_timestamp"), + Some(&json!("1700000000000001")) + ); + assert_eq!(object.get("payload"), Some(&json!({"event": "created"}))); + assert_eq!(object.get("payload_encoding"), Some(&json!("json"))); + assert!(object.contains_key("iggy_headers")); + } + + #[test] + fn given_large_u64_metadata_should_build_record_with_lossless_strings() { + let sink = SurrealDbSink::new(1, test_config()); + let mut message = test_message(Payload::Text("large-metadata".to_string())); + message.offset = u64::MAX; + message.timestamp = u64::MAX; + message.checksum = u64::MAX; + message.origin_timestamp = u64::MAX; + let topic_metadata = test_topic_metadata(); + let record_id_prefix = RecordIdPrefix::new(&topic_metadata); + + let record = sink + .build_record( + &record_id_prefix, + &topic_metadata, + &test_messages_metadata(), + message, + ) + .expect("Failed to build record"); + let object = record.as_object().expect("record should be object"); + + assert_eq!( + object.get("iggy_offset"), + Some(&json!("18446744073709551615")) + ); + assert_eq!( + object.get("iggy_timestamp"), + Some(&json!("18446744073709551615")) + ); + assert_eq!( + object.get("iggy_checksum"), + Some(&json!("18446744073709551615")) + ); + assert_eq!( + object.get("iggy_origin_timestamp"), + Some(&json!("18446744073709551615")) + ); + } + + #[test] + fn given_invalid_batch_when_processing_messages_should_record_error_and_return_error() { + let mut config = test_config(); + config.payload_format = Some("json".to_string()); + let mut sink = SurrealDbSink::new(1, config); + sink.payload_format = PayloadFormat::Json; + let message = test_message(Payload::Raw(b"not-json".to_vec())); + + tokio::runtime::Runtime::new() + .expect("runtime should start") + .block_on(async { + let result = sink + .process_messages( + &test_topic_metadata(), + &test_messages_metadata(), + vec![message], + ) + .await; + + assert!( + matches!(result, Err(Error::InvalidRecordValue(_))), + "batch failures should be observable by direct plugin callers" + ); + }); + + assert_eq!(sink.messages_processed.load(Ordering::Relaxed), 0); + assert_eq!(sink.insertion_errors.load(Ordering::Relaxed), 1); + } + + #[test] + fn given_invalid_chunks_when_processing_messages_should_process_all_chunks() { + let mut config = test_config(); + config.payload_format = Some("json".to_string()); + config.batch_size = Some(1); + let mut sink = SurrealDbSink::new(1, config); + sink.payload_format = PayloadFormat::Json; + let messages = vec![ + test_message(Payload::Raw(b"not-json".to_vec())), + test_message(Payload::Raw(b"also-not-json".to_vec())), + ]; + + tokio::runtime::Runtime::new() + .expect("runtime should start") + .block_on(async { + let result = sink + .process_messages(&test_topic_metadata(), &test_messages_metadata(), messages) + .await; + + assert!(matches!(result, Err(Error::InvalidRecordValue(_)))); + }); + + assert_eq!(sink.messages_processed.load(Ordering::Relaxed), 0); + assert_eq!(sink.insertion_errors.load(Ordering::Relaxed), 2); + } + + #[test] + fn given_malformed_record_in_batch_should_still_attempt_valid_records() { + let mut config = test_config(); + config.payload_format = Some("json".to_string()); + let sink = SurrealDbSink::new(1, config); + let messages = vec![ + test_message(Payload::Raw(b"not-json".to_vec())), + test_message(Payload::Raw(br#"{"valid":true}"#.to_vec())), + ]; + + tokio::runtime::Runtime::new() + .expect("runtime should start") + .block_on(async { + let outcome = sink + .insert_batch( + messages, + &RecordIdPrefix::new(&test_topic_metadata()), + &test_topic_metadata(), + &test_messages_metadata(), + ) + .await; + + assert_eq!(outcome.inserted_count, 0); + assert_eq!(outcome.error_count, 2); + assert!(matches!(outcome.error, Some(Error::InitError(_)))); + }); + } + + #[test] + fn given_endpoint_should_build_http_base_url() { + assert_eq!( + build_base_url("127.0.0.1:8000", false), + "http://127.0.0.1:8000" + ); + assert_eq!( + build_base_url("127.0.0.1:8000", true), + "https://127.0.0.1:8000" + ); + assert_eq!( + build_base_url("http://127.0.0.1:8000/", true), + "http://127.0.0.1:8000" + ); + } + + #[test] + fn given_endpoint_credentials_should_build_sanitized_base_url() { + assert_eq!( + build_base_url("http://user:pass@127.0.0.1:8000/", false), + "http://127.0.0.1:8000" + ); + } + + #[test] + fn given_endpoint_shapes_should_validate_expected_values() { + assert!(validate_endpoint_config("127.0.0.1:8000", false).is_ok()); + assert!(validate_endpoint_config("http://127.0.0.1:8000", true).is_ok()); + assert!(validate_endpoint_config("http://user:pass@127.0.0.1:8000", false).is_err()); + assert!(validate_endpoint_config("http://127.0.0.1:8000/path", false).is_err()); + assert!(validate_endpoint_config("http://127.0.0.1:8000?x=1", false).is_err()); + } + + #[test] + fn given_http_status_service_error_should_be_transient() { + let error = SurrealDbRequestError::HttpStatus { + status: StatusCode::SERVICE_UNAVAILABLE, + body: "retry later".to_string(), + }; + + assert!(is_transient_error(&error)); + assert!(is_timeout_or_service_error(&error)); + } + + #[test] + fn given_transaction_conflict_error_should_be_transient() { + let error = SurrealDbRequestError::Query("Transaction conflict".to_string()); + + assert!(is_transient_error(&error)); + assert!(is_transaction_conflict(&error)); + } + + #[test] + fn given_timeout_error_should_be_transient() { + let error = SurrealDbRequestError::Query("Query timed out".to_string()); + + assert!(is_transient_error(&error)); + assert!(is_timeout_or_service_error(&error)); + } + + #[test] + fn given_connection_text_in_query_error_should_not_be_connection_error() { + let error = SurrealDbRequestError::Query("connection pool size exceeded".to_string()); + + assert!(!is_connection_error(&error)); + } + + #[test] + fn given_non_transient_query_error_should_not_be_transient() { + let error = SurrealDbRequestError::Query("syntax error".to_string()); + + assert!(!is_transient_error(&error)); + } + + #[test] + fn given_metadata_disabled_should_build_minimal_record() { + let mut config = test_config(); + config.include_metadata = Some(false); + config.include_headers = Some(false); + config.include_checksum = Some(false); + config.include_origin_timestamp = Some(false); + let sink = SurrealDbSink::new(1, config); + let message = test_message(Payload::Text("minimal".to_string())); + let topic_metadata = test_topic_metadata(); + let record_id_prefix = RecordIdPrefix::new(&topic_metadata); + + let record = sink + .build_record( + &record_id_prefix, + &topic_metadata, + &test_messages_metadata(), + message, + ) + .expect("Failed to build record"); + let object = record.as_object().expect("record should be object"); + + assert!(object.contains_key("id")); + assert!(object.contains_key("iggy_message_id")); + assert!(object.contains_key("payload")); + assert!(!object.contains_key("iggy_stream")); + assert!(!object.contains_key("iggy_checksum")); + assert!(!object.contains_key("iggy_origin_timestamp")); + assert!(!object.contains_key("iggy_headers")); + } +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 0b2f264d03..bd4f70a1f4 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -26,6 +26,7 @@ mod influxdb; mod mongodb; mod postgres; mod quickwit; +mod surrealdb; mod wiremock; /// Prefix on every test container name so `just clean-test-containers` reaps @@ -72,4 +73,8 @@ pub use postgres::{ PostgresSourceJsonbFixture, PostgresSourceMarkFixture, PostgresSourceOps, }; pub use quickwit::{QuickwitFixture, QuickwitOps, QuickwitPreCreatedFixture}; +pub use surrealdb::{ + SurrealDbOps, SurrealDbSinkBatchFixture, SurrealDbSinkFixture, SurrealDbSinkJsonFixture, + SurrealDbSinkRawFixture, +}; pub use wiremock::{WireMockDirectFixture, WireMockWrappedFixture}; diff --git a/core/integration/tests/connectors/fixtures/surrealdb/container.rs b/core/integration/tests/connectors/fixtures/surrealdb/container.rs new file mode 100644 index 0000000000..07a733bfbb --- /dev/null +++ b/core/integration/tests/connectors/fixtures/surrealdb/container.rs @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::connectors::fixtures; +use integration::harness::TestBinaryError; +use serde::Deserialize; +use serde_json::{Value, json}; +use std::time::Duration; +use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers_modules::testcontainers::runners::AsyncRunner; +use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt}; +use tokio::time::sleep; +use tracing::info; + +const SURREALDB_IMAGE: &str = "docker.io/surrealdb/surrealdb"; +const SURREALDB_TAG: &str = "v3.1.4"; +const SURREALDB_PORT: u16 = 8000; +const SURREALDB_READY_MSG: &str = "Started web server on"; +const SURREALDB_BOOT_ATTEMPTS: usize = 120; +const SURREALDB_BOOT_INTERVAL_MS: u64 = 250; + +pub(super) const DEFAULT_TEST_STREAM: &str = "test_stream"; +pub(super) const DEFAULT_TEST_TOPIC: &str = "test_topic"; +pub(super) const DEFAULT_NAMESPACE: &str = "iggy"; +pub(super) const DEFAULT_DATABASE: &str = "connectors"; +pub(super) const DEFAULT_TABLE: &str = "iggy_messages"; +pub(super) const ROOT_USERNAME: &str = "root"; +pub(super) const ROOT_PASSWORD: &str = "root"; + +pub(super) const DEFAULT_POLL_ATTEMPTS: usize = 120; +pub(super) const DEFAULT_POLL_INTERVAL_MS: u64 = 50; + +pub(super) const ENV_SINK_ENDPOINT: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_ENDPOINT"; +pub(super) const ENV_SINK_NAMESPACE: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_NAMESPACE"; +pub(super) const ENV_SINK_DATABASE: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_DATABASE"; +pub(super) const ENV_SINK_TABLE: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_TABLE"; +pub(super) const ENV_SINK_USERNAME: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_USERNAME"; +pub(super) const ENV_SINK_PASSWORD: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_PASSWORD"; +pub(super) const ENV_SINK_AUTH_SCOPE: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_AUTH_SCOPE"; +pub(super) const ENV_SINK_AUTO_DEFINE_TABLE: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_AUTO_DEFINE_TABLE"; +pub(super) const ENV_SINK_DEFINE_INDEXES: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_DEFINE_INDEXES"; +pub(super) const ENV_SINK_BATCH_SIZE: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_BATCH_SIZE"; +pub(super) const ENV_SINK_PAYLOAD_FORMAT: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_PAYLOAD_FORMAT"; +pub(super) const ENV_SINK_STREAMS_0_STREAM: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_STREAMS_0_STREAM"; +pub(super) const ENV_SINK_STREAMS_0_TOPICS: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_STREAMS_0_TOPICS"; +pub(super) const ENV_SINK_STREAMS_0_SCHEMA: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_STREAMS_0_SCHEMA"; +pub(super) const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str = + "IGGY_CONNECTORS_SINK_SURREALDB_STREAMS_0_CONSUMER_GROUP"; +pub(super) const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PATH"; + +#[derive(Clone)] +pub struct SurrealDbClient { + client: reqwest::Client, + base_url: String, +} + +#[derive(Debug, Deserialize)] +struct SurrealSqlStatement { + status: String, + detail: Option, + result: Value, +} + +impl SurrealDbClient { + async fn new(endpoint: &str) -> Result { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: format!("Failed to create SurrealDB HTTP client: {e}"), + })?; + let client = Self { + client, + base_url: format!("http://{endpoint}"), + }; + client.signin().await?; + client.health().await?; + client.ensure_namespace_database().await?; + Ok(client) + } + + pub async fn health(&self) -> Result<(), TestBinaryError> { + let response = self + .client + .get(format!("{}/health", self.base_url)) + .send() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: format!("Failed to check SurrealDB health: {e}"), + })?; + let status = response.status(); + if status.is_success() { + return Ok(()); + } + + let body = response + .text() + .await + .unwrap_or_else(|e| format!("failed to read response body: {e}")); + Err(TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: format!("SurrealDB health check failed with HTTP status {status}: {body}"), + }) + } + + pub async fn query_result(&self, query: &str) -> Result { + let statements = self.execute_sql(query).await?; + statements + .into_iter() + .next() + .map(|statement| statement.result) + .ok_or_else(|| TestBinaryError::InvalidState { + message: "SurrealDB returned no SQL statements".to_string(), + }) + } + + async fn signin(&self) -> Result<(), TestBinaryError> { + let response = self + .client + .post(format!("{}/signin", self.base_url)) + .json(&json!({ + "user": ROOT_USERNAME, + "pass": ROOT_PASSWORD + })) + .send() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: format!("Failed to authenticate with SurrealDB: {e}"), + })?; + let status = response.status(); + if status.is_success() { + return Ok(()); + } + + let body = response + .text() + .await + .unwrap_or_else(|e| format!("failed to read response body: {e}")); + Err(TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: format!("Failed to authenticate with SurrealDB: HTTP status {status}: {body}"), + }) + } + + async fn ensure_namespace_database(&self) -> Result<(), TestBinaryError> { + let query = format!( + "DEFINE NAMESPACE IF NOT EXISTS {DEFAULT_NAMESPACE}; USE NS {DEFAULT_NAMESPACE}; DEFINE DATABASE IF NOT EXISTS {DEFAULT_DATABASE};" + ); + self.execute_sql_request(&query, false).await.map(|_| ()) + } + + async fn execute_sql(&self, query: &str) -> Result, TestBinaryError> { + self.execute_sql_request(query, true).await + } + + async fn execute_sql_request( + &self, + query: &str, + include_scope: bool, + ) -> Result, TestBinaryError> { + let mut request = self + .client + .post(format!("{}/sql", self.base_url)) + .basic_auth(ROOT_USERNAME, Some(ROOT_PASSWORD)) + .header("Accept", "application/json") + .header("Content-Type", "text/plain") + .body(query.to_string()); + + if include_scope { + request = request + .header("Surreal-NS", DEFAULT_NAMESPACE) + .header("Surreal-DB", DEFAULT_DATABASE); + } + + let response = request + .send() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to execute SurrealDB query: {e}"), + })?; + + let status = response.status(); + let body = response + .text() + .await + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to read SurrealDB response: {e}"), + })?; + + if !status.is_success() { + return Err(TestBinaryError::InvalidState { + message: format!("SurrealDB query failed with HTTP status {status}: {body}"), + }); + } + + let statements: Vec = + serde_json::from_str(&body).map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to decode SurrealDB response: {e}; response: {body}"), + })?; + + if let Some(statement) = statements + .iter() + .find(|statement| !statement.status.eq_ignore_ascii_case("OK")) + { + return Err(TestBinaryError::InvalidState { + message: statement + .detail + .clone() + .unwrap_or_else(|| format!("SurrealDB query status: {}", statement.status)), + }); + } + + Ok(statements) + } +} + +pub struct SurrealDbContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub(super) endpoint: String, +} + +impl SurrealDbContainer { + pub(super) async fn start() -> Result { + let container = GenericImage::new(SURREALDB_IMAGE, SURREALDB_TAG) + .with_exposed_port(SURREALDB_PORT.tcp()) + .with_wait_for(WaitFor::message_on_stdout(SURREALDB_READY_MSG)) + .with_mapped_port(0, SURREALDB_PORT.tcp()) + .with_container_name(fixtures::unique_container_name("surrealdb")) + .with_cmd([ + "start", + "--log", + "info", + "--user", + ROOT_USERNAME, + "--pass", + ROOT_PASSWORD, + "memory", + ]) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + let mapped_port = container + .ports() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: format!("Failed to get ports: {e}"), + })? + .map_to_host_port_ipv4(SURREALDB_PORT) + .ok_or_else(|| TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: "No mapping for SurrealDB port".to_string(), + })?; + + let endpoint = format!("127.0.0.1:{mapped_port}"); + let instance = Self { + container, + endpoint, + }; + instance.wait_until_ready().await?; + + info!("SurrealDB container available at {}", instance.endpoint); + Ok(instance) + } + + pub async fn create_client(&self) -> Result { + SurrealDbClient::new(&self.endpoint).await + } + + async fn wait_until_ready(&self) -> Result<(), TestBinaryError> { + let mut last_error = None; + + for _ in 0..SURREALDB_BOOT_ATTEMPTS { + match self.create_client().await { + Ok(_) => return Ok(()), + Err(error) => last_error = Some(error.to_string()), + } + sleep(Duration::from_millis(SURREALDB_BOOT_INTERVAL_MS)).await; + } + + let detail = last_error + .map(|error| format!(" Last error: {error}")) + .unwrap_or_default(); + Err(TestBinaryError::FixtureSetup { + fixture_type: "SurrealDbContainer".to_string(), + message: format!("SurrealDB did not become ready.{detail}"), + }) + } +} + +pub trait SurrealDbOps: Sync { + fn container(&self) -> &SurrealDbContainer; + + fn create_client( + &self, + ) -> impl std::future::Future> + Send { + self.container().create_client() + } +} diff --git a/core/integration/tests/connectors/fixtures/surrealdb/mod.rs b/core/integration/tests/connectors/fixtures/surrealdb/mod.rs new file mode 100644 index 0000000000..7e8e5fdad1 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/surrealdb/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod container; +mod sink; + +pub use container::SurrealDbOps; +pub use sink::{ + SurrealDbSinkBatchFixture, SurrealDbSinkFixture, SurrealDbSinkJsonFixture, + SurrealDbSinkRawFixture, +}; diff --git a/core/integration/tests/connectors/fixtures/surrealdb/sink.rs b/core/integration/tests/connectors/fixtures/surrealdb/sink.rs new file mode 100644 index 0000000000..59f9729334 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/surrealdb/sink.rs @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::container::{ + DEFAULT_DATABASE, DEFAULT_NAMESPACE, DEFAULT_POLL_ATTEMPTS, DEFAULT_POLL_INTERVAL_MS, + DEFAULT_TABLE, DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SINK_AUTH_SCOPE, + ENV_SINK_AUTO_DEFINE_TABLE, ENV_SINK_BATCH_SIZE, ENV_SINK_DATABASE, ENV_SINK_DEFINE_INDEXES, + ENV_SINK_ENDPOINT, ENV_SINK_NAMESPACE, ENV_SINK_PASSWORD, ENV_SINK_PATH, + ENV_SINK_PAYLOAD_FORMAT, ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA, + ENV_SINK_STREAMS_0_STREAM, ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_TABLE, ENV_SINK_USERNAME, + ROOT_PASSWORD, ROOT_USERNAME, SurrealDbClient, SurrealDbContainer, SurrealDbOps, +}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture}; +use serde_json::{Value, json}; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::time::Duration; +use tokio::time::sleep; +use tracing::info; + +pub trait SurrealDbSinkProfile { + const SCHEMA: &'static str; + const BATCH_SIZE: Option; +} + +pub struct SurrealDbSinkJsonProfile; +pub struct SurrealDbSinkRawProfile; +pub struct SurrealDbSinkBatchProfile; + +impl SurrealDbSinkProfile for SurrealDbSinkJsonProfile { + const SCHEMA: &'static str = "json"; + const BATCH_SIZE: Option = None; +} + +impl SurrealDbSinkProfile for SurrealDbSinkRawProfile { + const SCHEMA: &'static str = "raw"; + const BATCH_SIZE: Option = None; +} + +impl SurrealDbSinkProfile for SurrealDbSinkBatchProfile { + const SCHEMA: &'static str = "json"; + const BATCH_SIZE: Option = Some(10); +} + +pub type SurrealDbSinkJsonFixture = SurrealDbSinkFixture; +pub type SurrealDbSinkRawFixture = SurrealDbSinkFixture; +pub type SurrealDbSinkBatchFixture = SurrealDbSinkFixture; + +pub struct SurrealDbSinkFixture

{ + container: SurrealDbContainer, + profile: PhantomData

, +} + +impl

SurrealDbOps for SurrealDbSinkFixture

+where + P: Sync, +{ + fn container(&self) -> &SurrealDbContainer { + &self.container + } +} + +impl

SurrealDbSinkFixture

{ + pub async fn wait_for_records( + &self, + client: &SurrealDbClient, + expected: usize, + ) -> Result, TestBinaryError> { + for _ in 0..DEFAULT_POLL_ATTEMPTS { + let records = self.select_all_records(client).await?; + if records.len() >= expected { + info!( + "Found {} records in SurrealDB table '{DEFAULT_TABLE}'", + records.len() + ); + return Ok(records); + } + sleep(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)).await; + } + + Err(TestBinaryError::InvalidState { + message: format!( + "Expected at least {expected} SurrealDB records after {DEFAULT_POLL_ATTEMPTS} attempts" + ), + }) + } + + pub async fn select_all_records( + &self, + client: &SurrealDbClient, + ) -> Result, TestBinaryError> { + let query = format!("SELECT * FROM {DEFAULT_TABLE};"); + let value = client.query_result(&query).await?; + decode_records_sorted_by_offset(value) + } + + pub async fn select_records_by_message_id( + &self, + client: &SurrealDbClient, + message_id: u128, + ) -> Result, TestBinaryError> { + let message_id = serde_json::to_string(&message_id.to_string()).map_err(|e| { + TestBinaryError::InvalidState { + message: format!("Failed to encode SurrealDB message id: {e}"), + } + })?; + let query = format!("SELECT * FROM {DEFAULT_TABLE} WHERE iggy_message_id = {message_id};"); + let value = client.query_result(&query).await?; + decode_records_sorted_by_offset(value) + } + + pub async fn insert_preseeded_record( + &self, + client: &SurrealDbClient, + record_id: &str, + message_id: u128, + ) -> Result<(), TestBinaryError> { + let records = serde_json::to_string(&json!([ + { + "id": record_id, + "iggy_message_id": message_id.to_string(), + "seed_marker": "preseed-unchanged", + "payload": "preseeded" + } + ])) + .map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to encode SurrealDB preseed record: {e}"), + })?; + let query = format!("INSERT INTO {DEFAULT_TABLE} {records} RETURN NONE;"); + client.query_result(&query).await.map(|_| ()) + } +} + +fn decode_records_sorted_by_offset(value: Value) -> Result, TestBinaryError> { + let mut records: Vec = + serde_json::from_value(value).map_err(|e| TestBinaryError::InvalidState { + message: format!("Failed to decode SurrealDB records: {e}"), + })?; + records.sort_by_key(|record| { + record + .get("iggy_offset") + .and_then(Value::as_str) + .and_then(|offset| offset.parse::().ok()) + .unwrap_or(u64::MAX) + }); + + Ok(records) +} + +#[async_trait] +impl

TestFixture for SurrealDbSinkFixture

+where + P: SurrealDbSinkProfile + Send + Sync, +{ + async fn setup() -> Result { + let container = SurrealDbContainer::start().await?; + Ok(Self { + container, + profile: PhantomData, + }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert( + ENV_SINK_ENDPOINT.to_string(), + self.container.endpoint.clone(), + ); + envs.insert( + ENV_SINK_NAMESPACE.to_string(), + DEFAULT_NAMESPACE.to_string(), + ); + envs.insert(ENV_SINK_DATABASE.to_string(), DEFAULT_DATABASE.to_string()); + envs.insert(ENV_SINK_TABLE.to_string(), DEFAULT_TABLE.to_string()); + envs.insert(ENV_SINK_USERNAME.to_string(), ROOT_USERNAME.to_string()); + envs.insert(ENV_SINK_PASSWORD.to_string(), ROOT_PASSWORD.to_string()); + envs.insert(ENV_SINK_AUTH_SCOPE.to_string(), "root".to_string()); + envs.insert(ENV_SINK_AUTO_DEFINE_TABLE.to_string(), "true".to_string()); + envs.insert(ENV_SINK_DEFINE_INDEXES.to_string(), "true".to_string()); + envs.insert(ENV_SINK_PAYLOAD_FORMAT.to_string(), "auto".to_string()); + envs.insert( + ENV_SINK_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SINK_STREAMS_0_TOPICS.to_string(), + format!("[{}]", DEFAULT_TEST_TOPIC), + ); + envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), P::SCHEMA.to_string()); + envs.insert( + ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(), + format!("surrealdb_sink_{}_cg", P::SCHEMA), + ); + envs.insert( + ENV_SINK_PATH.to_string(), + "../../target/debug/libiggy_connector_surrealdb_sink".to_string(), + ); + + if let Some(batch_size) = P::BATCH_SIZE { + envs.insert(ENV_SINK_BATCH_SIZE.to_string(), batch_size.to_string()); + } + + envs + } +} diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index fc624f897f..8f39bbcc70 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -31,6 +31,7 @@ mod random; mod random_source_liveness; mod runtime; mod stdout; +mod surrealdb; use iggy_common::IggyTimestamp; use serde::{Deserialize, Serialize}; diff --git a/core/integration/tests/connectors/surrealdb/mod.rs b/core/integration/tests/connectors/surrealdb/mod.rs new file mode 100644 index 0000000000..7bbb19039e --- /dev/null +++ b/core/integration/tests/connectors/surrealdb/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod surrealdb_sink; + +const TEST_MESSAGE_COUNT: usize = 3; +const LARGE_BATCH_COUNT: usize = 50; +const POLL_ATTEMPTS: usize = 120; +const POLL_INTERVAL_MS: u64 = 50; diff --git a/core/integration/tests/connectors/surrealdb/sink.toml b/core/integration/tests/connectors/surrealdb/sink.toml new file mode 100644 index 0000000000..4f980f8827 --- /dev/null +++ b/core/integration/tests/connectors/surrealdb/sink.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sinks/surrealdb_sink" diff --git a/core/integration/tests/connectors/surrealdb/surrealdb_sink.rs b/core/integration/tests/connectors/surrealdb/surrealdb_sink.rs new file mode 100644 index 0000000000..9205760c49 --- /dev/null +++ b/core/integration/tests/connectors/surrealdb/surrealdb_sink.rs @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{LARGE_BATCH_COUNT, POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT}; +use crate::connectors::fixtures::{ + SurrealDbOps, SurrealDbSinkBatchFixture, SurrealDbSinkFixture, SurrealDbSinkJsonFixture, + SurrealDbSinkRawFixture, +}; +use bytes::Bytes; +use iggy::prelude::{IggyMessage, Partitioning}; +use iggy_common::Identifier; +use iggy_common::MessageClient; +use integration::harness::seeds; +use integration::iggy_harness; +use serde_json::Value; +use std::time::Duration; +use tokio::time::sleep; + +fn build_expected_record_id(message_id: u128, offset: u64) -> String { + let mut id = String::new(); + id.push('s'); + push_hex_component(&mut id, seeds::names::STREAM.as_bytes()); + id.push_str("_t"); + push_hex_component(&mut id, seeds::names::TOPIC.as_bytes()); + id.push_str("_p0_o"); + id.push_str(&offset.to_string()); + id.push_str("_m"); + id.push_str(&format!("{message_id:032x}")); + id +} + +fn push_hex_component(out: &mut String, bytes: &[u8]) { + const HEX: &[u8; 16] = b"0123456789abcdef"; + + for byte in bytes { + out.push(HEX[(byte >> 4) as usize] as char); + out.push(HEX[(byte & 0x0f) as usize] as char); + } +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/surrealdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn json_messages_sink_to_surrealdb(harness: &TestHarness, fixture: SurrealDbSinkJsonFixture) { + let client = harness.root_client().await.unwrap(); + let surreal_client = fixture + .create_client() + .await + .expect("Failed to create SurrealDB client"); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let payloads = [ + serde_json::json!({"name": "Alice", "score": 10}), + serde_json::json!({"name": "Bob", "score": 20}), + serde_json::json!({"name": "Carol", "score": 30}), + ]; + let mut messages: Vec = payloads + .iter() + .enumerate() + .map(|(idx, payload)| { + IggyMessage::builder() + .id((idx + 1) as u128) + .payload(Bytes::from( + serde_json::to_vec(payload).expect("Failed to serialize payload"), + )) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + let records = fixture + .wait_for_records(&surreal_client, TEST_MESSAGE_COUNT) + .await + .expect("Records did not appear in SurrealDB"); + + assert_eq!(records.len(), TEST_MESSAGE_COUNT); + for (idx, record) in records.iter().enumerate() { + assert_eq!( + record["iggy_message_id"], + Value::String((idx + 1).to_string()) + ); + assert_eq!( + record["iggy_stream"], + Value::String(seeds::names::STREAM.to_string()) + ); + assert_eq!( + record["iggy_topic"], + Value::String(seeds::names::TOPIC.to_string()) + ); + assert_eq!(record["iggy_partition_id"], Value::String("0".to_string())); + assert_eq!(record["iggy_offset"], Value::String(idx.to_string())); + assert_eq!( + record["payload_encoding"], + Value::String("json".to_string()) + ); + assert_eq!(record["payload"], payloads[idx]); + assert_eq!( + record["id"].as_str().expect("record id should be string"), + format!( + "iggy_messages:{}", + build_expected_record_id((idx + 1) as u128, idx as u64) + ) + ); + } +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/surrealdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn raw_messages_sink_as_base64(harness: &TestHarness, fixture: SurrealDbSinkRawFixture) { + let client = harness.root_client().await.unwrap(); + let surreal_client = fixture + .create_client() + .await + .expect("Failed to create SurrealDB client"); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let payloads: Vec> = vec![ + b"plain text".to_vec(), + vec![0x00, 0x01, 0x02, 0xff], + vec![0xde, 0xad, 0xbe, 0xef], + ]; + let mut messages: Vec = payloads + .iter() + .enumerate() + .map(|(idx, payload)| { + IggyMessage::builder() + .id((idx + 1) as u128) + .payload(Bytes::from(payload.clone())) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + let records = fixture + .wait_for_records(&surreal_client, payloads.len()) + .await + .expect("Records did not appear in SurrealDB"); + + assert_eq!(records.len(), payloads.len()); + let expected_payloads = ["cGxhaW4gdGV4dA==", "AAEC/w==", "3q2+7w=="]; + for (idx, record) in records.iter().enumerate() { + assert_eq!( + record["payload_encoding"], + Value::String("base64".to_string()) + ); + assert_eq!( + record["payload"], + Value::String(expected_payloads[idx].to_string()) + ); + } +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/surrealdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn large_batch_processed_in_chunks( + harness: &TestHarness, + fixture: SurrealDbSinkBatchFixture, +) { + let client = harness.root_client().await.unwrap(); + let surreal_client = fixture + .create_client() + .await + .expect("Failed to create SurrealDB client"); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + let mut messages: Vec = (0..LARGE_BATCH_COUNT) + .map(|idx| { + IggyMessage::builder() + .id((idx + 1) as u128) + .payload(Bytes::from( + serde_json::to_vec(&serde_json::json!({"idx": idx})) + .expect("Failed to serialize payload"), + )) + .build() + .expect("Failed to build message") + }) + .collect(); + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages"); + + let records = fixture + .wait_for_records(&surreal_client, LARGE_BATCH_COUNT) + .await + .expect("Records did not appear in SurrealDB"); + + assert_eq!(records.len(), LARGE_BATCH_COUNT); + for (idx, record) in records.iter().enumerate() { + assert_eq!(record["iggy_offset"], Value::String(idx.to_string())); + assert_eq!(record["payload"], serde_json::json!({"idx": idx})); + } +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/surrealdb/sink.toml")), + seed = seeds::connector_stream +)] +async fn duplicate_record_id_is_idempotent_replay_not_overwrite( + harness: &TestHarness, + fixture: SurrealDbSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + let surreal_client = fixture + .create_client() + .await + .expect("Failed to create SurrealDB client"); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + fixture + .insert_preseeded_record(&surreal_client, &build_expected_record_id(2, 1), 2) + .await + .expect("Failed to preseed duplicate record"); + + let mut messages: Vec = vec![ + IggyMessage::builder() + .id(1) + .payload(Bytes::from_static(br#"{"message":"one"}"#)) + .build() + .expect("Failed to build message 1"), + IggyMessage::builder() + .id(2) + .payload(Bytes::from_static(br#"{"message":"two"}"#)) + .build() + .expect("Failed to build message 2"), + IggyMessage::builder() + .id(3) + .payload(Bytes::from_static(br#"{"message":"three"}"#)) + .build() + .expect("Failed to build message 3"), + ]; + + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send duplicate batch"); + + let mut id1_inserted = false; + let mut id3_inserted = false; + + for _ in 0..POLL_ATTEMPTS { + id1_inserted = !fixture + .select_records_by_message_id(&surreal_client, 1) + .await + .expect("Failed to query id 1") + .is_empty(); + id3_inserted = !fixture + .select_records_by_message_id(&surreal_client, 3) + .await + .expect("Failed to query id 3") + .is_empty(); + + if id1_inserted && id3_inserted { + break; + } + + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + id1_inserted, + "Expected first non-duplicate record to be inserted" + ); + assert!( + id3_inserted, + "Expected suffix record after duplicate to be inserted" + ); + + let duplicate_records = fixture + .select_records_by_message_id(&surreal_client, 2) + .await + .expect("Failed to query duplicate record"); + assert_eq!( + duplicate_records.len(), + 1, + "Duplicate replay should not create extra records" + ); + assert_eq!( + duplicate_records[0]["seed_marker"], + Value::String("preseed-unchanged".to_string()), + "Existing record must not be overwritten by replay" + ); + assert_eq!( + duplicate_records[0]["payload"], + Value::String("preseeded".to_string()), + "Existing record payload must remain unchanged" + ); + assert_eq!( + fixture + .select_all_records(&surreal_client) + .await + .expect("Failed to select all records") + .len(), + 3 + ); +} diff --git a/scripts/bump-version.sh b/scripts/bump-version.sh index aa4b086518..d1e44e334a 100755 --- a/scripts/bump-version.sh +++ b/scripts/bump-version.sh @@ -87,7 +87,7 @@ EOF } RUST_COMPONENTS="rust-sdk rust-common rust-binary-protocol rust-server rust-cli rust-connector-sdk rust-mcp rust-bench rust-bench-dashboard-frontend rust-bench-dashboard-server rust-bench-report" -CONNECTOR_SINK_COMPONENTS="rust-connector-delta-sink rust-connector-elasticsearch-sink rust-connector-http-sink rust-connector-iceberg-sink rust-connector-influxdb-sink rust-connector-mongodb-sink rust-connector-postgres-sink rust-connector-quickwit-sink rust-connector-stdout-sink" +CONNECTOR_SINK_COMPONENTS="rust-connector-delta-sink rust-connector-elasticsearch-sink rust-connector-http-sink rust-connector-iceberg-sink rust-connector-influxdb-sink rust-connector-mongodb-sink rust-connector-postgres-sink rust-connector-quickwit-sink rust-connector-stdout-sink rust-connector-surrealdb-sink" CONNECTOR_SOURCE_COMPONENTS="rust-connector-elasticsearch-source rust-connector-influxdb-source rust-connector-postgres-source rust-connector-random-source" CONNECTOR_COMPONENTS="rust-connector-runtime ${CONNECTOR_SINK_COMPONENTS} ${CONNECTOR_SOURCE_COMPONENTS}" SDK_COMPONENTS="sdk-python sdk-node sdk-go sdk-csharp sdk-java"