;
+
+pub struct SurrealDbSinkFixture {
+ container: SurrealDbContainer,
+ profile: PhantomData
,
+}
+
+impl
SurrealDbOps for SurrealDbSinkFixture
+where
+ P: Sync,
+{
+ fn container(&self) -> &SurrealDbContainer {
+ &self.container
+ }
+}
+
+impl
SurrealDbSinkFixture
{
+ pub async fn wait_for_records(
+ &self,
+ client: &SurrealDbClient,
+ expected: usize,
+ ) -> Result, TestBinaryError> {
+ for _ in 0..DEFAULT_POLL_ATTEMPTS {
+ let records = self.select_all_records(client).await?;
+ if records.len() >= expected {
+ info!(
+ "Found {} records in SurrealDB table '{DEFAULT_TABLE}'",
+ records.len()
+ );
+ return Ok(records);
+ }
+ sleep(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)).await;
+ }
+
+ Err(TestBinaryError::InvalidState {
+ message: format!(
+ "Expected at least {expected} SurrealDB records after {DEFAULT_POLL_ATTEMPTS} attempts"
+ ),
+ })
+ }
+
+ pub async fn select_all_records(
+ &self,
+ client: &SurrealDbClient,
+ ) -> Result, TestBinaryError> {
+ let query = format!("SELECT * FROM {DEFAULT_TABLE};");
+ let value = client.query_result(&query).await?;
+ decode_records_sorted_by_offset(value)
+ }
+
+ pub async fn select_records_by_message_id(
+ &self,
+ client: &SurrealDbClient,
+ message_id: u128,
+ ) -> Result, TestBinaryError> {
+ let message_id = serde_json::to_string(&message_id.to_string()).map_err(|e| {
+ TestBinaryError::InvalidState {
+ message: format!("Failed to encode SurrealDB message id: {e}"),
+ }
+ })?;
+ let query = format!("SELECT * FROM {DEFAULT_TABLE} WHERE iggy_message_id = {message_id};");
+ let value = client.query_result(&query).await?;
+ decode_records_sorted_by_offset(value)
+ }
+
+ pub async fn insert_preseeded_record(
+ &self,
+ client: &SurrealDbClient,
+ record_id: &str,
+ message_id: u128,
+ ) -> Result<(), TestBinaryError> {
+ let records = serde_json::to_string(&json!([
+ {
+ "id": record_id,
+ "iggy_message_id": message_id.to_string(),
+ "seed_marker": "preseed-unchanged",
+ "payload": "preseeded"
+ }
+ ]))
+ .map_err(|e| TestBinaryError::InvalidState {
+ message: format!("Failed to encode SurrealDB preseed record: {e}"),
+ })?;
+ let query = format!("INSERT INTO {DEFAULT_TABLE} {records} RETURN NONE;");
+ client.query_result(&query).await.map(|_| ())
+ }
+}
+
+fn decode_records_sorted_by_offset(value: Value) -> Result, TestBinaryError> {
+ let mut records: Vec =
+ serde_json::from_value(value).map_err(|e| TestBinaryError::InvalidState {
+ message: format!("Failed to decode SurrealDB records: {e}"),
+ })?;
+ records.sort_by_key(|record| {
+ record
+ .get("iggy_offset")
+ .and_then(Value::as_str)
+ .and_then(|offset| offset.parse::().ok())
+ .unwrap_or(u64::MAX)
+ });
+
+ Ok(records)
+}
+
+#[async_trait]
+impl TestFixture for SurrealDbSinkFixture
+where
+ P: SurrealDbSinkProfile + Send + Sync,
+{
+ async fn setup() -> Result {
+ let container = SurrealDbContainer::start().await?;
+ Ok(Self {
+ container,
+ profile: PhantomData,
+ })
+ }
+
+ fn connectors_runtime_envs(&self) -> HashMap {
+ let mut envs = HashMap::new();
+ envs.insert(
+ ENV_SINK_ENDPOINT.to_string(),
+ self.container.endpoint.clone(),
+ );
+ envs.insert(
+ ENV_SINK_NAMESPACE.to_string(),
+ DEFAULT_NAMESPACE.to_string(),
+ );
+ envs.insert(ENV_SINK_DATABASE.to_string(), DEFAULT_DATABASE.to_string());
+ envs.insert(ENV_SINK_TABLE.to_string(), DEFAULT_TABLE.to_string());
+ envs.insert(ENV_SINK_USERNAME.to_string(), ROOT_USERNAME.to_string());
+ envs.insert(ENV_SINK_PASSWORD.to_string(), ROOT_PASSWORD.to_string());
+ envs.insert(ENV_SINK_AUTH_SCOPE.to_string(), "root".to_string());
+ envs.insert(ENV_SINK_AUTO_DEFINE_TABLE.to_string(), "true".to_string());
+ envs.insert(ENV_SINK_DEFINE_INDEXES.to_string(), "true".to_string());
+ envs.insert(ENV_SINK_PAYLOAD_FORMAT.to_string(), "auto".to_string());
+ envs.insert(
+ ENV_SINK_STREAMS_0_STREAM.to_string(),
+ DEFAULT_TEST_STREAM.to_string(),
+ );
+ envs.insert(
+ ENV_SINK_STREAMS_0_TOPICS.to_string(),
+ format!("[{}]", DEFAULT_TEST_TOPIC),
+ );
+ envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_string(), P::SCHEMA.to_string());
+ envs.insert(
+ ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_string(),
+ format!("surrealdb_sink_{}_cg", P::SCHEMA),
+ );
+ envs.insert(
+ ENV_SINK_PATH.to_string(),
+ "../../target/debug/libiggy_connector_surrealdb_sink".to_string(),
+ );
+
+ if let Some(batch_size) = P::BATCH_SIZE {
+ envs.insert(ENV_SINK_BATCH_SIZE.to_string(), batch_size.to_string());
+ }
+
+ envs
+ }
+}
diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs
index fc624f897f..8f39bbcc70 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -31,6 +31,7 @@ mod random;
mod random_source_liveness;
mod runtime;
mod stdout;
+mod surrealdb;
use iggy_common::IggyTimestamp;
use serde::{Deserialize, Serialize};
diff --git a/core/integration/tests/connectors/surrealdb/mod.rs b/core/integration/tests/connectors/surrealdb/mod.rs
new file mode 100644
index 0000000000..7bbb19039e
--- /dev/null
+++ b/core/integration/tests/connectors/surrealdb/mod.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod surrealdb_sink;
+
+const TEST_MESSAGE_COUNT: usize = 3;
+const LARGE_BATCH_COUNT: usize = 50;
+const POLL_ATTEMPTS: usize = 120;
+const POLL_INTERVAL_MS: u64 = 50;
diff --git a/core/integration/tests/connectors/surrealdb/sink.toml b/core/integration/tests/connectors/surrealdb/sink.toml
new file mode 100644
index 0000000000..4f980f8827
--- /dev/null
+++ b/core/integration/tests/connectors/surrealdb/sink.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "../connectors/sinks/surrealdb_sink"
diff --git a/core/integration/tests/connectors/surrealdb/surrealdb_sink.rs b/core/integration/tests/connectors/surrealdb/surrealdb_sink.rs
new file mode 100644
index 0000000000..9205760c49
--- /dev/null
+++ b/core/integration/tests/connectors/surrealdb/surrealdb_sink.rs
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{LARGE_BATCH_COUNT, POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT};
+use crate::connectors::fixtures::{
+ SurrealDbOps, SurrealDbSinkBatchFixture, SurrealDbSinkFixture, SurrealDbSinkJsonFixture,
+ SurrealDbSinkRawFixture,
+};
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage, Partitioning};
+use iggy_common::Identifier;
+use iggy_common::MessageClient;
+use integration::harness::seeds;
+use integration::iggy_harness;
+use serde_json::Value;
+use std::time::Duration;
+use tokio::time::sleep;
+
+fn build_expected_record_id(message_id: u128, offset: u64) -> String {
+ let mut id = String::new();
+ id.push('s');
+ push_hex_component(&mut id, seeds::names::STREAM.as_bytes());
+ id.push_str("_t");
+ push_hex_component(&mut id, seeds::names::TOPIC.as_bytes());
+ id.push_str("_p0_o");
+ id.push_str(&offset.to_string());
+ id.push_str("_m");
+ id.push_str(&format!("{message_id:032x}"));
+ id
+}
+
+fn push_hex_component(out: &mut String, bytes: &[u8]) {
+ const HEX: &[u8; 16] = b"0123456789abcdef";
+
+ for byte in bytes {
+ out.push(HEX[(byte >> 4) as usize] as char);
+ out.push(HEX[(byte & 0x0f) as usize] as char);
+ }
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path = "tests/connectors/surrealdb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn json_messages_sink_to_surrealdb(harness: &TestHarness, fixture: SurrealDbSinkJsonFixture) {
+ let client = harness.root_client().await.unwrap();
+ let surreal_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create SurrealDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+ let payloads = [
+ serde_json::json!({"name": "Alice", "score": 10}),
+ serde_json::json!({"name": "Bob", "score": 20}),
+ serde_json::json!({"name": "Carol", "score": 30}),
+ ];
+ let mut messages: Vec = payloads
+ .iter()
+ .enumerate()
+ .map(|(idx, payload)| {
+ IggyMessage::builder()
+ .id((idx + 1) as u128)
+ .payload(Bytes::from(
+ serde_json::to_vec(payload).expect("Failed to serialize payload"),
+ ))
+ .build()
+ .expect("Failed to build message")
+ })
+ .collect();
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send messages");
+
+ let records = fixture
+ .wait_for_records(&surreal_client, TEST_MESSAGE_COUNT)
+ .await
+ .expect("Records did not appear in SurrealDB");
+
+ assert_eq!(records.len(), TEST_MESSAGE_COUNT);
+ for (idx, record) in records.iter().enumerate() {
+ assert_eq!(
+ record["iggy_message_id"],
+ Value::String((idx + 1).to_string())
+ );
+ assert_eq!(
+ record["iggy_stream"],
+ Value::String(seeds::names::STREAM.to_string())
+ );
+ assert_eq!(
+ record["iggy_topic"],
+ Value::String(seeds::names::TOPIC.to_string())
+ );
+ assert_eq!(record["iggy_partition_id"], Value::String("0".to_string()));
+ assert_eq!(record["iggy_offset"], Value::String(idx.to_string()));
+ assert_eq!(
+ record["payload_encoding"],
+ Value::String("json".to_string())
+ );
+ assert_eq!(record["payload"], payloads[idx]);
+ assert_eq!(
+ record["id"].as_str().expect("record id should be string"),
+ format!(
+ "iggy_messages:{}",
+ build_expected_record_id((idx + 1) as u128, idx as u64)
+ )
+ );
+ }
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path = "tests/connectors/surrealdb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn raw_messages_sink_as_base64(harness: &TestHarness, fixture: SurrealDbSinkRawFixture) {
+ let client = harness.root_client().await.unwrap();
+ let surreal_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create SurrealDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+ let payloads: Vec> = vec![
+ b"plain text".to_vec(),
+ vec![0x00, 0x01, 0x02, 0xff],
+ vec![0xde, 0xad, 0xbe, 0xef],
+ ];
+ let mut messages: Vec = payloads
+ .iter()
+ .enumerate()
+ .map(|(idx, payload)| {
+ IggyMessage::builder()
+ .id((idx + 1) as u128)
+ .payload(Bytes::from(payload.clone()))
+ .build()
+ .expect("Failed to build message")
+ })
+ .collect();
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send messages");
+
+ let records = fixture
+ .wait_for_records(&surreal_client, payloads.len())
+ .await
+ .expect("Records did not appear in SurrealDB");
+
+ assert_eq!(records.len(), payloads.len());
+ let expected_payloads = ["cGxhaW4gdGV4dA==", "AAEC/w==", "3q2+7w=="];
+ for (idx, record) in records.iter().enumerate() {
+ assert_eq!(
+ record["payload_encoding"],
+ Value::String("base64".to_string())
+ );
+ assert_eq!(
+ record["payload"],
+ Value::String(expected_payloads[idx].to_string())
+ );
+ }
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path = "tests/connectors/surrealdb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn large_batch_processed_in_chunks(
+ harness: &TestHarness,
+ fixture: SurrealDbSinkBatchFixture,
+) {
+ let client = harness.root_client().await.unwrap();
+ let surreal_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create SurrealDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+ let mut messages: Vec = (0..LARGE_BATCH_COUNT)
+ .map(|idx| {
+ IggyMessage::builder()
+ .id((idx + 1) as u128)
+ .payload(Bytes::from(
+ serde_json::to_vec(&serde_json::json!({"idx": idx}))
+ .expect("Failed to serialize payload"),
+ ))
+ .build()
+ .expect("Failed to build message")
+ })
+ .collect();
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send messages");
+
+ let records = fixture
+ .wait_for_records(&surreal_client, LARGE_BATCH_COUNT)
+ .await
+ .expect("Records did not appear in SurrealDB");
+
+ assert_eq!(records.len(), LARGE_BATCH_COUNT);
+ for (idx, record) in records.iter().enumerate() {
+ assert_eq!(record["iggy_offset"], Value::String(idx.to_string()));
+ assert_eq!(record["payload"], serde_json::json!({"idx": idx}));
+ }
+}
+
+#[iggy_harness(
+ server(connectors_runtime(config_path = "tests/connectors/surrealdb/sink.toml")),
+ seed = seeds::connector_stream
+)]
+async fn duplicate_record_id_is_idempotent_replay_not_overwrite(
+ harness: &TestHarness,
+ fixture: SurrealDbSinkFixture,
+) {
+ let client = harness.root_client().await.unwrap();
+ let surreal_client = fixture
+ .create_client()
+ .await
+ .expect("Failed to create SurrealDB client");
+
+ let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
+ let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
+
+ fixture
+ .insert_preseeded_record(&surreal_client, &build_expected_record_id(2, 1), 2)
+ .await
+ .expect("Failed to preseed duplicate record");
+
+ let mut messages: Vec = vec![
+ IggyMessage::builder()
+ .id(1)
+ .payload(Bytes::from_static(br#"{"message":"one"}"#))
+ .build()
+ .expect("Failed to build message 1"),
+ IggyMessage::builder()
+ .id(2)
+ .payload(Bytes::from_static(br#"{"message":"two"}"#))
+ .build()
+ .expect("Failed to build message 2"),
+ IggyMessage::builder()
+ .id(3)
+ .payload(Bytes::from_static(br#"{"message":"three"}"#))
+ .build()
+ .expect("Failed to build message 3"),
+ ];
+
+ client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .expect("Failed to send duplicate batch");
+
+ let mut id1_inserted = false;
+ let mut id3_inserted = false;
+
+ for _ in 0..POLL_ATTEMPTS {
+ id1_inserted = !fixture
+ .select_records_by_message_id(&surreal_client, 1)
+ .await
+ .expect("Failed to query id 1")
+ .is_empty();
+ id3_inserted = !fixture
+ .select_records_by_message_id(&surreal_client, 3)
+ .await
+ .expect("Failed to query id 3")
+ .is_empty();
+
+ if id1_inserted && id3_inserted {
+ break;
+ }
+
+ sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+ }
+
+ assert!(
+ id1_inserted,
+ "Expected first non-duplicate record to be inserted"
+ );
+ assert!(
+ id3_inserted,
+ "Expected suffix record after duplicate to be inserted"
+ );
+
+ let duplicate_records = fixture
+ .select_records_by_message_id(&surreal_client, 2)
+ .await
+ .expect("Failed to query duplicate record");
+ assert_eq!(
+ duplicate_records.len(),
+ 1,
+ "Duplicate replay should not create extra records"
+ );
+ assert_eq!(
+ duplicate_records[0]["seed_marker"],
+ Value::String("preseed-unchanged".to_string()),
+ "Existing record must not be overwritten by replay"
+ );
+ assert_eq!(
+ duplicate_records[0]["payload"],
+ Value::String("preseeded".to_string()),
+ "Existing record payload must remain unchanged"
+ );
+ assert_eq!(
+ fixture
+ .select_all_records(&surreal_client)
+ .await
+ .expect("Failed to select all records")
+ .len(),
+ 3
+ );
+}
diff --git a/scripts/bump-version.sh b/scripts/bump-version.sh
index aa4b086518..d1e44e334a 100755
--- a/scripts/bump-version.sh
+++ b/scripts/bump-version.sh
@@ -87,7 +87,7 @@ EOF
}
RUST_COMPONENTS="rust-sdk rust-common rust-binary-protocol rust-server rust-cli rust-connector-sdk rust-mcp rust-bench rust-bench-dashboard-frontend rust-bench-dashboard-server rust-bench-report"
-CONNECTOR_SINK_COMPONENTS="rust-connector-delta-sink rust-connector-elasticsearch-sink rust-connector-http-sink rust-connector-iceberg-sink rust-connector-influxdb-sink rust-connector-mongodb-sink rust-connector-postgres-sink rust-connector-quickwit-sink rust-connector-stdout-sink"
+CONNECTOR_SINK_COMPONENTS="rust-connector-delta-sink rust-connector-elasticsearch-sink rust-connector-http-sink rust-connector-iceberg-sink rust-connector-influxdb-sink rust-connector-mongodb-sink rust-connector-postgres-sink rust-connector-quickwit-sink rust-connector-stdout-sink rust-connector-surrealdb-sink"
CONNECTOR_SOURCE_COMPONENTS="rust-connector-elasticsearch-source rust-connector-influxdb-source rust-connector-postgres-source rust-connector-random-source"
CONNECTOR_COMPONENTS="rust-connector-runtime ${CONNECTOR_SINK_COMPONENTS} ${CONNECTOR_SOURCE_COMPONENTS}"
SDK_COMPONENTS="sdk-python sdk-node sdk-go sdk-csharp sdk-java"