Skip to content

feat(connectors): add MySQL source polling connector#3568

Open
tusharagrahari wants to merge 1 commit into
apache:masterfrom
tusharagrahari:feature/mysql-source-polling
Open

feat(connectors): add MySQL source polling connector#3568
tusharagrahari wants to merge 1 commit into
apache:masterfrom
tusharagrahari:feature/mysql-source-polling

Conversation

@tusharagrahari

Copy link
Copy Markdown

Which issue does this PR address?

Relates to #3445

Rationale

MySQL is a widely used relational database with no existing Iggy source connector. This adds incremental table polling so users can stream MySQL rows into Iggy topics without CDC infrastructure.

What changed?

Before this PR, there was no way to source data from MySQL into Iggy.

This adds a mysql_source connector plugin supporting incremental table polling via a configurable tracking column. Rows are streamed as JSON, raw bytes, or text depending on the configured payload format. Post-processing options (delete after read, mark as processed) and custom SQL queries with parameter substitution are also supported. CDC (binlog-based) is out of scope and planned as a follow-up.

Local Execution

  • Passed
  • Pre-commit hooks ran

AI Usage

  1. Tools: Claude (Claude Code)
  2. Scope: Writing unit and integration tests, thinking through edge cases, and pair-programming during development.
  3. Verification: Ran all tests locally against a real MySQL container, validated end-to-end polling flow, and ran the full pre-commit hook suite.
  4. Yes, I can explain every line of code if asked.

@github-actions

Copy link
Copy Markdown

Thanks for the PR. It is labeled S-waiting-on-review and queued for review.

Slash commands (own line, regular comment) move it around the queue:

  • /ready - back to S-waiting-on-review after addressing feedback
  • /author - flip to S-waiting-on-author while you finish changes
  • /request-review @user-or-team - request a reviewer

See CONTRIBUTING.md for details.

@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 25, 2026
@codecov

codecov Bot commented Jun 25, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 76.27907% with 204 lines in your changes missing coverage. Please review.
✅ Project coverage is 44.91%. Comparing base (bc68827) to head (1c4d3d9).

Files with missing lines Patch % Lines
core/connectors/sources/mysql_source/src/lib.rs 76.27% 168 Missing and 36 partials ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3568       +/-   ##
=============================================
- Coverage     74.02%   44.91%   -29.11%     
  Complexity      937      937               
=============================================
  Files          1247     1245        -2     
  Lines        127567   112197    -15370     
  Branches     103435    88110    -15325     
=============================================
- Hits          94427    50391    -44036     
- Misses        30103    59073    +28970     
+ Partials       3037     2733      -304     
Components Coverage Δ
Rust Core 37.30% <76.27%> (-37.37%) ⬇️
Java SDK 62.44% <ø> (ø)
C# SDK 71.40% <ø> (-0.71%) ⬇️
Python SDK 88.88% <ø> (ø)
PHP SDK 84.29% <ø> (ø)
Node SDK 91.35% <ø> (+0.22%) ⬆️
Go SDK 40.14% <ø> (ø)
Files with missing lines Coverage Δ
core/connectors/sources/mysql_source/src/lib.rs 76.27% <76.27%> (ø)

... and 375 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@atharvalade atharvalade left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @tusharagrahari, thanks for the contribution and for taking the time to put this together.

A couple of process things before we dig into review:

  1. Issue approval: I can see you created #3445 yourself, which is great. However, per our CONTRIBUTING.md, new functionality needs maintainer approval on the issue before coding begins (a good-first-issue label or an explicit comment). Connectors are also listed as a high-risk area that needs a design discussion in the issue first. We typically ask first-time contributors to start with something labeled good-first-issue to get familiar with the repo's workflow and CI, then move on to bigger features like this.

  2. Failing CI: Could you take a look and get those green?

Not saying this won't get merged, the implementation looks good to start with. Just want to make sure we follow the process so there are no surprises down the line.

