Skip to content

Commit 969c7b3

Browse files
authored
Feat: Add the ability to specify a model end date (#2287)
1 parent 0384db3 commit 969c7b3

File tree

7 files changed

+100
-3
lines changed

7 files changed

+100
-3
lines changed

docs/reference/model_configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Configuration options for SQLMesh model properties. Supported by all model kinds
2020
| `cron` | The cron expression specifying how often the model should be refreshed. (Default: `@daily`) | str | N |
2121
| `interval_unit` | The temporal granularity of the model's data intervals. Supported values: `year`, `month`, `day`, `hour`, `half_hour`, `quarter_hour`, `five_minute`. (Default: inferred from `cron`) | str | N |
2222
| `start` | The date/time that determines the earliest date interval that should be processed by a model. Can be a datetime string, epoch time in milliseconds, or a relative datetime such as `1 year ago`. | str \| int | N |
23+
| `end` | The date/time that determines the latest date interval that should be processed by a model. Can be a datetime string, epoch time in milliseconds, or a relative datetime such as `1 year ago`. | str \| int | N |
2324
| `batch_size` | The maximum number of intervals that can be evaluated in a single backfill task. If this is `None`, all intervals will be processed as part of a single task. If this is set, a model's backfill will be chunked such that each individual task only contains jobs with the maximum of `batch_size` intervals. (Default: `None`) | int | N |
2425
| `grains` | The column(s) whose combination uniquely identifies each row in the model | str \| array[str] | N |
2526
| `references` | The model column(s) used to join to other models' grains | str \| array[str] | N |
@@ -43,6 +44,7 @@ The SQLMesh project-level `model_defaults` key supports the following options, d
4344
- cron
4445
- owner
4546
- start
47+
- end
4648
- batch_size
4749
- storage_format
4850

sqlmesh/core/model/definition.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ class _Model(ModelMeta, frozen=True):
101101
start: The earliest date that the model will be backfilled for. If this is None,
102102
then the date is inferred by taking the most recent start date of its ancestors.
103103
The start date can be a static datetime or a relative datetime like "1 year ago"
104+
end: The date that the model will be backfilled up until. Follows the same syntax as 'start',
105+
should be omitted if there is no end date.
104106
batch_size: The maximum number of incremental intervals that can be run per backfill job. If this is None,
105107
then backfilling this model will do all of history in one job. If this is set, a model's backfill
106108
will be chunked such that each individual job will only contain jobs with max `batch_size` intervals.

sqlmesh/core/node.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sqlglot import exp
1010

1111
from sqlmesh.utils.cron import CroniterCache
12-
from sqlmesh.utils.date import TimeLike, to_datetime
12+
from sqlmesh.utils.date import TimeLike, to_datetime, validate_date_range
1313
from sqlmesh.utils.errors import ConfigError
1414
from sqlmesh.utils.pydantic import (
1515
PydanticModel,
@@ -172,6 +172,8 @@ class _Node(PydanticModel):
172172
start: The earliest date that the node will be executed for. If this is None,
173173
then the date is inferred by taking the most recent start date of its ancestors.
174174
The start date can be a static datetime or a relative datetime like "1 year ago"
175+
end: The latest date that the model will be executed for. If this is None,
176+
the date from the scheduler will be used
175177
cron: A cron string specifying how often the node should be run, leveraging the
176178
[croniter](https://github.com/kiorky/croniter) library.
177179
interval_unit: The duration of an interval for the node. By default, it is computed from the cron expression.
@@ -185,6 +187,7 @@ class _Node(PydanticModel):
185187
description: t.Optional[str] = None
186188
owner: t.Optional[str] = None
187189
start: t.Optional[TimeLike] = None
190+
end: t.Optional[TimeLike] = None
188191
cron: str = "@daily"
189192
interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None)
190193
tags: t.List[str] = []
@@ -207,7 +210,7 @@ def _name_validator(cls, v: t.Any) -> t.Optional[str]:
207210
return v.meta["sql"]
208211
return str(v)
209212

210-
@field_validator("start", mode="before")
213+
@field_validator("start", "end", mode="before")
211214
@classmethod
212215
def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]:
213216
if isinstance(v, exp.Expression):
@@ -255,6 +258,7 @@ def _node_root_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
255258
raise ConfigError(
256259
f"Interval unit of '{interval_unit}' is larger than cron period of '{cron}'"
257260
)
261+
validate_date_range(values.get("start"), values.get("end"))
258262
return values
259263

260264
@property

sqlmesh/core/snapshot/definition.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1482,12 +1482,17 @@ def missing_intervals(
14821482
snapshot.intervals = snapshot.intervals.copy()
14831483
snapshot.remove_interval(interval, execution_time)
14841484

1485+
missing_interval_end_date = snapshot_end_date
1486+
node_end_date = snapshot.node.end
1487+
if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)):
1488+
missing_interval_end_date = node_end_date
1489+
14851490
intervals = snapshot.missing_intervals(
14861491
max(
14871492
to_datetime(snapshot_start_date),
14881493
to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
14891494
),
1490-
snapshot_end_date,
1495+
missing_interval_end_date,
14911496
execution_time=execution_time,
14921497
deployability_index=deployability_index,
14931498
ignore_cron=ignore_cron,

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,15 @@ def _create_cadence_dag_for_snapshot(
108108
f"Can't create a cadence DAG for the paused snapshot {snapshot.snapshot_id}"
109109
)
110110

111+
end_date = None
112+
if snapshot.node.end:
113+
end_date = pendulum.instance(to_datetime(snapshot.node.end))
114+
111115
with DAG(
112116
dag_id=dag_id,
113117
schedule_interval=snapshot.node.cron,
114118
start_date=pendulum.instance(to_datetime(snapshot.unpaused_ts)),
119+
end_date=end_date,
115120
max_active_runs=1,
116121
catchup=True,
117122
is_paused_upon_creation=False,

tests/core/test_model.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3244,3 +3244,43 @@ def my_model(context, **kwargs):
32443244

32453245
assert m.default_catalog == "catalog"
32463246
assert m.depends_on == {'"catalog"."other"."table"'}
3247+
3248+
3249+
def test_end_date():
3250+
expressions = d.parse(
3251+
f"""
3252+
MODEL (
3253+
name db.table,
3254+
kind INCREMENTAL_BY_TIME_RANGE (
3255+
time_column ts,
3256+
),
3257+
start '2023-01-01',
3258+
end '2023-06-01'
3259+
);
3260+
3261+
SELECT 1::int AS a, 2::int AS b, now::timestamp as ts
3262+
"""
3263+
)
3264+
model = load_sql_based_model(expressions)
3265+
3266+
assert model.start == "2023-01-01"
3267+
assert model.end == "2023-06-01"
3268+
assert model.interval_unit == IntervalUnit.DAY
3269+
3270+
with pytest.raises(ConfigError, match=".*Start date.+can't be greater than end date.*"):
3271+
load_sql_based_model(
3272+
d.parse(
3273+
f"""
3274+
MODEL (
3275+
name db.table,
3276+
kind INCREMENTAL_BY_TIME_RANGE (
3277+
time_column ts,
3278+
),
3279+
start '2024-01-01',
3280+
end '2023-06-01'
3281+
);
3282+
3283+
SELECT 1::int AS a, 2::int AS b, now::timestamp as ts
3284+
"""
3285+
)
3286+
)

tests/core/test_scheduler.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
IncrementalByUniqueKeyKind,
1111
TimeColumn,
1212
)
13+
from sqlmesh.core.node import IntervalUnit
1314
from sqlmesh.core.scheduler import Scheduler, compute_interval_params
1415
from sqlmesh.core.snapshot import Snapshot, SnapshotEvaluator
1516
from sqlmesh.utils.date import to_datetime
@@ -234,3 +235,41 @@ def test_circuit_breaker(scheduler: Scheduler):
234235
"2022-01-30",
235236
circuit_breaker=lambda: True,
236237
)
238+
239+
240+
def test_intervals_with_end_date_on_model(mocker: MockerFixture, make_snapshot):
241+
snapshot: Snapshot = make_snapshot(
242+
SqlModel(
243+
name="name",
244+
kind=IncrementalByTimeRangeKind(time_column="ds", batch_size=1),
245+
interval_unit=IntervalUnit.DAY,
246+
start="2023-01-01",
247+
end="2023-01-31",
248+
query=parse_one("SELECT ds FROM parent.tbl"),
249+
)
250+
)
251+
252+
snapshot_evaluator = SnapshotEvaluator(adapter=mocker.MagicMock(), ddl_concurrent_tasks=1)
253+
scheduler = Scheduler(
254+
snapshots=[snapshot],
255+
snapshot_evaluator=snapshot_evaluator,
256+
state_sync=mocker.MagicMock(),
257+
max_workers=2,
258+
default_catalog=None,
259+
)
260+
261+
# generate for 1 year to show that the returned batches should only cover
262+
# the range defined on the model itself
263+
batches = scheduler.batches(start="2023-01-01", end="2024-01-01")[snapshot]
264+
265+
assert len(batches) == 31 # days in Jan 2023
266+
assert batches[0] == (to_datetime("2023-01-01"), to_datetime("2023-01-02"))
267+
assert batches[-1] == (to_datetime("2023-01-31"), to_datetime("2023-02-01"))
268+
269+
# generate for less than 1 month to ensure that the scheduler end date
270+
# takes precedence over the model end date
271+
batches = scheduler.batches(start="2023-01-01", end="2023-01-10")[snapshot]
272+
273+
assert len(batches) == 10
274+
assert batches[0] == (to_datetime("2023-01-01"), to_datetime("2023-01-02"))
275+
assert batches[-1] == (to_datetime("2023-01-10"), to_datetime("2023-01-11"))

0 commit comments

Comments
 (0)