Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 187 additions & 4 deletions airbyte/cli/smoke_test_source/_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,57 @@

Each scenario defines a stream name, JSON schema, optional primary key,
and either inline records or a record generator reference.

Scenarios may additionally declare:

- `incremental`: `True` to advertise `SyncMode.incremental` for the stream.
- `cursor_field`: list[str], the source-defined cursor path for incremental streams.
- `record_generator`: string id of a generator in `get_scenario_records` /
`iter_incremental_scenario_events`.
- `high_volume`: `True` to exclude the stream from the default fast-only scenario set.
"""

from __future__ import annotations

import math
from typing import Any
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Literal


if TYPE_CHECKING:
from collections.abc import Iterator


_DEFAULT_LARGE_BATCH_COUNT = 1000
"""Default number of records for the `large_batch_stream` generator."""

_DEFAULT_INCREMENTAL_MAX_RECORDS = 1000
"""Safety cap on records emitted by the `incremental_batch_stream` generator.

Applies only when neither `batch_count` nor a stream-level `record_count`
is set; prevents accidental unbounded emission.
"""

HIGH_VOLUME_SCENARIO_NAMES: set[str] = {
"large_batch_stream",
"incremental_batch_stream",
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

PartitionGrain = Literal["day", "week", "month"]

_INCREMENTAL_CURSOR_FIELD: list[str] = ["updated_at"]

_INCREMENTAL_ID_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
"""Fixed origin used to derive stable `id` values for `incremental_batch_stream`.

Using a fixed epoch (rather than `cursor_start`) ensures the same cursor
timestamp always maps to the same `id` across runs, so resuming from state
does not re-emit colliding primary keys.
"""

_CATEGORIES: list[str] = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"]
"""Shared `category` rotation used by the batch-oriented record generators."""

PREDEFINED_SCENARIOS: list[dict[str, Any]] = [
{
"name": "basic_types",
Expand Down Expand Up @@ -708,6 +745,28 @@
{"id": 7, "float_value": -1.0, "integer_value": -2147483648},
],
},
{
"name": "incremental_batch_stream",
"description": (
"Incremental stream that honors inbound state, `start_date`, "
"and checkpoint triggers (`batch_size` / `batch_count` / `partition_by`)."
),
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"updated_at": {"type": "string", "format": "date-time"},
"category": {"type": "string"},
"value": {"type": "number"},
},
},
"primary_key": [["id"]],
"incremental": True,
"cursor_field": list(_INCREMENTAL_CURSOR_FIELD),
"record_generator": "incremental_batch",
"high_volume": True,
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
]


Expand All @@ -716,13 +775,12 @@ def generate_large_batch_records(
) -> list[dict[str, Any]]:
"""Generate records for the large_batch_stream scenario."""
count = scenario.get("record_count", _DEFAULT_LARGE_BATCH_COUNT)
categories = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"]
return [
{
"id": i,
"name": f"record_{i:06d}",
"value": float(i) * 1.1,
"category": categories[i % len(categories)],
"category": _CATEGORIES[i % len(_CATEGORIES)],
}
for i in range(1, count + 1)
]
Expand Down Expand Up @@ -753,10 +811,135 @@ def generate_large_string_records() -> list[dict[str, Any]]:
def get_scenario_records(
scenario: dict[str, Any],
) -> list[dict[str, Any]]:
"""Get records for a scenario, using generator if specified."""
"""Get records for a scenario, using generator if specified.

Used for full-refresh scenarios. Incremental scenarios should be iterated
via `iter_incremental_scenario_events` instead.
"""
generator = scenario.get("record_generator")
if generator == "large_batch":
return generate_large_batch_records(scenario)
if generator == "large_strings":
return generate_large_string_records()
return scenario.get("records", [])


def _partition_bucket(
ts: datetime,
grain: PartitionGrain,
) -> tuple[int, ...]:
"""Return a hashable bucket identifier for `ts` at the given UTC grain.

Two timestamps share a bucket iff they fall within the same calendar
`day`, ISO `week`, or `month` in UTC.
"""
ts_utc = ts.astimezone(timezone.utc)
if grain == "day":
return (ts_utc.year, ts_utc.month, ts_utc.day)
if grain == "week":
# Use `iso.year` (not `ts_utc.year`): an ISO week can straddle a
# calendar year boundary (e.g. Jan 1 may fall in ISO week 52/53 of
# the previous year), so `(iso.year, iso.week)` is the correct bucket.
iso = ts_utc.isocalendar()
return (iso.year, iso.week)
if grain == "month":
return (ts_utc.year, ts_utc.month)
raise ValueError(f"Unsupported partition grain: {grain!r}")


def iter_incremental_scenario_events(
scenario: dict[str, Any],
*,
cursor_start: datetime,
cursor_step: timedelta,
batch_size: int | None = None,
batch_count: int | None = None,
partition_by: PartitionGrain | None = None,
max_records: int | None = None,
) -> Iterator[dict[str, Any]]:
"""Yield ordered record/state events for an incremental scenario.

Events are dicts with a `kind` field:

- `{"kind": "record", "data": {...}, "cursor": "<iso-8601>"}`
- `{"kind": "state", "cursor": "<iso-8601>"}`

Records are emitted at `cursor_start + (i + 1) * cursor_step` for
`i = 0, 1, ...`, i.e. strictly greater than `cursor_start` so that
resuming from a prior state never re-emits.

STATE events are interleaved:

- after every `batch_size` records when `batch_size` is set;
- whenever the just-yielded record's partition bucket differs
from the previous record's bucket, when `partition_by` is set
(the emitted cursor is the first record of the new bucket);
- once at the end of the stream (deduped if a trigger already fired
at the final record).

Stopping rules (evaluated in order):

- if `batch_size` and `batch_count` are both set, stop after
`batch_size * batch_count` records;
- else honor the scenario's own `record_count` if present;
- else honor the caller-supplied `max_records`;
- else default to `_DEFAULT_INCREMENTAL_MAX_RECORDS`.
"""
if cursor_step <= timedelta(0):
raise ValueError("`cursor_step` must be a positive duration.")
if batch_size is not None and batch_size < 1:
raise ValueError("`batch_size` must be >= 1.")
if batch_count is not None and batch_count < 1:
raise ValueError("`batch_count` must be >= 1.")

if batch_size is not None and batch_count is not None:
total = batch_size * batch_count
elif "record_count" in scenario:
total = int(scenario["record_count"])
elif max_records is not None:
total = max_records
else:
total = _DEFAULT_INCREMENTAL_MAX_RECORDS

if total <= 0:
return

start_utc = cursor_start.astimezone(timezone.utc)

last_state_emitted_at: str | None = None
current_bucket: tuple[int, ...] | None = None
last_cursor_iso: str | None = None

for i in range(1, total + 1):
ts = start_utc + cursor_step * i
cursor_iso = ts.isoformat().replace("+00:00", "Z")
# Derive `id` from the cursor timestamp so values are stable across
# runs: two syncs that produce a record at the same `updated_at` will
# always assign it the same `id`, even if one run resumes from state
# while the other starts from scratch. This keeps the primary key
# unique on resume.
stable_id = (ts - _INCREMENTAL_ID_EPOCH) // cursor_step
record = {
"id": stable_id,
"updated_at": cursor_iso,
"category": _CATEGORIES[(i - 1) % len(_CATEGORIES)],
"value": round(float(stable_id) * 1.1, 6),
}
Comment thread
aaronsteers marked this conversation as resolved.
yield {"kind": "record", "data": record, "cursor": cursor_iso}
last_cursor_iso = cursor_iso

size_trigger = batch_size is not None and (i % batch_size == 0)

partition_trigger = False
if partition_by is not None:
next_bucket = _partition_bucket(ts, partition_by)
if current_bucket is not None and next_bucket != current_bucket:
partition_trigger = True
current_bucket = next_bucket

if (size_trigger or partition_trigger) and cursor_iso != last_state_emitted_at:
yield {"kind": "state", "cursor": cursor_iso}
last_state_emitted_at = cursor_iso

if last_cursor_iso is not None and last_cursor_iso != last_state_emitted_at:
yield {"kind": "state", "cursor": last_cursor_iso}
Loading
Loading