Comment thread core/connectors/sources/mysql_source/src/lib.rs
Comment on lines +545 to +548
if !processed_ids.is_empty() {
self.mark_or_delete_processed_rows(pool, table, pk_column, &processed_ids)
.await?;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark_or_delete_processed_rows runs inside poll_tables() before the messages reach the runtime's producer.send(). If the process crashes or publish fails after this point, those rows are gone from MySQL but never delivered to Iggy. It would be better to return the pending IDs in ProducedMessages and let the runtime execute the delete after confirmed send.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valid concern, I hit this too. The delete-before-send window exists because the SDK has no post-send ack primitive, not a conscious design choice. I modeled it on postgres_source as the reference, but that's context not a justification.

Proper fix needs a post-send hook on the Source trait — a SDK-level change. Happy to open a separate issue for it if useful.

Comment on lines +498 to +547
for table in &self.config.tables {
let table_config = RowProcessingConfig {
table,
..row_config
};

// Get last offset with minimal lock time
let last_offset = {
let state = self.state.lock().await;
state.tracking_offsets.get(table).cloned()
};

let query = if let Some(custom_query) = &self.config.custom_query {
self.validate_custom_query(custom_query)?;
self.substitute_query_params(custom_query, table, &last_offset, batch_size)
} else {
self.build_polling_query(table, tracking_column, &last_offset, batch_size)?
};

// Database I/O without holding the lock
let rows = with_retry(
|| sqlx::query(sqlx::AssertSqlSafe(query.as_str())).fetch_all(pool),
self.get_max_retries(),
self.retry_delay.as_millis() as u64,
)
.await?;

let mut max_offset: Option<String> = None;
let mut processed_ids: Vec<String> = Vec::new();

let mut count_per_table = 0;
for row in rows {
let processed = self.process_row(&row, &table_config)?;

if let Some(pk) = processed.row_pk {
processed_ids.push(pk);
}
if let Some(offset) = processed.max_offset {
max_offset = Some(offset);
}

messages.push(processed.message);
count_per_table += 1;
total_processed += 1;
}

// Database I/O without holding the lock
if !processed_ids.is_empty() {
self.mark_or_delete_processed_rows(pool, table, pk_column, &processed_ids)
.await?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If mark_or_delete_processed_rows succeeds for table A but a later operation fails for table B, the ? discards ALL collected messages (including A's). Table A's rows are already deleted/marked but its messages never get published. each table's batch needs independent error handling so one table's failure doesn't poison the rest.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworked poll_tables into two phases with per-table isolation:
Phase 1 — fetch + process, no side effects. Each table builds its own TableBatch. On failure (decode error, bad query), that table is logged and skipped for the cycle — its rows stay in MySQL and retry next poll. Other tables keep going.
Phase 2 — commit, per-table. mark_or_delete runs per table; only on success are messages appended and the offset advanced. Failures log and continue — one table's mark failure can't discard another's already-committed messages. Also added retry for transient errors (deadlock/lock-wait/connection drop) via with_retry.

Two things this doesn't cover -
Mark still happens before producer.send, so a send failure can still drop a batch. That needs a post-send ack on the Source trait — same gap exists in postgres_source, so I'd rather track it as a separate issue. Happy to open one if you agree.
With per-table skip, poll() returns Ok, so a stuck table only surfaces in the error log. Open to adding a metric if you'd prefer it more visible.

Comment thread core/connectors/sources/mysql_source/src/lib.rs
@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels Jun 25, 2026
Comment thread .gitignore
Comment on lines +45 to +46
local_test_connectors
local_test_runtime.toml

@hubcio hubcio Jun 26, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont see this file being added anywhere in the test in this PR nor in iggy as a whole. please drop it and add it to your own local .gitignore_global

@tusharagrahari

Copy link
Copy Markdown
Author

Hi @atharvalade, thanks for the detailed feedback!

On the process side — I did reach out on the Discord community before starting, where I got clearance to proceed with the connector work. I understand that might not substitute for a formal maintainer approval comment on the issue itself, and I'll make sure to get that explicitly documented going forward before picking up anything in high-risk areas.

On the CI failures — on it, will get those green shortly.

Also working through the inline review comments now. Will push the fixes once the CI is sorted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-author PR is waiting on author response

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants