diff --git a/.gitignore b/.gitignore index f0d8b6250b..9a65de36e9 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,5 @@ go.work core/bench/dashboard/frontend/dist LICENSE-binary **/LICENSE-binary +local_test_connectors +local_test_runtime.toml diff --git a/Cargo.lock b/Cargo.lock index 5c9905cab9..563f715027 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6947,6 +6947,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_mysql_source" +version = "0.1.0" +dependencies = [ + "async-trait", + "base64", + "chrono", + "dashmap", + "humantime", + "iggy_common", + "iggy_connector_sdk", + "secrecy", + "serde", + "serde_json", + "simd-json", + "sqlx", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "iggy_connector_postgres_sink" version = "0.4.1-edge.1" diff --git a/Cargo.toml b/Cargo.toml index 4d196d9632..902418cadd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/influxdb_source", + "core/connectors/sources/mysql_source", "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", @@ -303,7 +304,7 @@ tempfile = "3.27.0" terminal_size = { version = "0.4.4" } test-case = "3.3.1" testcontainers = { version = "0.27.3", features = ["reusable-containers"] } -testcontainers-modules = { version = "0.15.0", features = ["postgres", "http_wait"] } +testcontainers-modules = { version = "0.15.0", features = ["postgres", "mysql", "http_wait"] } thiserror = "2.0.18" tokio = { version = "1.52.3", features = ["full"] } tokio-rustls = "0.26.4" diff --git a/core/connectors/sources/mysql_source/Cargo.toml b/core/connectors/sources/mysql_source/Cargo.toml new file mode 100644 index 0000000000..7496e38674 --- /dev/null +++ b/core/connectors/sources/mysql_source/Cargo.toml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_mysql_source" +version = "0.1.0" +description = "Iggy MySQL source connector supporting table polling for message streaming platform" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "mysql", "polling"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "README.md" +publish = false + +[package.metadata.cargo-machete] +ignored = ["dashmap", "simd-json"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +chrono = { workspace = true } +dashmap = { workspace = true } +humantime = { workspace = true } +iggy_common = { workspace = true } +iggy_connector_sdk = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +sqlx = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/core/connectors/sources/mysql_source/README.md b/core/connectors/sources/mysql_source/README.md new file mode 100644 index 0000000000..b8028a44cd --- /dev/null +++ b/core/connectors/sources/mysql_source/README.md @@ -0,0 +1,382 @@ +# MySQL Source Connector + +The MySQL source connector fetches data from MySQL databases and streams it to Iggy topics. It supports incremental table polling with flexible payload extraction. + +> **Note:** Only polling mode is available. Binlog CDC support is planned for a future release. + +## Features + +- **Table Polling**: Incrementally fetch data from MySQL tables using a tracking column +- **Flexible Payload Extraction**: Extract BLOB, TEXT, or JSON columns directly as payload +- **Custom Queries**: Use custom SQL queries with parameter substitution +- **Delete After Read**: Automatically delete rows after processing +- **Mark as Processed**: Mark rows as processed using a boolean column +- **Multiple Tables**: Monitor multiple tables simultaneously +- **Batch Processing**: Fetch data in configurable batch sizes +- **Offset Tracking**: Keep track of processed records to avoid duplicates + +## Configuration + +```toml +[plugin_config] +connection_string = "mysql://user:pass@localhost:3306/database" +tables = ["users", "orders"] +poll_interval = "1s" +batch_size = 1000 +tracking_column = "id" +initial_offset = "0" +max_connections = 10 +snake_case_columns = false +include_metadata = true + +# Payload extraction (optional) +payload_column = "payload" +payload_format = "bytea" + +# Delete/mark processed (optional) +delete_after_read = false +processed_column = "is_processed" +primary_key_column = "id" + +# Custom query (optional) +custom_query = "SELECT * FROM $table WHERE id > $offset ORDER BY id LIMIT $limit" +``` + +## Configuration Options + +| Option | Type | Default | Description | +| ------ | ---- | ------- | ----------- | +| `connection_string` | string | required | MySQL connection string (`mysql://user:pass@host:3306/db`) | +| `tables` | array | required | List of tables to monitor | +| `poll_interval` | string | `10s` | How often to poll (e.g., `1s`, `5m`) | +| `batch_size` | u32 | `1000` | Max rows per poll | +| `tracking_column` | string | `id` | Column for incremental updates | +| `initial_offset` | string | none | Starting value for tracking column | +| `max_connections` | u32 | `10` | Max database connections | +| `snake_case_columns` | bool | `false` | Convert column names to snake_case | +| `include_metadata` | bool | `true` | Wrap results with metadata envelope | +| `payload_column` | string | none | Column to extract directly as payload | +| `payload_format` | string | `bytea` | Format of payload_column: `bytea`, `text`, or `json_direct` | +| `delete_after_read` | bool | `false` | Delete rows after reading | +| `processed_column` | string | none | Boolean column to mark as processed | +| `primary_key_column` | string | tracking_column | PK for delete/mark operations | +| `custom_query` | string | none | Custom SQL with parameter substitution | +| `verbose_logging` | bool | `false` | Log at info level instead of debug | +| `max_retries` | u32 | `3` | Max retry attempts for transient errors | +| `retry_delay` | string | `1s` | Base delay between retries (e.g., `500ms`, `2s`) | + +## Output Modes + +### JSON Mode (Default) + +When `payload_column` is not set, each row is wrapped in a `DatabaseRecord` JSON structure: + +```json +{ + "table_name": "users", + "operation_type": "SELECT", + "timestamp": "2024-01-15T10:30:00Z", + "data": { + "id": 123, + "name": "John Doe", + "email": "john@example.com" + }, + "old_data": null +} +``` + +The stream config should use `schema = "json"`. + +When `include_metadata = false`, the envelope is omitted and the row columns are serialized directly: + +```json +{ + "id": 123, + "name": "John Doe", + "email": "john@example.com" +} +``` + +### Payload Column Extraction + +When `payload_column` is set, the connector extracts that column directly as the Iggy message payload. The `payload_format` option determines how the column is read: + +| Format | Column Type | Schema | Description | +| ------ | ----------- | ------ | ----------- | +| `bytea` / `raw` | `BLOB`, `BINARY`, `VARBINARY` | `raw` | Raw bytes passthrough | +| `text` | `TEXT`, `VARCHAR` | `text` | UTF-8 text | +| `json_direct` / `jsonb` | `JSON` | `json` | JSON object serialized to bytes | + +## Payload Format Examples + +### BLOB (Raw Bytes) + +Extract raw bytes from a BLOB column: + +```sql +CREATE TABLE message_queue ( + id INT AUTO_INCREMENT PRIMARY KEY, + payload BLOB NOT NULL +); +``` + +```toml +[[streams]] +stream = "messages" +topic = "queue" +schema = "raw" +batch_length = 100 + +[plugin_config] +tables = ["message_queue"] +tracking_column = "id" +payload_column = "payload" +payload_format = "bytea" +``` + +### TEXT + +Extract text from a TEXT column: + +```sql +CREATE TABLE logs ( + id INT AUTO_INCREMENT PRIMARY KEY, + message TEXT NOT NULL +); +``` + +```toml +[[streams]] +stream = "logs" +topic = "app_logs" +schema = "text" +batch_length = 100 + +[plugin_config] +tables = ["logs"] +tracking_column = "id" +payload_column = "message" +payload_format = "text" +``` + +### JSON (Direct) + +Extract a JSON column directly as JSON payload (without `DatabaseRecord` wrapper): + +```sql +CREATE TABLE events ( + id INT AUTO_INCREMENT PRIMARY KEY, + data JSON NOT NULL +); +``` + +```toml +[[streams]] +stream = "events" +topic = "user_events" +schema = "json" +batch_length = 100 + +[plugin_config] +tables = ["events"] +tracking_column = "id" +payload_column = "data" +payload_format = "json_direct" +``` + +## Custom Query Parameters + +When using `custom_query`, these placeholders are available: + +| Placeholder | Replaced With | +| ----------- | ------------- | +| `$table` | Current table name | +| `$offset` | Last processed offset (or `initial_offset`) | +| `$limit` | `batch_size` value | +| `$now` | Current UTC timestamp (RFC3339) | +| `$now_unix` | Current Unix timestamp (seconds) | + +Example: + +```sql +SELECT * FROM $table +WHERE created_at > '$offset' + AND (scheduled_at IS NULL OR scheduled_at <= '$now') +ORDER BY created_at +LIMIT $limit +``` + +## Delete After Read / Mark as Processed + +### Delete After Read + +Deletes rows from the source table after successful processing: + +```toml +[plugin_config] +delete_after_read = true +primary_key_column = "id" +``` + +### Mark as Processed + +Updates a boolean column instead of deleting: + +```toml +[plugin_config] +processed_column = "is_processed" +primary_key_column = "id" +``` + +Your table needs the boolean column: + +```sql +ALTER TABLE users ADD COLUMN is_processed BOOLEAN DEFAULT false; +``` + +When `processed_column` is set, the connector automatically adds a `WHERE is_processed = FALSE` filter to the polling query, so only unprocessed rows are fetched. + +## Supported Column Types + +The connector handles these MySQL types in JSON mode: + +| MySQL Type | JSON Output | +| ---------- | ----------- | +| `BOOLEAN` | boolean | +| `TINYINT` | number (i8 → i64) | +| `SMALLINT` | number (i16 → i64) | +| `MEDIUMINT`, `INT` | number (i32 → i64) | +| `BIGINT` | number (i64) | +| `TINYINT UNSIGNED` | number (u8 → u64) | +| `SMALLINT UNSIGNED` | number (u16 → u64) | +| `MEDIUMINT UNSIGNED`, `INT UNSIGNED` | number (u32 → u64) | +| `BIGINT UNSIGNED` | number (u64) | +| `FLOAT` | number (f32 → f64) | +| `DOUBLE` | number (f64) | +| `DECIMAL` | number (parsed as f64, precision may be lost) | +| `BIT` | number (u64) | +| `YEAR` | number (u16 → u64) | +| `DATE` | string (YYYY-MM-DD) | +| `TIME` | string (HH:MM:SS) | +| `DATETIME`, `TIMESTAMP` | string (YYYY-MM-DD HH:MM:SS) | +| `CHAR`, `VARCHAR`, `TINYTEXT`, `TEXT`, `MEDIUMTEXT`, `LONGTEXT` | string | +| `ENUM`, `SET` | string | +| `BINARY`, `VARBINARY`, `TINYBLOB`, `BLOB`, `MEDIUMBLOB`, `LONGBLOB` | base64 string | +| `GEOMETRY` | base64 string | +| `JSON` | object | +| `NULL` | null | +| Unknown | string (text fallback), or base64 string (binary fallback) | + +> **DECIMAL precision:** MySQL `DECIMAL` is an arbitrary-precision decimal type. The connector reads it as a string then parses it as `f64`, which introduces floating-point rounding for high-precision values. If exactness is required, use a custom query to cast the column to `CHAR` and handle it as a string in the consumer. + +## Reliability Features + +### Automatic Retries + +The connector automatically retries transient database errors with linear backoff (`retry_delay * attempt_number`). Transient errors include: + +| MySQL Error | Meaning | +| ----------- | ------- | +| `1213` | Deadlock, retry candidate | +| `1205` | Lock wait timeout | +| `1053` | Server shutdown in progress | +| `1152` | Connection aborted during handshake | +| `2006` | Server has gone away | +| `2013` | Lost connection during query | +| `1040` | Too many connections | +| `1041` | Out of memory | + +Non-transient errors (syntax errors, constraint violations, missing tables) fail immediately without retry. + +Configure with `max_retries` (default: `3`) and `retry_delay` (default: `1s`). + +### SQL Injection Protection + +All table names and column names are quoted with MySQL backtick syntax. Single quotes in offset values are escaped (`'` → `''`). NUL bytes in identifiers are rejected. + +## Example Configs + +### Basic Polling (JSON Mode) + +```toml +[[streams]] +stream = "user_events" +topic = "users" +schema = "json" +batch_length = 100 + +[plugin_config] +connection_string = "mysql://user:pass@localhost:3306/mydb" +tables = ["users"] +poll_interval = "1s" +tracking_column = "updated_at" +``` + +### Raw Payload Passthrough + +```toml +[[streams]] +stream = "messages" +topic = "queue" +schema = "raw" +batch_length = 100 + +[plugin_config] +connection_string = "mysql://user:pass@localhost:3306/mydb" +tables = ["message_queue"] +poll_interval = "100ms" +tracking_column = "id" +payload_column = "payload" +payload_format = "bytea" +delete_after_read = true +``` + +### JSON Direct Extraction + +```toml +[[streams]] +stream = "events" +topic = "user_events" +schema = "json" +batch_length = 100 + +[plugin_config] +connection_string = "mysql://user:pass@localhost:3306/mydb" +tables = ["events"] +poll_interval = "1s" +tracking_column = "id" +payload_column = "data" +payload_format = "json_direct" +``` + +### Multiple Tables with Timestamp Tracking + +```toml +[[streams]] +stream = "audit" +topic = "changes" +schema = "json" +batch_length = 100 + +[plugin_config] +connection_string = "mysql://user:pass@localhost:3306/mydb" +tables = ["users", "orders", "products"] +poll_interval = "5s" +tracking_column = "updated_at" +initial_offset = "2024-01-01 00:00:00" +include_metadata = true +``` + +### Flat-Schema Sinks (Iceberg, Delta) + +In the default JSON mode, each row is wrapped in a `DatabaseRecord` envelope. Sinks like Iceberg and Delta expect flat JSON matching the target schema, so the envelope must be unwrapped. + +**Option A — use the `unwrap_envelope` transform** on the sink side to extract the `data` field: + +```toml +[transforms.unwrap_envelope] +enabled = true +field = "data" +``` + +**Option B — bypass the envelope** by setting `include_metadata = false` on the source, which serializes columns directly without the wrapper. diff --git a/core/connectors/sources/mysql_source/config.toml b/core/connectors/sources/mysql_source/config.toml new file mode 100644 index 0000000000..e180fa47f2 --- /dev/null +++ b/core/connectors/sources/mysql_source/config.toml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "source" +key = "mysql" +enabled = true +version = 0 +name = "MySQL source" +path = "../../target/release/libiggy_connector_mysql_source" +verbose = false + +[[streams]] +stream = "user_events" +topic = "users" +schema = "json" +batch_length = 100 + +[plugin_config] +connection_string = "mysql://user:pass@localhost:3306/database" +tables = ["users", "orders"] +poll_interval = "1s" +batch_size = 1000 +tracking_column = "id" +initial_offset = "0" +max_connections = 10 +snake_case_columns = false +include_metadata = true +# payload_column = "payload" # read raw payload from this column instead of serializing all columns +# payload_format = "json_direct" # required when payload_column is set: bytea | text | json_direct +# primary_key_column = "id" # PK used for delete_after_read / processed_column (defaults to tracking_column) +# delete_after_read = false # delete rows after successfully producing them +# processed_column = "is_processed" # alternative to delete: mark rows with TRUE instead of deleting diff --git a/core/connectors/sources/mysql_source/src/lib.rs b/core/connectors/sources/mysql_source/src/lib.rs new file mode 100644 index 0000000000..6cd17c96b9 --- /dev/null +++ b/core/connectors/sources/mysql_source/src/lib.rs @@ -0,0 +1,1358 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use base64::Engine; +use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; +use humantime::Duration as HumanDuration; +use iggy_common::{DateTime, Utc}; +use iggy_connector_sdk::{ + ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, +}; +use secrecy::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; +use sqlx::mysql::{MySqlDatabaseError, MySqlRow}; +use sqlx::{Column, MySql, Pool, Row, TypeInfo, mysql::MySqlPoolOptions}; +use std::collections::HashMap; +use std::str::FromStr; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +source_connector!(MySqlSource); + +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "1s"; + +#[derive(Debug)] +pub struct MySqlSource { + pub id: u32, + pool: Option>, + config: MySqlSourceConfig, + state: Mutex, + verbose: bool, + retry_delay: Duration, + poll_interval: Duration, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MySqlSourceConfig { + #[serde(serialize_with = "iggy_common::serde_secret::serialize_secret")] + pub connection_string: SecretString, + pub tables: Vec, + pub poll_interval: Option, + pub batch_size: Option, + pub tracking_column: Option, + pub initial_offset: Option, + pub max_connections: Option, + pub custom_query: Option, + pub snake_case_columns: Option, + pub include_metadata: Option, + pub delete_after_read: Option, + pub processed_column: Option, + pub primary_key_column: Option, + pub payload_column: Option, + pub payload_format: Option, + pub verbose_logging: Option, + pub max_retries: Option, + pub retry_delay: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum PayloadFormat { + #[default] + Json, + Bytea, + Text, + JsonDirect, +} + +struct ProcessedRow { + message: ProducedMessage, + max_offset: Option, + row_pk: Option, +} + +impl PayloadFormat { + fn from_config(s: Option<&str>) -> Self { + match s.map(|s| s.to_lowercase()).as_deref() { + Some("bytea") | Some("raw") => PayloadFormat::Bytea, + Some("text") => PayloadFormat::Text, + Some("json_direct") | Some("jsonb") | Some("jsonb_direct") => PayloadFormat::JsonDirect, + _ => PayloadFormat::Json, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DatabaseRecord { + pub table_name: String, + pub operation_type: String, + pub timestamp: DateTime, + pub data: serde_json::Value, + pub old_data: Option, +} + +#[derive(Clone, Copy)] +struct RowProcessingConfig<'a> { + table: &'a str, + tracking_column: &'a str, + pk_column: &'a str, + payload_format: PayloadFormat, + payload_col: &'a str, + snake_case_columns: bool, + include_metadata: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +struct State { + last_poll_time: DateTime, + tracking_offsets: HashMap, + processed_rows: u64, +} + +const CONNECTOR_NAME: &str = "MySQL source"; + +#[async_trait] +impl Source for MySqlSource { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening MySQL source connector with ID: {}, Tables: {:?}", + self.id, self.config.tables + ); + + if let Some(ref col) = self.config.payload_column + && !col.is_empty() + && PayloadFormat::from_config(self.config.payload_format.as_deref()) + == PayloadFormat::Json + { + return Err(Error::InitError( + "payload_format must be 'bytea', 'text', or 'json_direct' when payload_column is set" + .to_string(), + )); + } + self.connect().await?; + + info!( + "MySQL source connector with ID: {} opened successfully", + self.id + ); + Ok(()) + } + + async fn poll(&self) -> Result { + let poll_interval = self.poll_interval; + tokio::time::sleep(poll_interval).await; + + let messages = self.poll_tables().await?; + + let state = self.state.lock().await; + if self.verbose { + info!( + "MySQL source connector ID: {} produced {} messages. Total processed: {}", + self.id, + messages.len(), + state.processed_rows + ); + } else { + debug!( + "MySQL source connector ID: {} produced {} messages. Total processed: {}", + self.id, + messages.len(), + state.processed_rows + ); + } + + let schema = match self.payload_format() { + PayloadFormat::Bytea => Schema::Raw, + PayloadFormat::Text => Schema::Text, + PayloadFormat::JsonDirect | PayloadFormat::Json => Schema::Json, + }; + + let persisted_state = self.serialize_state(&state); + + Ok(ProducedMessages { + schema, + messages, + state: persisted_state, + }) + } + + async fn close(&mut self) -> Result<(), Error> { + if let Some(pool) = self.pool.take() { + pool.close().await; + info!("MySQL connection pool closed for connector ID: {}", self.id); + } + + let state = self.state.lock().await; + info!( + "MySQL source connector ID: {} closed. Total rows processed: {}", + self.id, state.processed_rows + ); + Ok(()) + } +} + +impl MySqlSource { + pub fn new(id: u32, config: MySqlSourceConfig, state: Option) -> Self { + let verbose = config.verbose_logging.unwrap_or(false); + let restored_state = state + .and_then(|s| s.deserialize::(CONNECTOR_NAME, id)) + .inspect(|s| { + info!( + "Restored state for {CONNECTOR_NAME} connector with ID: {id}. \ + Tracking offsets: {:?}, processed rows: {}", + s.tracking_offsets, s.processed_rows + ); + }); + + let delay_str = config.retry_delay.as_deref().unwrap_or(DEFAULT_RETRY_DELAY); + let retry_delay = HumanDuration::from_str(delay_str) + .map(|duration| duration.into()) + .unwrap_or_else(|_| Duration::from_secs(1)); + let interval_str = config.poll_interval.as_deref().unwrap_or("10s"); + let poll_interval = HumanDuration::from_str(interval_str) + .map(|duration| duration.into()) + .unwrap_or_else(|_| Duration::from_secs(10)); + MySqlSource { + id, + pool: None, + config, + state: Mutex::new(restored_state.unwrap_or(State { + last_poll_time: Utc::now(), + tracking_offsets: HashMap::new(), + processed_rows: 0, + })), + verbose, + retry_delay, + poll_interval, + } + } + + async fn connect(&mut self) -> Result<(), Error> { + let max_connections = self.config.max_connections.unwrap_or(10); + let redacted = redact_connection_string(self.config.connection_string.expose_secret()); + + info!("Connecting to MySQL with max {max_connections} connections: {redacted}"); + + let pool = MySqlPoolOptions::new() + .max_connections(max_connections) + .connect(self.config.connection_string.expose_secret()) + .await + .map_err(|e| Error::InitError(format!("Failed to connect to MySQL: {e}")))?; + + sqlx::query("SELECT 1") + .execute(&pool) + .await + .map_err(|e| Error::InitError(format!("Database connectivity test failed: {e}")))?; + + self.pool = Some(pool); + info!("Connected to MySQL database with {max_connections} max connections"); + Ok(()) + } + + fn payload_format(&self) -> PayloadFormat { + if let Some(ref payload_col) = self.config.payload_column + && !payload_col.is_empty() + { + return PayloadFormat::from_config(self.config.payload_format.as_deref()); + } + PayloadFormat::Json + } + + fn serialize_state(&self, state: &State) -> Option { + ConnectorState::serialize(state, CONNECTOR_NAME, self.id) + } + + fn get_pool(&self) -> Result<&Pool, Error> { + self.pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string())) + } + + fn extract_payload_column( + &self, + row: &MySqlRow, + column_index: usize, + format: PayloadFormat, + ) -> Result, Error> { + match format { + PayloadFormat::Bytea => { + let bytes: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(bytes.unwrap_or_default()) + } + PayloadFormat::Text => { + let text: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(text.unwrap_or_default().into_bytes()) + } + PayloadFormat::JsonDirect => { + let json_value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + simd_json::to_vec(&json_value.unwrap_or(serde_json::Value::Null)) + .map_err(|_| Error::InvalidRecord) + } + PayloadFormat::Json => Err(Error::InvalidConfig), // unreachable! if payload_column is there then payload_format can never be json + } + } + + fn substitute_query_params( + &self, + query: &str, + table: &str, + last_offset: &Option, + batch_size: u32, + ) -> String { + let offset_value = last_offset + .clone() + .or_else(|| self.config.initial_offset.clone()) + .unwrap_or_default(); + + let now = Utc::now(); + + query + .replace("$table", table) + .replace("$offset", &offset_value) + .replace("$limit", &batch_size.to_string()) + .replace("$now", &now.to_rfc3339()) + .replace("$now_unix", &now.timestamp().to_string()) + } + + fn validate_custom_query(&self, query: &str) -> Result<(), Error> { + let query_upper = query.to_uppercase(); + if !query_upper.contains("SELECT") { + warn!("Custom query should contain SELECT statement"); + } + if query.contains("$table") && self.config.tables.is_empty() { + return Err(Error::InvalidConfig); + } + Ok(()) + } + + fn build_polling_query( + &self, + table: &str, + tracking_column: &str, + last_offset: &Option, + batch_size: u32, + ) -> Result { + let quoted_table = quote_qualified_identifier(table)?; + let quoted_tracking = quote_identifier(tracking_column)?; + + let base_query = format!("SELECT * FROM {quoted_table}"); + + let mut conditions = Vec::new(); + + if let Some(offset) = last_offset { + conditions.push(format!( + "{quoted_tracking} > {}", + format_offset_value(offset) + )); + } else if let Some(initial) = &self.config.initial_offset { + conditions.push(format!( + "{quoted_tracking} > {}", + format_offset_value(initial) + )); + } + + if let Some(processed_col) = &self.config.processed_column { + let quoted_processed = quote_identifier(processed_col)?; + conditions.push(format!("{quoted_processed} = FALSE")); + } + + let where_clause = if conditions.is_empty() { + String::new() + } else { + format!(" WHERE {}", conditions.join(" AND ")) + }; + + let order_clause = format!(" ORDER BY {quoted_tracking} ASC"); + let limit_clause = format!(" LIMIT {batch_size}"); + + Ok(format!( + "{base_query}{where_clause}{order_clause}{limit_clause}" + )) + } + + fn get_max_retries(&self) -> u32 { + self.config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES) + } + + async fn mark_or_delete_processed_rows( + &self, + pool: &Pool, + table: &str, + pk_column: &str, + ids: &[String], + ) -> Result<(), Error> { + if ids.is_empty() { + return Ok(()); + } + + let quoted_table = quote_qualified_identifier(table)?; + let quoted_pk = quote_identifier(pk_column)?; + + let ids_list = ids + .iter() + .map(|id| { + if id.parse::().is_ok() { + id.clone() + } else { + format!( + "'{}'", + id.replace('\\', "\\\\") + .replace('\'', "''") + .replace('\0', "") + ) + } + }) + .collect::>() + .join(", "); + + if self.config.delete_after_read.unwrap_or(false) { + let delete_query = + format!("DELETE FROM {quoted_table} WHERE {quoted_pk} IN ({ids_list})"); + + if self.verbose { + info!("Deleting {} processed rows from '{table}'", ids.len()); + } else { + debug!("Deleting {} processed rows from '{table}'", ids.len()); + } + + sqlx::query(sqlx::AssertSqlSafe(delete_query)) + .execute(pool) + .await + .map_err(|e| { + error!("Failed to delete processed rows: {e}"); + Error::InvalidRecord + })?; + } else if let Some(processed_col) = &self.config.processed_column { + let quoted_processed = quote_identifier(processed_col)?; + let update_query = format!( + "UPDATE {quoted_table} SET {quoted_processed} = TRUE WHERE {quoted_pk} IN ({ids_list})" + ); + + if self.verbose { + info!("Marking {} rows as processed in '{table}'", ids.len()); + } else { + debug!("Marking {} rows as processed in '{table}'", ids.len()); + } + + sqlx::query(sqlx::AssertSqlSafe(update_query)) + .execute(pool) + .await + .map_err(|e| { + error!("Failed to mark rows as processed: {e}"); + Error::InvalidRecord + })?; + } + + Ok(()) + } + + async fn poll_tables(&self) -> Result, Error> { + let pool = self.get_pool()?; + let mut messages = Vec::new(); + + let batch_size = self.config.batch_size.unwrap_or(1000); + let tracking_column = self.config.tracking_column.as_deref().unwrap_or("id"); + let pk_column = self + .config + .primary_key_column + .as_deref() + .unwrap_or(tracking_column); + + let row_config = RowProcessingConfig { + table: "", + tracking_column, + pk_column, + payload_format: self.payload_format(), + payload_col: self.config.payload_column.as_deref().unwrap_or(""), + snake_case_columns: self.config.snake_case_columns.unwrap_or(false), + include_metadata: self.config.include_metadata.unwrap_or(true), + }; + + // Collect state updates to apply after processing + let mut state_updates: Vec<(String, String)> = Vec::new(); + let mut total_processed: u64 = 0; + + for table in &self.config.tables { + let table_config = RowProcessingConfig { + table, + ..row_config + }; + + // Get last offset with minimal lock time + let last_offset = { + let state = self.state.lock().await; + state.tracking_offsets.get(table).cloned() + }; + + let query = if let Some(custom_query) = &self.config.custom_query { + self.validate_custom_query(custom_query)?; + self.substitute_query_params(custom_query, table, &last_offset, batch_size) + } else { + self.build_polling_query(table, tracking_column, &last_offset, batch_size)? + }; + + // Database I/O without holding the lock + let rows = with_retry( + || sqlx::query(sqlx::AssertSqlSafe(query.as_str())).fetch_all(pool), + self.get_max_retries(), + self.retry_delay.as_millis() as u64, + ) + .await?; + + let mut max_offset: Option = None; + let mut processed_ids: Vec = Vec::new(); + + let mut count_per_table = 0; + for row in rows { + let processed = self.process_row(&row, &table_config)?; + + if let Some(pk) = processed.row_pk { + processed_ids.push(pk); + } + if let Some(offset) = processed.max_offset { + max_offset = Some(offset); + } + + messages.push(processed.message); + count_per_table += 1; + total_processed += 1; + } + + // Database I/O without holding the lock + if !processed_ids.is_empty() { + self.mark_or_delete_processed_rows(pool, table, pk_column, &processed_ids) + .await?; + } + + // Collect offset update for later + if let Some(offset) = max_offset { + state_updates.push((table.clone(), offset)); + } + + if self.verbose { + info!("Fetched {} rows from table '{table}'", count_per_table); + } else { + debug!("Fetched {} rows from table '{table}'", count_per_table); + } + } + + // Apply all state updates with a single lock acquisition + { + let mut state = self.state.lock().await; + state.processed_rows += total_processed; + for (table, offset) in state_updates { + state.tracking_offsets.insert(table, offset); + } + state.last_poll_time = Utc::now(); + } + + Ok(messages) + } + + fn process_row( + &self, + row: &MySqlRow, + config: &RowProcessingConfig, + ) -> Result { + let mut row_pk: Option = None; + let mut max_offset: Option = None; + let mut extracted_payload: Option> = None; + + // Payload column set: only extract it plus tracking/pk columns. + // Avoids extract_column_value on every other column since the data map + // built below would be discarded anyway. + if !config.payload_col.is_empty() { + for (i, column) in row.columns().iter().enumerate() { + let name = column.name(); + if name == config.payload_col { + extracted_payload = + Some(self.extract_payload_column(row, i, config.payload_format)?); + } + if name == config.tracking_column { + max_offset = value_as_string(&extract_column_value(row, i)?); + } + if name == config.pk_column { + row_pk = value_as_string(&extract_column_value(row, i)?); + } + } + } + + if extracted_payload.is_none() { + let mut data = serde_json::Map::new(); + for (i, column) in row.columns().iter().enumerate() { + let name = column.name(); + let column_name = if config.snake_case_columns { + to_snake_case(name) + } else { + name.to_string() + }; + let value = extract_column_value(row, i)?; + if name == config.tracking_column { + max_offset = value_as_string(&value); + } + if name == config.pk_column { + row_pk = value_as_string(&value); + } + data.insert(column_name, value); + } + + extracted_payload = Some(if config.include_metadata { + let record = DatabaseRecord { + table_name: config.table.to_string(), + operation_type: "SELECT".to_string(), + timestamp: Utc::now(), + data: serde_json::Value::Object(data), + old_data: None, + }; + simd_json::to_vec(&record).map_err(|_| Error::InvalidRecord)? + } else { + simd_json::to_vec(&data).map_err(|_| Error::InvalidRecord)? + }); + } + + // Both paths above always assign extracted_payload before reaching here. + Ok(build_processed_row( + extracted_payload.unwrap(), + max_offset, + row_pk, + )) + } +} + +fn extract_column_value(row: &MySqlRow, column_index: usize) -> Result { + let column = &row.columns()[column_index]; + let type_name = column.type_info().name(); + + match type_name { + "BOOLEAN" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::Bool) + .unwrap_or(serde_json::Value::Null)) + } + "TINYINT" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as i64)) + .unwrap_or(serde_json::Value::Null)) + } + "SMALLINT" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as i64)) + .unwrap_or(serde_json::Value::Null)) + } + "MEDIUMINT" | "INT" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as i64)) + .unwrap_or(serde_json::Value::Null)) + } + "BIGINT" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::from) + .unwrap_or(serde_json::Value::Null)) + } + "TINYINT UNSIGNED" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as u64)) + .unwrap_or(serde_json::Value::Null)) + } + "SMALLINT UNSIGNED" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as u64)) + .unwrap_or(serde_json::Value::Null)) + } + "MEDIUMINT UNSIGNED" | "INT UNSIGNED" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as u64)) + .unwrap_or(serde_json::Value::Null)) + } + "BIGINT UNSIGNED" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::from) + .unwrap_or(serde_json::Value::Null)) + } + "FLOAT" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as f64)) + .unwrap_or(serde_json::Value::Null)) + } + "DOUBLE" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::from) + .unwrap_or(serde_json::Value::Null)) + } + "DECIMAL" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .and_then(|s| s.parse::().ok()) + .map(serde_json::Value::from) + .unwrap_or(serde_json::Value::Null)) + } + "BIT" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::from) + .unwrap_or(serde_json::Value::Null)) + } + "YEAR" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|v| serde_json::Value::from(v as u64)) + .unwrap_or(serde_json::Value::Null)) + } + "DATE" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|d| serde_json::Value::String(d.to_string())) + .unwrap_or(serde_json::Value::Null)) + } + "TIME" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|t| serde_json::Value::String(t.to_string())) + .unwrap_or(serde_json::Value::Null)) + } + "DATETIME" | "TIMESTAMP" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|dt| serde_json::Value::String(dt.to_string())) + .unwrap_or(serde_json::Value::Null)) + } + "CHAR" | "VARCHAR" | "TINYTEXT" | "TEXT" | "MEDIUMTEXT" | "LONGTEXT" | "ENUM" | "SET" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null)) + } + "BINARY" | "VARBINARY" | "TINYBLOB" | "BLOB" | "MEDIUMBLOB" | "LONGBLOB" | "GEOMETRY" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|bytes| { + serde_json::Value::String( + base64::engine::general_purpose::STANDARD.encode(&bytes), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "JSON" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value.unwrap_or(serde_json::Value::Null)) + } + "NULL" => Ok(serde_json::Value::Null), + _ => { + let column_name = column.name(); + warn!( + "Column '{column_name}' has unrecognized MySQL type '{type_name}', \ + attempting text extraction" + ); + if let Ok(text) = row.try_get::, _>(column_index) { + return Ok(text + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null)); + } + if let Ok(bytes) = row.try_get::>, _>(column_index) { + return Ok(bytes + .map(|b| { + serde_json::Value::String( + base64::engine::general_purpose::STANDARD.encode(&b), + ) + }) + .unwrap_or(serde_json::Value::Null)); + } + error!( + "Column '{column_name}' has unsupported MySQL type '{type_name}', \ + returning null" + ); + Ok(serde_json::Value::Null) + } + } +} + +fn value_as_string(value: &serde_json::Value) -> Option { + match value { + serde_json::Value::String(s) => Some(s.clone()), + serde_json::Value::Number(n) => Some(n.to_string()), + _ => None, + } +} + +fn build_processed_row( + payload: Vec, + max_offset: Option, + row_pk: Option, +) -> ProcessedRow { + let now = Utc::now().timestamp_millis() as u64; + ProcessedRow { + message: ProducedMessage { + id: Some(Uuid::new_v4().as_u128()), + headers: None, + checksum: None, + timestamp: Some(now), + origin_timestamp: Some(now), + payload, + }, + max_offset, + row_pk, + } +} + +fn to_snake_case(input: &str) -> String { + let mut result = String::new(); + let mut prev_was_uppercase = false; + for (i, ch) in input.chars().enumerate() { + if ch.is_uppercase() { + if i > 0 && !prev_was_uppercase { + result.push('_'); + } + if let Some(lc) = ch.to_lowercase().next() { + result.push(lc); + } else { + result.push(ch); + } + prev_was_uppercase = true; + } else { + result.push(ch); + prev_was_uppercase = false; + } + } + result +} + +fn redact_connection_string(conn_str: &str) -> String { + if let Some(scheme_end) = conn_str.find("://") { + let scheme = &conn_str[..scheme_end + 3]; + let rest = &conn_str[scheme_end + 3..]; + let preview: String = rest.chars().take(3).collect(); + return format!("{scheme}{preview}***"); + } + let preview: String = conn_str.chars().take(3).collect(); + format!("{preview}***") +} + +fn quote_identifier(name: &str) -> Result { + if name.is_empty() { + return Err(Error::InvalidConfigValue( + "identifier must not be empty".to_string(), + )); + } + if name.contains('\0') { + return Err(Error::InvalidConfigValue(format!( + "identifier '{name}' contains NUL byte" + ))); + } + let escaped = name.replace('`', "``"); + Ok(format!("`{escaped}`")) +} + +fn quote_qualified_identifier(name: &str) -> Result { + if !name.contains('.') { + return quote_identifier(name); + } + let parts: Result, _> = name.split('.').map(quote_identifier).collect(); + Ok(parts?.join(".")) +} + +fn format_offset_value(value: &str) -> String { + if value.parse::().is_ok() || value.parse::().is_ok() { + value.to_string() + } else { + let escaped = value + .replace('\\', "\\\\") + .replace('\'', "''") + .replace('\0', ""); + format!("'{escaped}'") + } +} + +fn is_transient_error(e: &sqlx::Error) -> bool { + match e { + sqlx::Error::Io(_) => true, + sqlx::Error::PoolTimedOut => true, + sqlx::Error::PoolClosed => false, + sqlx::Error::Protocol(_) => false, + // MySQL surfaces a numeric error code (e.g. 1213) and a SQLSTATE (e.g. "40001"). + // `DatabaseError::code()` returns the SQLSTATE, so matching it against MySQL error + // numbers never fires. Downcast to the driver error and compare `number()` instead. + sqlx::Error::Database(db_err) => db_err + .try_downcast_ref::() + .is_some_and(|mysql_err| { + matches!( + mysql_err.number(), + // concurrency + 1213 | 1205 | + // server unavailability + 1053 | 1152 | 1080 | + // connection/network + 2006 | 2013 | 1158 | 1159 | 1160 | 1161 | + // resource exhaustion + 1040 | 1041 + ) + }), + _ => false, + } +} + +async fn with_retry(operation: F, max_retries: u32, delay_ms: u64) -> Result +where + F: Fn() -> Fut, + Fut: std::future::Future>, +{ + let mut attempts = 0; + loop { + match operation().await { + Ok(result) => return Ok(result), + Err(e) => { + attempts += 1; + if attempts >= max_retries || !is_transient_error(&e) { + error!("Database operation failed after {attempts} attempts: {e}"); + return Err(Error::InvalidRecord); + } + warn!( + "Transient database error (attempt {attempts}/{max_retries}): {e}. Retrying in {delay_ms}ms..." + ); + tokio::time::sleep(Duration::from_millis(delay_ms * attempts as u64)).await; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Baseline polling config; individual tests override only the fields they exercise. + fn test_config() -> MySqlSourceConfig { + MySqlSourceConfig { + connection_string: SecretString::from("mysql://localhost/db"), + tables: vec!["users".to_string()], + poll_interval: Some("5s".to_string()), + batch_size: Some(500), + tracking_column: Some("id".to_string()), + initial_offset: None, + max_connections: None, + custom_query: None, + snake_case_columns: None, + include_metadata: None, + delete_after_read: None, + processed_column: None, + primary_key_column: None, + payload_column: None, + payload_format: None, + verbose_logging: None, + max_retries: None, + retry_delay: None, + } + } + + #[test] + fn given_persisted_state_should_restore_tracking_offsets() { + // A connector restarted with prior state must resume from the saved + // per-table offsets and processed-row count, not re-poll from scratch. + let state = State { + last_poll_time: Utc::now(), + tracking_offsets: HashMap::from([ + ("users".to_string(), "100".to_string()), + ("orders".to_string(), "2024-01-15T10:30:00Z".to_string()), + ]), + processed_rows: 500, + }; + let connector_state = + ConnectorState::serialize(&state, "test", 1).expect("Failed to serialize state"); + + let source = MySqlSource::new(1, test_config(), Some(connector_state)); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let restored = source.state.lock().await; + assert_eq!( + restored.tracking_offsets.get("users"), + Some(&"100".to_string()) + ); + assert_eq!( + restored.tracking_offsets.get("orders"), + Some(&"2024-01-15T10:30:00Z".to_string()) + ); + assert_eq!(restored.processed_rows, 500); + }); + } + + #[test] + fn given_no_state_should_start_fresh() { + // First-ever run (no persisted state) starts with empty offsets so the + // first poll picks up everything from the initial_offset / table start. + let source = MySqlSource::new(1, test_config(), None); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let state = source.state.lock().await; + assert!(state.tracking_offsets.is_empty()); + assert_eq!(state.processed_rows, 0); + }); + } + + #[test] + fn given_invalid_state_should_start_fresh() { + // Corrupt/unreadable persisted state must degrade to a fresh start + // rather than panicking and crash-looping the connector. + let invalid_state = ConnectorState(b"not valid msgpack".to_vec()); + let source = MySqlSource::new(1, test_config(), Some(invalid_state)); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let state = source.state.lock().await; + assert!(state.tracking_offsets.is_empty()); + assert_eq!(state.processed_rows, 0); + }); + } + + #[test] + fn state_should_be_serializable_and_deserializable() { + // The full State (timestamp + offsets + count) must survive a + // MessagePack round-trip unchanged, since that is what gets persisted. + let original = State { + last_poll_time: DateTime::parse_from_rfc3339("2024-01-15T10:30:00Z") + .unwrap() + .with_timezone(&Utc), + tracking_offsets: HashMap::from([("table1".to_string(), "42".to_string())]), + processed_rows: 1000, + }; + + let connector_state = + ConnectorState::serialize(&original, "test", 1).expect("Failed to serialize state"); + let deserialized: State = connector_state + .deserialize("test", 1) + .expect("Failed to deserialize state"); + + assert_eq!(original.last_poll_time, deserialized.last_poll_time); + assert_eq!(original.tracking_offsets, deserialized.tracking_offsets); + assert_eq!(original.processed_rows, deserialized.processed_rows); + } + + #[test] + fn given_last_offset_should_filter_order_and_limit() { + // With a known last offset, the query must fetch only newer rows, + // ordered ascending by the tracking column, capped at the batch size. + let source = MySqlSource::new(1, test_config(), None); + let query = source + .build_polling_query("users", "id", &Some("100".to_string()), 500) + .expect("Failed to build query"); + assert_eq!( + query, + "SELECT * FROM `users` WHERE `id` > 100 ORDER BY `id` ASC LIMIT 500" + ); + } + + #[test] + fn given_initial_offset_and_no_last_offset_should_use_initial() { + // On the first poll (no last offset yet) the configured initial_offset + // seeds the WHERE clause so we skip rows the operator wants ignored. + let mut config = test_config(); + config.initial_offset = Some("1000".to_string()); + let source = MySqlSource::new(1, config, None); + let query = source + .build_polling_query("users", "id", &None, 500) + .expect("Failed to build query"); + assert_eq!( + query, + "SELECT * FROM `users` WHERE `id` > 1000 ORDER BY `id` ASC LIMIT 500" + ); + } + + #[test] + fn given_no_offset_should_omit_where_clause() { + // No last offset and no initial_offset means "read from the beginning": + // no WHERE filter, but ordering + limit still bound the batch. + let source = MySqlSource::new(1, test_config(), None); + let query = source + .build_polling_query("users", "id", &None, 500) + .expect("Failed to build query"); + assert_eq!(query, "SELECT * FROM `users` ORDER BY `id` ASC LIMIT 500"); + } + + #[test] + fn given_processed_column_should_append_unprocessed_filter() { + // When a processed_column is configured, each poll must also exclude + // already-handled rows (`col` = FALSE) so they are not re-emitted. + let mut config = test_config(); + config.processed_column = Some("is_processed".to_string()); + let source = MySqlSource::new(1, config, None); + let query = source + .build_polling_query("events", "id", &None, 100) + .expect("Failed to build query"); + assert!(query.contains("`is_processed` = FALSE")); + } + + #[test] + fn given_offset_value_should_quote_only_non_numeric() { + // Numeric offsets are emitted bare (correct comparison + no cast), + // while string offsets (e.g. timestamps) must be single-quoted literals. + let source = MySqlSource::new(1, test_config(), None); + + let numeric = source + .build_polling_query("users", "id", &Some("42".to_string()), 100) + .expect("Failed to build query"); + assert!(numeric.contains("`id` > 42")); + assert!(!numeric.contains("'42'")); + + let string = source + .build_polling_query("users", "updated_at", &Some("2024-01-01".to_string()), 100) + .expect("Failed to build query"); + assert!(string.contains("`updated_at` > '2024-01-01'")); + } + + #[test] + fn given_qualified_table_should_backtick_each_segment() { + // A `db.table` target must quote each segment independently so the + // dot stays a schema separator, not part of a single quoted name. + let source = MySqlSource::new(1, test_config(), None); + let query = source + .build_polling_query("mydb.users", "id", &None, 100) + .expect("Failed to build query"); + assert!(query.contains("FROM `mydb`.`users`")); + } + + #[test] + fn given_custom_query_should_substitute_table_offset_limit() { + // Placeholders in an operator-provided query must be filled with the + // current table, resolved offset, and batch size before execution. + let source = MySqlSource::new(1, test_config(), None); + let query = "SELECT * FROM $table WHERE id > $offset ORDER BY id LIMIT $limit"; + let result = source.substitute_query_params(query, "events", &Some("100".to_string()), 50); + assert!(result.contains("FROM events")); + assert!(result.contains("id > 100")); + assert!(result.contains("LIMIT 50")); + } + + #[test] + fn given_custom_query_with_time_params_should_substitute_now() { + // Time placeholders must be expanded to a concrete timestamp so no + // literal `$now` reaches the database. + let source = MySqlSource::new(1, test_config(), None); + let query = "SELECT * FROM $table WHERE created_at < '$now' OR ts < $now_unix"; + let result = source.substitute_query_params(query, "logs", &None, 100); + assert!(result.contains("FROM logs")); + assert!(!result.contains("$now")); + assert!(!result.contains("$now_unix")); + } + + #[test] + fn given_no_last_offset_should_fall_back_to_initial_offset() { + // In the custom-query path too, a missing last offset must fall back to + // the configured initial_offset rather than substituting an empty value. + let mut config = test_config(); + config.initial_offset = Some("500".to_string()); + let source = MySqlSource::new(1, config, None); + let result = source.substitute_query_params( + "SELECT * FROM $table WHERE id > $offset", + "data", + &None, + 100, + ); + assert!(result.contains("id > 500")); + } + + #[test] + fn given_table_placeholder_and_no_tables_should_fail() { + // A $table placeholder with no configured tables can never resolve, + // so validation must reject it instead of querying a literal "$table". + let mut config = test_config(); + config.tables = vec![]; + let source = MySqlSource::new(1, config, None); + let result = source.validate_custom_query("SELECT * FROM $table"); + assert!(matches!(result, Err(Error::InvalidConfig))); + } + + #[test] + fn given_valid_custom_query_should_pass() { + // A well-formed SELECT with tables configured passes validation. + let source = MySqlSource::new(1, test_config(), None); + let result = source.validate_custom_query("SELECT * FROM $table WHERE id > $offset"); + assert!(result.is_ok()); + } + + #[test] + fn given_backtick_in_identifier_should_escape() { + // An embedded backtick must be doubled so it cannot terminate the + // quoted identifier and inject trailing SQL. + let result = quote_identifier("col`name").expect("Failed to quote"); + assert_eq!(result, "`col``name`"); + } + + #[test] + fn given_empty_or_nul_identifier_should_fail() { + // Empty names and NUL bytes are never valid identifiers and must be + // rejected rather than producing malformed/unsafe SQL. + assert!(quote_identifier("").is_err()); + assert!(quote_identifier("bad\0name").is_err()); + } + + #[test] + fn given_qualified_identifier_should_quote_each_segment_and_reject_empty() { + // Each segment of a db.table name is quoted independently; an empty + // segment (leading/trailing dot) is rejected. + let quoted = quote_qualified_identifier("mydb.users").expect("Failed to quote"); + assert_eq!(quoted, "`mydb`.`users`"); + assert!(quote_qualified_identifier("mydb.").is_err()); + assert!(quote_qualified_identifier(".users").is_err()); + } + + #[test] + fn given_string_offset_value_should_escape_sql_metacharacters() { + // A non-numeric offset is interpolated into the WHERE clause, so quotes + // and backslashes must be escaped to prevent breaking out of the literal. + assert_eq!(format_offset_value("O'Brien"), "'O''Brien'"); + assert_eq!(format_offset_value("a\\b"), "'a\\\\b'"); + assert_eq!(format_offset_value("42"), "42"); + } + + #[test] + fn given_payload_format_strings_should_map_to_variants() { + // Operator-facing aliases (and casing) must map to the right variant; + // unknown/missing values default to Json. + assert_eq!( + PayloadFormat::from_config(Some("bytea")), + PayloadFormat::Bytea + ); + assert_eq!( + PayloadFormat::from_config(Some("RAW")), + PayloadFormat::Bytea + ); + assert_eq!( + PayloadFormat::from_config(Some("text")), + PayloadFormat::Text + ); + assert_eq!( + PayloadFormat::from_config(Some("json_direct")), + PayloadFormat::JsonDirect + ); + assert_eq!( + PayloadFormat::from_config(Some("jsonb")), + PayloadFormat::JsonDirect + ); + assert_eq!( + PayloadFormat::from_config(Some("unknown")), + PayloadFormat::Json + ); + assert_eq!(PayloadFormat::from_config(None), PayloadFormat::Json); + } + + #[test] + fn given_empty_payload_column_should_force_json() { + // payload_format only takes effect when a payload_column is set; without + // one the source always builds the full JSON record regardless of config. + let mut config = test_config(); + config.payload_column = None; + config.payload_format = Some("bytea".to_string()); + let source = MySqlSource::new(1, config, None); + assert_eq!(source.payload_format(), PayloadFormat::Json); + + let mut config = test_config(); + config.payload_column = Some("data".to_string()); + config.payload_format = Some("bytea".to_string()); + let source = MySqlSource::new(1, config, None); + assert_eq!(source.payload_format(), PayloadFormat::Bytea); + } + + #[test] + fn given_valid_poll_interval_and_retry_delay_should_parse() { + // Valid humantime strings are parsed into the corresponding Durations. + let mut config = test_config(); + config.poll_interval = Some("5s".to_string()); + config.retry_delay = Some("2s".to_string()); + let source = MySqlSource::new(1, config, None); + assert_eq!(source.poll_interval, Duration::from_secs(5)); + assert_eq!(source.retry_delay, Duration::from_secs(2)); + } + + #[test] + fn given_invalid_or_missing_cadence_should_fall_back_to_defaults() { + // Unparsable or absent values fall back to the documented defaults + // (10s poll interval, 1s retry delay) so the connector still runs. + let mut config = test_config(); + config.poll_interval = Some("not-a-duration".to_string()); + config.retry_delay = None; + let source = MySqlSource::new(1, config, None); + assert_eq!(source.poll_interval, Duration::from_secs(10)); + assert_eq!(source.retry_delay, Duration::from_secs(1)); + } + + #[test] + fn given_pool_errors_should_classify_transience() { + // A pool timeout is worth retrying (likely transient contention); a + // closed pool is terminal and must not be retried. + assert!(is_transient_error(&sqlx::Error::PoolTimedOut)); + assert!(!is_transient_error(&sqlx::Error::PoolClosed)); + } +} diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 0b2f264d03..a8d21e4995 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -24,6 +24,7 @@ mod http; mod iceberg; mod influxdb; mod mongodb; +mod mysql; mod postgres; mod quickwit; mod wiremock; @@ -66,6 +67,10 @@ pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture, }; +pub use mysql::{ + MySqlOps, MySqlSourceDeleteFixture, MySqlSourceJsonDirectFixture, MySqlSourceJsonFixture, + MySqlSourceMarkFixture, MySqlSourceNoMetadataFixture, MySqlSourceOps, MySqlSourceRawFixture, +}; pub use postgres::{ PostgresOps, PostgresSinkByteaFixture, PostgresSinkFixture, PostgresSinkJsonFixture, PostgresSourceByteaFixture, PostgresSourceDeleteFixture, PostgresSourceJsonFixture, diff --git a/core/integration/tests/connectors/fixtures/mysql/container.rs b/core/integration/tests/connectors/fixtures/mysql/container.rs new file mode 100644 index 0000000000..4eb305c0ff --- /dev/null +++ b/core/integration/tests/connectors/fixtures/mysql/container.rs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use integration::harness::TestBinaryError; +use sqlx::mysql::MySqlPoolOptions; +use sqlx::{MySql, Pool}; + +use crate::connectors::fixtures; +use testcontainers_modules::{ + mysql, + testcontainers::{ContainerAsync, ImageExt, runners::AsyncRunner}, +}; + +pub(super) const MYSQL_PORT: u16 = 3306; + +pub(super) const ENV_SOURCE_CONNECTION_STRING: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_CONNECTION_STRING"; +pub(super) const ENV_SOURCE_TABLES: &str = "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_TABLES"; +pub(super) const ENV_SOURCE_TRACKING_COLUMN: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_TRACKING_COLUMN"; +pub(super) const ENV_SOURCE_STREAMS_0_STREAM: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_STREAMS_0_STREAM"; +pub(super) const ENV_SOURCE_STREAMS_0_TOPIC: &str = "IGGY_CONNECTORS_SOURCE_MYSQL_STREAMS_0_TOPIC"; +pub(super) const ENV_SOURCE_STREAMS_0_SCHEMA: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_STREAMS_0_SCHEMA"; +pub(super) const ENV_SOURCE_POLL_INTERVAL: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_POLL_INTERVAL"; +pub(super) const ENV_SOURCE_PATH: &str = "IGGY_CONNECTORS_SOURCE_MYSQL_PATH"; +pub(super) const ENV_SOURCE_PAYLOAD_COLUMN: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_PAYLOAD_COLUMN"; +pub(super) const ENV_SOURCE_PAYLOAD_FORMAT: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_PAYLOAD_FORMAT"; +pub(super) const ENV_SOURCE_DELETE_AFTER_READ: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_DELETE_AFTER_READ"; +pub(super) const ENV_SOURCE_PRIMARY_KEY_COLUMN: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_PRIMARY_KEY_COLUMN"; +pub(super) const ENV_SOURCE_PROCESSED_COLUMN: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_PROCESSED_COLUMN"; +pub(super) const ENV_SOURCE_INCLUDE_METADATA: &str = + "IGGY_CONNECTORS_SOURCE_MYSQL_PLUGIN_CONFIG_INCLUDE_METADATA"; + +pub(super) const DEFAULT_TEST_STREAM: &str = "test_stream"; +pub(super) const DEFAULT_TEST_TOPIC: &str = "test_topic"; + +pub(super) const ENV_SOURCE_PLUGIN_PATH: &str = "../../target/debug/libiggy_connector_mysql_source"; + +pub trait MySqlOps: Sync { + fn container(&self) -> &MySqlContainer; + + fn create_pool( + &self, + ) -> impl std::future::Future, TestBinaryError>> + Send { + self.container().create_pool() + } +} + +pub trait MySqlSourceOps: MySqlOps { + fn table_name(&self) -> &str; + + fn count_rows<'a>( + &'a self, + pool: &'a Pool, + ) -> impl std::future::Future + Send + 'a { + async move { + let query = format!("SELECT COUNT(*) FROM `{}`", self.table_name()); + let count: (i64,) = sqlx::query_as(sqlx::AssertSqlSafe(query)) + .fetch_one(pool) + .await + .unwrap_or_else(|e| panic!("Failed to count rows: {e}")); + count.0 + } + } +} + +pub struct MySqlContainer { + #[allow(dead_code)] + container: ContainerAsync, + pub(super) connection_string: String, +} + +impl MySqlContainer { + pub(super) async fn start() -> Result { + let container = mysql::Mysql::default() + .with_container_name(fixtures::unique_container_name("mysql")) + .start() + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MySqlContainer".to_string(), + message: format!("Failed to start container: {e}"), + })?; + + let host_port = container + .get_host_port_ipv4(MYSQL_PORT) + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MySqlContainer".to_string(), + message: format!("Failed to get port: {e}"), + })?; + + let connection_string = format!("mysql://root@localhost:{host_port}/test"); + + Ok(Self { + container, + connection_string, + }) + } + + pub async fn create_pool(&self) -> Result, TestBinaryError> { + MySqlPoolOptions::new() + .max_connections(1) + .connect(&self.connection_string) + .await + .map_err(|e| TestBinaryError::FixtureSetup { + fixture_type: "MySqlContainer".to_string(), + message: format!("Failed to connect: {e}"), + }) + } +} diff --git a/core/integration/tests/connectors/fixtures/mysql/mod.rs b/core/integration/tests/connectors/fixtures/mysql/mod.rs new file mode 100644 index 0000000000..6bac344b10 --- /dev/null +++ b/core/integration/tests/connectors/fixtures/mysql/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod container; +mod source; + +pub use container::{MySqlOps, MySqlSourceOps}; +pub use source::{ + MySqlSourceDeleteFixture, MySqlSourceJsonDirectFixture, MySqlSourceJsonFixture, + MySqlSourceMarkFixture, MySqlSourceNoMetadataFixture, MySqlSourceRawFixture, +}; diff --git a/core/integration/tests/connectors/fixtures/mysql/source.rs b/core/integration/tests/connectors/fixtures/mysql/source.rs new file mode 100644 index 0000000000..c5224e1e9d --- /dev/null +++ b/core/integration/tests/connectors/fixtures/mysql/source.rs @@ -0,0 +1,607 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::container::{ + DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SOURCE_CONNECTION_STRING, + ENV_SOURCE_DELETE_AFTER_READ, ENV_SOURCE_INCLUDE_METADATA, ENV_SOURCE_PATH, + ENV_SOURCE_PAYLOAD_COLUMN, ENV_SOURCE_PAYLOAD_FORMAT, ENV_SOURCE_PLUGIN_PATH, + ENV_SOURCE_POLL_INTERVAL, ENV_SOURCE_PRIMARY_KEY_COLUMN, ENV_SOURCE_PROCESSED_COLUMN, + ENV_SOURCE_STREAMS_0_SCHEMA, ENV_SOURCE_STREAMS_0_STREAM, ENV_SOURCE_STREAMS_0_TOPIC, + ENV_SOURCE_TABLES, ENV_SOURCE_TRACKING_COLUMN, MySqlContainer, MySqlOps, MySqlSourceOps, +}; +use async_trait::async_trait; +use integration::harness::{TestBinaryError, TestFixture}; +use sqlx::{MySql, Pool}; +use std::collections::HashMap; + +/// MySQL source fixture for JSON rows with metadata. +/// +/// Creates a table with typed columns that get serialized as JSON with metadata. +/// The boolean column is declared `BOOLEAN` (i.e. `tinyint(1)`) so `sqlx-mysql` +/// reports it as `"BOOLEAN"` and the source emits JSON `true`/`false`, letting the +/// shared `TestMessage { active: bool }` deserialize unchanged. +pub struct MySqlSourceJsonFixture { + container: MySqlContainer, +} + +impl MySqlOps for MySqlSourceJsonFixture { + fn container(&self) -> &MySqlContainer { + &self.container + } +} + +impl MySqlSourceOps for MySqlSourceJsonFixture { + fn table_name(&self) -> &str { + Self::TABLE + } +} + +impl MySqlSourceJsonFixture { + const TABLE: &'static str = "test_messages"; + + pub async fn create_table(&self, pool: &Pool) { + let query = format!( + "CREATE TABLE IF NOT EXISTS `{}` ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + `count` INT NOT NULL, + amount DOUBLE NOT NULL, + active BOOLEAN NOT NULL, + `timestamp` BIGINT NOT NULL + )", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to create table: {e}")); + } + + #[allow(clippy::too_many_arguments)] + pub async fn insert_row( + &self, + pool: &Pool, + id: i32, + name: &str, + count: i32, + amount: f64, + active: bool, + timestamp: i64, + ) { + let query = format!( + "INSERT INTO `{}` (id, name, `count`, amount, active, `timestamp`) VALUES (?, ?, ?, ?, ?, ?)", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .bind(id) + .bind(name) + .bind(count) + .bind(amount) + .bind(active) + .bind(timestamp) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to insert row: {e}")); + } +} + +#[async_trait] +impl TestFixture for MySqlSourceJsonFixture { + async fn setup() -> Result { + let container = MySqlContainer::start().await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert( + ENV_SOURCE_CONNECTION_STRING.to_string(), + self.container.connection_string.clone(), + ); + envs.insert(ENV_SOURCE_TABLES.to_string(), format!("[{}]", Self::TABLE)); + envs.insert(ENV_SOURCE_TRACKING_COLUMN.to_string(), "id".to_string()); + envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_string(), "true".to_string()); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert(ENV_SOURCE_POLL_INTERVAL.to_string(), "10ms".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + ENV_SOURCE_PLUGIN_PATH.to_string(), + ); + envs + } +} + +/// MySQL source fixture with `include_metadata = false`. +/// +/// Same typed table as [`MySqlSourceJsonFixture`], but the connector is configured +/// to emit the bare column map with no `DatabaseRecord` envelope. +pub struct MySqlSourceNoMetadataFixture { + container: MySqlContainer, +} + +impl MySqlOps for MySqlSourceNoMetadataFixture { + fn container(&self) -> &MySqlContainer { + &self.container + } +} + +impl MySqlSourceOps for MySqlSourceNoMetadataFixture { + fn table_name(&self) -> &str { + Self::TABLE + } +} + +impl MySqlSourceNoMetadataFixture { + const TABLE: &'static str = "test_no_metadata"; + + pub async fn create_table(&self, pool: &Pool) { + let query = format!( + "CREATE TABLE IF NOT EXISTS `{}` ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + `count` INT NOT NULL, + amount DOUBLE NOT NULL, + active BOOLEAN NOT NULL, + `timestamp` BIGINT NOT NULL + )", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to create table: {e}")); + } + + #[allow(clippy::too_many_arguments)] + pub async fn insert_row( + &self, + pool: &Pool, + id: i32, + name: &str, + count: i32, + amount: f64, + active: bool, + timestamp: i64, + ) { + let query = format!( + "INSERT INTO `{}` (id, name, `count`, amount, active, `timestamp`) VALUES (?, ?, ?, ?, ?, ?)", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .bind(id) + .bind(name) + .bind(count) + .bind(amount) + .bind(active) + .bind(timestamp) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to insert row: {e}")); + } +} + +#[async_trait] +impl TestFixture for MySqlSourceNoMetadataFixture { + async fn setup() -> Result { + let container = MySqlContainer::start().await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert( + ENV_SOURCE_CONNECTION_STRING.to_string(), + self.container.connection_string.clone(), + ); + envs.insert(ENV_SOURCE_TABLES.to_string(), format!("[{}]", Self::TABLE)); + envs.insert(ENV_SOURCE_TRACKING_COLUMN.to_string(), "id".to_string()); + envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_string(), "false".to_string()); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert(ENV_SOURCE_POLL_INTERVAL.to_string(), "10ms".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + ENV_SOURCE_PLUGIN_PATH.to_string(), + ); + envs + } +} + +/// MySQL source fixture for a `BLOB` payload column. +pub struct MySqlSourceRawFixture { + container: MySqlContainer, +} + +impl MySqlOps for MySqlSourceRawFixture { + fn container(&self) -> &MySqlContainer { + &self.container + } +} + +impl MySqlSourceOps for MySqlSourceRawFixture { + fn table_name(&self) -> &str { + Self::TABLE + } +} + +impl MySqlSourceRawFixture { + const TABLE: &'static str = "test_payloads"; + + pub async fn create_table(&self, pool: &Pool) { + let query = format!( + "CREATE TABLE IF NOT EXISTS `{}` ( + id INT AUTO_INCREMENT PRIMARY KEY, + payload BLOB NOT NULL + )", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to create table: {e}")); + } + + pub async fn insert_payload(&self, pool: &Pool, id: i32, payload: &[u8]) { + let query = format!("INSERT INTO `{}` (id, payload) VALUES (?, ?)", Self::TABLE); + sqlx::query(sqlx::AssertSqlSafe(query)) + .bind(id) + .bind(payload) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to insert payload: {e}")); + } +} + +#[async_trait] +impl TestFixture for MySqlSourceRawFixture { + async fn setup() -> Result { + let container = MySqlContainer::start().await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert( + ENV_SOURCE_CONNECTION_STRING.to_string(), + self.container.connection_string.clone(), + ); + envs.insert(ENV_SOURCE_TABLES.to_string(), format!("[{}]", Self::TABLE)); + envs.insert(ENV_SOURCE_TRACKING_COLUMN.to_string(), "id".to_string()); + envs.insert(ENV_SOURCE_PAYLOAD_COLUMN.to_string(), "payload".to_string()); + envs.insert(ENV_SOURCE_PAYLOAD_FORMAT.to_string(), "bytea".to_string()); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "raw".to_string()); + envs.insert(ENV_SOURCE_POLL_INTERVAL.to_string(), "10ms".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + ENV_SOURCE_PLUGIN_PATH.to_string(), + ); + envs + } +} + +/// MySQL source fixture for a native `JSON` payload column. +pub struct MySqlSourceJsonDirectFixture { + container: MySqlContainer, +} + +impl MySqlOps for MySqlSourceJsonDirectFixture { + fn container(&self) -> &MySqlContainer { + &self.container + } +} + +impl MySqlSourceOps for MySqlSourceJsonDirectFixture { + fn table_name(&self) -> &str { + Self::TABLE + } +} + +impl MySqlSourceJsonDirectFixture { + const TABLE: &'static str = "test_json_payloads"; + + pub async fn create_table(&self, pool: &Pool) { + let query = format!( + "CREATE TABLE IF NOT EXISTS `{}` ( + id INT AUTO_INCREMENT PRIMARY KEY, + data JSON NOT NULL + )", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to create table: {e}")); + } + + pub async fn insert_json(&self, pool: &Pool, id: i32, data: &serde_json::Value) { + let query = format!("INSERT INTO `{}` (id, data) VALUES (?, ?)", Self::TABLE); + sqlx::query(sqlx::AssertSqlSafe(query)) + .bind(id) + .bind(data) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to insert json: {e}")); + } +} + +#[async_trait] +impl TestFixture for MySqlSourceJsonDirectFixture { + async fn setup() -> Result { + let container = MySqlContainer::start().await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert( + ENV_SOURCE_CONNECTION_STRING.to_string(), + self.container.connection_string.clone(), + ); + envs.insert(ENV_SOURCE_TABLES.to_string(), format!("[{}]", Self::TABLE)); + envs.insert(ENV_SOURCE_TRACKING_COLUMN.to_string(), "id".to_string()); + envs.insert(ENV_SOURCE_PAYLOAD_COLUMN.to_string(), "data".to_string()); + envs.insert( + ENV_SOURCE_PAYLOAD_FORMAT.to_string(), + "json_direct".to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert(ENV_SOURCE_POLL_INTERVAL.to_string(), "10ms".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + ENV_SOURCE_PLUGIN_PATH.to_string(), + ); + envs + } +} + +/// MySQL source fixture with `delete_after_read` enabled. +pub struct MySqlSourceDeleteFixture { + container: MySqlContainer, +} + +impl MySqlOps for MySqlSourceDeleteFixture { + fn container(&self) -> &MySqlContainer { + &self.container + } +} + +impl MySqlSourceOps for MySqlSourceDeleteFixture { + fn table_name(&self) -> &str { + Self::TABLE + } +} + +impl MySqlSourceDeleteFixture { + const TABLE: &'static str = "test_delete_rows"; + + pub async fn create_table(&self, pool: &Pool) { + let query = format!( + "CREATE TABLE IF NOT EXISTS `{}` ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + `value` INT NOT NULL + )", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to create table: {e}")); + } + + pub async fn insert_row(&self, pool: &Pool, name: &str, value: i32) { + let query = format!( + "INSERT INTO `{}` (name, `value`) VALUES (?, ?)", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .bind(name) + .bind(value) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to insert row: {e}")); + } + + pub async fn count_rows(&self, pool: &Pool) -> i64 { + MySqlSourceOps::count_rows(self, pool).await + } +} + +#[async_trait] +impl TestFixture for MySqlSourceDeleteFixture { + async fn setup() -> Result { + let container = MySqlContainer::start().await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert( + ENV_SOURCE_CONNECTION_STRING.to_string(), + self.container.connection_string.clone(), + ); + envs.insert(ENV_SOURCE_TABLES.to_string(), format!("[{}]", Self::TABLE)); + envs.insert(ENV_SOURCE_TRACKING_COLUMN.to_string(), "id".to_string()); + envs.insert(ENV_SOURCE_PRIMARY_KEY_COLUMN.to_string(), "id".to_string()); + envs.insert(ENV_SOURCE_DELETE_AFTER_READ.to_string(), "true".to_string()); + envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_string(), "true".to_string()); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert(ENV_SOURCE_POLL_INTERVAL.to_string(), "10ms".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + ENV_SOURCE_PLUGIN_PATH.to_string(), + ); + envs + } +} + +/// MySQL source fixture with `processed_column` marking. +pub struct MySqlSourceMarkFixture { + container: MySqlContainer, +} + +impl MySqlOps for MySqlSourceMarkFixture { + fn container(&self) -> &MySqlContainer { + &self.container + } +} + +impl MySqlSourceOps for MySqlSourceMarkFixture { + fn table_name(&self) -> &str { + Self::TABLE + } +} + +impl MySqlSourceMarkFixture { + const TABLE: &'static str = "test_mark_rows"; + + pub async fn create_table(&self, pool: &Pool) { + let query = format!( + "CREATE TABLE IF NOT EXISTS `{}` ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + `value` INT NOT NULL, + is_processed BOOLEAN NOT NULL DEFAULT FALSE + )", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to create table: {e}")); + } + + pub async fn insert_row(&self, pool: &Pool, name: &str, value: i32) { + let query = format!( + "INSERT INTO `{}` (name, `value`, is_processed) VALUES (?, ?, ?)", + Self::TABLE + ); + sqlx::query(sqlx::AssertSqlSafe(query)) + .bind(name) + .bind(value) + .bind(false) + .execute(pool) + .await + .unwrap_or_else(|e| panic!("Failed to insert row: {e}")); + } + + pub async fn count_rows(&self, pool: &Pool) -> i64 { + MySqlSourceOps::count_rows(self, pool).await + } + + pub async fn count_unprocessed(&self, pool: &Pool) -> i64 { + let query = format!( + "SELECT COUNT(*) FROM `{}` WHERE is_processed = FALSE", + Self::TABLE + ); + let count: (i64,) = sqlx::query_as(sqlx::AssertSqlSafe(query)) + .fetch_one(pool) + .await + .unwrap_or_else(|e| panic!("Failed to count rows: {e}")); + count.0 + } + + pub async fn count_processed(&self, pool: &Pool) -> i64 { + let query = format!( + "SELECT COUNT(*) FROM `{}` WHERE is_processed = TRUE", + Self::TABLE + ); + let count: (i64,) = sqlx::query_as(sqlx::AssertSqlSafe(query)) + .fetch_one(pool) + .await + .unwrap_or_else(|e| panic!("Failed to count rows: {e}")); + count.0 + } +} + +#[async_trait] +impl TestFixture for MySqlSourceMarkFixture { + async fn setup() -> Result { + let container = MySqlContainer::start().await?; + Ok(Self { container }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = HashMap::new(); + envs.insert( + ENV_SOURCE_CONNECTION_STRING.to_string(), + self.container.connection_string.clone(), + ); + envs.insert(ENV_SOURCE_TABLES.to_string(), format!("[{}]", Self::TABLE)); + envs.insert(ENV_SOURCE_TRACKING_COLUMN.to_string(), "id".to_string()); + envs.insert(ENV_SOURCE_PRIMARY_KEY_COLUMN.to_string(), "id".to_string()); + envs.insert( + ENV_SOURCE_PROCESSED_COLUMN.to_string(), + "is_processed".to_string(), + ); + envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_string(), "true".to_string()); + envs.insert( + ENV_SOURCE_STREAMS_0_STREAM.to_string(), + DEFAULT_TEST_STREAM.to_string(), + ); + envs.insert( + ENV_SOURCE_STREAMS_0_TOPIC.to_string(), + DEFAULT_TEST_TOPIC.to_string(), + ); + envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_string(), "json".to_string()); + envs.insert(ENV_SOURCE_POLL_INTERVAL.to_string(), "10ms".to_string()); + envs.insert( + ENV_SOURCE_PATH.to_string(), + ENV_SOURCE_PLUGIN_PATH.to_string(), + ); + envs + } +} diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index fc624f897f..071900eee0 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -25,6 +25,7 @@ mod http_config_provider; mod iceberg; mod influxdb; mod mongodb; +mod mysql; mod postgres; mod quickwit; mod random; diff --git a/core/integration/tests/connectors/mysql/mod.rs b/core/integration/tests/connectors/mysql/mod.rs new file mode 100644 index 0000000000..86cab59eee --- /dev/null +++ b/core/integration/tests/connectors/mysql/mod.rs @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod mysql_source; + +use crate::connectors::TestMessage; +use serde::Deserialize; + +const TEST_MESSAGE_COUNT: usize = 3; +const POLL_ATTEMPTS: usize = 100; +const POLL_INTERVAL_MS: u64 = 50; + +#[derive(Debug, Deserialize)] +struct DatabaseRecord { + table_name: String, + operation_type: String, + data: TestMessage, +} diff --git a/core/integration/tests/connectors/mysql/mysql_source.rs b/core/integration/tests/connectors/mysql/mysql_source.rs new file mode 100644 index 0000000000..518d38051c --- /dev/null +++ b/core/integration/tests/connectors/mysql/mysql_source.rs @@ -0,0 +1,619 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{DatabaseRecord, POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT}; +use crate::connectors::create_test_messages; +use crate::connectors::fixtures::{ + MySqlOps, MySqlSourceDeleteFixture, MySqlSourceJsonDirectFixture, MySqlSourceJsonFixture, + MySqlSourceMarkFixture, MySqlSourceNoMetadataFixture, MySqlSourceOps, MySqlSourceRawFixture, +}; +use iggy_common::MessageClient; +use iggy_common::{Consumer, Identifier, PollingStrategy}; +use integration::harness::seeds; +use integration::iggy_harness; +use std::time::Duration; +use tokio::time::sleep; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/mysql/source.toml")), + seed = seeds::connector_stream +)] +async fn json_rows_source_produces_messages_to_iggy( + harness: &TestHarness, + fixture: MySqlSourceJsonFixture, +) { + let client = harness.root_client().await.unwrap(); + let pool = fixture.create_pool().await.expect("Failed to create pool"); + fixture.create_table(&pool).await; + + let test_messages = create_test_messages(TEST_MESSAGE_COUNT); + for msg in &test_messages { + fixture + .insert_row( + &pool, + msg.id as i32, + &msg.name, + msg.count as i32, + msg.amount, + msg.active, + msg.timestamp, + ) + .await; + } + pool.close().await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(record) = serde_json::from_slice(&msg.payload) { + received.push(record); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= TEST_MESSAGE_COUNT, + "Expected at least {TEST_MESSAGE_COUNT} messages, got {}", + received.len() + ); + + for (i, record) in received.iter().enumerate() { + assert_eq!( + record.table_name, + fixture.table_name(), + "Table name mismatch at record {i}" + ); + assert_eq!( + record.operation_type, "SELECT", + "Operation type mismatch at record {i}" + ); + assert_eq!( + record.data, test_messages[i], + "Message data mismatch at record {i}" + ); + } +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/mysql/source.toml")), + seed = seeds::connector_stream +)] +async fn bare_payload_source_omits_metadata_envelope( + harness: &TestHarness, + fixture: MySqlSourceNoMetadataFixture, +) { + let client = harness.root_client().await.unwrap(); + let pool = fixture.create_pool().await.expect("Failed to create pool"); + fixture.create_table(&pool).await; + + let test_messages = create_test_messages(TEST_MESSAGE_COUNT); + for msg in &test_messages { + fixture + .insert_row( + &pool, + msg.id as i32, + &msg.name, + msg.count as i32, + msg.amount, + msg.active, + msg.timestamp, + ) + .await; + } + pool.close().await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= TEST_MESSAGE_COUNT, + "Expected at least {TEST_MESSAGE_COUNT} messages, got {}", + received.len() + ); + + for (i, payload) in received.iter().enumerate() { + let object = payload + .as_object() + .unwrap_or_else(|| panic!("Record {i} is not a JSON object: {payload}")); + assert!( + !object.contains_key("table_name") && !object.contains_key("operation_type"), + "Record {i} should be a bare column map with no metadata envelope, got {payload}" + ); + assert_eq!( + object["id"].as_u64(), + Some(test_messages[i].id), + "Bare-payload id mismatch at record {i}" + ); + assert_eq!( + object["name"].as_str(), + Some(test_messages[i].name.as_str()), + "Bare-payload name mismatch at record {i}" + ); + assert_eq!( + object["active"].as_bool(), + Some(test_messages[i].active), + "Bare-payload active mismatch at record {i}" + ); + } +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/mysql/source.toml")), + seed = seeds::connector_stream +)] +async fn raw_rows_source_produces_raw_messages_to_iggy( + harness: &TestHarness, + fixture: MySqlSourceRawFixture, +) { + let client = harness.root_client().await.unwrap(); + let pool = fixture.create_pool().await.expect("Failed to create pool"); + fixture.create_table(&pool).await; + + let payloads: Vec> = vec![ + b"hello world".to_vec(), + vec![0x00, 0x01, 0x02, 0xFF, 0xFE], + serde_json::to_vec(&serde_json::json!({"key": "value", "number": 42})) + .expect("Failed to serialize json"), + ]; + + for (i, payload) in payloads.iter().enumerate() { + fixture.insert_payload(&pool, (i + 1) as i32, payload).await; + } + pool.close().await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let mut received: Vec> = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + received.push(msg.payload.to_vec()); + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= TEST_MESSAGE_COUNT, + "Expected at least {TEST_MESSAGE_COUNT} messages, got {}", + received.len() + ); + + for (i, payload) in received.iter().enumerate() { + assert_eq!(payload, &payloads[i], "Payload mismatch at index {i}"); + } +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/mysql/source.toml")), + seed = seeds::connector_stream +)] +async fn json_direct_rows_source_produces_json_messages_to_iggy( + harness: &TestHarness, + fixture: MySqlSourceJsonDirectFixture, +) { + let client = harness.root_client().await.unwrap(); + let pool = fixture.create_pool().await.expect("Failed to create pool"); + fixture.create_table(&pool).await; + + let json_payloads: Vec = vec![ + serde_json::json!({"name": "Alice", "score": 100}), + serde_json::json!({"items": ["a", "b", "c"]}), + serde_json::json!({"nested": {"deep": {"value": 42}}}), + ]; + + for (i, payload) in json_payloads.iter().enumerate() { + fixture.insert_json(&pool, (i + 1) as i32, payload).await; + } + pool.close().await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= TEST_MESSAGE_COUNT, + "Expected at least {TEST_MESSAGE_COUNT} messages, got {}", + received.len() + ); + + for (i, payload) in received.iter().enumerate() { + assert_eq!( + payload, &json_payloads[i], + "JSON payload mismatch at index {i}" + ); + } +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/mysql/source.toml")), + seed = seeds::connector_stream +)] +async fn delete_after_read_source_removes_rows_after_producing( + harness: &TestHarness, + fixture: MySqlSourceDeleteFixture, +) { + let client = harness.root_client().await.unwrap(); + let pool = fixture.create_pool().await.expect("Failed to create pool"); + fixture.create_table(&pool).await; + + for i in 0..TEST_MESSAGE_COUNT { + fixture + .insert_row(&pool, &format!("row_{i}"), (i * 10) as i32) + .await; + } + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= TEST_MESSAGE_COUNT, + "Expected at least {TEST_MESSAGE_COUNT} messages, got {}", + received.len() + ); + + let mut final_count = -1i64; + for _ in 0..POLL_ATTEMPTS { + final_count = fixture.count_rows(&pool).await; + if final_count == 0 { + break; + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + assert_eq!( + final_count, 0, + "Expected 0 rows after delete_after_read, got {final_count}" + ); + + pool.close().await; +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/mysql/source.toml")), + seed = seeds::connector_stream +)] +async fn processed_column_source_marks_rows_after_producing( + harness: &TestHarness, + fixture: MySqlSourceMarkFixture, +) { + let client = harness.root_client().await.unwrap(); + let pool = fixture.create_pool().await.expect("Failed to create pool"); + fixture.create_table(&pool).await; + + for i in 0..TEST_MESSAGE_COUNT { + fixture + .insert_row(&pool, &format!("row_{i}"), (i * 10) as i32) + .await; + } + + let initial_unprocessed = fixture.count_unprocessed(&pool).await; + let initial_processed = fixture.count_processed(&pool).await; + assert_eq!( + initial_unprocessed + initial_processed, + TEST_MESSAGE_COUNT as i64, + "Expected {TEST_MESSAGE_COUNT} total rows before processing, got {} unprocessed + {} processed", + initial_unprocessed, + initial_processed + ); + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "test_consumer".try_into().unwrap(); + + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(json) = serde_json::from_slice(&msg.payload) { + received.push(json); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert!( + received.len() >= TEST_MESSAGE_COUNT, + "Expected at least {TEST_MESSAGE_COUNT} messages, got {}", + received.len() + ); + + let mut final_unprocessed = -1i64; + let mut final_processed = -1i64; + for _ in 0..POLL_ATTEMPTS { + final_unprocessed = fixture.count_unprocessed(&pool).await; + final_processed = fixture.count_processed(&pool).await; + if final_unprocessed == 0 && final_processed == TEST_MESSAGE_COUNT as i64 { + break; + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + assert_eq!( + final_unprocessed, 0, + "Expected 0 unprocessed rows after processing, got {final_unprocessed}" + ); + assert_eq!( + final_processed, TEST_MESSAGE_COUNT as i64, + "Expected {TEST_MESSAGE_COUNT} processed rows after processing, got {final_processed}" + ); + + let total_count = fixture.count_rows(&pool).await; + assert_eq!( + total_count, TEST_MESSAGE_COUNT as i64, + "Rows should not be deleted, expected {TEST_MESSAGE_COUNT}, got {total_count}" + ); + + pool.close().await; +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/mysql/source.toml")), + seed = seeds::connector_stream +)] +async fn state_persists_across_connector_restart( + harness: &mut TestHarness, + fixture: MySqlSourceJsonFixture, +) { + let pool = fixture.create_pool().await.expect("Failed to create pool"); + fixture.create_table(&pool).await; + + let first_batch = create_test_messages(TEST_MESSAGE_COUNT); + for msg in &first_batch { + fixture + .insert_row( + &pool, + msg.id as i32, + &msg.name, + msg.count as i32, + msg.amount, + msg.active, + msg.timestamp, + ) + .await; + } + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = "state_test_consumer".try_into().unwrap(); + + let client = harness.root_client().await.unwrap(); + let received_before = { + let mut received: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(record) = serde_json::from_slice(&msg.payload) { + received.push(record); + } + } + if received.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + received + }; + assert_eq!(received_before.len(), TEST_MESSAGE_COUNT); + + harness + .server_mut() + .stop_dependents() + .expect("Failed to stop connectors"); + + let second_batch_start_id = (TEST_MESSAGE_COUNT + 1) as i32; + for i in 0..TEST_MESSAGE_COUNT { + fixture + .insert_row( + &pool, + second_batch_start_id + i as i32, + &format!("user_batch2_{i}"), + ((TEST_MESSAGE_COUNT + i) * 10) as i32, + (TEST_MESSAGE_COUNT + i) as f64 * 99.99, + i % 2 == 0, + iggy_common::IggyTimestamp::now().as_micros() as i64, + ) + .await; + } + + harness + .server_mut() + .start_dependents() + .await + .expect("Failed to restart connectors"); + sleep(Duration::from_secs(2)).await; + + let mut received_after: Vec = Vec::new(); + for _ in 0..POLL_ATTEMPTS { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + 10, + true, + ) + .await + { + for msg in polled.messages { + if let Ok(record) = serde_json::from_slice(&msg.payload) { + received_after.push(record); + } + } + if received_after.len() >= TEST_MESSAGE_COUNT { + break; + } + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + + assert_eq!(received_after.len(), TEST_MESSAGE_COUNT); + + for record in &received_after { + assert!( + record.data.id > TEST_MESSAGE_COUNT as u64, + "After restart, got ID {} from first batch", + record.data.id + ); + } + + pool.close().await; +} diff --git a/core/integration/tests/connectors/mysql/source.toml b/core/integration/tests/connectors/mysql/source.toml new file mode 100644 index 0000000000..b3b73e091c --- /dev/null +++ b/core/integration/tests/connectors/mysql/source.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "../connectors/sources/mysql_source"