Skip to content

feat(mcp): Add tools to modify stream sync settings and refresh catalog on existing connections#994

Open
Aaron ("AJ") Steers (aaronsteers) wants to merge 3 commits into
mainfrom
devin/1773770361-mcp-stream-sync-mode-refresh-catalog
Open

feat(mcp): Add tools to modify stream sync settings and refresh catalog on existing connections#994
Aaron ("AJ") Steers (aaronsteers) wants to merge 3 commits into
mainfrom
devin/1773770361-mcp-stream-sync-mode-refresh-catalog

Conversation

@aaronsteers
Copy link
Copy Markdown
Member

@aaronsteers Aaron ("AJ") Steers (aaronsteers) commented Mar 17, 2026

Summary

Adds two new MCP tools (and corresponding CloudConnection core methods) to support automated incremental stream testing and configuration workflows:

  1. refresh_connection_catalog — Triggers a discover operation on a connection's source (via withRefreshedCatalog: true on the Config API) and replaces the connection's catalog with the refreshed result. Equivalent to "Refresh source schema" in the UI.

  2. set_stream_sync_mode — Safely modifies the sync mode for a single stream in a connection's syncCatalog. Validates that the requested mode is in the stream's supportedSyncModes before applying. Optionally sets destinationSyncMode and cursorField.

Core logic lives in CloudConnection (connections.py) and api_util.py; MCP tools in cloud.py are thin wrappers per the presentation-layer pattern.

Closes #993

Review & Testing Checklist for Human

  • Namespace-qualified streams: set_stream_sync_mode matches streams by name only (with optional namespace disambiguation). Verify this is acceptable for multi-namespace sources.
  • refresh_catalog two-step flow: The method does a GET (with withRefreshedCatalog: true) then a separate POST to replace. Confirm this get-then-replace pattern is correct vs. a single update call.
  • Real sync after mutation: My e2e run wasn't able to trigger a manual sync on the test connection (Cloud returned 409 state-conflict for the legacy pre-1.0-faker-1-to-dev-null connection — appears unrelated to this PR). The catalog mutation persistence is verified, but a real sync round-trip after set_stream_sync_mode → incremental should be confirmed on a fresh connection before merging.

E2E test results (Devin sandbox workspace)

Ran a script against pre-1.0-faker-1-to-dev-null (connection 0c20f439-f492-4fe1-bbc7-2b419e1b24f5) using CloudConnection.refresh_catalog() and CloudConnection.set_stream_sync_mode() from this branch:

  1. refresh_catalog() — returned 3 streams (products, purchases, users), persisted on the connection.
  2. set_stream_sync_mode("products", sync_mode="incremental", destination_sync_mode="append_dedup") — applied successfully; users and purchases were untouched.
  3. set_stream_sync_mode("products", sync_mode="full_refresh", destination_sync_mode="overwrite") — reverted cleanly.
  4. Invalid sync_mode raised PyAirbyteInputError with the expected Supported Sync Modes: ['full_refresh', 'incremental'] context.
  5. Missing stream raised PyAirbyteInputError with the available-streams list.

Note: when an existing connection in the same workspace already syncs a given stream to the same destination, replace_connection_catalog returns 400 connection-conflicting-destination-stream. That's a platform-level constraint (stream uniqueness), not a regression in this PR — but worth noting since refresh_catalog will surface it on shared-stream destinations.

Notes

  • Both tools are gated by safe mode (check_guid_created_in_session) and marked destructive=True, consistent with other catalog-mutating tools.
  • No unit tests added — the core methods operate against the Config API which requires integration-level testing.
  • The cursorField is set as a single-element list ([cursor_field]) to match the Airbyte catalog wire format.
  • Updated from main (82bf1e4); merge resolved automatically. All 21 CI checks green on the latest commit.

Link to Devin session: https://app.devin.ai/sessions/1a305419379e4f9b838842027b8bdbc9
Requested by: Aaron ("AJ") Steers (@aaronsteers)


Open with Devin

…og on existing connections

Adds two new MCP tools:
- refresh_connection_catalog: Triggers a discover operation on a connection's source
  and updates the catalog with latest stream definitions and sync modes
- set_stream_sync_mode: Safely changes the sync mode for a specific stream on a
  connection, with validation that the mode is supported

Core logic lives in CloudConnection (connections.py) and api_util.py, with MCP
tools as thin wrappers per the presentation layer pattern.

