Skip to content

Commit 877787e

Browse files
authored
Fix!: Handle lookback correctly on incremental models with an end date defined (#2962)
1 parent ad5ce8d commit 877787e

File tree

2 files changed

+102
-10
lines changed

2 files changed

+102
-10
lines changed

sqlmesh/core/snapshot/definition.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -902,10 +902,23 @@ def missing_intervals(
902902
upper_bound_ts = to_timestamp(execution_time)
903903
end_ts = min(end_ts, upper_bound_ts)
904904

905-
lookback = self.model.lookback if self.is_model else 0
905+
lookback = 0
906+
model_end_ts: t.Optional[int] = None
907+
908+
if self.is_model:
909+
lookback = self.model.lookback
910+
model_end_ts = (
911+
to_timestamp(make_inclusive_end(self.model.end)) if self.model.end else None
912+
)
906913

907914
return compute_missing_intervals(
908-
interval_unit, tuple(intervals), start_ts, end_ts, upper_bound_ts, lookback
915+
interval_unit,
916+
tuple(intervals),
917+
start_ts,
918+
end_ts,
919+
upper_bound_ts,
920+
lookback,
921+
model_end_ts,
909922
)
910923

911924
def categorize_as(self, category: SnapshotChangeCategory) -> None:
@@ -1641,6 +1654,7 @@ def compute_missing_intervals(
16411654
end_ts: int,
16421655
upper_bound_ts: int,
16431656
lookback: int,
1657+
model_end_ts: t.Optional[int],
16441658
) -> Intervals:
16451659
"""Computes all missing intervals between start and end given intervals.
16461660
@@ -1651,6 +1665,7 @@ def compute_missing_intervals(
16511665
end_ts: Exclusive timestamp end.
16521666
upper_bound_ts: The exclusive upper bound timestamp for lookback.
16531667
lookback: A lookback window.
1668+
model_end_ts: The inclusive end timestamp set on the model (if one is set)
16541669
16551670
Returns:
16561671
A list of all timestamps in this range.
@@ -1660,20 +1675,17 @@ def compute_missing_intervals(
16601675

16611676
# get all individual timestamps with the addition of extra lookback timestamps up to the execution date
16621677
# when a model has lookback, we need to check all the intervals between itself and its lookback exist.
1678+
intervals_beyond_end_ts = 0
16631679
while True:
16641680
ts = to_timestamp(croniter.get_next(estimate=True))
16651681

16661682
if ts < end_ts:
16671683
timestamps.append(ts)
1668-
else:
1669-
croniter.get_prev(estimate=True)
1670-
break
1671-
1672-
for _ in range(lookback):
1673-
ts = to_timestamp(croniter.get_next(estimate=True))
1674-
if ts < upper_bound_ts:
1684+
elif lookback and ts < upper_bound_ts:
16751685
timestamps.append(ts)
1686+
intervals_beyond_end_ts += 1
16761687
else:
1688+
croniter.get_prev(estimate=True)
16771689
break
16781690

16791691
missing = []
@@ -1697,7 +1709,8 @@ def compute_missing_intervals(
16971709
elif current_ts >= low and compare_ts < high:
16981710
break
16991711
else:
1700-
missing.append((current_ts, next_ts))
1712+
if model_end_ts is None or compare_ts < model_end_ts or i > intervals_beyond_end_ts + 1:
1713+
missing.append((current_ts, next_ts))
17011714

17021715
return missing
17031716

tests/core/test_snapshot.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,85 @@ def test_missing_intervals_end_bounded_with_ignore_cron(make_snapshot):
370370
]
371371

372372

373+
def test_missing_intervals_past_end_date_with_lookback(make_snapshot):
374+
snapshot: Snapshot = make_snapshot(
375+
SqlModel(
376+
name="test_model",
377+
kind=IncrementalByTimeRangeKind(time_column=TimeColumn(column="ds"), lookback=2),
378+
owner="owner",
379+
cron="@daily",
380+
query=parse_one("SELECT 1, ds FROM name"),
381+
start="2023-01-01",
382+
end="2023-01-05", # inclusive, equivalent to to_timestamp('2023-01-05 23:59:59.999999')
383+
)
384+
)
385+
386+
start_time = to_timestamp("2023-01-01")
387+
end_time = to_timestamp(
388+
"2023-01-06"
389+
) # exclusive because to_timestamp() returns a timestamp and not a date
390+
assert snapshot.inclusive_exclusive(snapshot.node.start, snapshot.node.end) == (
391+
start_time,
392+
end_time,
393+
)
394+
395+
# baseline - all intervals missing
396+
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [
397+
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
398+
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
399+
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
400+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
401+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
402+
]
403+
404+
# fully backfill model - no intervals missing
405+
snapshot.add_interval(start_time, end_time)
406+
407+
# even though lookback=2, because every interval has been filled,
408+
# there should be no missing intervals
409+
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == []
410+
411+
# however, when running for a new interval, this triggers lookback
412+
# in this case, we remove the most recent interval (the one for 2023-01-05) to simulate it being new
413+
# since lookback=2 days, this triggers missing intervals for 2023-01-03, 2023-01-04, 2023-01-05
414+
snapshot.remove_interval(interval=(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")))
415+
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [
416+
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
417+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
418+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
419+
]
420+
421+
# put the interval we just removed back to make the model fully backfilled again
422+
snapshot.add_interval(to_timestamp("2023-01-05"), to_timestamp("2023-01-06"))
423+
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == []
424+
425+
# running on the end date + 1 day (2023-01-07)
426+
# 2023-01-06 "would" run and since lookback=2 this pulls in 2023-01-04 and 2023-01-05 as well
427+
# however, only 2023-01-04 and 2023-01-05 are within the model end date
428+
end_time = to_timestamp("2023-01-07")
429+
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [
430+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
431+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
432+
]
433+
434+
# running on the end date + 2 days (2023-01-08)
435+
# 2023-01-07 "would" run and since lookback=2 this pulls in 2023-01-06 and 2023-01-05 as well
436+
# however, only 2023-01-05 is within the model end date
437+
end_time = to_timestamp("2023-01-08")
438+
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == [
439+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06"))
440+
]
441+
442+
# running on the end date + 3 days (2023-01-09)
443+
# no missing intervals because subtracting 2 days for lookback exceeds the models end date
444+
end_time = to_timestamp("2023-01-09")
445+
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == []
446+
447+
# running way in the future, no missing intervals because subtracting 2 days for lookback still exceeds the models end date
448+
end_time = to_timestamp("2024-01-01")
449+
assert snapshot.missing_intervals(start_time, end_time, execution_time=end_time) == []
450+
451+
373452
def test_incremental_time_self_reference(make_snapshot):
374453
snapshot = make_snapshot(
375454
SqlModel(

0 commit comments

Comments
 (0)