Skip to content

feat(connectors): add generic JDBC source and sink connectors (#2500)#3576

Open
shbhmrzd wants to merge 1 commit into
apache:masterfrom
shbhmrzd:jdbc_source_sink_connectors
Open

feat(connectors): add generic JDBC source and sink connectors (#2500)#3576
shbhmrzd wants to merge 1 commit into
apache:masterfrom
shbhmrzd:jdbc_source_sink_connectors

Conversation

@shbhmrzd

Copy link
Copy Markdown

Which issue does this PR address?

Closes #2500

Rationale

Iggy only shipped PostgreSQL-specific connectors, so every other database needed a purpose-built connector. A generic JDBC pair covers any JDBC-compliant database (PostgreSQL, MySQL, Oracle, SQL Server, H2, and others) through their existing JDBC drivers, instead of writing and maintaining a new connector per database.

What changed?

Before: reading from or writing to anything other than PostgreSQL required a bespoke connector. After: two new connectors talk to any JDBC database through an embedded JVM over JNI, reusing the standard java.sql API and the database's own driver JAR.

The source polls a configured SQL query and produces each row as a JSON message, in bulk (full re-read) or incremental mode (tracks a {last_offset} placeholder and persists the offset across restarts). The sink consumes messages and INSERTs each into a target table, with optional auto-create-table and Iggy metadata columns. Both share one embedded-JVM/JNI bridge, so a single driver JAR serves either direction.

How to review

Area Where to look What to check
Source logic core/connectors/sources/jdbc_source/src/lib.rs poll() then build_query (offset substitution via quote_sql_literal) then read_rows / extract_column_value (JDBC to JSON type mapping) then build_message (offset state persistence)
Sink logic core/connectors/sinks/jdbc_sink/src/lib.rs open() (JVM, connectivity SELECT 1, optional auto-create-table) then consume() then write_messages / insert_batch (batched PreparedStatement) then bind_row
JVM and JNI lifetime get_or_create_jvm, connection acquire/close in both crates one JVM per process (shared across instances), per-row and per-message push_local_frame / pop_local_frame, pooled-connection return, isValid reconnect in the source
Type fidelity source extract_column_value NUMERIC and DECIMAL map to string (no f64 precision loss), binary maps to base64, BIGINT caveat documented
Error handling is_transient_sql_state and the classify helpers in both crates SQLState classes 08, 40, 53, 57, 58 are transient (retried), everything else is permanent. The sink skips and counts permanent batches, retries transient ones
Secrets config Debug impls, the secret serde modules, sanitize_jdbc_url URL and password are SecretString, never logged or serialized
Shared test harness core/integration/tests/connectors/mod.rs setup_runtime, plus send_messages / get_messages helpers used by the sink and source tests
Server change core/server/src/tcp/tcp_listener.rs (3 lines) The connectors integration harness reserves a fixed TCP port (PortReserver) and waits for the server to write current_config.toml to learn the bound address. The server previously wrote that file only when the port was dynamic (port == 0), so a fixed-port server hung. The fix always notifies the config writer once the listener binds. The SO_REUSEPORT cross-shard broadcast stays gated on port == 0, so multi-shard behaviour is unchanged. Worth a check for cluster mode.

Config reference and per-database examples live in each connector's README.md, config.toml, and the core/connectors/runtime/example_config/connectors/jdbc_*.toml files.

Local Execution

  • Passed. Ran the CI-equivalent gates locally: cargo fmt --all -- --check, cargo clippy --all-targets --all-features -- -D warnings, cargo check --all --all-features, cargo sort --check, cargo machete, license headers (hawkeye check), and markdownlint.
  • Tests: 33 source and 22 sink unit tests, plus 7 Dockerized Postgres integration tests (source basic, multi-row, metadata, incremental offset advance, large result set; sink single and multi-row).
  • Validated live end to end: ran iggy-server plus separate source and sink connector runtimes against a Postgres container and pushed 150 messages through source then iggy then sink, confirming all 150 rows arrived intact in the destination table with contiguous offsets and exact decimal values.
  • Pre-commit hooks: prek was not run (not installed locally); the equivalent checks listed above (fmt, clippy, sort, machete, license headers, markdownlint, tests) were run manually instead.

Limitations

  • JNI permits one JVM per process, so a JDBC source and a JDBC sink cannot run in the same connectors-runtime process. Run them in separate runtime processes. Documented in the connector READMEs.
  • JDBC calls are synchronous (blocking) on the runtime worker thread, consistent with the JDBC source model.
  • The connectors require a JVM and a JDBC driver JAR at runtime (unlike the pure-Rust connectors). They are added to the release plugin list with that caveat.

AI Usage

  1. Tools: Claude Code (Claude).
  2. Scope: used substantially for drafting the connector implementations, tests, docs, and example configs, and for an iterative review and hardening pass.
  3. Verification: unit tests, Dockerized Postgres integration tests (including incremental-offset and large-result-set cases), and a manual end-to-end run of the full source to iggy to sink pipeline with 150 messages, inspecting the resulting database rows.
  4. Can explain every line: yes.

…#2500)

Adds JDBC source and sink connectors that let Iggy read from and write to
any JDBC-compliant database (PostgreSQL, MySQL, Oracle, SQL Server, H2) via
an embedded JVM, instead of a bespoke client per database.

Source: polls a SQL query and produces each row as a JSON message; supports
bulk and incremental (offset-tracked) modes with persisted offset state.
Sink: consumes messages and INSERTs each into a target table, with optional
auto-create-table and Iggy metadata columns. Both share one embedded-JVM/JNI
bridge so a single JDBC driver JAR works for either direction.

Includes per-connector docs and example configs, unit tests, Dockerized
Postgres integration tests, and a small TCP listener fix so the connectors
test harness can read the written runtime config.
@github-actions

Copy link
Copy Markdown

Thanks for the PR. It is labeled S-waiting-on-review and queued for review.

Slash commands (own line, regular comment) move it around the queue:

  • /ready - back to S-waiting-on-review after addressing feedback
  • /author - flip to S-waiting-on-author while you finish changes
  • /request-review @user-or-team - request a reviewer

See CONTRIBUTING.md for details.

@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-review PR is waiting on a reviewer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

JDBC Source and Sink Connectors for Iggy

1 participant