Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 3 additions & 90 deletions clients/python/src/taskbroker_client/scheduler/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@

import heapq
import logging
from collections.abc import Mapping
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, Any

from redis.client import StrictRedis
from rediscluster import RedisCluster
from sentry_sdk import capture_exception
from sentry_sdk.crons import MonitorStatus, capture_checkin

from taskbroker_client.app import TaskbrokerApp
from taskbroker_client.metrics import MetricsBackend
from taskbroker_client.scheduler.config import ScheduleConfig, crontab
from taskbroker_client.scheduler.schedules import CrontabSchedule, Schedule, TimedeltaSchedule
from taskbroker_client.scheduler.storage import RunStorage, RunStorageProtocol
from taskbroker_client.task import Task

logger = logging.getLogger(__name__)
Expand All @@ -23,91 +20,7 @@
from sentry_sdk._types import MonitorConfig


class RunStorage:
"""
Storage interface for tracking the last run time of tasks.
This is split out from `ScheduleRunner` to allow us to change storage
in the future, or adapt taskworkers for other applications should we need to.
"""

def __init__(
self, metrics: MetricsBackend, redis: RedisCluster[str] | StrictRedis[str]
) -> None:
self._redis = redis
self._metrics = metrics

def _make_key(self, key: str) -> str:
return f"tw:scheduler:{key}"

def set(self, key: str, next_runtime: datetime) -> bool:
"""
Record a spawn time for a task.
The next_runtime parameter indicates when the record should expire,
and a task can be spawned again.

Returns False when the key is set and a task should not be spawned.
"""
now = datetime.now(tz=UTC)
# next_runtime & now could be the same second, and redis gets sad if ex=0
duration = max(int((next_runtime - now).total_seconds()), 1)

result = self._redis.set(self._make_key(key), now.isoformat(), ex=duration, nx=True)
return bool(result)

def read(self, key: str) -> datetime | None:
"""
Retrieve the last run time of a task
Returns None if last run time has expired or is unknown.
"""
result = self._redis.get(self._make_key(key))
if result:
return datetime.fromisoformat(result)

self._metrics.incr("taskworker.scheduler.run_storage.read.miss", tags={"taskname": key})
return None

def read_many(
self,
storage_keys: list[str],
) -> Mapping[str, datetime | None]:
"""
Retrieve last run times in bulk.

storage_keys are the new-format keys including the entry key prefix and
schedule_id suffix (e.g. "my-entry:test:valid:300"). Falls back through
two legacy formats to allow seamless deploys:

new: "{entry_key}:{fullname}:{schedule_id}" (e.g. "my-entry:test:valid:300")
compat: "{fullname}:{schedule_id}" (e.g. "test:valid:300")
legacy: "{fullname}" (e.g. "test:valid")

Compat is derived by stripping the entry_key prefix (split on first colon).
Legacy is derived from compat by stripping the schedule_id suffix (rsplit on last colon).

Returns a mapping keyed by new storage_key.
"""
compat_keys = [sk.split(":", 1)[1] for sk in storage_keys]
legacy_keys = [ck.rsplit(":", 1)[0] for ck in compat_keys]

new_values = self._redis.mget([self._make_key(sk) for sk in storage_keys])
compat_values = self._redis.mget([self._make_key(ck) for ck in compat_keys])
legacy_values = self._redis.mget([self._make_key(lk) for lk in legacy_keys])

run_times: dict[str, datetime | None] = {}
for storage_key, new_val, compat_val, legacy_val in zip(
storage_keys, new_values, compat_values, legacy_values
):
value = new_val
if value is None:
value = compat_val
if value is None:
value = legacy_val
run_times[storage_key] = datetime.fromisoformat(value) if value else None
return run_times

def delete(self, key: str) -> None:
"""remove a task key - mostly for testing."""
self._redis.delete(self._make_key(key))
__all__ = ("ScheduleEntry", "ScheduleRunner", "RunStorage")


class ScheduleEntry:
Expand Down Expand Up @@ -212,7 +125,7 @@ class ScheduleRunner:
is used in a while loop to spawn tasks and sleep.
"""

def __init__(self, app: TaskbrokerApp, run_storage: RunStorage) -> None:
def __init__(self, app: TaskbrokerApp, run_storage: RunStorageProtocol) -> None:
self._entries: list[ScheduleEntry] = []
self._app = app
self._run_storage = run_storage
Expand Down
188 changes: 188 additions & 0 deletions clients/python/src/taskbroker_client/scheduler/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from __future__ import annotations

from collections.abc import Mapping
from datetime import UTC, datetime
from typing import Protocol

from redis.client import StrictRedis
from rediscluster import RedisCluster

from taskbroker_client.metrics import MetricsBackend


class RunStorageProtocol(Protocol):
"""
Storage interface for tracking the last run time of tasks.
This is split out from `ScheduleRunner` to allow us to change storage
in the future, or adapt taskworkers for other applications should we need to.
"""

def set(self, key: str, next_runtime: datetime) -> bool:
"""
Record a spawn time for a task.
The next_runtime parameter indicates when the record should expire,
and a task can be spawned again.

