From 6ff94d7e184ea07aac10dd2c2b5351c271408586 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 23 Apr 2026 23:57:01 +0000 Subject: [PATCH 1/6] feat(smoke-test-source): Add incremental sync with configurable state checkpoints - Adds a new `incremental_batch_stream` scenario that advertises `SyncMode.incremental` with `updated_at` as its source-defined cursor. - Adds `start_date`, `batch_size`, `batch_count`, `partition_by` (day/week/month), and `cursor_step` to the connector spec. All fields are optional and additive; existing full-refresh scenarios are unchanged. - Cursor coalesce order: state cursor > `start_date` > Jan 1 of the current UTC year. - Emits STATE after every `batch_size` records, at each partition boundary, and once at end-of-stream (dedup when triggers coincide). - Stops after `batch_size * batch_count` records when both are set. - `check()` rejects invalid `start_date`, `batch_size`, `batch_count`, `partition_by`, and `cursor_step`. - Adds 47 unit tests covering cursor coalesce, trigger logic, dedup, full-refresh non-regression, and config validation. --- airbyte/cli/smoke_test_source/_scenarios.py | 167 ++++++- airbyte/cli/smoke_test_source/source.py | 472 ++++++++++++++++++-- tests/unit_tests/test_smoke_test_source.py | 438 ++++++++++++++++++ 3 files changed, 1044 insertions(+), 33 deletions(-) create mode 100644 tests/unit_tests/test_smoke_test_source.py diff --git a/airbyte/cli/smoke_test_source/_scenarios.py b/airbyte/cli/smoke_test_source/_scenarios.py index dba77e5db..b792bd064 100644 --- a/airbyte/cli/smoke_test_source/_scenarios.py +++ b/airbyte/cli/smoke_test_source/_scenarios.py @@ -3,20 +3,45 @@ Each scenario defines a stream name, JSON schema, optional primary key, and either inline records or a record generator reference. + +Scenarios may additionally declare: + +- `incremental`: `True` to advertise `SyncMode.incremental` for the stream. +- `cursor_field`: list[str], the source-defined cursor path for incremental streams. +- `record_generator`: string id of a generator in `get_scenario_records` / + `iter_incremental_scenario_events`. +- `high_volume`: `True` to exclude the stream from the default fast-only scenario set. """ from __future__ import annotations import math -from typing import Any +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Any, Literal + + +if TYPE_CHECKING: + from collections.abc import Iterator _DEFAULT_LARGE_BATCH_COUNT = 1000 +"""Default number of records for the `large_batch_stream` generator.""" + +_DEFAULT_INCREMENTAL_MAX_RECORDS = 1000 +"""Safety cap on records emitted by the `incremental_batch_stream` generator. + +Applies only when neither `batch_count` nor a stream-level `record_count` +is set; prevents accidental unbounded emission. +""" HIGH_VOLUME_SCENARIO_NAMES: set[str] = { "large_batch_stream", } +PartitionGrain = Literal["day", "week", "month"] + +_INCREMENTAL_CURSOR_FIELD: list[str] = ["updated_at"] + PREDEFINED_SCENARIOS: list[dict[str, Any]] = [ { "name": "basic_types", @@ -708,6 +733,28 @@ {"id": 7, "float_value": -1.0, "integer_value": -2147483648}, ], }, + { + "name": "incremental_batch_stream", + "description": ( + "Incremental stream that honors inbound state, `start_date`, " + "and checkpoint triggers (`batch_size` / `batch_count` / `partition_by`)." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "category": {"type": "string"}, + "value": {"type": "number"}, + }, + }, + "primary_key": [["id"]], + "incremental": True, + "cursor_field": list(_INCREMENTAL_CURSOR_FIELD), + "record_generator": "incremental_batch", + "high_volume": False, + }, ] @@ -753,10 +800,126 @@ def generate_large_string_records() -> list[dict[str, Any]]: def get_scenario_records( scenario: dict[str, Any], ) -> list[dict[str, Any]]: - """Get records for a scenario, using generator if specified.""" + """Get records for a scenario, using generator if specified. + + Used for full-refresh scenarios. Incremental scenarios should be iterated + via `iter_incremental_scenario_events` instead. + """ generator = scenario.get("record_generator") if generator == "large_batch": return generate_large_batch_records(scenario) if generator == "large_strings": return generate_large_string_records() return scenario.get("records", []) + + +def _partition_bucket( + ts: datetime, + grain: PartitionGrain, +) -> tuple[int, ...]: + """Return a hashable bucket identifier for `ts` at the given UTC grain. + + Two timestamps share a bucket iff they fall within the same calendar + `day`, ISO `week`, or `month` in UTC. + """ + ts_utc = ts.astimezone(timezone.utc) + if grain == "day": + return (ts_utc.year, ts_utc.month, ts_utc.day) + if grain == "week": + iso = ts_utc.isocalendar() + return (iso.year, iso.week) + if grain == "month": + return (ts_utc.year, ts_utc.month) + raise ValueError(f"Unsupported partition grain: {grain!r}") + + +def iter_incremental_scenario_events( + scenario: dict[str, Any], + *, + cursor_start: datetime, + cursor_step: timedelta, + batch_size: int | None = None, + batch_count: int | None = None, + partition_by: PartitionGrain | None = None, + max_records: int | None = None, +) -> Iterator[dict[str, Any]]: + """Yield ordered record/state events for an incremental scenario. + + Events are dicts with a `kind` field: + + - `{"kind": "record", "data": {...}, "cursor": ""}` + - `{"kind": "state", "cursor": ""}` + + Records are emitted at `cursor_start + (i + 1) * cursor_step` for + `i = 0, 1, ...`, i.e. strictly greater than `cursor_start` so that + resuming from a prior state never re-emits. + + STATE events are interleaved: + + - after every `batch_size` records when `batch_size` is set; + - whenever the next record's partition bucket differs from the + current record's bucket, when `partition_by` is set; + - once at the end of the stream (deduped if a trigger already fired + at the final record). + + Stopping rules (evaluated in order): + + - if `batch_size` and `batch_count` are both set, stop after + `batch_size * batch_count` records; + - else honor the scenario's own `record_count` if present; + - else honor the caller-supplied `max_records`; + - else default to `_DEFAULT_INCREMENTAL_MAX_RECORDS`. + """ + if cursor_step <= timedelta(0): + raise ValueError("`cursor_step` must be a positive duration.") + if batch_size is not None and batch_size < 1: + raise ValueError("`batch_size` must be >= 1.") + if batch_count is not None and batch_count < 1: + raise ValueError("`batch_count` must be >= 1.") + + if batch_size is not None and batch_count is not None: + total = batch_size * batch_count + elif "record_count" in scenario: + total = int(scenario["record_count"]) + elif max_records is not None: + total = max_records + else: + total = _DEFAULT_INCREMENTAL_MAX_RECORDS + + if total <= 0: + return + + categories = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"] + start_utc = cursor_start.astimezone(timezone.utc) + + last_state_emitted_at: str | None = None + current_bucket: tuple[int, ...] | None = None + last_cursor_iso: str | None = None + + for i in range(1, total + 1): + ts = start_utc + cursor_step * i + cursor_iso = ts.isoformat().replace("+00:00", "Z") + record = { + "id": i, + "updated_at": cursor_iso, + "category": categories[(i - 1) % len(categories)], + "value": float(i) * 1.1, + } + yield {"kind": "record", "data": record, "cursor": cursor_iso} + last_cursor_iso = cursor_iso + + size_trigger = batch_size is not None and (i % batch_size == 0) + + partition_trigger = False + if partition_by is not None: + next_bucket = _partition_bucket(ts, partition_by) + if current_bucket is not None and next_bucket != current_bucket: + partition_trigger = True + current_bucket = next_bucket + + if (size_trigger or partition_trigger) and cursor_iso != last_state_emitted_at: + yield {"kind": "state", "cursor": cursor_iso} + last_state_emitted_at = cursor_iso + + if last_cursor_iso is not None and last_cursor_iso != last_state_emitted_at: + yield {"kind": "state", "cursor": last_cursor_iso} diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 46abc56a8..7aeba5254 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -16,7 +16,9 @@ from __future__ import annotations import logging +import re import time +from datetime import date, datetime, timedelta, timezone from typing import TYPE_CHECKING, Any from airbyte_cdk.models import ( @@ -24,7 +26,11 @@ AirbyteConnectionStatus, AirbyteMessage, AirbyteRecordMessage, + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, AirbyteStream, + AirbyteStreamState, AirbyteStreamStatus, AirbyteStreamStatusTraceMessage, AirbyteTraceMessage, @@ -41,7 +47,9 @@ from airbyte.cli.smoke_test_source._scenarios import ( _DEFAULT_LARGE_BATCH_COUNT, PREDEFINED_SCENARIOS, + PartitionGrain, get_scenario_records, + iter_incremental_scenario_events, ) @@ -49,6 +57,15 @@ from collections.abc import Iterable, Mapping +_VALID_PARTITION_GRAINS: tuple[str, ...] = ("day", "week", "month") + +_ISO_DATE_LENGTH = 10 + +_ISO_DURATION_RE = re.compile( + r"^P(?:(\d+)D)?(?:T(?:(\d+)H)?(?:(\d+)M)?(?:(\d+(?:\.\d+)?)S)?)?$", +) + + logger = logging.getLogger("airbyte") @@ -56,18 +73,147 @@ def _build_streams_from_scenarios( scenarios: list[dict[str, Any]], namespace: str | None = None, ) -> list[AirbyteStream]: - """Build AirbyteStream objects from scenario definitions.""" - return [ - AirbyteStream( - name=scenario["name"], - namespace=namespace, - json_schema=scenario["json_schema"], - supported_sync_modes=[SyncMode.full_refresh], - source_defined_cursor=False, - source_defined_primary_key=scenario.get("primary_key"), - ) - for scenario in scenarios - ] + """Build AirbyteStream objects from scenario definitions. + + Scenarios with `incremental=True` advertise both `full_refresh` and + `incremental` sync modes, set `source_defined_cursor=True`, and expose + the scenario's `cursor_field` as `default_cursor_field`. + """ + streams: list[AirbyteStream] = [] + for scenario in scenarios: + is_incremental = bool(scenario.get("incremental")) + sync_modes = [SyncMode.full_refresh] + if is_incremental: + sync_modes.append(SyncMode.incremental) + stream_kwargs: dict[str, Any] = { + "name": scenario["name"], + "namespace": namespace, + "json_schema": scenario["json_schema"], + "supported_sync_modes": sync_modes, + "source_defined_cursor": is_incremental, + "source_defined_primary_key": scenario.get("primary_key"), + } + if is_incremental and scenario.get("cursor_field"): + stream_kwargs["default_cursor_field"] = list(scenario["cursor_field"]) + streams.append(AirbyteStream(**stream_kwargs)) + return streams + + +def _parse_start_date(value: Any) -> datetime | None: # noqa: ANN401 + """Parse `start_date` config into a UTC datetime, or `None` if unset. + + Accepts ISO-8601 date (`YYYY-MM-DD`) or datetime strings. Naive inputs + are assumed to be UTC. + """ + if value is None or value == "": # noqa: PLC1901 + return None + if isinstance(value, datetime): + dt = value + elif isinstance(value, date): + dt = datetime(value.year, value.month, value.day, tzinfo=timezone.utc) + elif isinstance(value, str): + text = value.strip() + if text.endswith("Z"): + text = text[:-1] + "+00:00" + try: + parsed = datetime.fromisoformat(text) + except ValueError as exc: + if len(text) == _ISO_DATE_LENGTH: + parsed_date = date.fromisoformat(text) + parsed = datetime( + parsed_date.year, + parsed_date.month, + parsed_date.day, + tzinfo=timezone.utc, + ) + else: + raise ValueError(f"Invalid `start_date`: {value!r}") from exc + dt = parsed + else: + raise TypeError(f"Invalid `start_date` type: {type(value).__name__}") + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def _parse_cursor_step(value: Any) -> timedelta: # noqa: ANN401 + """Parse `cursor_step` config into a positive `timedelta`. + + Accepts numeric seconds, numeric strings, or ISO-8601 duration strings + (subset: `PnDTnHnMnS`). Defaults to 1 second when unset. + """ + if value is None or value == "": # noqa: PLC1901 + return timedelta(seconds=1) + if isinstance(value, bool): + raise TypeError("`cursor_step` must be a number of seconds or ISO-8601 duration.") + if isinstance(value, (int, float)): + seconds = float(value) + elif isinstance(value, str): + text = value.strip() + if text.startswith("P"): + match = _ISO_DURATION_RE.match(text) + if not match or match.group(0) == "P": + raise ValueError(f"Invalid ISO-8601 duration for `cursor_step`: {value!r}") + days = int(match.group(1) or 0) + hours = int(match.group(2) or 0) + minutes = int(match.group(3) or 0) + secs = float(match.group(4) or 0) + seconds = days * 86400 + hours * 3600 + minutes * 60 + secs + else: + try: + seconds = float(text) + except ValueError as exc: + raise ValueError(f"Invalid `cursor_step`: {value!r}") from exc + else: + raise TypeError(f"Invalid `cursor_step` type: {type(value).__name__}") + if seconds <= 0: + raise ValueError("`cursor_step` must be a positive duration.") + return timedelta(seconds=seconds) + + +def _default_cursor_start() -> datetime: + """Return Jan 1 of the current UTC year at `00:00:00Z`.""" + now = datetime.now(tz=timezone.utc) + return datetime(now.year, 1, 1, tzinfo=timezone.utc) + + +def _coalesce_partition_grain(raw: Any) -> PartitionGrain | None: # noqa: ANN401 + """Return `raw` as a valid `PartitionGrain`, or `None` when unset/invalid. + + Invalid values fall through to `None` here; `check()` rejects them upfront. + """ + return raw if raw in _VALID_PARTITION_GRAINS else None + + +def _state_cursor_for_stream( + state: list[AirbyteStateMessage] | None, + stream_name: str, + namespace: str | None, + cursor_field: str, +) -> datetime | None: + """Extract the `cursor_field` value from per-stream state, if present.""" + if not state: + return None + for msg in state: + stream_state = getattr(msg, "stream", None) + if not stream_state: + continue + descriptor = getattr(stream_state, "stream_descriptor", None) + if descriptor is None or descriptor.name != stream_name: + continue + if namespace is not None and getattr(descriptor, "namespace", None) != namespace: + continue + blob = getattr(stream_state, "stream_state", None) + if blob is None: + continue + if isinstance(blob, dict): + raw = blob.get(cursor_field) + else: + raw = getattr(blob, cursor_field, None) + if raw is None: + continue + return _parse_start_date(raw) + return None class SourceSmokeTest(Source): @@ -186,6 +332,61 @@ def spec( ), "default": None, }, + "start_date": { + "type": ["string", "null"], + "title": "Start Date", + "format": "date-time", + "description": ( + "Lower bound for incremental streams' cursor. " + "Accepts an ISO-8601 date or date-time. When " + "omitted and no state is provided, the cursor " + "starts at Jan 1 (00:00:00 UTC) of the current year." + ), + "default": None, + }, + "batch_size": { + "type": ["integer", "null"], + "title": "Batch Size", + "minimum": 1, + "description": ( + "Emit a STATE message after every `batch_size` " + "records per incremental stream. Leave unset to " + "disable size-based checkpointing." + ), + "default": None, + }, + "batch_count": { + "type": ["integer", "null"], + "title": "Batch Count", + "minimum": 1, + "description": ( + "When paired with `batch_size`, stop each " + "incremental stream after this many batches. " + "Total records emitted = batch_size * batch_count." + ), + "default": None, + }, + "partition_by": { + "type": ["string", "null"], + "title": "Partition By", + "enum": ["day", "week", "month", None], + "description": ( + "Emit a STATE message at each partition boundary " + "(UTC) of the cursor for incremental streams." + ), + "default": None, + }, + "cursor_step": { + "type": ["string", "number", "null"], + "title": "Cursor Step", + "description": ( + "Spacing between synthetic records for " + "incremental streams, as a number of seconds or " + "an ISO-8601 duration (e.g. `PT1S`, `PT1H`, " + "`P1D`). Defaults to 1 second." + ), + "default": None, + }, }, }, ) @@ -295,6 +496,37 @@ def _validate_custom_scenarios( ) return None + @staticmethod + def _validate_incremental_config( + config: Mapping[str, Any], + ) -> str | None: + """Validate `start_date`, `batch_size`, `batch_count`, `partition_by`, `cursor_step`.""" + try: + _parse_start_date(config.get("start_date")) + except (ValueError, TypeError) as exc: + return str(exc) + try: + _parse_cursor_step(config.get("cursor_step")) + except (ValueError, TypeError) as exc: + return str(exc) + + for key in ("batch_size", "batch_count"): + value = config.get(key) + if value is None: + continue + if isinstance(value, bool) or not isinstance(value, int): + return f"`{key}` must be a positive integer." + if value < 1: + return f"`{key}` must be >= 1." + + partition_by = config.get("partition_by") + if partition_by is not None and partition_by not in _VALID_PARTITION_GRAINS: + return ( + f"`partition_by` must be one of {list(_VALID_PARTITION_GRAINS)} " + f"or null, got {partition_by!r}." + ) + return None + def check( self, logger: logging.Logger, @@ -315,6 +547,13 @@ def check( message=error, ) + incremental_error = self._validate_incremental_config(config) + if incremental_error: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=incremental_error, + ) + scenarios = self._get_all_scenarios(config) if not scenarios: return AirbyteConnectionStatus( @@ -359,28 +598,175 @@ def _stream_status_message( ), ) + @staticmethod + def _build_state_message( + stream_name: str, + namespace: str | None, + cursor_field: str, + cursor_value: str, + ) -> AirbyteMessage: + """Build a per-stream STATE message carrying `{cursor_field: cursor_value}`.""" + return AirbyteMessage( + type=Type.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor( + name=stream_name, + namespace=namespace, + ), + stream_state=AirbyteStateBlob(**{cursor_field: cursor_value}), + ), + ), + ) + + def _emit_record( + self, + stream_name: str, + namespace: str | None, + data: dict[str, Any], + now_ms: int, + ) -> AirbyteMessage: + """Build a RECORD message for `stream_name`.""" + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=stream_name, + namespace=namespace, + data=data, + emitted_at=now_ms, + ), + ) + + def _emit_incremental( # noqa: PLR0913 + self, + *, + scenario: dict[str, Any], + stream_name: str, + namespace: str | None, + state: list[AirbyteStateMessage] | None, + start_date: datetime | None, + cursor_step: timedelta, + batch_size: int | None, + batch_count: int | None, + partition_by: PartitionGrain | None, + now_ms: int, + logger: logging.Logger, + ) -> Iterable[AirbyteMessage]: + """Emit RECORD and STATE messages for an incremental-sync stream.""" + cursor_field_list = scenario.get("cursor_field") or ["updated_at"] + cursor_field = cursor_field_list[0] + cursor_start = ( + _state_cursor_for_stream(state, stream_name, namespace, cursor_field) + or start_date + or _default_cursor_start() + ) + record_count = 0 + state_count = 0 + for event in iter_incremental_scenario_events( + scenario, + cursor_start=cursor_start, + cursor_step=cursor_step, + batch_size=batch_size, + batch_count=batch_count, + partition_by=partition_by, + ): + if event["kind"] == "record": + record_count += 1 + yield self._emit_record(stream_name, namespace, event["data"], now_ms) + else: + state_count += 1 + yield self._build_state_message( + stream_name, + namespace, + cursor_field, + event["cursor"], + ) + logger.info( + f"Emitted {record_count} records and {state_count} " + f"STATE messages for incremental stream '{stream_name}' " + f"(cursor_start={cursor_start.isoformat()})." + ) + + def _emit_incremental_as_full_refresh( # noqa: PLR0913 + self, + *, + scenario: dict[str, Any], + stream_name: str, + namespace: str | None, + start_date: datetime | None, + cursor_step: timedelta, + batch_size: int | None, + batch_count: int | None, + now_ms: int, + logger: logging.Logger, + ) -> Iterable[AirbyteMessage]: + """Emit RECORDs (no STATE) for an incremental scenario read in full-refresh mode.""" + cursor_start = start_date or _default_cursor_start() + count = 0 + for event in iter_incremental_scenario_events( + scenario, + cursor_start=cursor_start, + cursor_step=cursor_step, + batch_size=batch_size, + batch_count=batch_count, + partition_by=None, + ): + if event["kind"] != "record": + continue + count += 1 + yield self._emit_record(stream_name, namespace, event["data"], now_ms) + logger.info(f"Emitted {count} records for full-refresh stream '{stream_name}'.") + + def _emit_full_refresh( + self, + *, + scenario: dict[str, Any], + stream_name: str, + namespace: str | None, + now_ms: int, + logger: logging.Logger, + ) -> Iterable[AirbyteMessage]: + """Emit RECORDs for a classic full-refresh scenario (inline or simple generator).""" + records = get_scenario_records(scenario) + logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.") + for record in records: + yield self._emit_record(stream_name, namespace, record, now_ms) + def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, - state: list[Any] | None = None, # noqa: ARG002 + state: list[AirbyteStateMessage] | None = None, ) -> Iterable[AirbyteMessage]: - """Read records from selected smoke test streams.""" - selected_streams = {stream.stream.name for stream in catalog.streams} + """Read records from selected smoke test streams. + + Streams configured for `SyncMode.incremental` that map to a scenario + with `incremental=True` honor `start_date`, `cursor_step`, and the + checkpoint triggers (`batch_size` / `batch_count` / `partition_by`), + and emit STATE messages accordingly. All other streams use the + existing full-refresh behavior. + """ scenarios = self._get_all_scenarios(config) scenario_map = {s["name"]: s for s in scenarios} - now_ms = int(time.time() * 1000) - namespace = config.get("namespace") - for stream_name in selected_streams: + start_date = _parse_start_date(config.get("start_date")) + cursor_step = _parse_cursor_step(config.get("cursor_step")) + batch_size = config.get("batch_size") + batch_count = config.get("batch_count") + partition_by = _coalesce_partition_grain(config.get("partition_by")) + + now_ms = int(time.time() * 1000) + + for configured in catalog.streams: + stream_name = configured.stream.name scenario = scenario_map.get(stream_name) if not scenario: logger.warning(f"Stream '{stream_name}' not found in scenarios, skipping.") continue - # Emit STARTED status yield self._stream_status_message( stream_name, AirbyteStreamStatus.STARTED, @@ -392,21 +778,45 @@ def read( namespace=namespace, ) - records = get_scenario_records(scenario) - logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.") + is_incremental_run = configured.sync_mode == SyncMode.incremental and bool( + scenario.get("incremental") + ) - for record in records: - yield AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage( - stream=stream_name, - namespace=namespace, - data=record, - emitted_at=now_ms, - ), + if is_incremental_run: + yield from self._emit_incremental( + scenario=scenario, + stream_name=stream_name, + namespace=namespace, + state=state, + start_date=start_date, + cursor_step=cursor_step, + batch_size=batch_size, + batch_count=batch_count, + partition_by=partition_by, + now_ms=now_ms, + logger=logger, + ) + elif scenario.get("record_generator") == "incremental_batch": + yield from self._emit_incremental_as_full_refresh( + scenario=scenario, + stream_name=stream_name, + namespace=namespace, + start_date=start_date, + cursor_step=cursor_step, + batch_size=batch_size, + batch_count=batch_count, + now_ms=now_ms, + logger=logger, + ) + else: + yield from self._emit_full_refresh( + scenario=scenario, + stream_name=stream_name, + namespace=namespace, + now_ms=now_ms, + logger=logger, ) - # Emit COMPLETE status yield self._stream_status_message( stream_name, AirbyteStreamStatus.COMPLETE, diff --git a/tests/unit_tests/test_smoke_test_source.py b/tests/unit_tests/test_smoke_test_source.py new file mode 100644 index 000000000..ee5f2911d --- /dev/null +++ b/tests/unit_tests/test_smoke_test_source.py @@ -0,0 +1,438 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Unit tests for `airbyte.cli.smoke_test_source.source.SourceSmokeTest`. + +Covers: + +- The incremental cursor coalesce rule (state > start_date > Jan 1 UTC). +- STATE emission triggered by `batch_size`, `partition_by`, and end-of-stream. +- Deduplication when triggers coincide. +- `check()` validation for new config fields. +- Non-regression for full-refresh scenarios. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timedelta, timezone + +import pytest +from airbyte_cdk.models import ( + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, + AirbyteStreamState, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Status, + StreamDescriptor, + SyncMode, + Type, +) + +from airbyte.cli.smoke_test_source._scenarios import ( + _partition_bucket, + iter_incremental_scenario_events, +) +from airbyte.cli.smoke_test_source.source import ( + SourceSmokeTest, + _default_cursor_start, + _parse_cursor_step, + _parse_start_date, + _state_cursor_for_stream, +) + + +_LOGGER = logging.getLogger("airbyte.test") +_STREAM = "incremental_batch_stream" + + +def _source() -> SourceSmokeTest: + return SourceSmokeTest() + + +def _configured_catalog( + sync_mode: SyncMode = SyncMode.incremental, + stream_name: str = _STREAM, +) -> ConfiguredAirbyteCatalog: + src = _source() + catalog = src.discover(_LOGGER, {}) + stream = next(s for s in catalog.streams if s.name == stream_name) + return ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=stream, + sync_mode=sync_mode, + destination_sync_mode=DestinationSyncMode.append, + cursor_field=["updated_at"] + if sync_mode == SyncMode.incremental + else None, + ) + ], + ) + + +def _state_for(stream_name: str, updated_at: str) -> list[AirbyteStateMessage]: + return [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name=stream_name, namespace=None), + stream_state=AirbyteStateBlob(updated_at=updated_at), + ), + ), + ] + + +def _records_and_states(messages: list) -> tuple[list, list]: + return ( + [m for m in messages if m.type == Type.RECORD], + [m for m in messages if m.type == Type.STATE], + ) + + +def _state_cursor(msg) -> str: + return msg.state.stream.stream_state.__dict__["updated_at"] + + +# --------------------------------------------------------------------------- +# Spec / discover +# --------------------------------------------------------------------------- + + +def test_spec_includes_incremental_config_fields(): + spec = _source().spec(_LOGGER) + props = spec.connectionSpecification["properties"] + for key in ( + "start_date", + "batch_size", + "batch_count", + "partition_by", + "cursor_step", + ): + assert key in props, f"spec missing {key!r}" + + +def test_discover_marks_incremental_stream_correctly(): + catalog = _source().discover(_LOGGER, {}) + incr = next(s for s in catalog.streams if s.name == _STREAM) + assert SyncMode.incremental in incr.supported_sync_modes + assert SyncMode.full_refresh in incr.supported_sync_modes + assert incr.source_defined_cursor is True + assert incr.default_cursor_field == ["updated_at"] + + +def test_discover_leaves_full_refresh_scenarios_unchanged(): + catalog = _source().discover(_LOGGER, {}) + basic = next(s for s in catalog.streams if s.name == "basic_types") + assert basic.supported_sync_modes == [SyncMode.full_refresh] + assert basic.source_defined_cursor is False + + +# --------------------------------------------------------------------------- +# Cursor coalesce +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + pytest.param(None, None, id="none"), + pytest.param("", None, id="empty_string"), + pytest.param( + "2024-06-15", + datetime(2024, 6, 15, tzinfo=timezone.utc), + id="date_only", + ), + pytest.param( + "2024-06-15T12:34:56Z", + datetime(2024, 6, 15, 12, 34, 56, tzinfo=timezone.utc), + id="datetime_z", + ), + pytest.param( + "2024-06-15T12:34:56+02:00", + datetime(2024, 6, 15, 10, 34, 56, tzinfo=timezone.utc), + id="datetime_offset_normalized_to_utc", + ), + ], +) +def test_parse_start_date(value, expected): + assert _parse_start_date(value) == expected + + +def test_parse_start_date_rejects_garbage(): + with pytest.raises(ValueError): + _parse_start_date("not-a-date-at-all") + + +@pytest.mark.parametrize( + ("value", "expected_seconds"), + [ + pytest.param(None, 1, id="default"), + pytest.param(30, 30, id="int_seconds"), + pytest.param("30", 30, id="numeric_string"), + pytest.param("PT30S", 30, id="iso_seconds"), + pytest.param("PT1H", 3600, id="iso_hours"), + pytest.param("P1D", 86400, id="iso_days"), + pytest.param("P1DT6H", 86400 + 6 * 3600, id="iso_days_hours"), + ], +) +def test_parse_cursor_step(value, expected_seconds): + assert _parse_cursor_step(value) == timedelta(seconds=expected_seconds) + + +@pytest.mark.parametrize( + "value", + [ + pytest.param(0, id="zero_seconds"), + pytest.param(-5, id="negative_seconds"), + pytest.param("PT", id="empty_duration"), + pytest.param("PTbadS", id="garbage_duration"), + ], +) +def test_parse_cursor_step_rejects_invalid_value(value): + with pytest.raises(ValueError): + _parse_cursor_step(value) + + +def test_parse_cursor_step_rejects_bool_as_type_error(): + with pytest.raises(TypeError): + _parse_cursor_step(True) + + +def test_default_cursor_start_is_jan_1_current_utc_year(): + now = datetime.now(tz=timezone.utc) + assert _default_cursor_start() == datetime(now.year, 1, 1, tzinfo=timezone.utc) + + +def test_state_cursor_returns_none_when_no_state(): + assert _state_cursor_for_stream(None, _STREAM, None, "updated_at") is None + assert _state_cursor_for_stream([], _STREAM, None, "updated_at") is None + + +def test_state_cursor_returns_parsed_value_when_present(): + state = _state_for(_STREAM, "2030-06-15T12:00:00Z") + result = _state_cursor_for_stream(state, _STREAM, None, "updated_at") + assert result == datetime(2030, 6, 15, 12, 0, 0, tzinfo=timezone.utc) + + +def test_state_cursor_returns_none_when_stream_mismatch(): + state = _state_for("other_stream", "2030-06-15T12:00:00Z") + assert _state_cursor_for_stream(state, _STREAM, None, "updated_at") is None + + +# --------------------------------------------------------------------------- +# Incremental iterator +# --------------------------------------------------------------------------- + + +def test_iter_emits_size_triggered_states_plus_terminal(): + events = list( + iter_incremental_scenario_events( + {"name": _STREAM}, + cursor_start=datetime(2024, 1, 1, tzinfo=timezone.utc), + cursor_step=timedelta(hours=1), + batch_size=5, + batch_count=3, + ) + ) + records = [e for e in events if e["kind"] == "record"] + states = [e for e in events if e["kind"] == "state"] + assert len(records) == 15 + assert len(states) == 3 + assert [s["cursor"] for s in states] == [ + "2024-01-01T05:00:00Z", + "2024-01-01T10:00:00Z", + "2024-01-01T15:00:00Z", + ] + + +def test_iter_emits_partition_boundary_states(): + events = list( + iter_incremental_scenario_events( + {"name": _STREAM}, + cursor_start=datetime(2024, 1, 1, tzinfo=timezone.utc), + cursor_step=timedelta(hours=6), + batch_size=4, + batch_count=3, + partition_by="day", + ) + ) + records = [e for e in events if e["kind"] == "record"] + states = [e for e in events if e["kind"] == "state"] + assert len(records) == 12 + assert [s["cursor"] for s in states] == [ + "2024-01-02T00:00:00Z", + "2024-01-03T00:00:00Z", + "2024-01-04T00:00:00Z", + ] + + +def test_iter_dedupes_coincident_triggers(): + events = list( + iter_incremental_scenario_events( + {"name": _STREAM}, + cursor_start=datetime(2024, 1, 1, tzinfo=timezone.utc), + cursor_step=timedelta(hours=12), + batch_size=2, + batch_count=2, + partition_by="day", + ) + ) + states = [e for e in events if e["kind"] == "state"] + assert [s["cursor"] for s in states] == [ + "2024-01-02T00:00:00Z", + "2024-01-03T00:00:00Z", + ] + + +def test_iter_emits_terminal_state_when_no_trigger_at_end(): + events = list( + iter_incremental_scenario_events( + {"name": _STREAM}, + cursor_start=datetime(2024, 1, 1, tzinfo=timezone.utc), + cursor_step=timedelta(hours=1), + batch_size=10, + batch_count=None, + max_records=3, + ) + ) + states = [e for e in events if e["kind"] == "state"] + assert len(states) == 1 + assert states[0]["cursor"] == "2024-01-01T03:00:00Z" + + +@pytest.mark.parametrize( + ("ts", "grain", "expected"), + [ + pytest.param( + datetime(2024, 6, 15, 12, tzinfo=timezone.utc), + "day", + (2024, 6, 15), + id="day", + ), + pytest.param( + datetime(2024, 6, 15, 12, tzinfo=timezone.utc), + "week", + datetime(2024, 6, 15).isocalendar()[:2], + id="week", + ), + pytest.param( + datetime(2024, 6, 15, 12, tzinfo=timezone.utc), + "month", + (2024, 6), + id="month", + ), + ], +) +def test_partition_bucket(ts, grain, expected): + assert _partition_bucket(ts, grain) == tuple(expected) + + +# --------------------------------------------------------------------------- +# read() end-to-end +# --------------------------------------------------------------------------- + + +def test_read_incremental_uses_start_date_when_no_state(): + src = _source() + catalog = _configured_catalog(SyncMode.incremental) + config = { + "scenario_filter": [_STREAM], + "all_fast_streams": False, + "start_date": "2024-01-01T00:00:00Z", + "batch_size": 2, + "batch_count": 1, + "cursor_step": 3600, + } + records, states = _records_and_states(list(src.read(_LOGGER, config, catalog))) + assert len(records) == 2 + assert records[0].record.data["updated_at"] == "2024-01-01T01:00:00Z" + assert records[-1].record.data["updated_at"] == "2024-01-01T02:00:00Z" + assert len(states) == 1 + assert _state_cursor(states[0]) == "2024-01-01T02:00:00Z" + + +def test_read_incremental_state_overrides_start_date(): + src = _source() + catalog = _configured_catalog(SyncMode.incremental) + state = _state_for(_STREAM, "2030-06-15T12:00:00Z") + config = { + "scenario_filter": [_STREAM], + "all_fast_streams": False, + "start_date": "2024-01-01T00:00:00Z", + "batch_size": 2, + "batch_count": 1, + "cursor_step": 3600, + } + records, _ = _records_and_states( + list(src.read(_LOGGER, config, catalog, state=state)) + ) + assert records[0].record.data["updated_at"] == "2030-06-15T13:00:00Z" + + +def test_read_incremental_defaults_to_jan_1_current_utc_year(): + src = _source() + catalog = _configured_catalog(SyncMode.incremental) + config = { + "scenario_filter": [_STREAM], + "all_fast_streams": False, + "batch_size": 1, + "batch_count": 1, + "cursor_step": 3600, + } + records, _ = _records_and_states(list(src.read(_LOGGER, config, catalog))) + expected_year = datetime.now(tz=timezone.utc).year + assert records[0].record.data["updated_at"] == f"{expected_year}-01-01T01:00:00Z" + + +def test_read_full_refresh_scenario_unchanged_by_incremental_config(): + src = _source() + catalog = _configured_catalog(SyncMode.full_refresh, stream_name="basic_types") + config = { + "start_date": "2024-01-01T00:00:00Z", + "batch_size": 5, + "batch_count": 3, + "partition_by": "day", + "cursor_step": 3600, + } + records, states = _records_and_states(list(src.read(_LOGGER, config, catalog))) + assert len(records) == 3 + assert states == [] + + +# --------------------------------------------------------------------------- +# check() +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "config", + [ + pytest.param({}, id="empty"), + pytest.param({"start_date": "2024-01-01"}, id="valid_start_date"), + pytest.param({"batch_size": 10, "batch_count": 5}, id="valid_batches"), + pytest.param({"partition_by": "day"}, id="valid_partition"), + pytest.param({"cursor_step": "PT1H"}, id="valid_cursor_step_iso"), + pytest.param({"cursor_step": 60}, id="valid_cursor_step_int"), + ], +) +def test_check_accepts_valid_configs(config): + result = _source().check(_LOGGER, config) + assert result.status == Status.SUCCEEDED + + +@pytest.mark.parametrize( + "config", + [ + pytest.param({"start_date": "garbage-value-xyz"}, id="bad_start_date"), + pytest.param({"batch_size": 0}, id="zero_batch_size"), + pytest.param({"batch_count": -1}, id="negative_batch_count"), + pytest.param({"partition_by": "year"}, id="bad_partition_grain"), + pytest.param({"cursor_step": "PTbadS"}, id="bad_cursor_step"), + ], +) +def test_check_rejects_invalid_configs(config): + result = _source().check(_LOGGER, config) + assert result.status == Status.FAILED + assert result.message From 0ca95763a128ba26c23574024db9a7e91339adaf Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 24 Apr 2026 00:05:39 +0000 Subject: [PATCH 2/6] Address CodeRabbit review feedback - Tighten _state_cursor_for_stream namespace check to be symmetric so None-namespace callers only match None-namespace state. - Remove None from partition_by enum; the type array handles null. - Reject empty ISO-8601 duration (e.g. 'PT') in _parse_cursor_step. - Add batch_size=True and batch_count=True cases to validation tests. - Use getattr instead of __dict__ to read state cursor in tests. - Make year-boundary tests tolerate Dec 31 -> Jan 1 transition. - Add ISO-week comment explaining iso.year vs ts.year. - Expand _emit_incremental_as_full_refresh docstring. --- airbyte/cli/smoke_test_source/_scenarios.py | 3 +++ airbyte/cli/smoke_test_source/source.py | 13 +++++++++---- tests/unit_tests/test_smoke_test_source.py | 20 ++++++++++++++++---- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/airbyte/cli/smoke_test_source/_scenarios.py b/airbyte/cli/smoke_test_source/_scenarios.py index b792bd064..56efb3659 100644 --- a/airbyte/cli/smoke_test_source/_scenarios.py +++ b/airbyte/cli/smoke_test_source/_scenarios.py @@ -826,6 +826,9 @@ def _partition_bucket( if grain == "day": return (ts_utc.year, ts_utc.month, ts_utc.day) if grain == "week": + # Use `iso.year` (not `ts_utc.year`): an ISO week can straddle a + # calendar year boundary (e.g. Jan 1 may fall in ISO week 52/53 of + # the previous year), so `(iso.year, iso.week)` is the correct bucket. iso = ts_utc.isocalendar() return (iso.year, iso.week) if grain == "month": diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 7aeba5254..7258a345d 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -152,7 +152,7 @@ def _parse_cursor_step(value: Any) -> timedelta: # noqa: ANN401 text = value.strip() if text.startswith("P"): match = _ISO_DURATION_RE.match(text) - if not match or match.group(0) == "P": + if not match or match.group(0) == "P" or not any(match.groups()): raise ValueError(f"Invalid ISO-8601 duration for `cursor_step`: {value!r}") days = int(match.group(1) or 0) hours = int(match.group(2) or 0) @@ -201,7 +201,7 @@ def _state_cursor_for_stream( descriptor = getattr(stream_state, "stream_descriptor", None) if descriptor is None or descriptor.name != stream_name: continue - if namespace is not None and getattr(descriptor, "namespace", None) != namespace: + if getattr(descriptor, "namespace", None) != namespace: continue blob = getattr(stream_state, "stream_state", None) if blob is None: @@ -369,7 +369,7 @@ def spec( "partition_by": { "type": ["string", "null"], "title": "Partition By", - "enum": ["day", "week", "month", None], + "enum": ["day", "week", "month"], "description": ( "Emit a STATE message at each partition boundary " "(UTC) of the cursor for incremental streams." @@ -701,7 +701,12 @@ def _emit_incremental_as_full_refresh( # noqa: PLR0913 now_ms: int, logger: logging.Logger, ) -> Iterable[AirbyteMessage]: - """Emit RECORDs (no STATE) for an incremental scenario read in full-refresh mode.""" + """Emit RECORDs (no STATE) for an incremental scenario read in full-refresh mode. + + Total record count still honors `batch_size * batch_count` when both are + set; otherwise it falls back to the scenario's `record_count` or the + generator's default cap. Only STATE emission is suppressed in this mode. + """ cursor_start = start_date or _default_cursor_start() count = 0 for event in iter_incremental_scenario_events( diff --git a/tests/unit_tests/test_smoke_test_source.py b/tests/unit_tests/test_smoke_test_source.py index ee5f2911d..531f72400 100644 --- a/tests/unit_tests/test_smoke_test_source.py +++ b/tests/unit_tests/test_smoke_test_source.py @@ -92,7 +92,7 @@ def _records_and_states(messages: list) -> tuple[list, list]: def _state_cursor(msg) -> str: - return msg.state.stream.stream_state.__dict__["updated_at"] + return getattr(msg.state.stream.stream_state, "updated_at") # --------------------------------------------------------------------------- @@ -201,8 +201,14 @@ def test_parse_cursor_step_rejects_bool_as_type_error(): def test_default_cursor_start_is_jan_1_current_utc_year(): + # Sample the SUT first so a Dec 31 23:59:59 -> Jan 1 boundary can't make + # `now.year` disagree with the year the SUT observed. + result = _default_cursor_start() now = datetime.now(tz=timezone.utc) - assert _default_cursor_start() == datetime(now.year, 1, 1, tzinfo=timezone.utc) + assert result in { + datetime(now.year, 1, 1, tzinfo=timezone.utc), + datetime(now.year - 1, 1, 1, tzinfo=timezone.utc), + } def test_state_cursor_returns_none_when_no_state(): @@ -382,8 +388,12 @@ def test_read_incremental_defaults_to_jan_1_current_utc_year(): "cursor_step": 3600, } records, _ = _records_and_states(list(src.read(_LOGGER, config, catalog))) - expected_year = datetime.now(tz=timezone.utc).year - assert records[0].record.data["updated_at"] == f"{expected_year}-01-01T01:00:00Z" + now_year = datetime.now(tz=timezone.utc).year + # Tolerate a Dec 31 -> Jan 1 flip between the SUT call and this sample. + assert records[0].record.data["updated_at"] in { + f"{now_year}-01-01T01:00:00Z", + f"{now_year - 1}-01-01T01:00:00Z", + } def test_read_full_refresh_scenario_unchanged_by_incremental_config(): @@ -430,6 +440,8 @@ def test_check_accepts_valid_configs(config): pytest.param({"batch_count": -1}, id="negative_batch_count"), pytest.param({"partition_by": "year"}, id="bad_partition_grain"), pytest.param({"cursor_step": "PTbadS"}, id="bad_cursor_step"), + pytest.param({"batch_size": True}, id="batch_size_bool_true"), + pytest.param({"batch_count": True}, id="batch_count_bool_true"), ], ) def test_check_rejects_invalid_configs(config): From 2927bf97fa124056b55153e45398520dd2ee0337 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 24 Apr 2026 00:14:51 +0000 Subject: [PATCH 3/6] Address CodeRabbit follow-up feedback - _parse_start_date: wrap date.fromisoformat ValueError symmetrically so 10-char invalid dates (e.g. '2024-13-45') surface the same nice 'Invalid start_date: ...' message as other malformed inputs. - _validate_incremental_config: reject batch_count without batch_size, since iter_incremental_scenario_events silently ignores batch_count unless batch_size is also set. - _emit_incremental_as_full_refresh: document that updated_at values shift across year boundaries when start_date is not configured. - Add tests for batch_count-without-batch_size and invalid 10-char start_date. --- airbyte/cli/smoke_test_source/source.py | 15 +++++++++++++-- tests/unit_tests/test_smoke_test_source.py | 2 ++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 7258a345d..fbee22875 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -119,7 +119,10 @@ def _parse_start_date(value: Any) -> datetime | None: # noqa: ANN401 parsed = datetime.fromisoformat(text) except ValueError as exc: if len(text) == _ISO_DATE_LENGTH: - parsed_date = date.fromisoformat(text) + try: + parsed_date = date.fromisoformat(text) + except ValueError as date_exc: + raise ValueError(f"Invalid `start_date`: {value!r}") from date_exc parsed = datetime( parsed_date.year, parsed_date.month, @@ -497,7 +500,7 @@ def _validate_custom_scenarios( return None @staticmethod - def _validate_incremental_config( + def _validate_incremental_config( # noqa: PLR0911 config: Mapping[str, Any], ) -> str | None: """Validate `start_date`, `batch_size`, `batch_count`, `partition_by`, `cursor_step`.""" @@ -519,6 +522,9 @@ def _validate_incremental_config( if value < 1: return f"`{key}` must be >= 1." + if config.get("batch_count") is not None and config.get("batch_size") is None: + return "`batch_count` requires `batch_size` to also be set." + partition_by = config.get("partition_by") if partition_by is not None and partition_by not in _VALID_PARTITION_GRAINS: return ( @@ -706,6 +712,11 @@ def _emit_incremental_as_full_refresh( # noqa: PLR0913 Total record count still honors `batch_size * batch_count` when both are set; otherwise it falls back to the scenario's `record_count` or the generator's default cap. Only STATE emission is suppressed in this mode. + + Note: when `start_date` is not configured, the cursor origin defaults + to Jan 1 of the current UTC year, so `updated_at` values shift across + year boundaries even though record `id` / `category` / `value` remain + deterministic. """ cursor_start = start_date or _default_cursor_start() count = 0 diff --git a/tests/unit_tests/test_smoke_test_source.py b/tests/unit_tests/test_smoke_test_source.py index 531f72400..094f3988c 100644 --- a/tests/unit_tests/test_smoke_test_source.py +++ b/tests/unit_tests/test_smoke_test_source.py @@ -442,6 +442,8 @@ def test_check_accepts_valid_configs(config): pytest.param({"cursor_step": "PTbadS"}, id="bad_cursor_step"), pytest.param({"batch_size": True}, id="batch_size_bool_true"), pytest.param({"batch_count": True}, id="batch_count_bool_true"), + pytest.param({"batch_count": 5}, id="batch_count_without_batch_size"), + pytest.param({"start_date": "2024-13-45"}, id="invalid_10char_date"), ], ) def test_check_rejects_invalid_configs(config): From 2986b0cdd77064f0fb9ea8bbef524619c294a263 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 24 Apr 2026 05:40:49 +0000 Subject: [PATCH 4/6] Address Copilot review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix PK collision on incremental resume: derive `id` from cursor timestamp (via a fixed Unix epoch) so the same `updated_at` always maps to the same `id` — avoids duplicate primary keys across runs that resume from state. - Drop `format: date-time` from `start_date` spec since the parser explicitly accepts both ISO-8601 date and date-time inputs; keeping the format hint could cause UIs to reject date-only input. --- airbyte/cli/smoke_test_source/_scenarios.py | 18 ++++++++++++++++-- airbyte/cli/smoke_test_source/source.py | 8 ++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/airbyte/cli/smoke_test_source/_scenarios.py b/airbyte/cli/smoke_test_source/_scenarios.py index 56efb3659..473e57f04 100644 --- a/airbyte/cli/smoke_test_source/_scenarios.py +++ b/airbyte/cli/smoke_test_source/_scenarios.py @@ -42,6 +42,14 @@ _INCREMENTAL_CURSOR_FIELD: list[str] = ["updated_at"] +_INCREMENTAL_ID_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc) +"""Fixed origin used to derive stable `id` values for `incremental_batch_stream`. + +Using a fixed epoch (rather than `cursor_start`) ensures the same cursor +timestamp always maps to the same `id` across runs, so resuming from state +does not re-emit colliding primary keys. +""" + PREDEFINED_SCENARIOS: list[dict[str, Any]] = [ { "name": "basic_types", @@ -902,11 +910,17 @@ def iter_incremental_scenario_events( for i in range(1, total + 1): ts = start_utc + cursor_step * i cursor_iso = ts.isoformat().replace("+00:00", "Z") + # Derive `id` from the cursor timestamp so values are stable across + # runs: two syncs that produce a record at the same `updated_at` will + # always assign it the same `id`, even if one run resumes from state + # while the other starts from scratch. This keeps the primary key + # unique on resume. + stable_id = (ts - _INCREMENTAL_ID_EPOCH) // cursor_step record = { - "id": i, + "id": stable_id, "updated_at": cursor_iso, "category": categories[(i - 1) % len(categories)], - "value": float(i) * 1.1, + "value": round(float(stable_id) * 1.1, 6), } yield {"kind": "record", "data": record, "cursor": cursor_iso} last_cursor_iso = cursor_iso diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index fbee22875..46e713e3f 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -338,12 +338,12 @@ def spec( "start_date": { "type": ["string", "null"], "title": "Start Date", - "format": "date-time", "description": ( "Lower bound for incremental streams' cursor. " - "Accepts an ISO-8601 date or date-time. When " - "omitted and no state is provided, the cursor " - "starts at Jan 1 (00:00:00 UTC) of the current year." + "Accepts an ISO-8601 date (`YYYY-MM-DD`) or a " + "date-time (`YYYY-MM-DDTHH:MM:SSZ`). When omitted " + "and no state is provided, the cursor starts at " + "Jan 1 (00:00:00 UTC) of the current year." ), "default": None, }, From d2d68923716d9c33c9fe7655f3c6859fa9032be5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 24 Apr 2026 05:50:58 +0000 Subject: [PATCH 5/6] Address CodeRabbit follow-up feedback - Flip incremental_batch_stream to high_volume: True so it stays opt-in (was being pulled into the default fast set). - Hoist shared _CATEGORIES constant used by both batch generators. - Fix partition-trigger docstring wording (no look-ahead; trigger fires on boundary between previous and just-yielded record). - Update _emit_incremental_as_full_refresh docstring to reflect that id/value now shift with start_date (only category is independent). - Update tests to opt into all_slow_streams for the new scenario. --- airbyte/cli/smoke_test_source/_scenarios.py | 16 +++++++++------- airbyte/cli/smoke_test_source/source.py | 9 ++++++--- tests/unit_tests/test_smoke_test_source.py | 6 ++++-- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/airbyte/cli/smoke_test_source/_scenarios.py b/airbyte/cli/smoke_test_source/_scenarios.py index 473e57f04..fb26cafd9 100644 --- a/airbyte/cli/smoke_test_source/_scenarios.py +++ b/airbyte/cli/smoke_test_source/_scenarios.py @@ -50,6 +50,9 @@ does not re-emit colliding primary keys. """ +_CATEGORIES: list[str] = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"] +"""Shared `category` rotation used by the batch-oriented record generators.""" + PREDEFINED_SCENARIOS: list[dict[str, Any]] = [ { "name": "basic_types", @@ -761,7 +764,7 @@ "incremental": True, "cursor_field": list(_INCREMENTAL_CURSOR_FIELD), "record_generator": "incremental_batch", - "high_volume": False, + "high_volume": True, }, ] @@ -771,13 +774,12 @@ def generate_large_batch_records( ) -> list[dict[str, Any]]: """Generate records for the large_batch_stream scenario.""" count = scenario.get("record_count", _DEFAULT_LARGE_BATCH_COUNT) - categories = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"] return [ { "id": i, "name": f"record_{i:06d}", "value": float(i) * 1.1, - "category": categories[i % len(categories)], + "category": _CATEGORIES[i % len(_CATEGORIES)], } for i in range(1, count + 1) ] @@ -868,8 +870,9 @@ def iter_incremental_scenario_events( STATE events are interleaved: - after every `batch_size` records when `batch_size` is set; - - whenever the next record's partition bucket differs from the - current record's bucket, when `partition_by` is set; + - whenever the just-yielded record's partition bucket differs + from the previous record's bucket, when `partition_by` is set + (the emitted cursor is the first record of the new bucket); - once at the end of the stream (deduped if a trigger already fired at the final record). @@ -900,7 +903,6 @@ def iter_incremental_scenario_events( if total <= 0: return - categories = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"] start_utc = cursor_start.astimezone(timezone.utc) last_state_emitted_at: str | None = None @@ -919,7 +921,7 @@ def iter_incremental_scenario_events( record = { "id": stable_id, "updated_at": cursor_iso, - "category": categories[(i - 1) % len(categories)], + "category": _CATEGORIES[(i - 1) % len(_CATEGORIES)], "value": round(float(stable_id) * 1.1, 6), } yield {"kind": "record", "data": record, "cursor": cursor_iso} diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 46e713e3f..0aaa2c248 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -714,9 +714,12 @@ def _emit_incremental_as_full_refresh( # noqa: PLR0913 generator's default cap. Only STATE emission is suppressed in this mode. Note: when `start_date` is not configured, the cursor origin defaults - to Jan 1 of the current UTC year, so `updated_at` values shift across - year boundaries even though record `id` / `category` / `value` remain - deterministic. + to Jan 1 of the current UTC year. `updated_at` values shift across year + boundaries. Record `id` is derived from the cursor timestamp against a + fixed epoch (`stable_id = (ts - _INCREMENTAL_ID_EPOCH) // cursor_step`), + and `value` is derived from `id`, so both are stable per `updated_at` + (resume-safe) but shift when the origin shifts. Only `category`, which + is indexed by the within-run counter, is independent of `start_date`. """ cursor_start = start_date or _default_cursor_start() count = 0 diff --git a/tests/unit_tests/test_smoke_test_source.py b/tests/unit_tests/test_smoke_test_source.py index 094f3988c..05bc6e853 100644 --- a/tests/unit_tests/test_smoke_test_source.py +++ b/tests/unit_tests/test_smoke_test_source.py @@ -56,7 +56,9 @@ def _configured_catalog( stream_name: str = _STREAM, ) -> ConfiguredAirbyteCatalog: src = _source() - catalog = src.discover(_LOGGER, {}) + # `incremental_batch_stream` is `high_volume: True`, so opt into the + # slow-stream set explicitly when building the test catalog. + catalog = src.discover(_LOGGER, {"all_slow_streams": True}) stream = next(s for s in catalog.streams if s.name == stream_name) return ConfiguredAirbyteCatalog( streams=[ @@ -114,7 +116,7 @@ def test_spec_includes_incremental_config_fields(): def test_discover_marks_incremental_stream_correctly(): - catalog = _source().discover(_LOGGER, {}) + catalog = _source().discover(_LOGGER, {"all_slow_streams": True}) incr = next(s for s in catalog.streams if s.name == _STREAM) assert SyncMode.incremental in incr.supported_sync_modes assert SyncMode.full_refresh in incr.supported_sync_modes From 69f5ce15b329307b469b06dd40045a598286ff5c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 24 Apr 2026 05:58:35 +0000 Subject: [PATCH 6/6] Keep HIGH_VOLUME_SCENARIO_NAMES in sync with scenario flags Add `incremental_batch_stream` to `HIGH_VOLUME_SCENARIO_NAMES` so the public constant matches the `high_volume: True` flag on the scenario dict. Caught by Devin Review. --- airbyte/cli/smoke_test_source/_scenarios.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte/cli/smoke_test_source/_scenarios.py b/airbyte/cli/smoke_test_source/_scenarios.py index fb26cafd9..62906832e 100644 --- a/airbyte/cli/smoke_test_source/_scenarios.py +++ b/airbyte/cli/smoke_test_source/_scenarios.py @@ -36,6 +36,7 @@ HIGH_VOLUME_SCENARIO_NAMES: set[str] = { "large_batch_stream", + "incremental_batch_stream", } PartitionGrain = Literal["day", "week", "month"]