Closes #993

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Copy Markdown
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1773770361-mcp-stream-sync-mode-refresh-catalog' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1773770361-mcp-stream-sync-mode-refresh-catalog'

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /uv-lock - Updates uv.lock file
  • /test-pr - Runs tests with the updated PyAirbyte
  • /prerelease - Builds and publishes a prerelease version to PyPI
📚 Show Repo Guidance

Helpful Resources

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 17, 2026

PyTest Results (Fast Tests Only, No Creds)

343 tests  ±0   343 ✅ ±0   6m 6s ⏱️ -26s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 5e24c4a. ± Comparison against base commit 82bf1e4.

♻️ This comment has been updated with latest results.

@aaronsteers Aaron ("AJ") Steers (aaronsteers) marked this pull request as ready for review March 17, 2026 18:14
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 17, 2026

📝 Walkthrough

Walkthrough

Adds a Config API wrapper to fetch a refreshed connection catalog, CloudConnection methods to refresh a connection's catalog and to update a stream's sync settings, and MCP tool endpoints that expose those operations.

Changes

Refresh & Stream Sync Tools

Layer / File(s) Summary
Data / API call
airbyte/_util/api_util.py
New get_refreshed_connection_catalog(...) calls the Config API with withRefreshedCatalog: true and returns the refreshed connection catalog dict.
Core Domain
airbyte/cloud/connections.py
Added refresh_catalog() which calls the API util, validates the returned catalog, replaces the connection's catalog, and returns the refreshed syncCatalog. Added set_stream_sync_mode(...) which locates a stream (optionally by namespace), validates supported sync modes and cursor requirements, updates that stream's config (syncMode, destinationSyncMode, cursorField) and persists the catalog.
Tooling / Surface
airbyte/mcp/cloud.py
Added MCP tool functions refresh_connection_catalog(...) and set_stream_sync_mode(...) which validate session/workspace/connection, call the CloudConnection methods, and return human-readable success messages including stream counts and connection URLs.

Sequence Diagram(s)

sequenceDiagram
    participant Client as MCP Client
    participant MCP as MCP Endpoint
    participant CloudConn as CloudConnection
    participant APIUtil as API Utility
    participant ConfigAPI as Config API

    rect rgba(100, 150, 200, 0.5)
    Note over Client,ConfigAPI: Refresh Catalog Flow
    Client->>MCP: refresh_connection_catalog(connection_id)
    MCP->>CloudConn: connection.refresh_catalog()
    CloudConn->>APIUtil: get_refreshed_connection_catalog(...)
    APIUtil->>ConfigAPI: GET /connections/get?withRefreshedCatalog=true
    ConfigAPI-->>APIUtil: Updated catalog
    APIUtil-->>CloudConn: Catalog dict
    CloudConn->>CloudConn: Validate & store catalog
    CloudConn-->>MCP: Confirmation message
    MCP-->>Client: Success with stream count & URL
    end

    rect rgba(150, 100, 200, 0.5)
    Note over Client,CloudConn: Set Stream Sync Mode Flow
    Client->>MCP: set_stream_sync_mode(connection_id, stream_name, sync_mode, ...)
    MCP->>CloudConn: connection.set_stream_sync_mode(...)
    CloudConn->>CloudConn: Locate stream in catalog
    CloudConn->>CloudConn: Validate sync_mode in supportedSyncModes
    CloudConn->>CloudConn: Update stream config
    CloudConn->>CloudConn: Save updated catalog
    CloudConn-->>MCP: Success
    MCP-->>Client: Confirmation with stream & connection details
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely describes the main changes: adding MCP tools for catalog refresh and stream sync mode modification on existing connections.
Linked Issues check ✅ Passed All core requirements from issue #993 are met: refresh_connection_catalog triggers discover with withRefreshedCatalog flag, set_stream_sync_mode validates sync modes and safely modifies stream settings, both tools are exposed as MCP tools with proper gating.
Out of Scope Changes check ✅ Passed All changes are directly scoped to implementing the two requested MCP tools and their supporting core methods; no extraneous modifications detected.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch devin/1773770361-mcp-stream-sync-mode-refresh-catalog

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

…am_sync_mode

Addresses CodeRabbit review feedback:
- Add stream_namespace parameter to disambiguate same-named streams in
  different namespaces. Raises PyAirbyteInputError when name is ambiguous.