Returns False when the key is set and a task should not be spawned.
"""
...

def read(self, key: str) -> datetime | None:
"""
Retrieve the last run time of a task
Returns None if last run time has expired or is unknown.
"""
...

def read_many(self, storage_keys: list[str]) -> Mapping[str, datetime | None]:
"""
Retrieve last run times in bulk.

Returns a mapping keyed by new storage_key.
"""
...

def delete(self, key: str) -> None:
"""remove a task key - mostly for testing."""
...


class VolatileRunStorage(RunStorageProtocol):
"""
An in-memory run storage implementation

Provides no durability, and is not recommended for applications with timedelta
schedules. Is a reasonable option for applications that use crontab schedules.
"""

def __init__(self) -> None:
self._data: dict[str, tuple[datetime, datetime]] = {}

def set(self, key: str, next_runtime: datetime) -> bool:
now = datetime.now(tz=UTC)

if key not in self._data:
self._data[key] = (now, next_runtime)
return True
existing_expires = self._data[key][1]
if existing_expires <= now:
self._data[key] = (now, next_runtime)
return True
return False

def read(self, key: str) -> datetime | None:
"""
Retrieve the last run time of a task
Returns None if last run time has expired or is unknown.
"""
value = self._data.get(key, None)
if value is None:
return None
return value[0]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read and read_many ignore expiry in volatile storage

Medium Severity

VolatileRunStorage.read() and read_many() return the stored timestamp without checking whether the entry has expired (_data[key][1] <= now). The set() method correctly checks the expiry time before allowing overwrites, but read() ignores it entirely. This violates the interface contract documented in the docstring ("Returns None if last run time has expired or is unknown") and breaks behavioral parity with the Redis-backed RunStorage, where expired keys automatically return None via TTL. If _load_last_run is invoked (e.g. after add() clears the heap) while expired entries exist, stale last-run times will be loaded instead of None.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 0883c0d. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok. The if the time is in the past, the schedules will still advance.


def read_many(self, storage_keys: list[str]) -> Mapping[str, datetime | None]:
"""
Retrieve last run times in bulk.

Returns a mapping keyed by new storage_key.
"""
results: dict[str, datetime | None] = {}
for key in storage_keys:
value = self._data.get(key, None)
if value is None:
results[key] = value
else:
results[key] = value[0]
return results
Comment on lines +86 to +96
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: VolatileRunStorage.read() and read_many() do not check for expiry, which is inconsistent with the set() method and violates the RunStorageProtocol contract.
Severity: LOW

Suggested Fix

Add an expiry check in VolatileRunStorage.read() and read_many(). Before returning a value, check if its expiry timestamp has passed. If it has, the entry should be treated as expired and not be returned, similar to the logic in the set() method.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: clients/python/src/taskbroker_client/scheduler/storage.py#L83-L96

Potential issue: The `VolatileRunStorage.read()` and `read_many()` methods return stored
`run_time` values without checking if the associated expiry datetime has passed. This is
inconsistent with the `set()` method, which does perform an expiry check, and violates
the contract defined by `RunStorageProtocol`. While the current scheduler implementation
does not trigger incorrect behavior because it only calls these methods on an empty
storage, this inconsistency makes the code brittle. Future refactoring or changes in how
the scheduler uses the storage could lead to bugs where stale, unexpired data is read
and processed, potentially causing missed task executions.

Also affects:

  • clients/python/src/taskbroker_client/scheduler/storage.py:73~81

Did we get this right? 👍 / 👎 to inform future reviews.


def delete(self, key: str) -> None:
"""remove a task key - mostly for testing."""
self._data.pop(key, None)


class RunStorage(RunStorageProtocol):
"""
Redis backed scheduler storage
"""

def __init__(
self, metrics: MetricsBackend, redis: RedisCluster[str] | StrictRedis[str]
) -> None:
self._redis = redis
self._metrics = metrics

def _make_key(self, key: str) -> str:
return f"tw:scheduler:{key}"

def set(self, key: str, next_runtime: datetime) -> bool:
"""
Record a spawn time for a task.
The next_runtime parameter indicates when the record should expire,
and a task can be spawned again.

