From 5d72692da30cfe8897f4624a2679be2dc37d6cd6 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Wed, 22 Apr 2026 15:15:45 -0400 Subject: [PATCH 1/3] feat: Add volatile schedule storage Seer has been running without durable storage for scheduled tasks, and it also does not have a redis instance provisioned. With seer's scheduled tasks all using `crontab` expressions, adding a durable storage is very low value, as durable storage is only required for `timedelta` schedules. Having a volatile storage backend will make it simpler and cheaper to shift seer to taskbroker. --- .../src/taskbroker_client/scheduler/runner.py | 93 +--------- .../taskbroker_client/scheduler/storage.py | 174 ++++++++++++++++++ clients/python/tests/scheduler/test_runner.py | 33 ++++ .../python/tests/scheduler/test_storage.py | 56 ++++++ 4 files changed, 266 insertions(+), 90 deletions(-) create mode 100644 clients/python/src/taskbroker_client/scheduler/storage.py create mode 100644 clients/python/tests/scheduler/test_storage.py diff --git a/clients/python/src/taskbroker_client/scheduler/runner.py b/clients/python/src/taskbroker_client/scheduler/runner.py index fde9938f..a04b07c4 100644 --- a/clients/python/src/taskbroker_client/scheduler/runner.py +++ b/clients/python/src/taskbroker_client/scheduler/runner.py @@ -2,19 +2,16 @@ import heapq import logging -from collections.abc import Mapping from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING, Any -from redis.client import StrictRedis -from rediscluster import RedisCluster from sentry_sdk import capture_exception from sentry_sdk.crons import MonitorStatus, capture_checkin from taskbroker_client.app import TaskbrokerApp -from taskbroker_client.metrics import MetricsBackend from taskbroker_client.scheduler.config import ScheduleConfig, crontab from taskbroker_client.scheduler.schedules import CrontabSchedule, Schedule, TimedeltaSchedule +from taskbroker_client.scheduler.storage import RunStorage, RunStorageProtocol from taskbroker_client.task import Task logger = logging.getLogger(__name__) @@ -23,91 +20,7 @@ from sentry_sdk._types import MonitorConfig -class RunStorage: - """ - Storage interface for tracking the last run time of tasks. - This is split out from `ScheduleRunner` to allow us to change storage - in the future, or adapt taskworkers for other applications should we need to. - """ - - def __init__( - self, metrics: MetricsBackend, redis: RedisCluster[str] | StrictRedis[str] - ) -> None: - self._redis = redis - self._metrics = metrics - - def _make_key(self, key: str) -> str: - return f"tw:scheduler:{key}" - - def set(self, key: str, next_runtime: datetime) -> bool: - """ - Record a spawn time for a task. - The next_runtime parameter indicates when the record should expire, - and a task can be spawned again. - - Returns False when the key is set and a task should not be spawned. - """ - now = datetime.now(tz=UTC) - # next_runtime & now could be the same second, and redis gets sad if ex=0 - duration = max(int((next_runtime - now).total_seconds()), 1) - - result = self._redis.set(self._make_key(key), now.isoformat(), ex=duration, nx=True) - return bool(result) - - def read(self, key: str) -> datetime | None: - """ - Retrieve the last run time of a task - Returns None if last run time has expired or is unknown. - """ - result = self._redis.get(self._make_key(key)) - if result: - return datetime.fromisoformat(result) - - self._metrics.incr("taskworker.scheduler.run_storage.read.miss", tags={"taskname": key}) - return None - - def read_many( - self, - storage_keys: list[str], - ) -> Mapping[str, datetime | None]: - """ - Retrieve last run times in bulk. - - storage_keys are the new-format keys including the entry key prefix and - schedule_id suffix (e.g. "my-entry:test:valid:300"). Falls back through - two legacy formats to allow seamless deploys: - - new: "{entry_key}:{fullname}:{schedule_id}" (e.g. "my-entry:test:valid:300") - compat: "{fullname}:{schedule_id}" (e.g. "test:valid:300") - legacy: "{fullname}" (e.g. "test:valid") - - Compat is derived by stripping the entry_key prefix (split on first colon). - Legacy is derived from compat by stripping the schedule_id suffix (rsplit on last colon). - - Returns a mapping keyed by new storage_key. - """ - compat_keys = [sk.split(":", 1)[1] for sk in storage_keys] - legacy_keys = [ck.rsplit(":", 1)[0] for ck in compat_keys] - - new_values = self._redis.mget([self._make_key(sk) for sk in storage_keys]) - compat_values = self._redis.mget([self._make_key(ck) for ck in compat_keys]) - legacy_values = self._redis.mget([self._make_key(lk) for lk in legacy_keys]) - - run_times: dict[str, datetime | None] = {} - for storage_key, new_val, compat_val, legacy_val in zip( - storage_keys, new_values, compat_values, legacy_values - ): - value = new_val - if value is None: - value = compat_val - if value is None: - value = legacy_val - run_times[storage_key] = datetime.fromisoformat(value) if value else None - return run_times - - def delete(self, key: str) -> None: - """remove a task key - mostly for testing.""" - self._redis.delete(self._make_key(key)) +__all__ = ("ScheduleEntry", "ScheduleRunner", "RunStorage") class ScheduleEntry: @@ -212,7 +125,7 @@ class ScheduleRunner: is used in a while loop to spawn tasks and sleep. """ - def __init__(self, app: TaskbrokerApp, run_storage: RunStorage) -> None: + def __init__(self, app: TaskbrokerApp, run_storage: RunStorageProtocol) -> None: self._entries: list[ScheduleEntry] = [] self._app = app self._run_storage = run_storage diff --git a/clients/python/src/taskbroker_client/scheduler/storage.py b/clients/python/src/taskbroker_client/scheduler/storage.py new file mode 100644 index 00000000..994c6048 --- /dev/null +++ b/clients/python/src/taskbroker_client/scheduler/storage.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +from collections.abc import Mapping +from datetime import UTC, datetime +from typing import Protocol + +from redis.client import StrictRedis +from rediscluster import RedisCluster + +from taskbroker_client.metrics import MetricsBackend + + +class RunStorageProtocol(Protocol): + """ + Storage interface for tracking the last run time of tasks. + This is split out from `ScheduleRunner` to allow us to change storage + in the future, or adapt taskworkers for other applications should we need to. + """ + + def set(self, key: str, next_runtime: datetime) -> bool: ... + + """ + Record a spawn time for a task. + The next_runtime parameter indicates when the record should expire, + and a task can be spawned again. + + Returns False when the key is set and a task should not be spawned. + """ + + def read(self, key: str) -> datetime | None: ... + + """ + Retrieve the last run time of a task + Returns None if last run time has expired or is unknown. + """ + + def read_many(self, storage_keys: list[str]) -> Mapping[str, datetime | None]: ... + + """ + Retrieve last run times in bulk. + + Returns a mapping keyed by new storage_key. + """ + + def delete(self, key: str) -> None: ... + + """remove a task key - mostly for testing.""" + + +class VolatileRunStorage(RunStorageProtocol): + """ + An in-memory run storage implementation + + Provides no durability, and is not recommended for applications with timedelta + schedules. Is a reasonable option for applications that use crontab schedules. + """ + + def __init__(self) -> None: + self._data: dict[str, datetime] = {} + + def set(self, key: str, next_runtime: datetime) -> bool: + now = datetime.now(tz=UTC) + self._data[key] = now + return True + + def read(self, key: str) -> datetime | None: + """ + Retrieve the last run time of a task + Returns None if last run time has expired or is unknown. + """ + return self._data.get(key, None) + + def read_many(self, storage_keys: list[str]) -> Mapping[str, datetime | None]: + """ + Retrieve last run times in bulk. + + Returns a mapping keyed by new storage_key. + """ + results = {} + for key in storage_keys: + results[key] = self._data.get(key) + return results + + def delete(self, key: str) -> None: + """remove a task key - mostly for testing.""" + self._data.pop(key, None) + + +class RunStorage(RunStorageProtocol): + """ + Redis backed scheduler storage + """ + + def __init__( + self, metrics: MetricsBackend, redis: RedisCluster[str] | StrictRedis[str] + ) -> None: + self._redis = redis + self._metrics = metrics + + def _make_key(self, key: str) -> str: + return f"tw:scheduler:{key}" + + def set(self, key: str, next_runtime: datetime) -> bool: + """ + Record a spawn time for a task. + The next_runtime parameter indicates when the record should expire, + and a task can be spawned again. + + Returns False when the key is set and a task should not be spawned. + """ + now = datetime.now(tz=UTC) + # next_runtime & now could be the same second, and redis gets sad if ex=0 + duration = max(int((next_runtime - now).total_seconds()), 1) + + result = self._redis.set(self._make_key(key), now.isoformat(), ex=duration, nx=True) + return bool(result) + + def read(self, key: str) -> datetime | None: + """ + Retrieve the last run time of a task + Returns None if last run time has expired or is unknown. + """ + result = self._redis.get(self._make_key(key)) + if result: + return datetime.fromisoformat(result) + + self._metrics.incr("taskworker.scheduler.run_storage.read.miss", tags={"taskname": key}) + return None + + def read_many( + self, + storage_keys: list[str], + ) -> Mapping[str, datetime | None]: + """ + Retrieve last run times in bulk. + + storage_keys are the new-format keys including the entry key prefix and + schedule_id suffix (e.g. "my-entry:test:valid:300"). Falls back through + two legacy formats to allow seamless deploys: + + new: "{entry_key}:{fullname}:{schedule_id}" (e.g. "my-entry:test:valid:300") + compat: "{fullname}:{schedule_id}" (e.g. "test:valid:300") + legacy: "{fullname}" (e.g. "test:valid") + + Compat is derived by stripping the entry_key prefix (split on first colon). + Legacy is derived from compat by stripping the schedule_id suffix (rsplit on last colon). + + Returns a mapping keyed by new storage_key. + """ + compat_keys = [sk.split(":", 1)[1] for sk in storage_keys] + legacy_keys = [ck.rsplit(":", 1)[0] for ck in compat_keys] + + new_values = self._redis.mget([self._make_key(sk) for sk in storage_keys]) + compat_values = self._redis.mget([self._make_key(ck) for ck in compat_keys]) + legacy_values = self._redis.mget([self._make_key(lk) for lk in legacy_keys]) + + run_times: dict[str, datetime | None] = {} + for storage_key, new_val, compat_val, legacy_val in zip( + storage_keys, new_values, compat_values, legacy_values + ): + value = new_val + if value is None: + value = compat_val + if value is None: + value = legacy_val + run_times[storage_key] = datetime.fromisoformat(value) if value else None + return run_times + + def delete(self, key: str) -> None: + """remove a task key - mostly for testing.""" + self._redis.delete(self._make_key(key)) + + +RedisRunStorage = RunStorage diff --git a/clients/python/tests/scheduler/test_runner.py b/clients/python/tests/scheduler/test_runner.py index 792b6cd3..c35e0b9b 100644 --- a/clients/python/tests/scheduler/test_runner.py +++ b/clients/python/tests/scheduler/test_runner.py @@ -8,6 +8,7 @@ from taskbroker_client.metrics import NoOpMetricsBackend from taskbroker_client.scheduler.config import crontab from taskbroker_client.scheduler.runner import RunStorage, ScheduleRunner +from taskbroker_client.scheduler.storage import VolatileRunStorage from ..conftest import freeze_time, producer_factory @@ -533,3 +534,35 @@ def test_schedulerunner_two_schedules_same_task( # Each entry has its own Redis key — neither blocks the other assert run_storage.read("first:test:valid:300") is not None assert run_storage.read("second:test:valid:600") is not None + + +def test_schedulerunner_volatile_storage_crontab(task_app: TaskbrokerApp) -> None: + storage = VolatileRunStorage() + schedule_set = ScheduleRunner(app=task_app, run_storage=storage) + schedule_set.add( + "valid", + { + "task": "test:valid", + "schedule": crontab(minute="*/2"), + }, + ) + + namespace = task_app.taskregistry.get("test") + with patch.object(namespace, "send_task") as mock_send: + with freeze_time("2025-01-24 14:24:00 UTC"): + sleep_time = schedule_set.tick() + assert sleep_time == 120 + assert mock_send.call_count == 1 + + # Not due yet — 1 minute into the 2-minute interval + with freeze_time("2025-01-24 14:25:00 UTC"): + sleep_time = schedule_set.tick() + assert sleep_time == 60 + assert mock_send.call_count == 1 + + # Due again — no delete() needed; VolatileRunStorage.set() always returns True + # and the entry's in-memory last_run governs re-spawning + with freeze_time("2025-01-24 14:26:00 UTC"): + sleep_time = schedule_set.tick() + assert sleep_time == 120 + assert mock_send.call_count == 2 diff --git a/clients/python/tests/scheduler/test_storage.py b/clients/python/tests/scheduler/test_storage.py new file mode 100644 index 00000000..9f9857dc --- /dev/null +++ b/clients/python/tests/scheduler/test_storage.py @@ -0,0 +1,56 @@ +from datetime import UTC, datetime + +from taskbroker_client.scheduler.storage import VolatileRunStorage + +from ..conftest import freeze_time + + +def test_volatile_run_storage_set_returns_true() -> None: + storage = VolatileRunStorage() + with freeze_time("2025-01-24 14:25:00 UTC"): + result = storage.set("key", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)) + assert result is True + + +def test_volatile_run_storage_set_stores_now_not_next_runtime() -> None: + storage = VolatileRunStorage() + now = datetime(2025, 1, 24, 14, 25, 0, tzinfo=UTC) + next_runtime = datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC) + with freeze_time(now): + storage.set("key", next_runtime) + result = storage.read("key") + assert result == now + + +def test_volatile_run_storage_read_after_set() -> None: + storage = VolatileRunStorage() + now = datetime(2025, 1, 24, 14, 25, 0, tzinfo=UTC) + with freeze_time(now): + storage.set("key", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)) + assert storage.read("key") == now + assert storage.read("not-defined") is None + + +def test_volatile_run_storage_read_many_mixed() -> None: + storage = VolatileRunStorage() + t1 = datetime(2025, 1, 24, 14, 25, 0, tzinfo=UTC) + with freeze_time(t1): + storage.set("present-1", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)) + storage.set("present-2", datetime(2025, 1, 24, 14, 31, 0, tzinfo=UTC)) + + result = storage.read_many(["present-1", "missing", "present-2"]) + assert result == {"present-1": t1, "missing": None, "present-2": t1} + + result = storage.read_many(["a", "b", "c"]) + assert result == {"a": None, "b": None, "c": None} + + +def test_volatile_run_storage_delete() -> None: + storage = VolatileRunStorage() + with freeze_time("2025-01-24 14:25:00 UTC"): + storage.set("key", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)) + assert storage.read("key") is not None + + storage.delete("key") + storage.delete("missing") + assert storage.read("key") is None From 0883c0db63965f3d2f438a0cbd69031f84587725 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 24 Apr 2026 10:02:07 -0400 Subject: [PATCH 2/3] Fix potential infinite loop --- .../taskbroker_client/scheduler/storage.py | 26 ++++++++++++++----- .../python/tests/scheduler/test_storage.py | 10 +++++-- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/clients/python/src/taskbroker_client/scheduler/storage.py b/clients/python/src/taskbroker_client/scheduler/storage.py index 994c6048..d7d7fcf4 100644 --- a/clients/python/src/taskbroker_client/scheduler/storage.py +++ b/clients/python/src/taskbroker_client/scheduler/storage.py @@ -56,19 +56,29 @@ class VolatileRunStorage(RunStorageProtocol): """ def __init__(self) -> None: - self._data: dict[str, datetime] = {} + self._data: dict[str, tuple[datetime, datetime]] = {} def set(self, key: str, next_runtime: datetime) -> bool: now = datetime.now(tz=UTC) - self._data[key] = now - return True + + if key not in self._data: + self._data[key] = (now, next_runtime) + return True + existing_expires = self._data[key][1] + if existing_expires <= now: + self._data[key] = (now, next_runtime) + return True + return False def read(self, key: str) -> datetime | None: """ Retrieve the last run time of a task Returns None if last run time has expired or is unknown. """ - return self._data.get(key, None) + value = self._data.get(key, None) + if value is None: + return None + return value[0] def read_many(self, storage_keys: list[str]) -> Mapping[str, datetime | None]: """ @@ -76,9 +86,13 @@ def read_many(self, storage_keys: list[str]) -> Mapping[str, datetime | None]: Returns a mapping keyed by new storage_key. """ - results = {} + results: dict[str, datetime | None] = {} for key in storage_keys: - results[key] = self._data.get(key) + value = self._data.get(key, None) + if value is None: + results[key] = value + else: + results[key] = value[0] return results def delete(self, key: str) -> None: diff --git a/clients/python/tests/scheduler/test_storage.py b/clients/python/tests/scheduler/test_storage.py index 9f9857dc..db24de8b 100644 --- a/clients/python/tests/scheduler/test_storage.py +++ b/clients/python/tests/scheduler/test_storage.py @@ -5,11 +5,17 @@ from ..conftest import freeze_time -def test_volatile_run_storage_set_returns_true() -> None: +def test_volatile_run_storage_set() -> None: storage = VolatileRunStorage() with freeze_time("2025-01-24 14:25:00 UTC"): result = storage.set("key", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)) - assert result is True + assert result is True + with freeze_time("2025-01-24 14:26:00 UTC"): + result = storage.set("key", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)) + assert result is False + with freeze_time("2025-01-24 14:30:01 UTC"): + result = storage.set("key", datetime(2025, 1, 24, 14, 35, 0, tzinfo=UTC)) + assert result is True def test_volatile_run_storage_set_stores_now_not_next_runtime() -> None: From e023f9e084d45ee0213e282f1d8d39755aa642b1 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 24 Apr 2026 10:16:30 -0400 Subject: [PATCH 3/3] Fix docstrings --- .../taskbroker_client/scheduler/storage.py | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/clients/python/src/taskbroker_client/scheduler/storage.py b/clients/python/src/taskbroker_client/scheduler/storage.py index d7d7fcf4..70ebb442 100644 --- a/clients/python/src/taskbroker_client/scheduler/storage.py +++ b/clients/python/src/taskbroker_client/scheduler/storage.py @@ -17,34 +17,34 @@ class RunStorageProtocol(Protocol): in the future, or adapt taskworkers for other applications should we need to. """ - def set(self, key: str, next_runtime: datetime) -> bool: ... - - """ - Record a spawn time for a task. - The next_runtime parameter indicates when the record should expire, - and a task can be spawned again. - - Returns False when the key is set and a task should not be spawned. - """ - - def read(self, key: str) -> datetime | None: ... - - """ - Retrieve the last run time of a task - Returns None if last run time has expired or is unknown. - """ + def set(self, key: str, next_runtime: datetime) -> bool: + """ + Record a spawn time for a task. + The next_runtime parameter indicates when the record should expire, + and a task can be spawned again. - def read_many(self, storage_keys: list[str]) -> Mapping[str, datetime | None]: ... + Returns False when the key is set and a task should not be spawned. + """ + ... - """ - Retrieve last run times in bulk. + def read(self, key: str) -> datetime | None: + """ + Retrieve the last run time of a task + Returns None if last run time has expired or is unknown. + """ + ... - Returns a mapping keyed by new storage_key. - """ + def read_many(self, storage_keys: list[str]) -> Mapping[str, datetime | None]: + """ + Retrieve last run times in bulk. - def delete(self, key: str) -> None: ... + Returns a mapping keyed by new storage_key. + """ + ... - """remove a task key - mostly for testing.""" + def delete(self, key: str) -> None: + """remove a task key - mostly for testing.""" + ... class VolatileRunStorage(RunStorageProtocol):