Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/_build_rust_artifacts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ on:
connector_plugins:
type: string
required: false
default: "iggy_connector_elasticsearch_sink,iggy_connector_elasticsearch_source,iggy_connector_iceberg_sink,iggy_connector_postgres_sink,iggy_connector_postgres_source,iggy_connector_quickwit_sink,iggy_connector_random_source,iggy_connector_stdout_sink"
default: "iggy_connector_elasticsearch_sink,iggy_connector_elasticsearch_source,iggy_connector_iceberg_sink,iggy_connector_jdbc_sink,iggy_connector_jdbc_source,iggy_connector_postgres_sink,iggy_connector_postgres_source,iggy_connector_quickwit_sink,iggy_connector_random_source,iggy_connector_stdout_sink"
description: "Comma-separated list of connector plugin crates to build as shared libraries"
outputs:
artifact_name:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/edge-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ jobs:
- `iggy_connector_elasticsearch_sink`
- `iggy_connector_elasticsearch_source`
- `iggy_connector_iceberg_sink`
- `iggy_connector_jdbc_sink`
- `iggy_connector_jdbc_source`
- `iggy_connector_postgres_sink`
- `iggy_connector_postgres_source`
- `iggy_connector_quickwit_sink`
Expand Down
72 changes: 71 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ members = [
"core/connectors/sinks/http_sink",
"core/connectors/sinks/iceberg_sink",
"core/connectors/sinks/influxdb_sink",
"core/connectors/sinks/jdbc_sink",
"core/connectors/sinks/mongodb_sink",
"core/connectors/sinks/postgres_sink",
"core/connectors/sinks/quickwit_sink",
"core/connectors/sinks/stdout_sink",
"core/connectors/sources/elasticsearch_source",
"core/connectors/sources/influxdb_source",
"core/connectors/sources/jdbc_source",
"core/connectors/sources/postgres_source",
"core/connectors/sources/random_source",
"core/consensus",
Expand Down
2 changes: 2 additions & 0 deletions core/connectors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Each sink should have its own, custom configuration, which is passed along with
- **Doris Sink** - loads JSON messages into Apache Doris tables via the Stream Load HTTP API
- **Elasticsearch Sink** - sends messages to Elasticsearch indices
- **Iceberg Sink** - writes data to Apache Iceberg tables via REST catalog
- **JDBC Sink** - writes messages as rows into any JDBC-compliant database (PostgreSQL, MySQL, Oracle, SQL Server, H2) via an embedded JVM
- **PostgreSQL Sink** - stores messages in PostgreSQL database tables
- **Quickwit Sink** - indexes messages in Quickwit search engine
- **Stdout Sink** - prints messages to standard output (useful for debugging/development)
Expand All @@ -96,6 +97,7 @@ Please refer to the **[Source documentation](https://github.com/apache/iggy/tree
### Available Sources

- **Elasticsearch Source** - polls documents from Elasticsearch indices
- **JDBC Source** - reads rows from any JDBC-compliant database (PostgreSQL, MySQL, Oracle, SQL Server, H2) via an embedded JVM; bulk and incremental modes
- **PostgreSQL Source** - reads rows from PostgreSQL tables with multiple consumption strategies (delete after read, mark as processed, timestamp tracking)
- **Random Source** - generates random test messages (useful for testing/development)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Example JDBC Source Connector Configuration - BULK MODE
# Bulk mode works with ALL JDBC databases without any special requirements
# No tracking column needed - just executes your query and fetches results

type = "source"
key = "jdbc_bulk_example"
enabled = true
version = 0
name = "JDBC Bulk Mode Source"
path = "target/release/libiggy_connector_jdbc_source"
plugin_config_format = "toml"

[plugin_config]
# This example uses PostgreSQL, but bulk mode works identically with:
# MySQL, Oracle, SQL Server, H2, Derby, DB2, etc.
jdbc_url = "jdbc:postgresql://localhost:5432/warehouse"

# Database credentials can be in URL or separate
# jdbc_url = "jdbc:postgresql://localhost:5432/warehouse?user=myuser&password=mypass"
username = "warehouse_user"
password = "secret"

driver_class = "org.postgresql.Driver"
driver_jar_path = "/opt/jdbc-drivers/postgresql-42.6.0.jar"

# Bulk mode: Any valid SELECT query
# Can include JOINs, aggregations, complex WHERE clauses, etc.
query = """
SELECT
p.product_id,\
p.product_name,
p.category,
p.price,
COUNT(o.order_id) as total_orders,
SUM(o.quantity) as total_quantity
FROM products p
LEFT JOIN orders o ON p.product_id = o.product_id
GROUP BY p.product_id, p.product_name, p.category, p.price
"""

# Poll once per hour for daily snapshots
poll_interval = "1h"

# Large batch size for full table scans
batch_size = 10000

# BULK MODE - no tracking column needed!
mode = "bulk"

# Bulk mode benefits:
# - No tracking column required
# - Works with any SELECT query
# - Supports complex queries (JOINs, aggregations, window functions)
# - Perfect for periodic snapshots
# - Universal compatibility with all databases

snake_case_columns = true
include_metadata = false

[[streams]]
stream = "warehouse"
topic = "product_summary"
partition_id = 1
schema = "json"
60 changes: 60 additions & 0 deletions core/connectors/runtime/example_config/connectors/jdbc_h2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Example JDBC Source Connector Configuration for H2 Database
# H2 is useful for testing and development as it's an embedded Java database

type = "source"
key = "jdbc_h2_example"
enabled = true
version = 0
name = "JDBC H2 Source"
path = "target/release/libiggy_connector_jdbc_source"
plugin_config_format = "toml"

[plugin_config]
# H2 connection URL (in-memory database)
jdbc_url = "jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1"

# H2 JDBC driver
driver_class = "org.h2.Driver"

# Path to H2 driver JAR
# Download from: https://repo1.maven.org/maven2/com/h2database/h2/2.2.224/h2-2.2.224.jar
# Note: Update this path to match where you downloaded the JAR file
driver_jar_path = "/tmp/jdbc-drivers/h2-2.2.224.jar"

# H2 credentials (default)
username = "sa"
password = ""

# Simple query for testing
query = "SELECT * FROM users WHERE id > {last_offset} ORDER BY id"

poll_interval = "10s"
batch_size = 100
tracking_column = "id"
initial_offset = "0"
mode = "incremental"
snake_case_columns = false
include_metadata = true

[[streams]]
stream = "test"
topic = "users"
partition_id = 1
schema = "json"
85 changes: 85 additions & 0 deletions core/connectors/runtime/example_config/connectors/jdbc_mysql.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Example JDBC Source Connector Configuration for MySQL
# This file demonstrates how to configure the JDBC source connector
# to read data from a MySQL database and publish to Iggy streams.

type = "source"
key = "jdbc_mysql_example"
enabled = true
version = 0
name = "JDBC MySQL Source"
path = "target/release/libiggy_connector_jdbc_source"
plugin_config_format = "toml"

[plugin_config]
# JDBC connection URL
# Option 1: Separate credentials (recommended)
jdbc_url = "jdbc:mysql://localhost:3306/ecommerce?useSSL=false&serverTimezone=UTC"

# Option 2: Embedded credentials in URL (alternative)
# jdbc_url = "jdbc:mysql://iggy_user:iggy_password@localhost:3306/ecommerce?useSSL=false&serverTimezone=UTC"

# JDBC driver class name
driver_class = "com.mysql.cj.jdbc.Driver"

# Path to JDBC driver JAR file
# Download from: https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
driver_jar_path = "/opt/jdbc-drivers/mysql-connector-j-8.0.33.jar"

# Database credentials (optional if included in jdbc_url)
username = "iggy_user"
password = "iggy_password"

# SQL query to execute
# Use {last_offset} placeholder for incremental reads
query = "SELECT * FROM orders WHERE updated_at > {last_offset} ORDER BY updated_at ASC"

# How often to poll the database
poll_interval = "30s"

# Maximum number of rows to fetch per poll
batch_size = 1000

# Column to track for incremental reads (must be in query result)
tracking_column = "updated_at"

# Initial offset value for the first poll
initial_offset = "2024-01-01 00:00:00"

# Source mode: "incremental" or "bulk"
# Note: Both modes work with ALL JDBC databases (MySQL, Oracle, PostgreSQL, etc.)
# - incremental: Tracks last offset, avoids duplicate reads
# - bulk: Full table scan, no offset tracking
mode = "incremental"

# Convert column names to snake_case (e.g., OrderDate -> order_date)
snake_case_columns = true

# Include metadata wrapper in output messages
include_metadata = true

# Custom JVM options (optional)
jvm_options = ["-Xmx512m", "-Xms128m"]

# Target Iggy stream and topic
[[streams]]
stream = "ecommerce"
topic = "orders"
partition_id = 1
schema = "json"
Loading