- Add fail-fast guard when switching to incremental mode without a usable
  cursor field (no existing cursor, no default cursor, no source-defined cursor).

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 3 additional findings.

Open in Devin Review

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 17, 2026

PyTest Results (Full)

413 tests  ±0   395 ✅ ±0   24m 37s ⏱️ - 1m 38s
  1 suites ±0    18 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 5e24c4a. ± Comparison against base commit 82bf1e4.

♻️ This comment has been updated with latest results.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds new Airbyte Cloud MCP tools and core CloudConnection helpers to (1) refresh a connection’s catalog via a forced re-discover and (2) update per-stream sync mode settings safely.

Changes:

  • Add MCP tool refresh_connection_catalog to trigger a refreshed discover and replace the connection catalog.
  • Add MCP tool set_stream_sync_mode to update a single stream’s syncMode (optionally namespace-qualified) and related config.
  • Add Config API helper get_refreshed_connection_catalog and CloudConnection methods refresh_catalog / set_stream_sync_mode.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
airbyte/mcp/cloud.py Adds two new MCP tool wrappers that call CloudConnection methods and return human-readable status strings.
airbyte/cloud/connections.py Implements the core catalog refresh + per-stream sync mode update logic against the stored syncCatalog.
airbyte/_util/api_util.py Adds a Config API helper to fetch a connection with withRefreshedCatalog: true.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread airbyte/mcp/cloud.py
Comment on lines +2712 to +2722
return (
f"Successfully set sync mode for stream '{stream_name}' "
f"on connection '{connection_id}' to '{sync_mode}'"
+ (
f" with destination sync mode '{destination_sync_mode}'"
if destination_sync_mode
else ""
)
+ (f" and cursor field '{cursor_field}'" if cursor_field else "")
+ f". URL: {connection.connection_url}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable observation. Including namespace in the success message when it was explicitly supplied would improve clarity for multi-namespace sources. Will add if the human reviewer agrees.\n\n---\nDevin session

Comment on lines +681 to +688
api_util.replace_connection_catalog(
connection_id=self.connection_id,
configured_catalog_dict=refreshed_catalog,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. import_raw_catalog does the same replace_connection_catalog call with the same auth params. Switching to self.import_raw_catalog(refreshed_catalog) would reduce duplication. Will apply if the human reviewer agrees.\n\n---\nDevin session

Comment on lines +742 to +745
available_streams = [
f"{e.get('stream', {}).get('namespace', '')}.{e.get('stream', {}).get('name', '')}"
for e in catalog["streams"]
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid observation — when namespace is None, this produces .users. However, this is a diagnostic context field on an error (not user-facing UI), so the dot-prefix still communicates "no namespace" clearly enough. Happy to clean this up if the human reviewer agrees it's worth addressing.\n\n---\nDevin session

Comment on lines +774 to +781
raise PyAirbyteInputError(
message=(f"Sync mode '{sync_mode}' is not supported by stream '{stream_name}'."),
context={
"stream_name": stream_name,
"requested_sync_mode": sync_mode,
"supported_sync_modes": supported_sync_modes,
},
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point on consistency — connection_id is included in the other error contexts in this method. Will add if the human reviewer considers it worth a follow-up commit.\n\n---\nDevin session

config["destinationSyncMode"] = destination_sync_mode

if cursor_field is not None:
config["cursorField"] = [cursor_field]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is by design — allowing cursor_field to be set during full_refresh enables pre-configuring the cursor before a subsequent switch to incremental mode (a common two-step workflow). The docstring could be clearer about this, but silently ignoring the parameter would be surprising. Deferring to human reviewer on whether to add a note in the docstring.\n\n---\nDevin session

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it.

5 similar comments
@devin-ai-integration
Copy link
Copy Markdown
Contributor

Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it.

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it.

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it.

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it.

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@airbyte/mcp/cloud.py`:
- Around line 2721-2730: The success message construction around
stream_name/connection_id/sync_mode currently always prefixes the cursor_field
with "and", which reads awkwardly when destination_sync_mode is None; update the
f-string assembly that uses destination_sync_mode and cursor_field so that if
destination_sync_mode is truthy you keep " and cursor field '{cursor_field}'",
but if destination_sync_mode is falsy you use " with cursor field
'{cursor_field}'" instead; adjust the expression that builds the trailing clause
(referencing destination_sync_mode, cursor_field, and connection.connection_url)
to pick the correct connective.
- Around line 2721-2730: The returned status string construction is
grammatically awkward when destination_sync_mode is None but cursor_field is
present; update the string assembly in the return expression that references
stream_name, connection_id, sync_mode, destination_sync_mode, cursor_field, and
connection.connection_url so that the cursor clause uses "with cursor field
'{cursor_field}'" (or otherwise build the optional clauses consistently) instead
of "and cursor field '{cursor_field}'" to ensure the message reads correctly in
all combinations of optional args.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: cc5fee52-5ffb-4554-b99c-9f7f2d75f89d

📥 Commits

Reviewing files that changed from the base of the PR and between e6fce67 and 5e24c4a.

📒 Files selected for processing (1)
  • airbyte/mcp/cloud.py

Comment thread airbyte/mcp/cloud.py
Comment on lines +2721 to +2730
return (
f"Successfully set sync mode for stream '{stream_name}' "
f"on connection '{connection_id}' to '{sync_mode}'"
+ (
f" with destination sync mode '{destination_sync_mode}'"
if destination_sync_mode
else ""
)
+ (f" and cursor field '{cursor_field}'" if cursor_field else "")
+ f". URL: {connection.connection_url}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Grammatically awkward success message when only cursor_field is provided — wdyt about fixing the connective?

When destination_sync_mode is None but cursor_field is set, the message reads:

"Successfully set sync mode for stream 'X' on connection 'Y' to 'incremental' and cursor field 'updated_at'. URL: …"

The bare "and" without a preceding "with …" clause reads oddly. Could "with cursor field" replace "and cursor field" to make it consistent with the destination-mode phrasing?

✏️ Proposed fix
-    + (f" and cursor field '{cursor_field}'" if cursor_field else "")
+    + (f" with cursor field '{cursor_field}'" if cursor_field else "")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return (
f"Successfully set sync mode for stream '{stream_name}' "
f"on connection '{connection_id}' to '{sync_mode}'"
+ (
f" with destination sync mode '{destination_sync_mode}'"
if destination_sync_mode
else ""
)
+ (f" and cursor field '{cursor_field}'" if cursor_field else "")
+ f". URL: {connection.connection_url}"
return (
f"Successfully set sync mode for stream '{stream_name}' "
f"on connection '{connection_id}' to '{sync_mode}'"
(
f" with destination sync mode '{destination_sync_mode}'"
if destination_sync_mode
else ""
)
(f" with cursor field '{cursor_field}'" if cursor_field else "")
f". URL: {connection.connection_url}"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte/mcp/cloud.py` around lines 2721 - 2730, The success message
construction around stream_name/connection_id/sync_mode currently always
prefixes the cursor_field with "and", which reads awkwardly when
destination_sync_mode is None; update the f-string assembly that uses
destination_sync_mode and cursor_field so that if destination_sync_mode is
truthy you keep " and cursor field '{cursor_field}'", but if
destination_sync_mode is falsy you use " with cursor field '{cursor_field}'"
instead; adjust the expression that builds the trailing clause (referencing
destination_sync_mode, cursor_field, and connection.connection_url) to pick the
correct connective.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Grammatically awkward message when only cursor_field is provided — wdyt about using "with" instead of "and"?

When destination_sync_mode=None but cursor_field is set, the message reads:

"…to 'incremental' and cursor field 'updated_at'. URL: …"

The "and" appears without a preceding "with …" clause, which is a bit off. Swapping it to "with cursor field" makes it consistent with the destination_sync_mode phrasing and reads naturally regardless of which optional args are provided.

✏️ Proposed fix
-    + (f" and cursor field '{cursor_field}'" if cursor_field else "")
+    + (f" with cursor field '{cursor_field}'" if cursor_field else "")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte/mcp/cloud.py` around lines 2721 - 2730, The returned status string
construction is grammatically awkward when destination_sync_mode is None but
cursor_field is present; update the string assembly in the return expression
that references stream_name, connection_id, sync_mode, destination_sync_mode,
cursor_field, and connection.connection_url so that the cursor clause uses "with
cursor field '{cursor_field}'" (or otherwise build the optional clauses
consistently) instead of "and cursor field '{cursor_field}'" to ensure the
message reads correctly in all combinations of optional args.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(mcp): Add tools to modify stream sync settings and refresh catalog on existing connections

2 participants