feat(mcp): Add tools to modify stream sync settings and refresh catalog on existing connections#994
Conversation
…og on existing connections Adds two new MCP tools: - refresh_connection_catalog: Triggers a discover operation on a connection's source and updates the catalog with latest stream definitions and sync modes - set_stream_sync_mode: Safely changes the sync mode for a specific stream on a connection, with validation that the mode is supported Core logic lives in CloudConnection (connections.py) and api_util.py, with MCP tools as thin wrappers per the presentation layer pattern. Closes #993 Co-Authored-By: AJ Steers <aj@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1773770361-mcp-stream-sync-mode-refresh-catalog' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1773770361-mcp-stream-sync-mode-refresh-catalog'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
📝 WalkthroughWalkthroughAdds a Config API wrapper to fetch a refreshed connection catalog, CloudConnection methods to refresh a connection's catalog and to update a stream's sync settings, and MCP tool endpoints that expose those operations. ChangesRefresh & Stream Sync Tools
Sequence Diagram(s)sequenceDiagram
participant Client as MCP Client
participant MCP as MCP Endpoint
participant CloudConn as CloudConnection
participant APIUtil as API Utility
participant ConfigAPI as Config API
rect rgba(100, 150, 200, 0.5)
Note over Client,ConfigAPI: Refresh Catalog Flow
Client->>MCP: refresh_connection_catalog(connection_id)
MCP->>CloudConn: connection.refresh_catalog()
CloudConn->>APIUtil: get_refreshed_connection_catalog(...)
APIUtil->>ConfigAPI: GET /connections/get?withRefreshedCatalog=true
ConfigAPI-->>APIUtil: Updated catalog
APIUtil-->>CloudConn: Catalog dict
CloudConn->>CloudConn: Validate & store catalog
CloudConn-->>MCP: Confirmation message
MCP-->>Client: Success with stream count & URL
end
rect rgba(150, 100, 200, 0.5)
Note over Client,CloudConn: Set Stream Sync Mode Flow
Client->>MCP: set_stream_sync_mode(connection_id, stream_name, sync_mode, ...)
MCP->>CloudConn: connection.set_stream_sync_mode(...)
CloudConn->>CloudConn: Locate stream in catalog
CloudConn->>CloudConn: Validate sync_mode in supportedSyncModes
CloudConn->>CloudConn: Update stream config
CloudConn->>CloudConn: Save updated catalog
CloudConn-->>MCP: Success
MCP-->>Client: Confirmation with stream & connection details
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…am_sync_mode Addresses CodeRabbit review feedback: - Add stream_namespace parameter to disambiguate same-named streams in different namespaces. Raises PyAirbyteInputError when name is ambiguous. - Add fail-fast guard when switching to incremental mode without a usable cursor field (no existing cursor, no default cursor, no source-defined cursor). Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Pull request overview
Adds new Airbyte Cloud MCP tools and core CloudConnection helpers to (1) refresh a connection’s catalog via a forced re-discover and (2) update per-stream sync mode settings safely.
Changes:
- Add MCP tool
refresh_connection_catalogto trigger a refreshed discover and replace the connection catalog. - Add MCP tool
set_stream_sync_modeto update a single stream’ssyncMode(optionally namespace-qualified) and related config. - Add Config API helper
get_refreshed_connection_catalogand CloudConnection methodsrefresh_catalog/set_stream_sync_mode.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
airbyte/mcp/cloud.py |
Adds two new MCP tool wrappers that call CloudConnection methods and return human-readable status strings. |
airbyte/cloud/connections.py |
Implements the core catalog refresh + per-stream sync mode update logic against the stored syncCatalog. |
airbyte/_util/api_util.py |
Adds a Config API helper to fetch a connection with withRefreshedCatalog: true. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return ( | ||
| f"Successfully set sync mode for stream '{stream_name}' " | ||
| f"on connection '{connection_id}' to '{sync_mode}'" | ||
| + ( | ||
| f" with destination sync mode '{destination_sync_mode}'" | ||
| if destination_sync_mode | ||
| else "" | ||
| ) | ||
| + (f" and cursor field '{cursor_field}'" if cursor_field else "") | ||
| + f". URL: {connection.connection_url}" | ||
| ) |
There was a problem hiding this comment.
Reasonable observation. Including namespace in the success message when it was explicitly supplied would improve clarity for multi-namespace sources. Will add if the human reviewer agrees.\n\n---\nDevin session
| api_util.replace_connection_catalog( | ||
| connection_id=self.connection_id, | ||
| configured_catalog_dict=refreshed_catalog, | ||
| api_root=self.workspace.api_root, | ||
| client_id=self.workspace.client_id, | ||
| client_secret=self.workspace.client_secret, | ||
| bearer_token=self.workspace.bearer_token, | ||
| ) |
There was a problem hiding this comment.
Good suggestion. import_raw_catalog does the same replace_connection_catalog call with the same auth params. Switching to self.import_raw_catalog(refreshed_catalog) would reduce duplication. Will apply if the human reviewer agrees.\n\n---\nDevin session
| available_streams = [ | ||
| f"{e.get('stream', {}).get('namespace', '')}.{e.get('stream', {}).get('name', '')}" | ||
| for e in catalog["streams"] | ||
| ] |
There was a problem hiding this comment.
Valid observation — when namespace is None, this produces .users. However, this is a diagnostic context field on an error (not user-facing UI), so the dot-prefix still communicates "no namespace" clearly enough. Happy to clean this up if the human reviewer agrees it's worth addressing.\n\n---\nDevin session
| raise PyAirbyteInputError( | ||
| message=(f"Sync mode '{sync_mode}' is not supported by stream '{stream_name}'."), | ||
| context={ | ||
| "stream_name": stream_name, | ||
| "requested_sync_mode": sync_mode, | ||
| "supported_sync_modes": supported_sync_modes, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Fair point on consistency — connection_id is included in the other error contexts in this method. Will add if the human reviewer considers it worth a follow-up commit.\n\n---\nDevin session
| config["destinationSyncMode"] = destination_sync_mode | ||
|
|
||
| if cursor_field is not None: | ||
| config["cursorField"] = [cursor_field] |
There was a problem hiding this comment.
This is by design — allowing cursor_field to be set during full_refresh enables pre-configuring the cursor before a subsequent switch to incremental mode (a common two-step workflow). The docstring could be clearer about this, but silently ignoring the parameter would be surprising. Deferring to human reviewer on whether to add a note in the docstring.\n\n---\nDevin session
…stream-sync-mode-refresh-catalog
|
Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it. |
5 similar comments
|
Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it. |
|
Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it. |
|
Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it. |
|
Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it. |
|
Devin is archived and cannot be woken up. Please unarchive Devin if you want to continue using it. |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@airbyte/mcp/cloud.py`:
- Around line 2721-2730: The success message construction around
stream_name/connection_id/sync_mode currently always prefixes the cursor_field
with "and", which reads awkwardly when destination_sync_mode is None; update the
f-string assembly that uses destination_sync_mode and cursor_field so that if
destination_sync_mode is truthy you keep " and cursor field '{cursor_field}'",
but if destination_sync_mode is falsy you use " with cursor field
'{cursor_field}'" instead; adjust the expression that builds the trailing clause
(referencing destination_sync_mode, cursor_field, and connection.connection_url)
to pick the correct connective.
- Around line 2721-2730: The returned status string construction is
grammatically awkward when destination_sync_mode is None but cursor_field is
present; update the string assembly in the return expression that references
stream_name, connection_id, sync_mode, destination_sync_mode, cursor_field, and
connection.connection_url so that the cursor clause uses "with cursor field
'{cursor_field}'" (or otherwise build the optional clauses consistently) instead
of "and cursor field '{cursor_field}'" to ensure the message reads correctly in
all combinations of optional args.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
| return ( | ||
| f"Successfully set sync mode for stream '{stream_name}' " | ||
| f"on connection '{connection_id}' to '{sync_mode}'" | ||
| + ( | ||
| f" with destination sync mode '{destination_sync_mode}'" | ||
| if destination_sync_mode | ||
| else "" | ||
| ) | ||
| + (f" and cursor field '{cursor_field}'" if cursor_field else "") | ||
| + f". URL: {connection.connection_url}" |
There was a problem hiding this comment.
Grammatically awkward success message when only cursor_field is provided — wdyt about fixing the connective?
When destination_sync_mode is None but cursor_field is set, the message reads:
"Successfully set sync mode for stream 'X' on connection 'Y' to 'incremental' and cursor field 'updated_at'. URL: …"
The bare "and" without a preceding "with …" clause reads oddly. Could "with cursor field" replace "and cursor field" to make it consistent with the destination-mode phrasing?
✏️ Proposed fix
- + (f" and cursor field '{cursor_field}'" if cursor_field else "")
+ + (f" with cursor field '{cursor_field}'" if cursor_field else "")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return ( | |
| f"Successfully set sync mode for stream '{stream_name}' " | |
| f"on connection '{connection_id}' to '{sync_mode}'" | |
| + ( | |
| f" with destination sync mode '{destination_sync_mode}'" | |
| if destination_sync_mode | |
| else "" | |
| ) | |
| + (f" and cursor field '{cursor_field}'" if cursor_field else "") | |
| + f". URL: {connection.connection_url}" | |
| return ( | |
| f"Successfully set sync mode for stream '{stream_name}' " | |
| f"on connection '{connection_id}' to '{sync_mode}'" | |
| ( | |
| f" with destination sync mode '{destination_sync_mode}'" | |
| if destination_sync_mode | |
| else "" | |
| ) | |
| (f" with cursor field '{cursor_field}'" if cursor_field else "") | |
| f". URL: {connection.connection_url}" |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@airbyte/mcp/cloud.py` around lines 2721 - 2730, The success message
construction around stream_name/connection_id/sync_mode currently always
prefixes the cursor_field with "and", which reads awkwardly when
destination_sync_mode is None; update the f-string assembly that uses
destination_sync_mode and cursor_field so that if destination_sync_mode is
truthy you keep " and cursor field '{cursor_field}'", but if
destination_sync_mode is falsy you use " with cursor field '{cursor_field}'"
instead; adjust the expression that builds the trailing clause (referencing
destination_sync_mode, cursor_field, and connection.connection_url) to pick the
correct connective.
Grammatically awkward message when only cursor_field is provided — wdyt about using "with" instead of "and"?
When destination_sync_mode=None but cursor_field is set, the message reads:
"…to 'incremental' and cursor field 'updated_at'. URL: …"
The "and" appears without a preceding "with …" clause, which is a bit off. Swapping it to "with cursor field" makes it consistent with the destination_sync_mode phrasing and reads naturally regardless of which optional args are provided.
✏️ Proposed fix
- + (f" and cursor field '{cursor_field}'" if cursor_field else "")
+ + (f" with cursor field '{cursor_field}'" if cursor_field else "")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@airbyte/mcp/cloud.py` around lines 2721 - 2730, The returned status string
construction is grammatically awkward when destination_sync_mode is None but
cursor_field is present; update the string assembly in the return expression
that references stream_name, connection_id, sync_mode, destination_sync_mode,
cursor_field, and connection.connection_url so that the cursor clause uses "with
cursor field '{cursor_field}'" (or otherwise build the optional clauses
consistently) instead of "and cursor field '{cursor_field}'" to ensure the
message reads correctly in all combinations of optional args.
Summary
Adds two new MCP tools (and corresponding
CloudConnectioncore methods) to support automated incremental stream testing and configuration workflows:refresh_connection_catalog— Triggers a discover operation on a connection's source (viawithRefreshedCatalog: trueon the Config API) and replaces the connection's catalog with the refreshed result. Equivalent to "Refresh source schema" in the UI.set_stream_sync_mode— Safely modifies the sync mode for a single stream in a connection'ssyncCatalog. Validates that the requested mode is in the stream'ssupportedSyncModesbefore applying. Optionally setsdestinationSyncModeandcursorField.Core logic lives in
CloudConnection(connections.py) andapi_util.py; MCP tools incloud.pyare thin wrappers per the presentation-layer pattern.Closes #993
Review & Testing Checklist for Human
set_stream_sync_modematches streams bynameonly (with optional namespace disambiguation). Verify this is acceptable for multi-namespace sources.refresh_catalogtwo-step flow: The method does a GET (withwithRefreshedCatalog: true) then a separate POST to replace. Confirm this get-then-replace pattern is correct vs. a single update call.409 state-conflictfor the legacypre-1.0-faker-1-to-dev-nullconnection — appears unrelated to this PR). The catalog mutation persistence is verified, but a real sync round-trip afterset_stream_sync_mode → incrementalshould be confirmed on a fresh connection before merging.E2E test results (Devin sandbox workspace)
Ran a script against
pre-1.0-faker-1-to-dev-null(connection0c20f439-f492-4fe1-bbc7-2b419e1b24f5) usingCloudConnection.refresh_catalog()andCloudConnection.set_stream_sync_mode()from this branch:refresh_catalog()— returned 3 streams (products,purchases,users), persisted on the connection.set_stream_sync_mode("products", sync_mode="incremental", destination_sync_mode="append_dedup")— applied successfully;usersandpurchaseswere untouched.set_stream_sync_mode("products", sync_mode="full_refresh", destination_sync_mode="overwrite")— reverted cleanly.sync_moderaisedPyAirbyteInputErrorwith the expectedSupported Sync Modes: ['full_refresh', 'incremental']context.PyAirbyteInputErrorwith the available-streams list.Note: when an existing connection in the same workspace already syncs a given stream to the same destination,
replace_connection_catalogreturns400 connection-conflicting-destination-stream. That's a platform-level constraint (stream uniqueness), not a regression in this PR — but worth noting sincerefresh_catalogwill surface it on shared-stream destinations.Notes
check_guid_created_in_session) and markeddestructive=True, consistent with other catalog-mutating tools.cursorFieldis set as a single-element list ([cursor_field]) to match the Airbyte catalog wire format.main(82bf1e4); merge resolved automatically. All 21 CI checks green on the latest commit.Link to Devin session: https://app.devin.ai/sessions/1a305419379e4f9b838842027b8bdbc9
Requested by: Aaron ("AJ") Steers (@aaronsteers)