feat(smoke-test-source): Add incremental sync with configurable state checkpoints#1016
feat(smoke-test-source): Add incremental sync with configurable state checkpoints#1016Aaron ("AJ") Steers (aaronsteers) wants to merge 6 commits into
Conversation
… checkpoints - Adds a new `incremental_batch_stream` scenario that advertises `SyncMode.incremental` with `updated_at` as its source-defined cursor. - Adds `start_date`, `batch_size`, `batch_count`, `partition_by` (day/week/month), and `cursor_step` to the connector spec. All fields are optional and additive; existing full-refresh scenarios are unchanged. - Cursor coalesce order: state cursor > `start_date` > Jan 1 of the current UTC year. - Emits STATE after every `batch_size` records, at each partition boundary, and once at end-of-stream (dedup when triggers coincide). - Stops after `batch_size * batch_count` records when both are set. - `check()` rejects invalid `start_date`, `batch_size`, `batch_count`, `partition_by`, and `cursor_step`. - Adds 47 unit tests covering cursor coalesce, trigger logic, dedup, full-refresh non-regression, and config validation.
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1776987918-smoke-test-incremental' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1776987918-smoke-test-incremental'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 42 minutes and 15 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds incremental sync to the smoke-test source: scenario metadata now supports incremental descriptors and cursor fields; a new incremental event generator with partitioning and checkpointing; connector config + validation for cursor controls; discovery/read now emit RECORD and per-stream STATE for incremental scenarios; tests added. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant Source as SourceSmokeTest
participant Generator as iter_incremental_scenario_events
participant StateStore as State Store
Caller->>Source: read(config, catalog, state)
Source->>Source: determine stream type (incremental vs full-refresh)
alt incremental
Source->>StateStore: fetch per-stream state (if present)
StateStore-->>Source: cursor_start or none
Source->>Generator: start(scenario, cursor_start, cursor_step, batch_size, partition_by)
loop emit events
Generator->>Source: RECORD (with cursor, id)
alt checkpoint triggered (batch / partition / stop)
Generator->>Source: STATE (updated cursor)
Source->>StateStore: persist/update stream state
end
Source->>Caller: yield RECORD/STATE
end
Generator->>Source: final STATE (if not already emitted)
Source->>Caller: yield final STATE
else full-refresh
Source->>Generator: produce full-refresh records
Source->>Caller: yield RECORDs only
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Want me to highlight specific tests or helper functions (cursor parsing, partitioning, state-deduplication) for a focused review, wdyt? 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- Tighten _state_cursor_for_stream namespace check to be symmetric so None-namespace callers only match None-namespace state. - Remove None from partition_by enum; the type array handles null. - Reject empty ISO-8601 duration (e.g. 'PT') in _parse_cursor_step. - Add batch_size=True and batch_count=True cases to validation tests. - Use getattr instead of __dict__ to read state cursor in tests. - Make year-boundary tests tolerate Dec 31 -> Jan 1 transition. - Add ISO-week comment explaining iso.year vs ts.year. - Expand _emit_incremental_as_full_refresh docstring.
- _parse_start_date: wrap date.fromisoformat ValueError symmetrically so 10-char invalid dates (e.g. '2024-13-45') surface the same nice 'Invalid start_date: ...' message as other malformed inputs. - _validate_incremental_config: reject batch_count without batch_size, since iter_incremental_scenario_events silently ignores batch_count unless batch_size is also set. - _emit_incremental_as_full_refresh: document that updated_at values shift across year boundaries when start_date is not configured. - Add tests for batch_count-without-batch_size and invalid 10-char start_date.
The 0.2.0 wrapper still works with PyAirbyte >= 0.41.0 — the image build pulls the latest PyAirbyte at image-build time, so once airbytehq/PyAirbyte#1016 ships the new scenario will be available. Leaving the floor at 0.41.0 keeps CI resolvable until then.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
tests/unit_tests/test_smoke_test_source.py (2)
311-335: Small nit ontuple(expected)— redundant sinceexpectedis already a tuple, wdyt?For the
day/monthparams,expectedis a literal tuple; forweek,datetime(...).isocalendar()[:2]also returns a plain tuple (slicing a named tuple yieldstuple). Sotuple(expected)is a no-op wrap. You can drop it for a hair more clarity, but it's harmless — up to you.♻️ Optional tweak
def test_partition_bucket(ts, grain, expected): - assert _partition_bucket(ts, grain) == tuple(expected) + assert _partition_bucket(ts, grain) == expected🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit_tests/test_smoke_test_source.py` around lines 311 - 335, The test wraps the expected values in tuple(expected) redundantly in test_partition_bucket; update the assertions to compare directly using expected (i.e., change assert _partition_bucket(ts, grain) == tuple(expected) to assert _partition_bucket(ts, grain) == expected) and adjust the three param cases (day/week/month) accordingly; locate the assertion in the test_partition_bucket function referencing _partition_bucket to make this small cleanup.
94-95: Tiny nit:getattrwithout a default reads like defensive code that isn't defending anything — intentional, wdyt?
getattr(obj, "updated_at")with no default is equivalent toobj.state.stream.stream_state.updated_atand will still raiseAttributeErrorif missing. If the intent is "fail loud when the cursor attribute is absent", plain attribute access communicates that a bit more clearly. If the intent was to tolerate absence onAirbyteStateBlob(which is a dynamic/extra=allowpydantic model), adding a default (e.g.None) would make that explicit. Totally optional though — happy to leave it as-is if you prefer.♻️ Optional tweak
def _state_cursor(msg) -> str: - return getattr(msg.state.stream.stream_state, "updated_at") + return msg.state.stream.stream_state.updated_at🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit_tests/test_smoke_test_source.py` around lines 94 - 95, The helper _state_cursor currently uses getattr(msg.state.stream.stream_state, "updated_at") which offers no default and is therefore equivalent to direct attribute access; change it to explicit attribute access msg.state.stream.stream_state.updated_at to communicate the "fail loud" intent more clearly, or if you intended to tolerate a missing cursor, change to getattr(msg.state.stream.stream_state, "updated_at", None) to make that behavior explicit—update the _state_cursor function accordingly (choose one of the two options).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tests/unit_tests/test_smoke_test_source.py`:
- Around line 311-335: The test wraps the expected values in tuple(expected)
redundantly in test_partition_bucket; update the assertions to compare directly
using expected (i.e., change assert _partition_bucket(ts, grain) ==
tuple(expected) to assert _partition_bucket(ts, grain) == expected) and adjust
the three param cases (day/week/month) accordingly; locate the assertion in the
test_partition_bucket function referencing _partition_bucket to make this small
cleanup.
- Around line 94-95: The helper _state_cursor currently uses
getattr(msg.state.stream.stream_state, "updated_at") which offers no default and
is therefore equivalent to direct attribute access; change it to explicit
attribute access msg.state.stream.stream_state.updated_at to communicate the
"fail loud" intent more clearly, or if you intended to tolerate a missing
cursor, change to getattr(msg.state.stream.stream_state, "updated_at", None) to
make that behavior explicit—update the _state_cursor function accordingly
(choose one of the two options).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 75a62930-73a7-4810-b811-399844cdd218
📒 Files selected for processing (2)
airbyte/cli/smoke_test_source/source.pytests/unit_tests/test_smoke_test_source.py
✅ Files skipped from review due to trivial changes (1)
- airbyte/cli/smoke_test_source/source.py
There was a problem hiding this comment.
Pull request overview
Adds opt-in incremental sync support to SourceSmokeTest with configurable cursor start coalescing and forced per-stream state checkpoint emission.
Changes:
- Introduces a new incremental scenario (
incremental_batch_stream) withupdated_atas the source-defined cursor and a synthetic incremental event generator. - Adds config parsing + validation for
start_date,cursor_step,batch_size,batch_count, andpartition_by, and emits STREAM state messages during incremental reads. - Adds a comprehensive unit test suite covering cursor coalescing, checkpoint triggers/deduping, end-to-end
read(), andcheck()validation.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
airbyte/cli/smoke_test_source/source.py |
Implements incremental config parsing/validation, cursor coalescing, and STATE emission in read() for incremental-enabled scenarios. |
airbyte/cli/smoke_test_source/_scenarios.py |
Adds the incremental scenario definition and the incremental record/state event iterator with size/partition/end triggers. |
tests/unit_tests/test_smoke_test_source.py |
Adds unit tests validating incremental behavior, config validation, and non-regression for full-refresh scenarios. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "start_date": { | ||
| "type": ["string", "null"], | ||
| "title": "Start Date", | ||
| "format": "date-time", |
There was a problem hiding this comment.
start_date is documented/parsed as accepting either an ISO-8601 date (YYYY-MM-DD) or datetime, but the connector spec declares format: date-time. Some platforms/UIs treat format as a validator or render a datetime-only picker, which can reject date-only inputs that the implementation explicitly supports. Consider removing the format constraint or switching it to a schema that clearly allows both date and date-time inputs (or, alternatively, restrict parsing to date-time only to match the spec).
| "format": "date-time", |
There was a problem hiding this comment.
Agreed — the format: date-time hint was inconsistent with _parse_start_date, which explicitly accepts both 10-char ISO dates and full ISO-8601 datetimes. Some Airbyte UIs render a datetime-only picker for format: date-time and would reject 2024-06-15.
Fixed in 2986b0c by removing the format constraint and clarifying the description to call out both accepted shapes (YYYY-MM-DD and YYYY-MM-DDTHH:MM:SSZ).
| "partition_by": { | ||
| "type": ["string", "null"], | ||
| "title": "Partition By", | ||
| "enum": ["day", "week", "month"], |
There was a problem hiding this comment.
🟡 partition_by JSON Schema enum rejects null despite type and default allowing it
The partition_by spec field declares "type": ["string", "null"] and "default": None, but the "enum": ["day", "week", "month"] does not include null. In JSON Schema draft-07, the enum keyword independently restricts allowed values, so a null value passes the type check but fails the enum check. Verified using the jsonschema library (already a project dependency): jsonschema.validate(None, schema) raises ValidationError: None is not one of ['day', 'week', 'month']. This means that users who leave partition_by unset (or explicitly set to null) would fail config validation if the Airbyte platform performs strict JSON Schema validation against the connector spec.
| "enum": ["day", "week", "month"], | |
| "enum": ["day", "week", "month", None], |
Was this helpful? React with 👍 or 👎 to provide feedback.
- Fix PK collision on incremental resume: derive `id` from cursor timestamp (via a fixed Unix epoch) so the same `updated_at` always maps to the same `id` — avoids duplicate primary keys across runs that resume from state. - Drop `format: date-time` from `start_date` spec since the parser explicitly accepts both ISO-8601 date and date-time inputs; keeping the format hint could cause UIs to reject date-only input.
- Flip incremental_batch_stream to high_volume: True so it stays opt-in (was being pulled into the default fast set). - Hoist shared _CATEGORIES constant used by both batch generators. - Fix partition-trigger docstring wording (no look-ahead; trigger fires on boundary between previous and just-yielded record). - Update _emit_incremental_as_full_refresh docstring to reflect that id/value now shift with start_date (only category is independent). - Update tests to opt into all_slow_streams for the new scenario.
Add `incremental_batch_stream` to `HIGH_VOLUME_SCENARIO_NAMES` so the public constant matches the `high_volume: True` flag on the scenario dict. Caught by Devin Review.
|
Ran the connector end-to-end against the new Unit tests (51 passed)Demo 1 — initial run, composed checkpointsConfig: Exactly 6 records (= Demo 2 — resume from state (coalesce + PK stability)Same config, input state State cursor wins over Demo 3 — month partitionConfig: Full evidence page: https://app.devin.ai/attachments/c653760d-8f2e-46de-ac19-64876f76d625/evidence_full.png |
Summary
Adds incremental sync support to
SourceSmokeTest(inairbyte/cli/smoke_test_source/) with force-emitted state checkpoints, per the requirements:start_dateconfig → Jan 1 of the current UTC year.batch_sizerecords (paired withbatch_countto bound total records =batch_size * batch_count),partition_by: day | week | month(UTC),Scope is opt-in per scenario: a new
incremental_batch_streamscenario advertisesSyncMode.incrementalwithupdated_atas its source-defined cursor. All existing full-refresh scenarios (basic_types,timestamp_types,large_batch_stream, etc.) are untouched. All new spec fields are optional — no breaking changes for existing catalogs/configs.New spec fields (all optional):
start_date— ISO-8601 date or datetime; lower bound for incremental cursor.batch_size— int ≥ 1; emit STATE every N records.batch_count— int ≥ 1; paired withbatch_sizeto bound record emission.partition_by—day|week|month; emit STATE at UTC calendar boundaries.cursor_step— seconds (number) or ISO-8601 duration (e.g.PT1H,P1D); defaults to 1 s.Behavioral notes:
updated_atstrictly greater than the resolved cursor start, so resuming from a prior STATE never re-emits a record.full_refreshmode, records are emitted with the same generator (deterministic) but no STATE messages.check()rejects invalidstart_date, negative/zerobatch_size/batch_count, unknownpartition_by, and unparseablecursor_stepbefore the sync starts.Covered by 47 unit tests in
tests/unit_tests/test_smoke_test_source.py— coalesce order, trigger logic, dedup, full-refresh non-regression, and config validation. Localruff check,ruff format --check,pyrefly check, and full unit-test suite all pass.A separate follow-up PR will bump
source-smoke-test's Docker image tag inairbytehq/airbyteonce this lands.Review & Testing Checklist for Human
start_date→ Jan 1 UTC) matches the intended semantics.incremental_batch_streamis incremental;large_batch_streamstays full-refresh-only).start_date,batch_size,batch_count,partition_by,cursor_step).partition_by=dayrun withcursor_step=PT6Hto see STATE messages at day boundaries.Notes
airbyte/cli/smoke_test_source/{source.py,_scenarios.py}, following the existing layout for this source. Happy to move shared helpers out to a public module in a follow-up if preferred.large_batch_streamunchanged; cursor field =updated_at; no extra triggers for v1.Link to Devin session: https://app.devin.ai/sessions/aed4dcdceb4e444c98be17b2bfcd2602
Requested by: Aaron ("AJ") Steers (@aaronsteers)
Summary by CodeRabbit
New Features
Tests