feat(connectors): add generic JDBC source and sink connectors (#2500)#3576
Open
shbhmrzd wants to merge 1 commit into
Open
feat(connectors): add generic JDBC source and sink connectors (#2500)#3576shbhmrzd wants to merge 1 commit into
shbhmrzd wants to merge 1 commit into
Conversation
…#2500) Adds JDBC source and sink connectors that let Iggy read from and write to any JDBC-compliant database (PostgreSQL, MySQL, Oracle, SQL Server, H2) via an embedded JVM, instead of a bespoke client per database. Source: polls a SQL query and produces each row as a JSON message; supports bulk and incremental (offset-tracked) modes with persisted offset state. Sink: consumes messages and INSERTs each into a target table, with optional auto-create-table and Iggy metadata columns. Both share one embedded-JVM/JNI bridge so a single JDBC driver JAR works for either direction. Includes per-connector docs and example configs, unit tests, Dockerized Postgres integration tests, and a small TCP listener fix so the connectors test harness can read the written runtime config.
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR address?
Closes #2500
Rationale
Iggy only shipped PostgreSQL-specific connectors, so every other database needed a purpose-built connector. A generic JDBC pair covers any JDBC-compliant database (PostgreSQL, MySQL, Oracle, SQL Server, H2, and others) through their existing JDBC drivers, instead of writing and maintaining a new connector per database.
What changed?
Before: reading from or writing to anything other than PostgreSQL required a bespoke connector. After: two new connectors talk to any JDBC database through an embedded JVM over JNI, reusing the standard
java.sqlAPI and the database's own driver JAR.The source polls a configured SQL query and produces each row as a JSON message, in bulk (full re-read) or incremental mode (tracks a
{last_offset}placeholder and persists the offset across restarts). The sink consumes messages and INSERTs each into a target table, with optional auto-create-table and Iggy metadata columns. Both share one embedded-JVM/JNI bridge, so a single driver JAR serves either direction.How to review
core/connectors/sources/jdbc_source/src/lib.rspoll()thenbuild_query(offset substitution viaquote_sql_literal) thenread_rows/extract_column_value(JDBC to JSON type mapping) thenbuild_message(offset state persistence)core/connectors/sinks/jdbc_sink/src/lib.rsopen()(JVM, connectivitySELECT 1, optional auto-create-table) thenconsume()thenwrite_messages/insert_batch(batchedPreparedStatement) thenbind_rowget_or_create_jvm, connection acquire/close in both cratespush_local_frame/pop_local_frame, pooled-connection return,isValidreconnect in the sourceextract_column_valueNUMERICandDECIMALmap to string (no f64 precision loss), binary maps to base64,BIGINTcaveat documentedis_transient_sql_stateand the classify helpers in both crates08,40,53,57,58are transient (retried), everything else is permanent. The sink skips and counts permanent batches, retries transient onesDebugimpls, the secret serde modules,sanitize_jdbc_urlSecretString, never logged or serializedcore/integration/tests/connectors/mod.rssetup_runtime, plussend_messages/get_messageshelpers used by the sink and source testscore/server/src/tcp/tcp_listener.rs(3 lines)PortReserver) and waits for the server to writecurrent_config.tomlto learn the bound address. The server previously wrote that file only when the port was dynamic (port == 0), so a fixed-port server hung. The fix always notifies the config writer once the listener binds. TheSO_REUSEPORTcross-shard broadcast stays gated onport == 0, so multi-shard behaviour is unchanged. Worth a check for cluster mode.Config reference and per-database examples live in each connector's
README.md,config.toml, and thecore/connectors/runtime/example_config/connectors/jdbc_*.tomlfiles.Local Execution
cargo fmt --all -- --check,cargo clippy --all-targets --all-features -- -D warnings,cargo check --all --all-features,cargo sort --check,cargo machete, license headers (hawkeye check), andmarkdownlint.iggy-serverplus separate source and sink connector runtimes against a Postgres container and pushed 150 messages through source then iggy then sink, confirming all 150 rows arrived intact in the destination table with contiguous offsets and exact decimal values.prekwas not run (not installed locally); the equivalent checks listed above (fmt, clippy, sort, machete, license headers, markdownlint, tests) were run manually instead.Limitations
AI Usage