Returns False when the key is set and a task should not be spawned.
"""
now = datetime.now(tz=UTC)
# next_runtime & now could be the same second, and redis gets sad if ex=0
duration = max(int((next_runtime - now).total_seconds()), 1)

result = self._redis.set(self._make_key(key), now.isoformat(), ex=duration, nx=True)
return bool(result)

def read(self, key: str) -> datetime | None:
"""
Retrieve the last run time of a task
Returns None if last run time has expired or is unknown.
"""
result = self._redis.get(self._make_key(key))
if result:
return datetime.fromisoformat(result)

self._metrics.incr("taskworker.scheduler.run_storage.read.miss", tags={"taskname": key})
return None

def read_many(
self,
storage_keys: list[str],
) -> Mapping[str, datetime | None]:
"""
Retrieve last run times in bulk.

storage_keys are the new-format keys including the entry key prefix and
schedule_id suffix (e.g. "my-entry:test:valid:300"). Falls back through
two legacy formats to allow seamless deploys:

new: "{entry_key}:{fullname}:{schedule_id}" (e.g. "my-entry:test:valid:300")
compat: "{fullname}:{schedule_id}" (e.g. "test:valid:300")
legacy: "{fullname}" (e.g. "test:valid")

Compat is derived by stripping the entry_key prefix (split on first colon).
Legacy is derived from compat by stripping the schedule_id suffix (rsplit on last colon).

Returns a mapping keyed by new storage_key.
"""
compat_keys = [sk.split(":", 1)[1] for sk in storage_keys]
legacy_keys = [ck.rsplit(":", 1)[0] for ck in compat_keys]

new_values = self._redis.mget([self._make_key(sk) for sk in storage_keys])
compat_values = self._redis.mget([self._make_key(ck) for ck in compat_keys])
legacy_values = self._redis.mget([self._make_key(lk) for lk in legacy_keys])

run_times: dict[str, datetime | None] = {}
for storage_key, new_val, compat_val, legacy_val in zip(
storage_keys, new_values, compat_values, legacy_values
):
value = new_val
if value is None:
value = compat_val
if value is None:
value = legacy_val
run_times[storage_key] = datetime.fromisoformat(value) if value else None
return run_times

def delete(self, key: str) -> None:
"""remove a task key - mostly for testing."""
self._redis.delete(self._make_key(key))


RedisRunStorage = RunStorage
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused RedisRunStorage alias is dead code

Low Severity

RedisRunStorage is defined as an alias for RunStorage but is never imported or referenced anywhere in the codebase. It isn't included in any __all__, isn't documented, and has no consumers. This dead code could confuse developers about whether RunStorage and RedisRunStorage are distinct classes.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 0883c0d. Configure here.

33 changes: 33 additions & 0 deletions clients/python/tests/scheduler/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from taskbroker_client.metrics import NoOpMetricsBackend
from taskbroker_client.scheduler.config import crontab
from taskbroker_client.scheduler.runner import RunStorage, ScheduleRunner
from taskbroker_client.scheduler.storage import VolatileRunStorage

from ..conftest import freeze_time, producer_factory

Expand Down Expand Up @@ -533,3 +534,35 @@ def test_schedulerunner_two_schedules_same_task(
# Each entry has its own Redis key — neither blocks the other
assert run_storage.read("first:test:valid:300") is not None
assert run_storage.read("second:test:valid:600") is not None


def test_schedulerunner_volatile_storage_crontab(task_app: TaskbrokerApp) -> None:
storage = VolatileRunStorage()
schedule_set = ScheduleRunner(app=task_app, run_storage=storage)
schedule_set.add(
"valid",
{
"task": "test:valid",
"schedule": crontab(minute="*/2"),
},
)

namespace = task_app.taskregistry.get("test")
with patch.object(namespace, "send_task") as mock_send:
with freeze_time("2025-01-24 14:24:00 UTC"):
sleep_time = schedule_set.tick()
assert sleep_time == 120
assert mock_send.call_count == 1

# Not due yet — 1 minute into the 2-minute interval
with freeze_time("2025-01-24 14:25:00 UTC"):
sleep_time = schedule_set.tick()
assert sleep_time == 60
assert mock_send.call_count == 1

# Due again — no delete() needed; VolatileRunStorage.set() always returns True
# and the entry's in-memory last_run governs re-spawning
with freeze_time("2025-01-24 14:26:00 UTC"):
sleep_time = schedule_set.tick()
assert sleep_time == 120
assert mock_send.call_count == 2
Loading
Loading