From 48c3a9db6e6fd90c019f48feec060a1434301ac3 Mon Sep 17 00:00:00 2001
From: Diaconu Radu-Mihai <52667211+countradooku@users.noreply.github.com>
Date: Thu, 11 Jun 2026 11:47:30 +0300
Subject: [PATCH 01/15] feat(connectors): add SurrealDB sink connector
SurrealDB is a document database target for Iggy connector users, so the sink writes batches with deterministic record ids and bulk INSERT IGNORE to keep runtime redelivery idempotent without per-message round trips.
Constraint: User explicitly requested the latest SurrealDB Rust SDK and chose to keep it despite BUSL-1.1 license-validation warnings for SurrealDB crates.
Constraint: Local Docker daemon was unavailable, so real-container integration execution could not run here.
Rejected: Per-message SDK writes | too many round trips and weaker batching throughput.
Rejected: Using the testcontainers SurrealDB module | module source hardcodes an older SurrealDB image.
Confidence: medium
Scope-risk: moderate
Directive: Keep record ids deterministic across releases; changing build_record_id breaks replay idempotency.
Tested: cargo fmt --all; cargo sort --no-format --workspace; cargo clippy --all-features --all-targets -- -D warnings; cargo check --all --all-features; cargo test -p iggy_connector_surrealdb_sink; cargo test -p integration --no-run connectors::surrealdb; cargo test --locked --doc; cargo doc --no-deps --all-features --quiet; taplo/license/shellcheck/version/diff/binary checks; prek install
Not-tested: Docker-backed SurrealDB integration execution, because Docker daemon was not running locally.
---
.github/workflows/_build_rust_artifacts.yml | 2 +-
.github/workflows/edge-release.yml | 1 +
Cargo.lock | 1362 ++++++++++++++++-
Cargo.toml | 2 +
core/connectors/README.md | 1 +
.../connectors/surrealdb_sink.toml | 57 +
core/connectors/sinks/README.md | 1 +
.../sinks/surrealdb_sink/Cargo.toml | 49 +
.../connectors/sinks/surrealdb_sink/README.md | 91 ++
.../sinks/surrealdb_sink/config.toml | 57 +
.../sinks/surrealdb_sink/src/lib.rs | 1094 +++++++++++++
core/integration/Cargo.toml | 1 +
.../tests/connectors/fixtures/mod.rs | 5 +
.../fixtures/surrealdb/container.rs | 189 +++
.../connectors/fixtures/surrealdb/mod.rs | 27 +
.../connectors/fixtures/surrealdb/sink.rs | 293 ++++
core/integration/tests/connectors/mod.rs | 1 +
.../tests/connectors/surrealdb/mod.rs | 25 +
.../tests/connectors/surrealdb/sink.toml | 20 +
.../connectors/surrealdb/surrealdb_sink.rs | 354 +++++
scripts/bump-version.sh | 2 +-
21 files changed, 3589 insertions(+), 45 deletions(-)
create mode 100644 core/connectors/runtime/example_config/connectors/surrealdb_sink.toml
create mode 100644 core/connectors/sinks/surrealdb_sink/Cargo.toml
create mode 100644 core/connectors/sinks/surrealdb_sink/README.md
create mode 100644 core/connectors/sinks/surrealdb_sink/config.toml
create mode 100644 core/connectors/sinks/surrealdb_sink/src/lib.rs
create mode 100644 core/integration/tests/connectors/fixtures/surrealdb/container.rs
create mode 100644 core/integration/tests/connectors/fixtures/surrealdb/mod.rs
create mode 100644 core/integration/tests/connectors/fixtures/surrealdb/sink.rs
create mode 100644 core/integration/tests/connectors/surrealdb/mod.rs
create mode 100644 core/integration/tests/connectors/surrealdb/sink.toml
create mode 100644 core/integration/tests/connectors/surrealdb/surrealdb_sink.rs
diff --git a/.github/workflows/_build_rust_artifacts.yml b/.github/workflows/_build_rust_artifacts.yml
index 4902b29b5c..af80e6f4d9 100644
--- a/.github/workflows/_build_rust_artifacts.yml
+++ b/.github/workflows/_build_rust_artifacts.yml
@@ -46,7 +46,7 @@ on:
connector_plugins:
type: string
required: false
- default: "iggy_connector_elasticsearch_sink,iggy_connector_elasticsearch_source,iggy_connector_iceberg_sink,iggy_connector_postgres_sink,iggy_connector_postgres_source,iggy_connector_quickwit_sink,iggy_connector_random_source,iggy_connector_stdout_sink"
+ default: "iggy_connector_elasticsearch_sink,iggy_connector_elasticsearch_source,iggy_connector_iceberg_sink,iggy_connector_postgres_sink,iggy_connector_postgres_source,iggy_connector_quickwit_sink,iggy_connector_random_source,iggy_connector_stdout_sink,iggy_connector_surrealdb_sink"
description: "Comma-separated list of connector plugin crates to build as shared libraries"
outputs:
artifact_name:
diff --git a/.github/workflows/edge-release.yml b/.github/workflows/edge-release.yml
index 8c92c61a01..925a87718f 100644
--- a/.github/workflows/edge-release.yml
+++ b/.github/workflows/edge-release.yml
@@ -109,6 +109,7 @@ jobs:
- `iggy_connector_quickwit_sink`
- `iggy_connector_random_source`
- `iggy_connector_stdout_sink`
+ - `iggy_connector_surrealdb_sink`
## Downloads
diff --git a/Cargo.lock b/Cargo.lock
index 6d8e8c841c..01ab3d5f70 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -223,6 +223,15 @@ dependencies = [
"syn 2.0.117",
]
+[[package]]
+name = "addr"
+version = "0.15.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a93b8a41dbe230ad5087cc721f8d41611de654542180586b315d9f4cf6b72bef"
+dependencies = [
+ "psl-types",
+]
+
[[package]]
name = "adler2"
version = "2.0.1"
@@ -236,7 +245,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common 0.1.7",
- "generic-array",
+ "generic-array 0.14.7",
]
[[package]]
@@ -305,7 +314,7 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee4508988c62edf04abd8d92897fca0c2995d907ce1dfeaf369dac3716a40685"
dependencies = [
- "as-slice",
+ "as-slice 0.2.1",
]
[[package]]
@@ -338,6 +347,19 @@ version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
+[[package]]
+name = "ammonia"
+version = "4.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "17e913097e1a2124b46746c980134e8c954bc17a6a59bb3fde96f088d126dde6"
+dependencies = [
+ "cssparser",
+ "html5ever",
+ "maplit",
+ "tendril",
+ "url",
+]
+
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -439,6 +461,15 @@ dependencies = [
"security-framework",
]
+[[package]]
+name = "approx"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6"
+dependencies = [
+ "num-traits",
+]
+
[[package]]
name = "ar_archive_writer"
version = "0.5.1"
@@ -898,6 +929,18 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0f477b951e452a0b6b4a10b53ccd569042d1d01729b519e02074a9c0958a063"
+[[package]]
+name = "as-slice"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "45403b49e3954a4b8428a0ac21a4b7afadccf92bfd96273f1a58cd4812496ae0"
+dependencies = [
+ "generic-array 0.12.4",
+ "generic-array 0.13.3",
+ "generic-array 0.14.7",
+ "stable_deref_trait",
+]
+
[[package]]
name = "as-slice"
version = "0.2.1"
@@ -1216,7 +1259,7 @@ dependencies = [
"futures-util",
"log",
"pin-project-lite",
- "tungstenite",
+ "tungstenite 0.29.0",
]
[[package]]
@@ -1375,6 +1418,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00"
dependencies = [
"aws-lc-sys",
+ "untrusted 0.7.1",
"zeroize",
]
@@ -1867,6 +1911,19 @@ version = "1.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06"
+[[package]]
+name = "bcrypt"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a0f5948f30df5f43ac29d310b7476793be97c50787e6ef4a63d960a0d0be827"
+dependencies = [
+ "base64",
+ "blowfish",
+ "getrandom 0.3.4",
+ "subtle",
+ "zeroize",
+]
+
[[package]]
name = "bdd"
version = "0.0.1"
@@ -2099,7 +2156,7 @@ version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
- "generic-array",
+ "generic-array 0.14.7",
]
[[package]]
@@ -2117,7 +2174,7 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
dependencies = [
- "generic-array",
+ "generic-array 0.14.7",
]
[[package]]
@@ -2142,6 +2199,16 @@ dependencies = [
"piper",
]
+[[package]]
+name = "blowfish"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e412e2cd0f2b2d93e02543ceae7917b3c70331573df19ee046bcbc35e45e87d7"
+dependencies = [
+ "byteorder",
+ "cipher",
+]
+
[[package]]
name = "bnum"
version = "0.12.1"
@@ -2275,6 +2342,12 @@ dependencies = [
"syn 2.0.117",
]
+[[package]]
+name = "boxcar"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "36f64beae40a84da1b4b26ff2761a5b895c12adc41dc25aaee1c4f2bbfe97a6e"
+
[[package]]
name = "brotli"
version = "8.0.3"
@@ -2472,6 +2545,9 @@ name = "bytes"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
+dependencies = [
+ "serde",
+]
[[package]]
name = "bytes-utils"
@@ -2554,6 +2630,15 @@ dependencies = [
"thiserror 2.0.18",
]
+[[package]]
+name = "castaway"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a"
+dependencies = [
+ "rustversion",
+]
+
[[package]]
name = "cbc"
version = "0.1.2"
@@ -2666,7 +2751,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3"
dependencies = [
"chrono",
- "phf",
+ "phf 0.12.1",
+]
+
+[[package]]
+name = "ciborium"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e"
+dependencies = [
+ "ciborium-io",
+ "ciborium-ll",
+ "serde",
+]
+
+[[package]]
+name = "ciborium-io"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757"
+
+[[package]]
+name = "ciborium-ll"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9"
+dependencies = [
+ "ciborium-io",
+ "half",
]
[[package]]
@@ -2728,7 +2840,7 @@ version = "4.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9"
dependencies = [
- "heck",
+ "heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.117",
@@ -3046,7 +3158,7 @@ dependencies = [
"futures-util",
"pin-project-lite",
"rustls-platform-verifier 0.7.0",
- "tungstenite",
+ "tungstenite 0.29.0",
]
[[package]]
@@ -3096,7 +3208,7 @@ dependencies = [
"static-toml",
"strum 0.28.0",
"tracing",
- "tungstenite",
+ "tungstenite 0.29.0",
]
[[package]]
@@ -3415,7 +3527,7 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76"
dependencies = [
- "generic-array",
+ "generic-array 0.14.7",
"rand_core 0.6.4",
"subtle",
"zeroize",
@@ -3427,7 +3539,7 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a"
dependencies = [
- "generic-array",
+ "generic-array 0.14.7",
"rand_core 0.6.4",
"typenum",
]
@@ -3441,6 +3553,29 @@ dependencies = [
"hybrid-array",
]
+[[package]]
+name = "cssparser"
+version = "0.35.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4e901edd733a1472f944a45116df3f846f54d37e67e68640ac8bb69689aca2aa"
+dependencies = [
+ "cssparser-macros",
+ "dtoa-short",
+ "itoa",
+ "phf 0.11.3",
+ "smallvec",
+]
+
+[[package]]
+name = "cssparser-macros"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "13b588ba4ac1a99f7f2964d24b3d896ddc6bf847ee3855dbd4366f058cfcd331"
+dependencies = [
+ "quote",
+ "syn 2.0.117",
+]
+
[[package]]
name = "csv"
version = "1.4.0"
@@ -4189,6 +4324,12 @@ dependencies = [
"unicode-xid",
]
+[[package]]
+name = "deunicode"
+version = "1.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "abd57806937c9cc163efc8ea3910e00a62e2aeb0b8119f1793a978088f8f6b04"
+
[[package]]
name = "difflib"
version = "0.4.0"
@@ -4251,6 +4392,64 @@ dependencies = [
"windows-sys 0.61.2",
]
+[[package]]
+name = "diskann"
+version = "0.53.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "376186e025eb294c22f06236b23417608f1867def159c3a61a5c57788a3e889e"
+dependencies = [
+ "anyhow",
+ "bytemuck",
+ "diskann-utils",
+ "diskann-vector",
+ "diskann-wide",
+ "futures-util",
+ "half",
+ "hashbrown 0.16.1",
+ "num-traits",
+ "rand 0.9.4",
+ "thiserror 2.0.18",
+ "tokio",
+]
+
+[[package]]
+name = "diskann-utils"
+version = "0.53.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b70289db1b66826fa1ef2b4113bf2f9d0dedc8df983b2b804c38dc1e519e15e"
+dependencies = [
+ "bytemuck",
+ "cfg-if",
+ "diskann-vector",
+ "diskann-wide",
+ "half",
+ "rand 0.9.4",
+ "rand_distr",
+ "rayon",
+ "thiserror 2.0.18",
+]
+
+[[package]]
+name = "diskann-vector"
+version = "0.53.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f62c9d81aad6e3df6a026b1bb693dbbcfbee5ea93d9e7a5ff15c31576263bc29"
+dependencies = [
+ "cfg-if",
+ "diskann-wide",
+ "half",
+]
+
+[[package]]
+name = "diskann-wide"
+version = "0.53.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "46fcacef8ea9274969f98499456718f3dcaa5d3d7392b3171079653370fa0b20"
+dependencies = [
+ "cfg-if",
+ "half",
+]
+
[[package]]
name = "dispatch2"
version = "0.3.1"
@@ -4312,6 +4511,16 @@ dependencies = [
"const-random",
]
+[[package]]
+name = "dmp"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bb2dfc7a18dffd3ef60a442b72a827126f1557d914620f8fc4d1049916da43c1"
+dependencies = [
+ "trice",
+ "urlencoding",
+]
+
[[package]]
name = "docker_credential"
version = "1.4.0"
@@ -4341,6 +4550,15 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c3cf4824e2d5f025c7b531afcb2325364084a16806f6d47fbc1f5fbd9960590"
+[[package]]
+name = "dtoa-short"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd1511a7b6a56299bd043a9c167a6d2bfb37bf84a6dfceaba651168adfb43c87"
+dependencies = [
+ "dtoa",
+]
+
[[package]]
name = "dtor"
version = "0.8.1"
@@ -4377,6 +4595,16 @@ version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
+[[package]]
+name = "earcutr"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "79127ed59a85d7687c409e9978547cffb7dc79675355ed22da6b66fd5f6ead01"
+dependencies = [
+ "itertools 0.11.0",
+ "num-traits",
+]
+
[[package]]
name = "ecdsa"
version = "0.16.9"
@@ -4457,7 +4685,7 @@ dependencies = [
"crypto-bigint",
"digest 0.10.7",
"ff",
- "generic-array",
+ "generic-array 0.14.7",
"group",
"hkdf 0.12.4",
"pem-rfc7468",
@@ -4507,6 +4735,12 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66b7e2430c6dff6a955451e2cfc438f09cea1965a9d6f87f7e3b90decc014099"
+[[package]]
+name = "endian-type"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "869b0adbda23651a9c5c0c3d270aac9fcb52e8622a8f2b17e57802d7791962f2"
+
[[package]]
name = "enumflags2"
version = "0.7.12"
@@ -4851,6 +5085,7 @@ checksum = "35f6839d7b3b98adde531effaf34f0c2badc6f4735d26fe74709d8e513a96ef3"
dependencies = [
"bitflags 2.11.1",
"rustc_version",
+ "serde",
]
[[package]]
@@ -4879,6 +5114,12 @@ dependencies = [
"num-traits",
]
+[[package]]
+name = "float_next_after"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
+
[[package]]
name = "flume"
version = "0.12.0"
@@ -4995,12 +5236,28 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "fst"
+version = "0.4.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ab85b9b05e3978cc9a9cf8fea7f01b494e1a09ed3037e16ba39edc7a29eb61a"
+
[[package]]
name = "funty"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
+[[package]]
+name = "futf"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df420e2e84819663797d1ec6544b13c5be84629e7bb00dc960d6917db2987843"
+dependencies = [
+ "mac",
+ "new_debug_unreachable",
+]
+
[[package]]
name = "futures"
version = "0.3.32"
@@ -5130,6 +5387,15 @@ dependencies = [
"slab",
]
+[[package]]
+name = "fuzzy-matcher"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54614a3312934d066701a80f20f15fa3b56d67ac7722b39eea5b4c9dd1d66c94"
+dependencies = [
+ "thread_local",
+]
+
[[package]]
name = "generator"
version = "0.8.8"
@@ -5145,6 +5411,24 @@ dependencies = [
"windows-result 0.4.1",
]
+[[package]]
+name = "generic-array"
+version = "0.12.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd"
+dependencies = [
+ "typenum",
+]
+
+[[package]]
+name = "generic-array"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f797e67af32588215eaaab8327027ee8e71b9dd0b2b26996aedf20c030fce309"
+dependencies = [
+ "typenum",
+]
+
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -5156,6 +5440,53 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "geo"
+version = "0.32.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f3901269ec6d4f6068d3f09e5f02f995bd076398dcd1dfec407cd230b02d11b"
+dependencies = [
+ "earcutr",
+ "float_next_after",
+ "geo-types",
+ "geographiclib-rs",
+ "i_overlay",
+ "log",
+ "num-traits",
+ "rand 0.8.6",
+ "robust",
+ "rstar 0.12.2",
+ "serde",
+ "sif-itree",
+ "spade",
+]
+
+[[package]]
+name = "geo-types"
+version = "0.7.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94776032c45f950d30a13af6113c2ad5625316c9abfbccee4dd5a6695f8fe0f5"
+dependencies = [
+ "approx",
+ "num-traits",
+ "rayon",
+ "rstar 0.10.0",
+ "rstar 0.11.0",
+ "rstar 0.12.2",
+ "rstar 0.8.4",
+ "rstar 0.9.3",
+ "serde",
+]
+
+[[package]]
+name = "geographiclib-rs"
+version = "0.2.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c5a7f08910fd98737a6eda7568e7c5e645093e073328eeef49758cfe8b0489c7"
+dependencies = [
+ "libm",
+]
+
[[package]]
name = "getrandom"
version = "0.2.17"
@@ -5215,7 +5546,7 @@ version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e2c0d8c632f8a251ce9a8198079b1022adc586ff4e3d33e18debd40eb463b31"
dependencies = [
- "heck",
+ "heck 0.5.0",
"peg",
"quote",
"serde",
@@ -5753,9 +6084,12 @@ version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b"
dependencies = [
+ "bytemuck",
"cfg-if",
"crunchy",
"num-traits",
+ "rand 0.9.4",
+ "rand_distr",
"zerocopy",
]
@@ -5794,6 +6128,15 @@ dependencies = [
"syn 2.0.117",
]
+[[package]]
+name = "hash32"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d4041af86e63ac4298ce40e5cca669066e75b6f1aa3390fe2561ffa5e1d9f4cc"
+dependencies = [
+ "byteorder",
+]
+
[[package]]
name = "hash32"
version = "0.2.1"
@@ -5803,6 +6146,15 @@ dependencies = [
"byteorder",
]
+[[package]]
+name = "hash32"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606"
+dependencies = [
+ "byteorder",
+]
+
[[package]]
name = "hash32"
version = "1.0.0"
@@ -5860,22 +6212,74 @@ dependencies = [
]
[[package]]
-name = "heapless"
-version = "0.7.17"
+name = "headers"
+version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f"
+checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb"
dependencies = [
- "atomic-polyfill",
- "hash32 0.2.1",
- "rustc_version",
- "serde",
- "spin",
- "stable_deref_trait",
+ "base64",
+ "bytes",
+ "headers-core",
+ "http 1.4.1",
+ "httpdate",
+ "mime",
+ "sha1 0.10.6",
]
[[package]]
-name = "heck"
-version = "0.5.0"
+name = "headers-core"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
+dependencies = [
+ "http 1.4.1",
+]
+
+[[package]]
+name = "heapless"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "634bd4d29cbf24424d0a4bfcbf80c6960129dc24424752a7d1d1390607023422"
+dependencies = [
+ "as-slice 0.1.5",
+ "generic-array 0.14.7",
+ "hash32 0.1.1",
+ "stable_deref_trait",
+]
+
+[[package]]
+name = "heapless"
+version = "0.7.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f"
+dependencies = [
+ "atomic-polyfill",
+ "hash32 0.2.1",
+ "rustc_version",
+ "serde",
+ "spin",
+ "stable_deref_trait",
+]
+
+[[package]]
+name = "heapless"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad"
+dependencies = [
+ "hash32 0.3.1",
+ "stable_deref_trait",
+]
+
+[[package]]
+name = "heck"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
+
+[[package]]
+name = "heck"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
@@ -6017,6 +6421,17 @@ dependencies = [
"windows-link 0.2.1",
]
+[[package]]
+name = "html5ever"
+version = "0.35.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "55d958c2f74b664487a2035fe1dadb032c48718a03b63f3ab0b8537db8549ed4"
+dependencies = [
+ "log",
+ "markup5ever",
+ "match_token",
+]
+
[[package]]
name = "http"
version = "0.2.12"
@@ -6253,6 +6668,49 @@ dependencies = [
"tower-service",
]
+[[package]]
+name = "i_float"
+version = "1.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "010025c2c532c8d82e42d0b8bb5184afa449fa6f06c709ea9adcb16c49ae405b"
+dependencies = [
+ "libm",
+]
+
+[[package]]
+name = "i_key_sort"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9190f86706ca38ac8add223b2aed8b1330002b5cdbbce28fb58b10914d38fc27"
+
+[[package]]
+name = "i_overlay"
+version = "4.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "413183068e6e0289e18d7d0a1f661b81546e6918d5453a44570b9ab30cbed1b3"
+dependencies = [
+ "i_float",
+ "i_key_sort",
+ "i_shape",
+ "i_tree",
+ "rayon",
+]
+
+[[package]]
+name = "i_shape"
+version = "1.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ea154b742f7d43dae2897fcd5ead86bc7b5eefcedd305a7ebf9f69d44d61082"
+dependencies = [
+ "i_float",
+]
+
+[[package]]
+name = "i_tree"
+version = "0.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "35e6d558e6d4c7b82bc51d9c771e7a927862a161a7d87bf2b0541450e0e20915"
+
[[package]]
name = "iana-time-zone"
version = "0.1.65"
@@ -6519,7 +6977,7 @@ dependencies = [
"serde",
"tokio",
"tokio-rustls",
- "tokio-tungstenite",
+ "tokio-tungstenite 0.29.0",
"tracing",
"trait-variant",
"webpki-roots 1.0.7",
@@ -6743,7 +7201,7 @@ dependencies = [
"thiserror 2.0.18",
"tokio",
"tracing",
- "tungstenite",
+ "tungstenite 0.29.0",
"twox-hash",
"ulid",
"url",
@@ -7058,6 +7516,24 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "iggy_connector_surrealdb_sink"
+version = "0.4.1-edge.1"
+dependencies = [
+ "async-trait",
+ "base64",
+ "iggy_common",
+ "iggy_connector_sdk",
+ "secrecy",
+ "serde",
+ "serde_json",
+ "simd-json",
+ "surrealdb",
+ "tokio",
+ "toml 1.1.2+spec-1.1.0",
+ "tracing",
+]
+
[[package]]
name = "iggy_examples"
version = "0.0.6"
@@ -7229,7 +7705,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01"
dependencies = [
"block-padding",
- "generic-array",
+ "generic-array 0.14.7",
]
[[package]]
@@ -7284,6 +7760,7 @@ dependencies = [
"server",
"socket2 0.6.3",
"sqlx",
+ "surrealdb",
"sysinfo 0.39.2",
"tempfile",
"test-case",
@@ -7360,6 +7837,15 @@ version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
+[[package]]
+name = "itertools"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
+dependencies = [
+ "either",
+]
+
[[package]]
name = "itertools"
version = "0.13.0"
@@ -7538,6 +8024,7 @@ version = "10.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba32bfb4ffdeaca3e34431072faf01745c9b26d25504aa7a6cf5684334fc4fc"
dependencies = [
+ "aws-lc-rs",
"base64",
"ed25519-dalek",
"getrandom 0.2.17",
@@ -7736,6 +8223,15 @@ dependencies = [
"lexical-util",
]
+[[package]]
+name = "lexicmp"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0e8f89da8fd95c4eb6274e914694bea90c7826523b26f2a2fd863d44b9d42c43"
+dependencies = [
+ "deunicode",
+]
+
[[package]]
name = "libbz2-rs-sys"
version = "0.2.5"
@@ -8041,6 +8537,12 @@ dependencies = [
"twox-hash",
]
+[[package]]
+name = "mac"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
+
[[package]]
name = "macro_magic"
version = "0.5.1"
@@ -8105,6 +8607,34 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58093314a45e00c77d5c508f76e77c3396afbbc0d01506e7fae47b018bac2b1d"
+[[package]]
+name = "maplit"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
+
+[[package]]
+name = "markup5ever"
+version = "0.35.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "311fe69c934650f8f19652b3946075f0fc41ad8757dbb68f1ca14e7900ecc1c3"
+dependencies = [
+ "log",
+ "tendril",
+ "web_atoms",
+]
+
+[[package]]
+name = "match_token"
+version = "0.35.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac84fd3f360fcc43dc5f5d186f02a94192761a080e8bc58621ad4d12296a58cf"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
+]
+
[[package]]
name = "matchers"
version = "0.2.0"
@@ -8120,6 +8650,16 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
+[[package]]
+name = "matrixmultiply"
+version = "0.3.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a06de3016e9fae57a36fd14dba131fccf49f74b40b7fbdb472f96e361ec71a08"
+dependencies = [
+ "autocfg",
+ "rawpointer",
+]
+
[[package]]
name = "maybe-rayon"
version = "0.1.1"
@@ -8446,6 +8986,36 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b"
+[[package]]
+name = "ndarray"
+version = "0.17.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "520080814a7a6b4a6e9070823bb24b4531daac8c4627e08ba5de8c5ef2f2752d"
+dependencies = [
+ "matrixmultiply",
+ "num-complex",
+ "num-integer",
+ "num-traits",
+ "portable-atomic",
+ "portable-atomic-util",
+ "rawpointer",
+]
+
+[[package]]
+name = "ndarray-stats"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b6e54a8b65764f71827a90ca1d56965ec0c67f069f996477bd493402a901d1f"
+dependencies = [
+ "indexmap 2.14.0",
+ "itertools 0.13.0",
+ "ndarray",
+ "noisy_float",
+ "num-integer",
+ "num-traits",
+ "rand 0.8.6",
+]
+
[[package]]
name = "ndk-context"
version = "0.1.1"
@@ -8464,6 +9034,15 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086"
+[[package]]
+name = "nibble_vec"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43"
+dependencies = [
+ "smallvec",
+]
+
[[package]]
name = "nix"
version = "0.31.3"
@@ -8485,6 +9064,15 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "noisy_float"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c16843be85dd410c6a12251c4eca0dd1d3ee8c5725f746c4d5e0fdcec0a864b2"
+dependencies = [
+ "num-traits",
+]
+
[[package]]
name = "nom"
version = "7.1.3"
@@ -9325,6 +9913,12 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ee67f1008b1ba2321834326597b8e186293b049a023cdef258527550b9935b4"
+[[package]]
+name = "path-clean"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "17359afc20d7ab31fdb42bb844c8b3bb1dabd7dcf7e68428492da7f16966fcef"
+
[[package]]
name = "pbkdf2"
version = "0.12.2"
@@ -9332,8 +9926,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2"
dependencies = [
"digest 0.10.7",
+ "hmac 0.12.1",
+ "password-hash",
+ "sha2 0.10.9",
]
+[[package]]
+name = "pdqselect"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ec91767ecc0a0bbe558ce8c9da33c068066c57ecc8bb8477ef8c1ad3ef77c27"
+
[[package]]
name = "pear"
version = "0.2.9"
@@ -9458,13 +10061,100 @@ dependencies = [
"sha2 0.10.9",
]
+[[package]]
+name = "phf"
+version = "0.11.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078"
+dependencies = [
+ "phf_macros 0.11.3",
+ "phf_shared 0.11.3",
+]
+
[[package]]
name = "phf"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7"
dependencies = [
- "phf_shared",
+ "phf_shared 0.12.1",
+]
+
+[[package]]
+name = "phf"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c1562dc717473dbaa4c1f85a36410e03c047b2e7df7f45ee938fbef64ae7fadf"
+dependencies = [
+ "phf_macros 0.13.1",
+ "phf_shared 0.13.1",
+ "serde",
+]
+
+[[package]]
+name = "phf_codegen"
+version = "0.11.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a"
+dependencies = [
+ "phf_generator 0.11.3",
+ "phf_shared 0.11.3",
+]
+
+[[package]]
+name = "phf_generator"
+version = "0.11.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d"
+dependencies = [
+ "phf_shared 0.11.3",
+ "rand 0.8.6",
+]
+
+[[package]]
+name = "phf_generator"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "135ace3a761e564ec88c03a77317a7c6b80bb7f7135ef2544dbe054243b89737"
+dependencies = [
+ "fastrand",
+ "phf_shared 0.13.1",
+]
+
+[[package]]
+name = "phf_macros"
+version = "0.11.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216"
+dependencies = [
+ "phf_generator 0.11.3",
+ "phf_shared 0.11.3",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
+]
+
+[[package]]
+name = "phf_macros"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "812f032b54b1e759ccd5f8b6677695d5268c588701effba24601f6932f8269ef"
+dependencies = [
+ "phf_generator 0.13.1",
+ "phf_shared 0.13.1",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
+ "unicase",
+]
+
+[[package]]
+name = "phf_shared"
+version = "0.11.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5"
+dependencies = [
+ "siphasher",
]
[[package]]
@@ -9476,6 +10166,16 @@ dependencies = [
"siphasher",
]
+[[package]]
+name = "phf_shared"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e57fef6bc5981e38c2ce2d63bfa546861309f875b8a75f092d1d54ae2d64f266"
+dependencies = [
+ "siphasher",
+ "unicase",
+]
+
[[package]]
name = "pico-args"
version = "0.5.0"
@@ -9645,7 +10345,7 @@ dependencies = [
"cobs",
"embedded-io 0.4.0",
"embedded-io 0.6.1",
- "heapless",
+ "heapless 0.7.17",
"serde",
]
@@ -9674,8 +10374,14 @@ dependencies = [
]
[[package]]
-name = "predicates"
-version = "3.1.4"
+name = "precomputed-hash"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
+
+[[package]]
+name = "predicates"
+version = "3.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ada8f2932f28a27ee7b70dd6c1c39ea0675c55a36879ab92f3a715eaa1e63cfe"
dependencies = [
@@ -9956,6 +10662,12 @@ dependencies = [
"thiserror 2.0.18",
]
+[[package]]
+name = "psl-types"
+version = "2.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac"
+
[[package]]
name = "psm"
version = "0.1.31"
@@ -10058,6 +10770,18 @@ dependencies = [
"serde",
]
+[[package]]
+name = "quick_cache"
+version = "0.6.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a3db184a8b66cfe87f0263a1de147a6b554c864d1767c6f7fa4eb0e5497b565"
+dependencies = [
+ "ahash 0.8.12",
+ "equivalent",
+ "hashbrown 0.16.1",
+ "parking_lot",
+]
+
[[package]]
name = "quinn"
version = "0.11.9"
@@ -10143,6 +10867,17 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09"
+[[package]]
+name = "radix_trie"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3b4431027dcd37fc2a73ef740b5f233aa805897935b8bce0195e41bbf9a3289a"
+dependencies = [
+ "endian-type",
+ "nibble_vec",
+ "serde",
+]
+
[[package]]
name = "rand"
version = "0.8.6"
@@ -10219,6 +10954,16 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69"
+[[package]]
+name = "rand_distr"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6a8615d50dcf34fa31f7ab52692afec947c4dd0ab803cc87cb3b0b4570ff7463"
+dependencies = [
+ "num-traits",
+ "rand 0.9.4",
+]
+
[[package]]
name = "rand_xoshiro"
version = "0.8.1"
@@ -10287,6 +11032,12 @@ dependencies = [
"bitflags 2.11.1",
]
+[[package]]
+name = "rawpointer"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3"
+
[[package]]
name = "rayon"
version = "1.12.0"
@@ -10321,6 +11072,12 @@ dependencies = [
"yasna",
]
+[[package]]
+name = "reblessive"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbc4a4ea2a66a41a1152c4b3d86e8954dc087bdf33af35446e6e176db4e73c8c"
+
[[package]]
name = "recursive"
version = "0.1.1"
@@ -10519,6 +11276,7 @@ dependencies = [
"js-sys",
"log",
"mime",
+ "mime_guess",
"percent-encoding",
"pin-project-lite",
"quinn",
@@ -10626,6 +11384,33 @@ dependencies = [
"rand 0.10.1",
]
+[[package]]
+name = "revision"
+version = "0.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e735a8c2864f0b0fd48a55d0a71c081c7cbef8c8958a4665d8de423f20f2d0cf"
+dependencies = [
+ "bytes",
+ "chrono",
+ "geo",
+ "regex",
+ "revision-derive",
+ "roaring",
+ "rust_decimal",
+ "uuid",
+]
+
+[[package]]
+name = "revision-derive"
+version = "0.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f446f8c55ba240992330b09f69fe9e5ec8a2e1ba266843cb9f59d7bc6037c821"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
+]
+
[[package]]
name = "rfc6979"
version = "0.4.0"
@@ -10655,7 +11440,7 @@ dependencies = [
"cfg-if",
"getrandom 0.2.17",
"libc",
- "untrusted",
+ "untrusted 0.9.0",
"windows-sys 0.52.0",
]
@@ -10766,8 +11551,15 @@ checksum = "1dedc5658c6ecb3bdb5ef5f3295bb9253f42dcf3fd1402c03f6b1f7659c3c4a9"
dependencies = [
"bytemuck",
"byteorder",
+ "serde",
]
+[[package]]
+name = "robust"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4e27ee8bb91ca0adcf0ecb116293afa12d393f9c2b9b9cd54d33e8078fe19839"
+
[[package]]
name = "rolling-file"
version = "0.2.0"
@@ -10809,6 +11601,67 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "rstar"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a45c0e8804d37e4d97e55c6f258bc9ad9c5ee7b07437009dd152d764949a27c"
+dependencies = [
+ "heapless 0.6.1",
+ "num-traits",
+ "pdqselect",
+ "serde",
+ "smallvec",
+]
+
+[[package]]
+name = "rstar"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b40f1bfe5acdab44bc63e6699c28b74f75ec43afb59f3eda01e145aff86a25fa"
+dependencies = [
+ "heapless 0.7.17",
+ "num-traits",
+ "serde",
+ "smallvec",
+]
+
+[[package]]
+name = "rstar"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1f39465655a1e3d8ae79c6d9e007f4953bfc5d55297602df9dc38f9ae9f1359a"
+dependencies = [
+ "heapless 0.7.17",
+ "num-traits",
+ "serde",
+ "smallvec",
+]
+
+[[package]]
+name = "rstar"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73111312eb7a2287d229f06c00ff35b51ddee180f017ab6dec1f69d62ac098d6"
+dependencies = [
+ "heapless 0.7.17",
+ "num-traits",
+ "serde",
+ "smallvec",
+]
+
+[[package]]
+name = "rstar"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "421400d13ccfd26dfa5858199c30a5d76f9c54e0dba7575273025b43c5175dbb"
+dependencies = [
+ "heapless 0.8.0",
+ "num-traits",
+ "serde",
+ "smallvec",
+]
+
[[package]]
name = "rust-embed"
version = "8.11.0"
@@ -10853,6 +11706,16 @@ dependencies = [
"ordered-multimap",
]
+[[package]]
+name = "rust-stemmers"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e46a2036019fdb888131db7a4c847a1063a7493f971ed94ea82c67eada63ca54"
+dependencies = [
+ "serde",
+ "serde_derive",
+]
+
[[package]]
name = "rust_decimal"
version = "1.42.0"
@@ -11034,7 +11897,7 @@ dependencies = [
"aws-lc-rs",
"ring",
"rustls-pki-types",
- "untrusted",
+ "untrusted 0.9.0",
]
[[package]]
@@ -11067,6 +11930,15 @@ version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
+[[package]]
+name = "salsa20"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213"
+dependencies = [
+ "cipher",
+]
+
[[package]]
name = "same-file"
version = "1.0.6"
@@ -11144,6 +12016,18 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
+[[package]]
+name = "scrypt"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f"
+dependencies = [
+ "password-hash",
+ "pbkdf2",
+ "salsa20",
+ "sha2 0.10.9",
+]
+
[[package]]
name = "sd-notify"
version = "0.5.0"
@@ -11184,7 +12068,7 @@ checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc"
dependencies = [
"base16ct",
"der",
- "generic-array",
+ "generic-array 0.14.7",
"pkcs8",
"subtle",
"zeroize",
@@ -11209,7 +12093,7 @@ dependencies = [
"aes",
"cbc",
"futures-util",
- "generic-array",
+ "generic-array 0.14.7",
"getrandom 0.2.17",
"hkdf 0.12.4",
"num",
@@ -11770,6 +12654,12 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
+[[package]]
+name = "sif-itree"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d7f45b8998ced5134fb1d75732c77842a3e888f19c1ff98481822e8fbfbf930b"
+
[[package]]
name = "signal-hook-registry"
version = "1.4.8"
@@ -11942,7 +12832,7 @@ version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451"
dependencies = [
- "heck",
+ "heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.117",
@@ -11992,6 +12882,18 @@ dependencies = [
"url",
]
+[[package]]
+name = "spade"
+version = "2.15.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9699399fd9349b00b184f5635b074f9ec93afffef30c853f8c875b32c0f8c7fa"
+dependencies = [
+ "hashbrown 0.16.1",
+ "num-traits",
+ "robust",
+ "smallvec",
+]
+
[[package]]
name = "spin"
version = "0.9.8"
@@ -12103,7 +13005,7 @@ dependencies = [
"cfg-if",
"dotenvy",
"either",
- "heck",
+ "heck 0.5.0",
"hex",
"proc-macro2",
"quote",
@@ -12136,7 +13038,7 @@ dependencies = [
"either",
"futures-core",
"futures-util",
- "generic-array",
+ "generic-array 0.14.7",
"log",
"percent-encoding",
"serde",
@@ -12263,6 +13165,28 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
+[[package]]
+name = "storekey"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd9a94571bde7369ecaac47cec2e6844642d99166bd452fbd8def74b5b917b2f"
+dependencies = [
+ "bytes",
+ "storekey-derive",
+ "uuid",
+]
+
+[[package]]
+name = "storekey-derive"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6079d53242246522ec982de613c5c952cc7b1380ef2f8622fcdab9bfe73c0098"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
+]
+
[[package]]
name = "strict-num"
version = "0.1.1"
@@ -12272,6 +13196,31 @@ dependencies = [
"float-cmp 0.9.0",
]
+[[package]]
+name = "string_cache"
+version = "0.8.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f"
+dependencies = [
+ "new_debug_unreachable",
+ "parking_lot",
+ "phf_shared 0.11.3",
+ "precomputed-hash",
+ "serde",
+]
+
+[[package]]
+name = "string_cache_codegen"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c711928715f1fe0fe509c53b43e993a9a557babc2d0a3567d0a3006f1ac931a0"
+dependencies = [
+ "phf_generator 0.11.3",
+ "phf_shared 0.11.3",
+ "proc-macro2",
+ "quote",
+]
+
[[package]]
name = "stringcase"
version = "0.4.0"
@@ -12342,7 +13291,7 @@ version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7"
dependencies = [
- "heck",
+ "heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.117",
@@ -12354,7 +13303,7 @@ version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab85eea0270ee17587ed4156089e10b9e6880ee688791d45a905f5b1ca36f664"
dependencies = [
- "heck",
+ "heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.117",
@@ -12366,6 +13315,224 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
+[[package]]
+name = "surrealdb"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6d9deae5add485b4b5492f9f1a05500e24e4ad272896547835e0c9696b44d05b"
+dependencies = [
+ "anyhow",
+ "async-channel",
+ "boxcar",
+ "chrono",
+ "futures",
+ "getrandom 0.3.4",
+ "indexmap 2.14.0",
+ "js-sys",
+ "path-clean",
+ "reqwest 0.13.4",
+ "ring",
+ "rustls",
+ "rustls-pki-types",
+ "semver",
+ "serde",
+ "serde_json",
+ "surrealdb-core",
+ "surrealdb-types",
+ "surrealdb-types-derive",
+ "tokio",
+ "tokio-tungstenite 0.28.0",
+ "tokio-tungstenite-wasm",
+ "tokio-util",
+ "tracing",
+ "url",
+ "uuid",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "wasmtimer",
+ "web-sys",
+]
+
+[[package]]
+name = "surrealdb-collections"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d5e09c94ab7abc679f18b47367d70db7ee902553f4fec4fb0a9c92286938abc3"
+dependencies = [
+ "revision",
+ "storekey",
+]
+
+[[package]]
+name = "surrealdb-core"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f3097d6247661f5c5a319fa1f4683d5beba2956c6b86615ad69b55f988ab135b"
+dependencies = [
+ "addr",
+ "ahash 0.8.12",
+ "ammonia",
+ "anyhow",
+ "argon2",
+ "async-channel",
+ "async-stream",
+ "base64",
+ "bcrypt",
+ "blake3",
+ "bytes",
+ "chrono",
+ "ciborium",
+ "dashmap",
+ "deunicode",
+ "diskann",
+ "diskann-utils",
+ "diskann-vector",
+ "dmp",
+ "fastnum",
+ "fst",
+ "futures",
+ "fuzzy-matcher",
+ "geo",
+ "geo-types",
+ "getrandom 0.3.4",
+ "half",
+ "headers",
+ "hex",
+ "http 1.4.1",
+ "humantime",
+ "ipnet",
+ "jsonwebtoken",
+ "lexicmp",
+ "md-5 0.10.6",
+ "memchr",
+ "mime",
+ "ndarray",
+ "ndarray-stats",
+ "num-traits",
+ "num_cpus",
+ "object_store",
+ "parking_lot",
+ "path-clean",
+ "pbkdf2",
+ "phf 0.13.1",
+ "pin-project-lite",
+ "quick_cache",
+ "radix_trie",
+ "rand 0.9.4",
+ "rand_core 0.6.4",
+ "rayon",
+ "reblessive",
+ "regex",
+ "revision",
+ "ring",
+ "roaring",
+ "rust-stemmers",
+ "rust_decimal",
+ "scrypt",
+ "semver",
+ "serde",
+ "serde_json",
+ "sha1 0.10.6",
+ "sha2 0.10.9",
+ "storekey",
+ "strsim",
+ "subtle",
+ "surrealdb-collections",
+ "surrealdb-protocol",
+ "surrealdb-strand",
+ "surrealdb-types",
+ "sysinfo 0.37.2",
+ "thiserror 2.0.18",
+ "tokio",
+ "tokio-util",
+ "tracing",
+ "ulid",
+ "unicase",
+ "url",
+ "uuid",
+ "vart",
+ "wasm-bindgen-futures",
+ "wasmtimer",
+ "web-time",
+]
+
+[[package]]
+name = "surrealdb-protocol"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f4e06f586c9179a02349b88b0c18e3a0850c55431aa513e0cd66529c00da1af"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "chrono",
+ "flatbuffers",
+ "futures",
+ "geo",
+ "prost",
+ "prost-types",
+ "rust_decimal",
+ "semver",
+ "serde",
+ "serde_json",
+ "tonic",
+ "tonic-prost",
+ "uuid",
+]
+
+[[package]]
+name = "surrealdb-strand"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ef42225a4fe1f281b52520ee6042ce1545ffbeae2431034635662538efad740"
+dependencies = [
+ "revision",
+ "serde",
+ "storekey",
+]
+
+[[package]]
+name = "surrealdb-types"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44b03150b36ee46f24cc2dbb22659426c42661ea484769d6345ebb3542e88fa7"
+dependencies = [
+ "anyhow",
+ "async-channel",
+ "bytes",
+ "castaway",
+ "chrono",
+ "flatbuffers",
+ "geo",
+ "hex",
+ "http 1.4.1",
+ "papaya",
+ "rand 0.9.4",
+ "regex",
+ "rust_decimal",
+ "semver",
+ "serde",
+ "serde_json",
+ "surrealdb-protocol",
+ "surrealdb-types-derive",
+ "tracing",
+ "ulid",
+ "url",
+ "uuid",
+]
+
+[[package]]
+name = "surrealdb-types-derive"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "023873912963cb7cd58b5d158f68081af629dba13236152f96a4cff8b3dd3fc6"
+dependencies = [
+ "heck 0.4.1",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
+]
+
[[package]]
name = "svgtypes"
version = "0.15.3"
@@ -12581,6 +13748,17 @@ dependencies = [
"windows-sys 0.61.2",
]
+[[package]]
+name = "tendril"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d24a120c5fc464a3458240ee02c299ebcb9d67b5249c8848b09d639dca8d7bb0"
+dependencies = [
+ "futf",
+ "mac",
+ "utf-8",
+]
+
[[package]]
name = "terminal_size"
version = "0.4.4"
@@ -12907,6 +14085,22 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "tokio-tungstenite"
+version = "0.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857"
+dependencies = [
+ "futures-util",
+ "log",
+ "rustls",
+ "rustls-pki-types",
+ "tokio",
+ "tokio-rustls",
+ "tungstenite 0.28.0",
+ "webpki-roots 0.26.11",
+]
+
[[package]]
name = "tokio-tungstenite"
version = "0.29.0"
@@ -12919,10 +14113,29 @@ dependencies = [
"rustls-pki-types",
"tokio",
"tokio-rustls",
- "tungstenite",
+ "tungstenite 0.29.0",
"webpki-roots 0.26.11",
]
+[[package]]
+name = "tokio-tungstenite-wasm"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ccecee909c02b8863f9bda87253127eb4da0e7e1342330b2583fbc4d1795c2f8"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "http 1.4.1",
+ "httparse",
+ "js-sys",
+ "thiserror 2.0.18",
+ "tokio",
+ "tokio-tungstenite 0.28.0",
+ "wasm-bindgen",
+ "web-sys",
+]
+
[[package]]
name = "tokio-util"
version = "0.7.18"
@@ -13280,6 +14493,17 @@ dependencies = [
"syn 2.0.117",
]
+[[package]]
+name = "trice"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3aaab10ae9fac0b10f392752bf56f0fd20845f39037fec931e8537b105b515a"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+ "web-sys",
+]
+
[[package]]
name = "try-lock"
version = "0.2.5"
@@ -13295,6 +14519,26 @@ dependencies = [
"core_maths",
]
+[[package]]
+name = "tungstenite"
+version = "0.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442"
+dependencies = [
+ "bytes",
+ "data-encoding",
+ "http 1.4.1",
+ "httparse",
+ "log",
+ "rand 0.9.4",
+ "rustls",
+ "rustls-pki-types",
+ "sha1 0.10.6",
+ "thiserror 2.0.18",
+ "url",
+ "utf-8",
+]
+
[[package]]
name = "tungstenite"
version = "0.29.0"
@@ -13448,6 +14692,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe"
dependencies = [
"rand 0.9.4",
+ "serde",
"web-time",
]
@@ -13569,6 +14814,12 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
+[[package]]
+name = "untrusted"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
+
[[package]]
name = "untrusted"
version = "0.9.0"
@@ -13650,6 +14901,12 @@ dependencies = [
"xmlwriter",
]
+[[package]]
+name = "utf-8"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
+
[[package]]
name = "utf8-width"
version = "0.1.8"
@@ -13769,6 +15026,12 @@ dependencies = [
"ryu",
]
+[[package]]
+name = "vart"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1982d899e57d646498709735f16e9224cf1e8680676ad687f930cf8b5b555ae"
+
[[package]]
name = "vcpkg"
version = "0.2.15"
@@ -13897,6 +15160,7 @@ dependencies = [
"cfg-if",
"once_cell",
"rustversion",
+ "serde",
"wasm-bindgen-macro",
"wasm-bindgen-shared",
]
@@ -14048,6 +15312,18 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "web_atoms"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57ffde1dc01240bdf9992e3205668b235e59421fd085e8a317ed98da0178d414"
+dependencies = [
+ "phf 0.11.3",
+ "phf_codegen",
+ "string_cache",
+ "string_cache_codegen",
+]
+
[[package]]
name = "webpki-root-certs"
version = "1.0.7"
@@ -14668,7 +15944,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc"
dependencies = [
"anyhow",
- "heck",
+ "heck 0.5.0",
"wit-parser",
]
@@ -14679,7 +15955,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
dependencies = [
"anyhow",
- "heck",
+ "heck 0.5.0",
"indexmap 2.14.0",
"prettyplease",
"syn 2.0.117",
diff --git a/Cargo.toml b/Cargo.toml
index 29828a7696..c2729645b1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -43,6 +43,7 @@ members = [
"core/connectors/sinks/postgres_sink",
"core/connectors/sinks/quickwit_sink",
"core/connectors/sinks/stdout_sink",
+ "core/connectors/sinks/surrealdb_sink",
"core/connectors/sources/elasticsearch_source",
"core/connectors/sources/influxdb_source",
"core/connectors/sources/postgres_source",
@@ -289,6 +290,7 @@ sqlx = { version = "0.9.0", features = [
static-toml = "1.3.0"
strum = { version = "0.28.0", features = ["derive"] }
strum_macros = "0.28.0"
+surrealdb = { version = "3.1.4", default-features = false, features = ["protocol-ws", "rustls"] }
syn = { version = "2", features = ["full", "extra-traits"] }
sysinfo = "0.39.2"
tempfile = "3.27.0"
diff --git a/core/connectors/README.md b/core/connectors/README.md
index b7d24cfbab..e078b46b77 100644
--- a/core/connectors/README.md
+++ b/core/connectors/README.md
@@ -86,6 +86,7 @@ Each sink should have its own, custom configuration, which is passed along with
- **PostgreSQL Sink** - stores messages in PostgreSQL database tables
- **Quickwit Sink** - indexes messages in Quickwit search engine
- **Stdout Sink** - prints messages to standard output (useful for debugging/development)
+- **SurrealDB Sink** - writes messages into SurrealDB with deterministic record IDs for idempotent replay
## Source
diff --git a/core/connectors/runtime/example_config/connectors/surrealdb_sink.toml b/core/connectors/runtime/example_config/connectors/surrealdb_sink.toml
new file mode 100644
index 0000000000..1ca41ea963
--- /dev/null
+++ b/core/connectors/runtime/example_config/connectors/surrealdb_sink.toml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "surrealdb"
+enabled = true
+version = 0
+name = "SurrealDB sink"
+path = "target/release/libiggy_connector_surrealdb_sink"
+plugin_config_format = "toml"
+verbose = false
+benchmark = false
+
+[[streams]]
+stream = "example_stream"
+topics = ["example_topic"]
+schema = "json"
+batch_length = 1000
+poll_interval = "5ms"
+consumer_group = "surrealdb_sink_connector"
+
+[plugin_config]
+endpoint = "127.0.0.1:8000"
+namespace = "iggy"
+database = "connectors"
+table = "iggy_messages"
+username = "root"
+password = "root"
+auth_scope = "root"
+use_tls = false
+auto_define_table = true
+define_indexes = true
+batch_size = 1000
+payload_format = "auto"
+include_metadata = true
+include_headers = true
+include_checksum = true
+include_origin_timestamp = true
+query_timeout = "30s"
+max_retries = 3
+retry_delay = "100ms"
+max_retry_delay = "5s"
+verbose_logging = false
diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md
index 9aaaa2ffaa..1029f79335 100644
--- a/core/connectors/sinks/README.md
+++ b/core/connectors/sinks/README.md
@@ -15,6 +15,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s
| **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas |
| **quickwit_sink** | Indexes messages in Quickwit search engine for log analytics |
| **stdout_sink** | Prints messages to standard output (useful for debugging and development) |
+| **surrealdb_sink** | Writes messages into SurrealDB with deterministic record IDs for idempotent replay |
The sink is represented by the single `Sink` trait, which defines the basic interface for all sink connectors. It provides methods for initializing the sink, writing data to external destination, and closing the sink.
diff --git a/core/connectors/sinks/surrealdb_sink/Cargo.toml b/core/connectors/sinks/surrealdb_sink/Cargo.toml
new file mode 100644
index 0000000000..8d7866f979
--- /dev/null
+++ b/core/connectors/sinks/surrealdb_sink/Cargo.toml
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_surrealdb_sink"
+version = "0.4.1-edge.1"
+description = "Iggy SurrealDB sink connector for writing stream messages into SurrealDB"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming", "surrealdb", "sink"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org"
+documentation = "https://iggy.apache.org/docs"
+repository = "https://github.com/apache/iggy"
+readme = "../../README.md"
+publish = false
+
+[lib]
+crate-type = ["cdylib", "lib"]
+
+[dependencies]
+async-trait = { workspace = true }
+base64 = { workspace = true }
+iggy_common = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+secrecy = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+surrealdb = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+
+[dev-dependencies]
+simd-json = { workspace = true }
+toml = { workspace = true }
diff --git a/core/connectors/sinks/surrealdb_sink/README.md b/core/connectors/sinks/surrealdb_sink/README.md
new file mode 100644
index 0000000000..a2f2171634
--- /dev/null
+++ b/core/connectors/sinks/surrealdb_sink/README.md
@@ -0,0 +1,91 @@
+# SurrealDB Sink Connector
+
+Writes Apache Iggy stream messages into SurrealDB over the Rust WebSocket SDK.
+
+The sink writes one SurrealQL bulk `INSERT IGNORE` per connector batch. Each
+record uses a deterministic SurrealDB record id derived from stream, topic,
+partition and Iggy message id, so replayed batches are idempotent and existing
+records are left untouched.
+
+## Configuration
+
+```toml
+type = "sink"
+key = "surrealdb"
+enabled = true
+version = 0
+name = "SurrealDB sink"
+path = "target/release/libiggy_connector_surrealdb_sink"
+plugin_config_format = "toml"
+
+[[streams]]
+stream = "example_stream"
+topics = ["example_topic"]
+schema = "json"
+batch_length = 1000
+poll_interval = "5ms"
+consumer_group = "surrealdb_sink_connector"
+
+[plugin_config]
+endpoint = "127.0.0.1:8000"
+namespace = "iggy"
+database = "connectors"
+table = "iggy_messages"
+username = "root"
+password = "root"
+auth_scope = "root"
+use_tls = false
+auto_define_table = true
+define_indexes = true
+batch_size = 1000
+payload_format = "auto"
+include_metadata = true
+include_headers = true
+include_checksum = true
+include_origin_timestamp = true
+query_timeout = "30s"
+max_retries = 3
+retry_delay = "100ms"
+max_retry_delay = "5s"
+verbose_logging = false
+```
+
+### Plugin Fields
+
+| Field | Default | Description |
+| --- | --- | --- |
+| `endpoint` | required | SurrealDB WebSocket host and port without scheme, for example `127.0.0.1:8000`. |
+| `namespace` | required | SurrealDB namespace selected during `open()`. |
+| `database` | required | SurrealDB database selected during `open()`. |
+| `table` | required | Target table. Must be a safe SurrealQL identifier. |
+| `username` / `password` | none | Optional credentials. |
+| `auth_scope` | `root` | `root`, `namespace`, `database`, or `none`. |
+| `use_tls` | `false` | Uses `wss://` when true, `ws://` otherwise. |
+| `auto_define_table` | `false` | Runs `DEFINE TABLE IF NOT EXISTS
SCHEMALESS`. |
+| `define_indexes` | `false` | Defines an offset index on stream/topic/partition/offset. Requires `auto_define_table`. |
+| `batch_size` | `1000` | Maximum number of records per SurrealDB request. |
+| `payload_format` | `auto` | `auto`, `json`, `text`, or `base64`. |
+| `include_metadata` | `true` | Stores stream/topic/partition/offset/timestamps/schema fields. |
+| `include_headers` | `true` | Stores Iggy headers as a deterministic object. Raw headers are base64 encoded. |
+| `include_checksum` | `true` | Stores `iggy_checksum`. |
+| `include_origin_timestamp` | `true` | Stores `iggy_origin_timestamp`. |
+| `query_timeout` | `30s` | SurrealDB SDK query timeout. |
+| `max_retries` | `3` | Total attempts for transient write failures. |
+| `retry_delay` | `100ms` | Base retry delay. |
+| `max_retry_delay` | `5s` | Capped exponential retry delay. |
+| `verbose_logging` | `false` | Emits per-batch success logs at `info`. |
+
+## Stored Shape
+
+With metadata enabled, records contain:
+
+- `id`: deterministic SurrealDB record id key
+- `iggy_message_id`: original Iggy message id as a string
+- `iggy_stream`, `iggy_topic`, `iggy_partition_id`, `iggy_offset`
+- `iggy_timestamp`, `iggy_origin_timestamp`, `iggy_checksum`, `iggy_schema`
+- `iggy_headers`
+- `payload`
+- `payload_encoding`
+
+`payload_format = "auto"` stores decoded JSON payloads as queryable SurrealDB
+values, text payloads as strings, and binary payloads as base64 strings.
diff --git a/core/connectors/sinks/surrealdb_sink/config.toml b/core/connectors/sinks/surrealdb_sink/config.toml
new file mode 100644
index 0000000000..1ca41ea963
--- /dev/null
+++ b/core/connectors/sinks/surrealdb_sink/config.toml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+type = "sink"
+key = "surrealdb"
+enabled = true
+version = 0
+name = "SurrealDB sink"
+path = "target/release/libiggy_connector_surrealdb_sink"
+plugin_config_format = "toml"
+verbose = false
+benchmark = false
+
+[[streams]]
+stream = "example_stream"
+topics = ["example_topic"]
+schema = "json"
+batch_length = 1000
+poll_interval = "5ms"
+consumer_group = "surrealdb_sink_connector"
+
+[plugin_config]
+endpoint = "127.0.0.1:8000"
+namespace = "iggy"
+database = "connectors"
+table = "iggy_messages"
+username = "root"
+password = "root"
+auth_scope = "root"
+use_tls = false
+auto_define_table = true
+define_indexes = true
+batch_size = 1000
+payload_format = "auto"
+include_metadata = true
+include_headers = true
+include_checksum = true
+include_origin_timestamp = true
+query_timeout = "30s"
+max_retries = 3
+retry_delay = "100ms"
+max_retry_delay = "5s"
+verbose_logging = false
diff --git a/core/connectors/sinks/surrealdb_sink/src/lib.rs b/core/connectors/sinks/surrealdb_sink/src/lib.rs
new file mode 100644
index 0000000000..1d7aaf0af1
--- /dev/null
+++ b/core/connectors/sinks/surrealdb_sink/src/lib.rs
@@ -0,0 +1,1094 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use base64::Engine;
+use base64::engine::general_purpose;
+use iggy_connector_sdk::convert::owned_value_to_serde_json;
+use iggy_connector_sdk::retry::{exponential_backoff, jitter, parse_duration};
+use iggy_connector_sdk::{
+ ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector,
+};
+use secrecy::{ExposeSecret, SecretString};
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value, json};
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::Duration;
+use surrealdb::Surreal;
+use surrealdb::engine::remote::ws::{Client as WsClient, Ws, Wss};
+use surrealdb::opt::Config;
+use surrealdb::opt::auth::{Database, Namespace, Root};
+use tracing::{debug, error, info, warn};
+
+sink_connector!(SurrealDbSink);
+
+const DEFAULT_BATCH_SIZE: usize = 1000;
+const DEFAULT_QUERY_TIMEOUT: &str = "30s";
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_RETRY_DELAY: &str = "100ms";
+const DEFAULT_MAX_RETRY_DELAY: &str = "5s";
+const ENCODING_BASE64: &str = "base64";
+const ENCODING_JSON: &str = "json";
+const ENCODING_TEXT: &str = "text";
+
+type SurrealDbClient = Surreal;
+
+#[derive(Debug)]
+pub struct SurrealDbSink {
+ id: u32,
+ client: Option,
+ config: SurrealDbSinkConfig,
+ table: String,
+ auth_scope: AuthScope,
+ payload_format: PayloadFormat,
+ batch_size: usize,
+ query_timeout: Duration,
+ max_retries: u32,
+ retry_delay: Duration,
+ max_retry_delay: Duration,
+ include_metadata: bool,
+ include_headers: bool,
+ include_checksum: bool,
+ include_origin_timestamp: bool,
+ auto_define_table: bool,
+ define_indexes: bool,
+ verbose: bool,
+ messages_processed: AtomicU64,
+ insertion_errors: AtomicU64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct SurrealDbSinkConfig {
+ pub endpoint: String,
+ pub namespace: String,
+ pub database: String,
+ pub table: String,
+ pub username: Option,
+ #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")]
+ pub password: Option,
+ pub auth_scope: Option,
+ pub use_tls: Option,
+ pub auto_define_table: Option,
+ pub define_indexes: Option,
+ pub batch_size: Option,
+ pub payload_format: Option,
+ pub include_metadata: Option,
+ pub include_headers: Option,
+ pub include_checksum: Option,
+ pub include_origin_timestamp: Option,
+ pub query_timeout: Option,
+ pub max_retries: Option,
+ pub retry_delay: Option,
+ pub max_retry_delay: Option,
+ pub verbose_logging: Option,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum AuthScope {
+ Root,
+ Namespace,
+ Database,
+ None,
+}
+
+impl AuthScope {
+ fn from_config(value: Option<&str>) -> Self {
+ match value {
+ Some(value) if value.eq_ignore_ascii_case("namespace") => AuthScope::Namespace,
+ Some(value) if value.eq_ignore_ascii_case("database") => AuthScope::Database,
+ Some(value) if value.eq_ignore_ascii_case("none") => AuthScope::None,
+ Some(value) if value.eq_ignore_ascii_case("root") => AuthScope::Root,
+ Some(value) => {
+ warn!("Unknown SurrealDB auth scope '{value}', defaulting to root");
+ AuthScope::Root
+ }
+ None => AuthScope::Root,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum PayloadFormat {
+ Auto,
+ Json,
+ Text,
+ Base64,
+}
+
+impl PayloadFormat {
+ fn from_config(value: Option<&str>) -> Self {
+ match value {
+ Some(value) if value.eq_ignore_ascii_case("json") => PayloadFormat::Json,
+ Some(value) if value.eq_ignore_ascii_case("text") => PayloadFormat::Text,
+ Some(value) if value.eq_ignore_ascii_case("base64") => PayloadFormat::Base64,
+ Some(value) if value.eq_ignore_ascii_case("binary") => PayloadFormat::Base64,
+ Some(value) if value.eq_ignore_ascii_case("auto") => PayloadFormat::Auto,
+ Some(value) => {
+ warn!("Unknown SurrealDB payload format '{value}', defaulting to auto");
+ PayloadFormat::Auto
+ }
+ None => PayloadFormat::Auto,
+ }
+ }
+}
+
+#[derive(Debug)]
+struct PayloadDocument {
+ value: Value,
+ encoding: &'static str,
+}
+
+#[derive(Debug)]
+struct BatchInsertOutcome {
+ inserted_count: u64,
+ error: Option,
+}
+
+impl SurrealDbSink {
+ pub fn new(id: u32, config: SurrealDbSinkConfig) -> Self {
+ let table = config.table.clone();
+ let auth_scope = AuthScope::from_config(config.auth_scope.as_deref());
+ let payload_format = PayloadFormat::from_config(config.payload_format.as_deref());
+ let batch_size = config
+ .batch_size
+ .unwrap_or(DEFAULT_BATCH_SIZE as u32)
+ .max(1) as usize;
+ let query_timeout = parse_duration(config.query_timeout.as_deref(), DEFAULT_QUERY_TIMEOUT);
+ let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY);
+ let max_retry_delay =
+ parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY);
+
+ SurrealDbSink {
+ id,
+ client: None,
+ config,
+ table,
+ auth_scope,
+ payload_format,
+ batch_size,
+ query_timeout,
+ max_retries: DEFAULT_MAX_RETRIES,
+ retry_delay,
+ max_retry_delay,
+ include_metadata: true,
+ include_headers: true,
+ include_checksum: true,
+ include_origin_timestamp: true,
+ auto_define_table: false,
+ define_indexes: false,
+ verbose: false,
+ messages_processed: AtomicU64::new(0),
+ insertion_errors: AtomicU64::new(0),
+ }
+ .with_config_defaults()
+ }
+
+ fn with_config_defaults(mut self) -> Self {
+ self.max_retries = self
+ .config
+ .max_retries
+ .unwrap_or(DEFAULT_MAX_RETRIES)
+ .max(1);
+ self.include_metadata = self.config.include_metadata.unwrap_or(true);
+ self.include_headers = self.config.include_headers.unwrap_or(true);
+ self.include_checksum = self.config.include_checksum.unwrap_or(true);
+ self.include_origin_timestamp = self.config.include_origin_timestamp.unwrap_or(true);
+ self.auto_define_table = self.config.auto_define_table.unwrap_or(false);
+ self.define_indexes = self.config.define_indexes.unwrap_or(false);
+ self.verbose = self.config.verbose_logging.unwrap_or(false);
+
+ if self.max_retry_delay < self.retry_delay {
+ warn!(
+ "SurrealDB sink ID: {} max_retry_delay is smaller than retry_delay. Using retry_delay as max_retry_delay.",
+ self.id
+ );
+ self.max_retry_delay = self.retry_delay;
+ }
+
+ self
+ }
+}
+
+#[async_trait]
+impl Sink for SurrealDbSink {
+ async fn open(&mut self) -> Result<(), Error> {
+ validate_identifier("table", &self.table)?;
+
+ info!(
+ "Opening SurrealDB sink connector with ID: {}. Endpoint: {}, namespace: {}, database: {}, table: {}",
+ self.id,
+ redact_endpoint(&self.config.endpoint),
+ self.config.namespace,
+ self.config.database,
+ self.table
+ );
+
+ let client = self.connect().await?;
+ self.signin_if_configured(&client).await?;
+ client
+ .use_ns(&self.config.namespace)
+ .use_db(&self.config.database)
+ .await
+ .map_err(|e| Error::InitError(format!("Failed to select namespace/database: {e}")))?;
+ client
+ .health()
+ .await
+ .map_err(|e| Error::InitError(format!("SurrealDB health check failed: {e}")))?;
+
+ if self.auto_define_table {
+ self.ensure_table(&client).await?;
+ }
+
+ self.client = Some(client);
+ info!(
+ "Opened SurrealDB sink connector ID: {} for table: {}",
+ self.id, self.table
+ );
+ Ok(())
+ }
+
+ async fn consume(
+ &self,
+ topic_metadata: &TopicMetadata,
+ messages_metadata: MessagesMetadata,
+ messages: Vec,
+ ) -> Result<(), Error> {
+ self.process_messages(topic_metadata, &messages_metadata, &messages)
+ .await
+ }
+
+ async fn close(&mut self) -> Result<(), Error> {
+ info!("Closing SurrealDB sink connector with ID: {}", self.id);
+ self.client.take();
+
+ let messages_processed = self.messages_processed.load(Ordering::Relaxed);
+ let insertion_errors = self.insertion_errors.load(Ordering::Relaxed);
+ info!(
+ "SurrealDB sink ID: {} processed {} messages with {} errors",
+ self.id, messages_processed, insertion_errors
+ );
+ Ok(())
+ }
+}
+
+impl SurrealDbSink {
+ async fn connect(&self) -> Result {
+ let config = Config::new().query_timeout(self.query_timeout);
+ let endpoint = self.config.endpoint.as_str();
+
+ if self.config.use_tls.unwrap_or(false) {
+ Surreal::new::((endpoint, config))
+ .await
+ .map_err(|e| Error::InitError(format!("Failed to connect to SurrealDB: {e}")))
+ } else {
+ Surreal::new::((endpoint, config))
+ .await
+ .map_err(|e| Error::InitError(format!("Failed to connect to SurrealDB: {e}")))
+ }
+ }
+
+ async fn signin_if_configured(&self, client: &SurrealDbClient) -> Result<(), Error> {
+ if self.auth_scope == AuthScope::None {
+ return Ok(());
+ }
+
+ let username = self.config.username.as_ref().ok_or_else(|| {
+ Error::InitError(
+ "SurrealDB username is required when auth_scope is not none".to_string(),
+ )
+ })?;
+ let password = self.config.password.as_ref().ok_or_else(|| {
+ Error::InitError(
+ "SurrealDB password is required when auth_scope is not none".to_string(),
+ )
+ })?;
+ let password = password.expose_secret().to_string();
+
+ match self.auth_scope {
+ AuthScope::Root => {
+ client
+ .signin(Root {
+ username: username.clone(),
+ password,
+ })
+ .await
+ }
+ AuthScope::Namespace => {
+ client
+ .signin(Namespace {
+ namespace: self.config.namespace.clone(),
+ username: username.clone(),
+ password,
+ })
+ .await
+ }
+ AuthScope::Database => {
+ client
+ .signin(Database {
+ namespace: self.config.namespace.clone(),
+ database: self.config.database.clone(),
+ username: username.clone(),
+ password,
+ })
+ .await
+ }
+ AuthScope::None => return Ok(()),
+ }
+ .map(|_| ())
+ .map_err(|e| Error::InitError(format!("Failed to authenticate with SurrealDB: {e}")))
+ }
+
+ async fn ensure_table(&self, client: &SurrealDbClient) -> Result<(), Error> {
+ let table = &self.table;
+ let mut query = format!("DEFINE TABLE IF NOT EXISTS {table} SCHEMALESS;");
+
+ if self.define_indexes {
+ let offset_index = format!("{table}_iggy_offset_idx");
+ validate_identifier("index", &offset_index)?;
+ query.push_str(&format!(
+ " DEFINE INDEX IF NOT EXISTS {offset_index} ON TABLE {table} FIELDS iggy_stream, iggy_topic, iggy_partition_id, iggy_offset;"
+ ));
+ }
+
+ client
+ .query(query)
+ .await
+ .map_err(|e| Error::InitError(format!("Failed to define SurrealDB table: {e}")))?
+ .check()
+ .map_err(|e| Error::InitError(format!("Failed to define SurrealDB table: {e}")))?;
+
+ Ok(())
+ }
+
+ async fn process_messages(
+ &self,
+ topic_metadata: &TopicMetadata,
+ messages_metadata: &MessagesMetadata,
+ messages: &[ConsumedMessage],
+ ) -> Result<(), Error> {
+ let client = self
+ .client
+ .as_ref()
+ .ok_or_else(|| Error::InitError("SurrealDB sink is not connected".to_string()))?;
+
+ let mut successful_inserts = 0u64;
+ let mut last_error = None;
+
+ for batch in messages.chunks(self.batch_size) {
+ let outcome = self
+ .insert_batch(client, batch, topic_metadata, messages_metadata)
+ .await;
+ successful_inserts += outcome.inserted_count;
+
+ if let Some(error) = outcome.error {
+ self.insertion_errors
+ .fetch_add(batch.len() as u64, Ordering::Relaxed);
+ error!(
+ "Failed to insert SurrealDB batch for connector ID: {}, table: {}, error: {error}",
+ self.id, self.table
+ );
+ last_error = Some(error);
+ }
+ }
+
+ self.messages_processed
+ .fetch_add(successful_inserts, Ordering::Relaxed);
+
+ if self.verbose {
+ info!(
+ "SurrealDB sink ID: {} wrote {successful_inserts} messages to table '{}'",
+ self.id, self.table
+ );
+ } else {
+ debug!(
+ "SurrealDB sink ID: {} wrote {successful_inserts} messages to table '{}'",
+ self.id, self.table
+ );
+ }
+
+ if let Some(error) = last_error {
+ Err(error)
+ } else {
+ Ok(())
+ }
+ }
+
+ async fn insert_batch(
+ &self,
+ client: &SurrealDbClient,
+ messages: &[ConsumedMessage],
+ topic_metadata: &TopicMetadata,
+ messages_metadata: &MessagesMetadata,
+ ) -> BatchInsertOutcome {
+ if messages.is_empty() {
+ return BatchInsertOutcome {
+ inserted_count: 0,
+ error: None,
+ };
+ }
+
+ let mut records = Vec::with_capacity(messages.len());
+ for message in messages {
+ match self.build_record(topic_metadata, messages_metadata, message) {
+ Ok(record) => records.push(record),
+ Err(error) => {
+ return BatchInsertOutcome {
+ inserted_count: 0,
+ error: Some(error),
+ };
+ }
+ }
+ }
+
+ self.insert_records_with_retry(client, records).await
+ }
+
+ async fn insert_records_with_retry(
+ &self,
+ client: &SurrealDbClient,
+ records: Vec,
+ ) -> BatchInsertOutcome {
+ let mut attempts = 0u32;
+ let query = build_insert_query(&self.table);
+ let record_count = records.len() as u64;
+ let variables = json!({ "records": records });
+
+ loop {
+ let result = client
+ .query(&query)
+ .bind(variables.clone())
+ .await
+ .and_then(|response| response.check());
+
+ match result {
+ Ok(_) => {
+ return BatchInsertOutcome {
+ inserted_count: record_count,
+ error: None,
+ };
+ }
+ Err(error) => {
+ attempts += 1;
+ if !is_transient_error(&error) || attempts >= self.max_retries {
+ return BatchInsertOutcome {
+ inserted_count: 0,
+ error: Some(Error::CannotStoreData(format!(
+ "SurrealDB batch insert failed after {attempts} attempts: {error}"
+ ))),
+ };
+ }
+
+ let delay = jitter(exponential_backoff(
+ self.retry_delay,
+ attempts.saturating_sub(1),
+ self.max_retry_delay,
+ ));
+ warn!(
+ "Transient SurrealDB write error for connector ID: {} (attempt {attempts}/{}): {error}. Retrying in {:?}.",
+ self.id, self.max_retries, delay
+ );
+ tokio::time::sleep(delay).await;
+ }
+ }
+ }
+ }
+
+ fn build_record(
+ &self,
+ topic_metadata: &TopicMetadata,
+ messages_metadata: &MessagesMetadata,
+ message: &ConsumedMessage,
+ ) -> Result {
+ let mut record = Map::new();
+ record.insert(
+ "id".to_string(),
+ Value::String(build_record_id(
+ topic_metadata,
+ messages_metadata,
+ message.id,
+ )),
+ );
+ record.insert(
+ "iggy_message_id".to_string(),
+ Value::String(message.id.to_string()),
+ );
+
+ if self.include_metadata {
+ record.insert(
+ "iggy_stream".to_string(),
+ Value::String(topic_metadata.stream.clone()),
+ );
+ record.insert(
+ "iggy_topic".to_string(),
+ Value::String(topic_metadata.topic.clone()),
+ );
+ record.insert(
+ "iggy_partition_id".to_string(),
+ Value::Number(messages_metadata.partition_id.into()),
+ );
+ record.insert(
+ "iggy_offset".to_string(),
+ Value::Number(message.offset.into()),
+ );
+ record.insert(
+ "iggy_timestamp".to_string(),
+ Value::Number(message.timestamp.into()),
+ );
+ record.insert(
+ "iggy_schema".to_string(),
+ Value::String(messages_metadata.schema.to_string()),
+ );
+ }
+
+ if self.include_checksum {
+ record.insert(
+ "iggy_checksum".to_string(),
+ Value::Number(message.checksum.into()),
+ );
+ }
+
+ if self.include_origin_timestamp {
+ record.insert(
+ "iggy_origin_timestamp".to_string(),
+ Value::Number(message.origin_timestamp.into()),
+ );
+ }
+
+ if self.include_headers
+ && let Some(headers) = &message.headers
+ && !headers.is_empty()
+ {
+ record.insert("iggy_headers".to_string(), encode_headers(headers)?);
+ }
+
+ let payload = self.build_payload_document(&message.payload)?;
+ record.insert("payload".to_string(), payload.value);
+ record.insert(
+ "payload_encoding".to_string(),
+ Value::String(payload.encoding.to_string()),
+ );
+
+ Ok(Value::Object(record))
+ }
+
+ fn build_payload_document(&self, payload: &Payload) -> Result {
+ match self.payload_format {
+ PayloadFormat::Auto => build_auto_payload_document(payload),
+ PayloadFormat::Json => build_json_payload_document(payload),
+ PayloadFormat::Text => build_text_payload_document(payload),
+ PayloadFormat::Base64 => build_base64_payload_document(payload),
+ }
+ }
+}
+
+fn build_insert_query(table: &str) -> String {
+ format!("INSERT IGNORE INTO {table} $records RETURN NONE;")
+}
+
+fn build_auto_payload_document(payload: &Payload) -> Result {
+ match payload {
+ Payload::Json(value) => Ok(PayloadDocument {
+ value: owned_value_to_serde_json(value),
+ encoding: ENCODING_JSON,
+ }),
+ Payload::Text(text) | Payload::Proto(text) => Ok(PayloadDocument {
+ value: Value::String(text.clone()),
+ encoding: ENCODING_TEXT,
+ }),
+ Payload::Raw(_) | Payload::FlatBuffer(_) | Payload::Avro(_) => {
+ build_base64_payload_document(payload)
+ }
+ }
+}
+
+fn build_json_payload_document(payload: &Payload) -> Result {
+ match payload {
+ Payload::Json(value) => Ok(PayloadDocument {
+ value: owned_value_to_serde_json(value),
+ encoding: ENCODING_JSON,
+ }),
+ _ => {
+ let bytes = payload.try_to_bytes()?;
+ let value = serde_json::from_slice(&bytes)
+ .map_err(|e| Error::InvalidRecordValue(format!("Invalid JSON payload: {e}")))?;
+ Ok(PayloadDocument {
+ value,
+ encoding: ENCODING_JSON,
+ })
+ }
+ }
+}
+
+fn build_text_payload_document(payload: &Payload) -> Result {
+ match payload {
+ Payload::Text(text) | Payload::Proto(text) => Ok(PayloadDocument {
+ value: Value::String(text.clone()),
+ encoding: ENCODING_TEXT,
+ }),
+ _ => {
+ let bytes = payload.try_to_bytes()?;
+ let text = String::from_utf8(bytes)
+ .map_err(|e| Error::InvalidRecordValue(format!("Invalid UTF-8 payload: {e}")))?;
+ Ok(PayloadDocument {
+ value: Value::String(text),
+ encoding: ENCODING_TEXT,
+ })
+ }
+ }
+}
+
+fn build_base64_payload_document(payload: &Payload) -> Result {
+ let bytes = payload.try_to_bytes()?;
+ Ok(PayloadDocument {
+ value: Value::String(general_purpose::STANDARD.encode(bytes)),
+ encoding: ENCODING_BASE64,
+ })
+}
+
+fn encode_headers(
+ headers: &std::collections::BTreeMap,
+) -> Result {
+ let mut encoded = Map::new();
+
+ for (key, value) in headers {
+ let value = if let Ok(raw) = value.as_raw() {
+ json!({
+ "data": general_purpose::STANDARD.encode(raw),
+ "iggy_header_encoding": ENCODING_BASE64
+ })
+ } else {
+ Value::String(value.to_string_value())
+ };
+ encoded.insert(key.to_string_value(), value);
+ }
+
+ Ok(Value::Object(encoded))
+}
+
+fn build_record_id(
+ topic_metadata: &TopicMetadata,
+ messages_metadata: &MessagesMetadata,
+ message_id: u128,
+) -> String {
+ let mut id = String::with_capacity(
+ topic_metadata.stream.len() * 2 + topic_metadata.topic.len() * 2 + 48,
+ );
+ id.push('s');
+ push_hex_component(&mut id, topic_metadata.stream.as_bytes());
+ id.push_str("_t");
+ push_hex_component(&mut id, topic_metadata.topic.as_bytes());
+ id.push_str("_p");
+ id.push_str(&messages_metadata.partition_id.to_string());
+ id.push_str("_m");
+ id.push_str(&format!("{message_id:032x}"));
+ id
+}
+
+fn push_hex_component(out: &mut String, bytes: &[u8]) {
+ const HEX: &[u8; 16] = b"0123456789abcdef";
+
+ for byte in bytes {
+ out.push(HEX[(byte >> 4) as usize] as char);
+ out.push(HEX[(byte & 0x0f) as usize] as char);
+ }
+}
+
+fn validate_identifier(field: &str, value: &str) -> Result<(), Error> {
+ let mut chars = value.chars();
+ let Some(first) = chars.next() else {
+ return Err(Error::InvalidConfigValue(format!(
+ "SurrealDB {field} cannot be empty"
+ )));
+ };
+
+ if !(first == '_' || first.is_ascii_alphabetic()) {
+ return Err(Error::InvalidConfigValue(format!(
+ "SurrealDB {field} must start with an ASCII letter or underscore"
+ )));
+ }
+
+ if chars.any(|ch| !(ch == '_' || ch.is_ascii_alphanumeric())) {
+ return Err(Error::InvalidConfigValue(format!(
+ "SurrealDB {field} must contain only ASCII letters, digits, and underscores"
+ )));
+ }
+
+ Ok(())
+}
+
+fn is_transient_error(error: &surrealdb::Error) -> bool {
+ let message = error.to_string().to_ascii_lowercase();
+ message.contains("timeout")
+ || message.contains("timed out")
+ || message.contains("connection")
+ || message.contains("network")
+ || message.contains("websocket")
+ || message.contains("channel")
+ || message.contains("broken pipe")
+ || message.contains("reset by peer")
+ || message.contains("temporarily unavailable")
+ || message.contains("service unavailable")
+}
+
+fn redact_endpoint(endpoint: &str) -> String {
+ if let Some((_, host)) = endpoint.rsplit_once('@') {
+ return format!("***@{}", host);
+ }
+
+ endpoint.to_string()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use iggy_common::{HeaderKey, HeaderValue};
+ use iggy_connector_sdk::Schema;
+ use std::collections::BTreeMap;
+ use std::str::FromStr;
+
+ fn test_config() -> SurrealDbSinkConfig {
+ SurrealDbSinkConfig {
+ endpoint: "127.0.0.1:8000".to_string(),
+ namespace: "iggy".to_string(),
+ database: "connectors".to_string(),
+ table: "iggy_messages".to_string(),
+ username: Some("root".to_string()),
+ password: Some(SecretString::from("root")),
+ auth_scope: None,
+ use_tls: None,
+ auto_define_table: None,
+ define_indexes: None,
+ batch_size: None,
+ payload_format: None,
+ include_metadata: None,
+ include_headers: None,
+ include_checksum: None,
+ include_origin_timestamp: None,
+ query_timeout: None,
+ max_retries: None,
+ retry_delay: None,
+ max_retry_delay: None,
+ verbose_logging: None,
+ }
+ }
+
+ fn test_topic_metadata() -> TopicMetadata {
+ TopicMetadata {
+ stream: "test_stream".to_string(),
+ topic: "test_topic".to_string(),
+ }
+ }
+
+ fn test_messages_metadata() -> MessagesMetadata {
+ MessagesMetadata {
+ partition_id: 7,
+ current_offset: 0,
+ schema: Schema::Json,
+ }
+ }
+
+ fn test_message(payload: Payload) -> ConsumedMessage {
+ ConsumedMessage {
+ id: 42,
+ offset: 9,
+ checksum: 123,
+ timestamp: 1_700_000_000_000_000,
+ origin_timestamp: 1_700_000_000_000_001,
+ headers: None,
+ payload,
+ }
+ }
+
+ fn json_payload(value: serde_json::Value) -> Payload {
+ let mut bytes = serde_json::to_vec(&value).expect("Failed to serialize JSON");
+ Payload::Json(simd_json::to_owned_value(&mut bytes).expect("Failed to parse JSON"))
+ }
+
+ #[test]
+ fn given_default_config_should_apply_expected_runtime_values() {
+ let sink = SurrealDbSink::new(1, test_config());
+
+ assert_eq!(sink.batch_size, DEFAULT_BATCH_SIZE);
+ assert_eq!(sink.auth_scope, AuthScope::Root);
+ assert_eq!(sink.payload_format, PayloadFormat::Auto);
+ assert_eq!(sink.query_timeout, Duration::from_secs(30));
+ assert_eq!(sink.max_retries, DEFAULT_MAX_RETRIES);
+ assert_eq!(sink.retry_delay, Duration::from_millis(100));
+ assert_eq!(sink.max_retry_delay, Duration::from_secs(5));
+ assert!(sink.include_metadata);
+ assert!(sink.include_headers);
+ assert!(sink.include_checksum);
+ assert!(sink.include_origin_timestamp);
+ assert!(!sink.auto_define_table);
+ assert!(!sink.define_indexes);
+ }
+
+ #[test]
+ fn given_config_overrides_should_apply_expected_values() {
+ let mut config = test_config();
+ config.auth_scope = Some("database".to_string());
+ config.payload_format = Some("base64".to_string());
+ config.batch_size = Some(10);
+ config.query_timeout = Some("5s".to_string());
+ config.max_retries = Some(5);
+ config.retry_delay = Some("250ms".to_string());
+ config.max_retry_delay = Some("2s".to_string());
+ config.include_metadata = Some(false);
+ config.include_headers = Some(false);
+ config.include_checksum = Some(false);
+ config.include_origin_timestamp = Some(false);
+ config.auto_define_table = Some(true);
+ config.define_indexes = Some(true);
+ config.verbose_logging = Some(true);
+
+ let sink = SurrealDbSink::new(1, config);
+
+ assert_eq!(sink.auth_scope, AuthScope::Database);
+ assert_eq!(sink.payload_format, PayloadFormat::Base64);
+ assert_eq!(sink.batch_size, 10);
+ assert_eq!(sink.query_timeout, Duration::from_secs(5));
+ assert_eq!(sink.max_retries, 5);
+ assert_eq!(sink.retry_delay, Duration::from_millis(250));
+ assert_eq!(sink.max_retry_delay, Duration::from_secs(2));
+ assert!(!sink.include_metadata);
+ assert!(!sink.include_headers);
+ assert!(!sink.include_checksum);
+ assert!(!sink.include_origin_timestamp);
+ assert!(sink.auto_define_table);
+ assert!(sink.define_indexes);
+ assert!(sink.verbose);
+ }
+
+ #[test]
+ fn given_reversed_retry_delays_should_clamp_max_retry_delay() {
+ let mut config = test_config();
+ config.retry_delay = Some("5s".to_string());
+ config.max_retry_delay = Some("100ms".to_string());
+
+ let sink = SurrealDbSink::new(1, config);
+
+ assert_eq!(sink.retry_delay, Duration::from_secs(5));
+ assert_eq!(sink.max_retry_delay, Duration::from_secs(5));
+ }
+
+ #[test]
+ fn given_payload_format_inputs_should_map_expected_variant() {
+ let cases = [
+ (None, PayloadFormat::Auto),
+ (Some("auto"), PayloadFormat::Auto),
+ (Some("json"), PayloadFormat::Json),
+ (Some("text"), PayloadFormat::Text),
+ (Some("base64"), PayloadFormat::Base64),
+ (Some("binary"), PayloadFormat::Base64),
+ (Some("unknown"), PayloadFormat::Auto),
+ ];
+
+ for (input, expected) in cases {
+ assert_eq!(PayloadFormat::from_config(input), expected);
+ }
+ }
+
+ #[test]
+ fn given_auth_scope_inputs_should_map_expected_variant() {
+ let cases = [
+ (None, AuthScope::Root),
+ (Some("root"), AuthScope::Root),
+ (Some("namespace"), AuthScope::Namespace),
+ (Some("database"), AuthScope::Database),
+ (Some("none"), AuthScope::None),
+ (Some("unknown"), AuthScope::Root),
+ ];
+
+ for (input, expected) in cases {
+ assert_eq!(AuthScope::from_config(input), expected);
+ }
+ }
+
+ #[test]
+ fn given_identifier_values_should_validate_expected_shapes() {
+ assert!(validate_identifier("table", "iggy_messages").is_ok());
+ assert!(validate_identifier("table", "_messages9").is_ok());
+ assert!(validate_identifier("table", "").is_err());
+ assert!(validate_identifier("table", "9messages").is_err());
+ assert!(validate_identifier("table", "messages-name").is_err());
+ assert!(validate_identifier("table", "messages; DROP TABLE x").is_err());
+ }
+
+ #[test]
+ fn given_topic_metadata_should_build_deterministic_record_id() {
+ let id = build_record_id(&test_topic_metadata(), &test_messages_metadata(), 42);
+
+ assert_eq!(
+ id,
+ "s746573745f73747265616d_t746573745f746f706963_p7_m0000000000000000000000000000002a"
+ );
+ }
+
+ #[test]
+ fn given_table_name_should_build_bulk_insert_query() {
+ assert_eq!(
+ build_insert_query("iggy_messages"),
+ "INSERT IGNORE INTO iggy_messages $records RETURN NONE;"
+ );
+ }
+
+ #[test]
+ fn given_auto_payload_json_should_store_queryable_json() {
+ let payload = json_payload(json!({"name": "Alice", "active": true}));
+ let document = build_auto_payload_document(&payload).expect("Failed to build payload");
+
+ assert_eq!(document.encoding, ENCODING_JSON);
+ assert_eq!(document.value, json!({"name": "Alice", "active": true}));
+ }
+
+ #[test]
+ fn given_auto_payload_text_should_store_text() {
+ let payload = Payload::Text("hello".to_string());
+ let document = build_auto_payload_document(&payload).expect("Failed to build payload");
+
+ assert_eq!(document.encoding, ENCODING_TEXT);
+ assert_eq!(document.value, Value::String("hello".to_string()));
+ }
+
+ #[test]
+ fn given_auto_payload_raw_should_store_base64() {
+ let payload = Payload::Raw(vec![0, 1, 2, 255]);
+ let document = build_auto_payload_document(&payload).expect("Failed to build payload");
+
+ assert_eq!(document.encoding, ENCODING_BASE64);
+ assert_eq!(document.value, Value::String("AAEC/w==".to_string()));
+ }
+
+ #[test]
+ fn given_json_payload_format_should_parse_raw_json() {
+ let payload = Payload::Raw(br#"{"count":3}"#.to_vec());
+ let document = build_json_payload_document(&payload).expect("Failed to build payload");
+
+ assert_eq!(document.encoding, ENCODING_JSON);
+ assert_eq!(document.value, json!({"count": 3}));
+ }
+
+ #[test]
+ fn given_json_payload_format_when_invalid_should_fail() {
+ let payload = Payload::Raw(b"not-json".to_vec());
+ let result = build_json_payload_document(&payload);
+
+ assert!(matches!(result, Err(Error::InvalidRecordValue(_))));
+ }
+
+ #[test]
+ fn given_text_payload_format_when_invalid_utf8_should_fail() {
+ let payload = Payload::Raw(vec![0xff, 0xfe]);
+ let result = build_text_payload_document(&payload);
+
+ assert!(matches!(result, Err(Error::InvalidRecordValue(_))));
+ }
+
+ #[test]
+ fn given_headers_should_encode_raw_as_base64_and_values_as_strings() {
+ let mut headers = BTreeMap::new();
+ headers.insert(
+ HeaderKey::try_from("trace-id").expect("valid key"),
+ HeaderValue::from_str("abc").expect("valid value"),
+ );
+ headers.insert(
+ HeaderKey::try_from("binary").expect("valid key"),
+ HeaderValue::try_from(vec![1_u8, 2, 3]).expect("valid raw"),
+ );
+
+ let encoded = encode_headers(&headers).expect("Failed to encode headers");
+
+ assert_eq!(
+ encoded,
+ json!({
+ "binary": {
+ "data": "AQID",
+ "iggy_header_encoding": "base64"
+ },
+ "trace-id": "abc"
+ })
+ );
+ }
+
+ #[test]
+ fn given_message_should_build_full_record() {
+ let mut message = test_message(json_payload(json!({"event": "created"})));
+ let mut headers = BTreeMap::new();
+ headers.insert(
+ HeaderKey::try_from("source").expect("valid key"),
+ HeaderValue::from_str("unit-test").expect("valid value"),
+ );
+ message.headers = Some(headers);
+
+ let sink = SurrealDbSink::new(1, test_config());
+ let record = sink
+ .build_record(&test_topic_metadata(), &test_messages_metadata(), &message)
+ .expect("Failed to build record");
+ let object = record.as_object().expect("record should be object");
+
+ assert_eq!(
+ object.get("id"),
+ Some(&Value::String(
+ "s746573745f73747265616d_t746573745f746f706963_p7_m0000000000000000000000000000002a"
+ .to_string()
+ ))
+ );
+ assert_eq!(object.get("iggy_message_id"), Some(&json!("42")));
+ assert_eq!(object.get("iggy_stream"), Some(&json!("test_stream")));
+ assert_eq!(object.get("iggy_topic"), Some(&json!("test_topic")));
+ assert_eq!(object.get("iggy_partition_id"), Some(&json!(7)));
+ assert_eq!(object.get("iggy_offset"), Some(&json!(9)));
+ assert_eq!(object.get("iggy_checksum"), Some(&json!(123)));
+ assert_eq!(object.get("payload"), Some(&json!({"event": "created"})));
+ assert_eq!(object.get("payload_encoding"), Some(&json!("json")));
+ assert!(object.contains_key("iggy_headers"));
+ }
+
+ #[test]
+ fn given_metadata_disabled_should_build_minimal_record() {
+ let mut config = test_config();
+ config.include_metadata = Some(false);
+ config.include_headers = Some(false);
+ config.include_checksum = Some(false);
+ config.include_origin_timestamp = Some(false);
+ let sink = SurrealDbSink::new(1, config);
+ let message = test_message(Payload::Text("minimal".to_string()));
+
+ let record = sink
+ .build_record(&test_topic_metadata(), &test_messages_metadata(), &message)
+ .expect("Failed to build record");
+ let object = record.as_object().expect("record should be object");
+
+ assert!(object.contains_key("id"));
+ assert!(object.contains_key("iggy_message_id"));
+ assert!(object.contains_key("payload"));
+ assert!(!object.contains_key("iggy_stream"));
+ assert!(!object.contains_key("iggy_checksum"));
+ assert!(!object.contains_key("iggy_origin_timestamp"));
+ assert!(!object.contains_key("iggy_headers"));
+ }
+
+ #[test]
+ fn given_endpoint_with_credentials_should_redact_prefix() {
+ assert_eq!(
+ redact_endpoint("user:pass@127.0.0.1:8000"),
+ "***@127.0.0.1:8000"
+ );
+ assert_eq!(redact_endpoint("127.0.0.1:8000"), "127.0.0.1:8000");
+ }
+}
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 81ba70ed95..26b7fdd38c 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -82,6 +82,7 @@ serial_test = { workspace = true }
server = { workspace = true }
socket2 = { workspace = true }
sqlx = { workspace = true }
+surrealdb = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
test-case = { workspace = true }
diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs
index 0b2f264d03..bd4f70a1f4 100644
--- a/core/integration/tests/connectors/fixtures/mod.rs
+++ b/core/integration/tests/connectors/fixtures/mod.rs
@@ -26,6 +26,7 @@ mod influxdb;
mod mongodb;
mod postgres;
mod quickwit;
+mod surrealdb;
mod wiremock;
/// Prefix on every test container name so `just clean-test-containers` reaps
@@ -72,4 +73,8 @@ pub use postgres::{
PostgresSourceJsonbFixture, PostgresSourceMarkFixture, PostgresSourceOps,
};
pub use quickwit::{QuickwitFixture, QuickwitOps, QuickwitPreCreatedFixture};
+pub use surrealdb::{
+ SurrealDbOps, SurrealDbSinkBatchFixture, SurrealDbSinkFixture, SurrealDbSinkJsonFixture,
+ SurrealDbSinkRawFixture,
+};
pub use wiremock::{WireMockDirectFixture, WireMockWrappedFixture};
diff --git a/core/integration/tests/connectors/fixtures/surrealdb/container.rs b/core/integration/tests/connectors/fixtures/surrealdb/container.rs
new file mode 100644
index 0000000000..864d32a6ac
--- /dev/null
+++ b/core/integration/tests/connectors/fixtures/surrealdb/container.rs
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::connectors::fixtures;
+use integration::harness::TestBinaryError;
+use std::time::Duration;
+use surrealdb::Surreal;
+use surrealdb::engine::remote::ws::{Client as WsClient, Ws};
+use surrealdb::opt::auth::Root;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, ImageExt};
+use tokio::time::sleep;
+use tracing::info;
+
+const SURREALDB_IMAGE: &str = "docker.io/surrealdb/surrealdb";
+const SURREALDB_TAG: &str = "v3.1.4";
+const SURREALDB_PORT: u16 = 8000;
+const SURREALDB_READY_MSG: &str = "Started web server on";
+const SURREALDB_BOOT_ATTEMPTS: usize = 120;
+const SURREALDB_BOOT_INTERVAL_MS: u64 = 250;
+
+pub(super) const DEFAULT_TEST_STREAM: &str = "test_stream";
+pub(super) const DEFAULT_TEST_TOPIC: &str = "test_topic";
+pub(super) const DEFAULT_NAMESPACE: &str = "iggy";
+pub(super) const DEFAULT_DATABASE: &str = "connectors";
+pub(super) const DEFAULT_TABLE: &str = "iggy_messages";
+pub(super) const ROOT_USERNAME: &str = "root";
+pub(super) const ROOT_PASSWORD: &str = "root";
+
+pub(super) const DEFAULT_POLL_ATTEMPTS: usize = 120;
+pub(super) const DEFAULT_POLL_INTERVAL_MS: u64 = 50;
+
+pub(super) const ENV_SINK_ENDPOINT: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_ENDPOINT";
+pub(super) const ENV_SINK_NAMESPACE: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_NAMESPACE";
+pub(super) const ENV_SINK_DATABASE: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_DATABASE";
+pub(super) const ENV_SINK_TABLE: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_TABLE";
+pub(super) const ENV_SINK_USERNAME: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_USERNAME";
+pub(super) const ENV_SINK_PASSWORD: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_PASSWORD";
+pub(super) const ENV_SINK_AUTH_SCOPE: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_AUTH_SCOPE";
+pub(super) const ENV_SINK_AUTO_DEFINE_TABLE: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_AUTO_DEFINE_TABLE";
+pub(super) const ENV_SINK_DEFINE_INDEXES: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_DEFINE_INDEXES";
+pub(super) const ENV_SINK_BATCH_SIZE: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_BATCH_SIZE";
+pub(super) const ENV_SINK_PAYLOAD_FORMAT: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_PLUGIN_CONFIG_PAYLOAD_FORMAT";
+pub(super) const ENV_SINK_STREAMS_0_STREAM: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_STREAMS_0_STREAM";
+pub(super) const ENV_SINK_STREAMS_0_TOPICS: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_STREAMS_0_TOPICS";
+pub(super) const ENV_SINK_STREAMS_0_SCHEMA: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_STREAMS_0_SCHEMA";
+pub(super) const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str =
+ "IGGY_CONNECTORS_SINK_SURREALDB_STREAMS_0_CONSUMER_GROUP";
+pub(super) const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_SURREALDB_PATH";
+
+pub type SurrealDbClient = Surreal;
+
+pub struct SurrealDbContainer {
+ #[allow(dead_code)]
+ container: ContainerAsync,
+ pub(super) endpoint: String,
+}
+
+impl SurrealDbContainer {
+ pub(super) async fn start() -> Result {
+ let container = GenericImage::new(SURREALDB_IMAGE, SURREALDB_TAG)
+ .with_exposed_port(SURREALDB_PORT.tcp())
+ .with_wait_for(WaitFor::message_on_stdout(SURREALDB_READY_MSG))
+ .with_mapped_port(0, SURREALDB_PORT.tcp())
+ .with_container_name(fixtures::unique_container_name("surrealdb"))
+ .with_cmd([
+ "start",
+ "--log",
+ "info",
+ "--user",
+ ROOT_USERNAME,
+ "--pass",
+ ROOT_PASSWORD,
+ "memory",
+ ])
+ .start()
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "SurrealDbContainer".to_string(),
+ message: format!("Failed to start container: {e}"),
+ })?;
+
+ let mapped_port = container
+ .ports()
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "SurrealDbContainer".to_string(),
+ message: format!("Failed to get ports: {e}"),
+ })?
+ .map_to_host_port_ipv4(SURREALDB_PORT)
+ .ok_or_else(|| TestBinaryError::FixtureSetup {
+ fixture_type: "SurrealDbContainer".to_string(),
+ message: "No mapping for SurrealDB port".to_string(),
+ })?;
+
+ let endpoint = format!("127.0.0.1:{mapped_port}");
+ let instance = Self {
+ container,
+ endpoint,
+ };
+ instance.wait_until_ready().await?;
+
+ info!("SurrealDB container available at {}", instance.endpoint);
+ Ok(instance)
+ }
+
+ pub async fn create_client(&self) -> Result {
+ let client = Surreal::new::(self.endpoint.as_str())
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "SurrealDbContainer".to_string(),
+ message: format!("Failed to connect to SurrealDB: {e}"),
+ })?;
+
+ client
+ .signin(Root {
+ username: ROOT_USERNAME.to_string(),
+ password: ROOT_PASSWORD.to_string(),
+ })
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "SurrealDbContainer".to_string(),
+ message: format!("Failed to authenticate with SurrealDB: {e}"),
+ })?;
+
+ client
+ .use_ns(DEFAULT_NAMESPACE)
+ .use_db(DEFAULT_DATABASE)
+ .await
+ .map_err(|e| TestBinaryError::FixtureSetup {
+ fixture_type: "SurrealDbContainer".to_string(),
+ message: format!("Failed to select namespace/database: {e}"),
+ })?;
+
+ Ok(client)
+ }
+
+ async fn wait_until_ready(&self) -> Result<(), TestBinaryError> {
+ for _ in 0..SURREALDB_BOOT_ATTEMPTS {
+ if let Ok(client) = self.create_client().await
+ && client.health().await.is_ok()
+ {
+ return Ok(());
+ }
+ sleep(Duration::from_millis(SURREALDB_BOOT_INTERVAL_MS)).await;
+ }
+
+ Err(TestBinaryError::FixtureSetup {
+ fixture_type: "SurrealDbContainer".to_string(),
+ message: "SurrealDB did not become ready".to_string(),
+ })
+ }
+}
+
+pub trait SurrealDbOps: Sync {
+ fn container(&self) -> &SurrealDbContainer;
+
+ fn create_client(
+ &self,
+ ) -> impl std::future::Future