From 757db24eb438723b01c627cba3074626fc15ac8d Mon Sep 17 00:00:00 2001 From: Mark Story Date: Thu, 16 Apr 2026 10:26:08 -0400 Subject: [PATCH 1/2] feat(schedules) Improve schedule entry isolation In #590 I made schedule storage keys embed their schedule data, and another issue was identified that a task with two schedules could have an overlapping storage key. These changes embed the schedule entry name into the storage key to ensure that schedule timing is unique for each entry. Now changing, the schedule name, task name, or schedule timing will reset the schedule timing. Refs STREAM-676 --- .../src/taskbroker_client/scheduler/runner.py | 34 ++++-- clients/python/tests/scheduler/test_runner.py | 110 +++++++++++++----- 2 files changed, 103 insertions(+), 41 deletions(-) diff --git a/clients/python/src/taskbroker_client/scheduler/runner.py b/clients/python/src/taskbroker_client/scheduler/runner.py index bfbcdfa8..fde9938f 100644 --- a/clients/python/src/taskbroker_client/scheduler/runner.py +++ b/clients/python/src/taskbroker_client/scheduler/runner.py @@ -73,22 +73,36 @@ def read_many( """ Retrieve last run times in bulk. - storage_keys are the new-format keys including the schedule_id suffix - (e.g. "test:valid:300"). Falls back to the legacy key (derived by - stripping the suffix) when the new key has no data, allowing a seamless - first-deploy transition. + storage_keys are the new-format keys including the entry key prefix and + schedule_id suffix (e.g. "my-entry:test:valid:300"). Falls back through + two legacy formats to allow seamless deploys: - Returns a mapping keyed by storage_key. + new: "{entry_key}:{fullname}:{schedule_id}" (e.g. "my-entry:test:valid:300") + compat: "{fullname}:{schedule_id}" (e.g. "test:valid:300") + legacy: "{fullname}" (e.g. "test:valid") + + Compat is derived by stripping the entry_key prefix (split on first colon). + Legacy is derived from compat by stripping the schedule_id suffix (rsplit on last colon). + + Returns a mapping keyed by new storage_key. """ - legacy_keys = [sk.rsplit(":", 1)[0] for sk in storage_keys] + compat_keys = [sk.split(":", 1)[1] for sk in storage_keys] + legacy_keys = [ck.rsplit(":", 1)[0] for ck in compat_keys] new_values = self._redis.mget([self._make_key(sk) for sk in storage_keys]) + compat_values = self._redis.mget([self._make_key(ck) for ck in compat_keys]) legacy_values = self._redis.mget([self._make_key(lk) for lk in legacy_keys]) run_times: dict[str, datetime | None] = {} - for storage_key, new_val, legacy_val in zip(storage_keys, new_values, legacy_values): - raw = new_val if new_val is not None else legacy_val - run_times[storage_key] = datetime.fromisoformat(raw) if raw else None + for storage_key, new_val, compat_val, legacy_val in zip( + storage_keys, new_values, compat_values, legacy_values + ): + value = new_val + if value is None: + value = compat_val + if value is None: + value = legacy_val + run_times[storage_key] = datetime.fromisoformat(value) if value else None return run_times def delete(self, key: str) -> None: @@ -126,7 +140,7 @@ def fullname(self) -> str: @property def storage_key(self) -> str: - return f"{self.fullname}:{self._schedule.schedule_id()}" + return f"{self._key}:{self.fullname}:{self._schedule.schedule_id()}" @property def namespace(self) -> str: diff --git a/clients/python/tests/scheduler/test_runner.py b/clients/python/tests/scheduler/test_runner.py index 90511496..792b6cd3 100644 --- a/clients/python/tests/scheduler/test_runner.py +++ b/clients/python/tests/scheduler/test_runner.py @@ -102,7 +102,7 @@ def test_schedulerunner_tick_one_task_time_remaining( schedule_set = ScheduleRunner(app=task_app, run_storage=run_storage) schedule_set.add( - "valid", + "first-task", { "task": "test:valid", "schedule": timedelta(minutes=5), @@ -110,7 +110,7 @@ def test_schedulerunner_tick_one_task_time_remaining( ) # Last run was two minutes ago. with freeze_time("2025-01-24 14:23:00 UTC"): - run_storage.set("test:valid:300", datetime(2025, 1, 24, 14, 28, 0, tzinfo=UTC)) + run_storage.set("first-task:test:valid:300", datetime(2025, 1, 24, 14, 28, 0, tzinfo=UTC)) namespace = task_app.taskregistry.get("test") with freeze_time("2025-01-24 14:25:00 UTC"), patch.object(namespace, "send_task") as mock_send: @@ -118,7 +118,7 @@ def test_schedulerunner_tick_one_task_time_remaining( assert sleep_time == 180 assert mock_send.call_count == 0 - last_run = run_storage.read("test:valid:300") + last_run = run_storage.read("first-task:test:valid:300") assert last_run == datetime(2025, 1, 24, 14, 23, 0, tzinfo=UTC) @@ -137,7 +137,7 @@ def test_schedulerunner_tick_one_task_spawned( # Last run was 5 minutes from the freeze_time below run_storage.read_many.return_value = { - "test:valid:300": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC), + "valid:test:valid:300": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC), } run_storage.set.return_value = True @@ -155,7 +155,7 @@ def test_schedulerunner_tick_one_task_spawned( assert run_storage.set.call_count == 1 # set() is called with the correct next_run time run_storage.set.assert_called_with( - "test:valid:300", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC) + "valid:test:valid:300", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC) ) @@ -175,7 +175,7 @@ def test_schedulerunner_tick_create_checkin( # Last run was 5 minutes from the freeze_time below run_storage.read_many.return_value = { - "test:valid:300": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC), + "important-task:test:valid:300": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC), } run_storage.set.return_value = True mock_capture_checkin.return_value = "checkin-id" @@ -234,8 +234,8 @@ def test_schedulerunner_tick_key_exists_no_spawn( with freeze_time("2025-01-24 14:30:00 UTC"): # Set a key into run_storage to simulate another scheduler running - run_storage.delete("test:valid:300") - assert run_storage.set("test:valid:300", datetime.now(tz=UTC) + timedelta(minutes=2)) + run_storage.delete("valid:test:valid:300") + assert run_storage.set("valid:test:valid:300", datetime.now(tz=UTC) + timedelta(minutes=2)) # Our scheduler would wakeup and tick again. # The key exists in run_storage so we should not spawn a task. @@ -295,7 +295,7 @@ def test_schedulerunner_tick_one_task_multiple_ticks_crontab( assert sleep_time == 60 # Remove key to simulate expiration - run_storage.delete("test:valid:*/2_*_*_*_*") + run_storage.delete("valid:test:valid:*/2_*_*_*_*") with freeze_time("2025-01-24 14:26:00 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 120 @@ -336,7 +336,7 @@ def test_schedulerunner_tick_multiple_tasks( assert mock_send.call_count == 2 # Remove the redis key, as the ttl in redis doesn't respect freeze_time() - run_storage.delete("test:second:120") + run_storage.delete("second:test:second:120") with freeze_time("2025-01-24 14:27:01 UTC"): sleep_time = schedule_set.tick() # two minutes left on the 5 min task @@ -373,7 +373,7 @@ def test_schedulerunner_tick_fast_and_slow( called = extract_sent_tasks(mock_send) assert called == ["valid"] - run_storage.delete("test:valid:30") + run_storage.delete("valid:test:valid:30") with freeze_time("2025-01-24 14:25:30 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 30 @@ -381,7 +381,7 @@ def test_schedulerunner_tick_fast_and_slow( called = extract_sent_tasks(mock_send) assert called == ["valid", "valid"] - run_storage.delete("test:valid:30") + run_storage.delete("valid:test:valid:30") with freeze_time("2025-01-24 14:26:00 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 30 @@ -389,7 +389,7 @@ def test_schedulerunner_tick_fast_and_slow( called = extract_sent_tasks(mock_send) assert called == ["valid", "valid", "second", "valid"] - run_storage.delete("test:valid:30") + run_storage.delete("valid:test:valid:30") with freeze_time("2025-01-24 14:26:30 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 30 @@ -397,12 +397,12 @@ def test_schedulerunner_tick_fast_and_slow( called = extract_sent_tasks(mock_send) assert called == ["valid", "valid", "second", "valid", "valid"] - run_storage.delete("test:valid:30") + run_storage.delete("valid:test:valid:30") with freeze_time("2025-01-24 14:27:00 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 30 - assert run_storage.read("test:valid:30") + assert run_storage.read("valid:test:valid:30") called = extract_sent_tasks(mock_send) assert called == [ "valid", @@ -423,13 +423,13 @@ def test_scheduleentry_storage_key(task_app: TaskbrokerApp) -> None: runner = ScheduleRunner(app=task_app, run_storage=run_storage) runner.add("valid", {"task": "test:valid", "schedule": timedelta(minutes=5)}) entry = runner._entries[0] - assert entry.storage_key == "test:valid:300" + assert entry.storage_key == "valid:test:valid:300" assert entry.fullname == "test:valid" runner2 = ScheduleRunner(app=task_app, run_storage=run_storage) - runner2.add("valid", {"task": "test:valid", "schedule": crontab(minute="*/2")}) + runner2.add("my-schedule", {"task": "test:valid", "schedule": crontab(minute="*/2")}) entry2 = runner2._entries[0] - assert entry2.storage_key == "test:valid:*/2_*_*_*_*" + assert entry2.storage_key == "my-schedule:test:valid:*/2_*_*_*_*" def test_schedulerunner_schedule_change_spawns_immediately( @@ -440,7 +440,7 @@ def test_schedulerunner_schedule_change_spawns_immediately( should not block spawning. The task should run immediately on the new schedule. """ # Simulate the old scheduler having stored state with a 3-hour schedule - old_storage_key = "test:valid:10800" # 3 hours = 10800 seconds + old_storage_key = "valid:test:valid:10800" # 3 hours = 10800 seconds with freeze_time("2025-01-24 12:00:00 UTC"): # Old key with a 3h TTL would normally block for up to 3 more hours run_storage.set(old_storage_key, datetime(2025, 1, 24, 15, 0, 0, tzinfo=UTC)) @@ -459,29 +459,77 @@ def test_schedulerunner_schedule_change_spawns_immediately( def test_runstorage_read_many_backwards_compat(run_storage: RunStorage) -> None: """ - read_many() should fall back to the legacy key (old format without schedule_id suffix) - when the new-format key has no data. When the new key exists it should take precedence. + read_many() falls back through three key formats during a rolling deploy: + + new: "{entry_key}:{fullname}:{schedule_id}" (e.g. "my-entry:test:valid:300") + compat: "{fullname}:{schedule_id}" (e.g. "test:valid:300") + legacy: "{fullname}" (e.g. "test:valid") + + The new key takes precedence, then compat, then legacy. """ with freeze_time("2025-01-24 14:25:00 UTC"): - now = datetime.now(tz=UTC) - # Write state under the old legacy key format (no schedule_id suffix) + legacy_time = datetime.now(tz=UTC) + # Write state under the original legacy key (no schedule_id, no entry_key) run_storage._redis.set( run_storage._make_key("test:valid"), - now.isoformat(), + legacy_time.isoformat(), ex=300, ) - # New-format key doesn't exist yet — should fall back to legacy value - result = run_storage.read_many(["test:valid:300"]) - assert result["test:valid:300"] == now + # Only legacy exists — should fall back all the way + first_result = run_storage.read_many(["my-entry:test:valid:300"]) + assert first_result["my-entry:test:valid:300"] == legacy_time - # Once a new-format key is written, it wins over the legacy key with freeze_time("2025-01-24 14:26:00 UTC"): - new_time = datetime.now(tz=UTC) + compat_time = datetime.now(tz=UTC) + # Write state under the compat key format (schedule_id, no entry_key) run_storage._redis.set( run_storage._make_key("test:valid:300"), + compat_time.isoformat(), + ex=300, + ) + + # Compat key exists and wins over legacy + second_result = run_storage.read_many(["my-entry:test:valid:300"]) + assert second_result["my-entry:test:valid:300"] == compat_time + + with freeze_time("2025-01-24 14:27:00 UTC"): + new_time = datetime.now(tz=UTC) + # Write state under the new key format (entry_key + fullname + schedule_id) + run_storage._redis.set( + run_storage._make_key("my-entry:test:valid:300"), new_time.isoformat(), ex=300, ) - result2 = run_storage.read_many(["test:valid:300"]) - assert result2["test:valid:300"] == new_time + + # New key wins over both compat and legacy + latest_result = run_storage.read_many(["my-entry:test:valid:300"]) + assert latest_result["my-entry:test:valid:300"] == new_time + + +def test_schedulerunner_two_schedules_same_task( + task_app: TaskbrokerApp, run_storage: RunStorage +) -> None: + """ + If a task has two different schedules, each schedule should have distinct + storage keys that include the schedule timing. + """ + schedule_set = ScheduleRunner(app=task_app, run_storage=run_storage) + schedule_set.add("first", {"task": "test:valid", "schedule": timedelta(minutes=5)}) + schedule_set.add("second", {"task": "test:valid", "schedule": timedelta(minutes=10)}) + + first_entry, second_entry = schedule_set._entries + assert first_entry.storage_key == "first:test:valid:300" + assert second_entry.storage_key == "second:test:valid:600" + + namespace = task_app.taskregistry.get("test") + with patch.object(namespace, "send_task") as mock_send: + with freeze_time("2025-01-24 14:25:00 UTC"): + sleep_time = schedule_set.tick() + # Both tasks are due as there are no storage keys set + assert mock_send.call_count == 2 + assert sleep_time == 300 + + # Each entry has its own Redis key — neither blocks the other + assert run_storage.read("first:test:valid:300") is not None + assert run_storage.read("second:test:valid:600") is not None From 1d14b61a22ced5c2ed178a17f05dd89e45057a3a Mon Sep 17 00:00:00 2001 From: Mark Story Date: Thu, 16 Apr 2026 15:20:42 -0400 Subject: [PATCH 2/2] Fix compiler warning from new rust --- src/kafka/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index bd70ab5a..8de67229 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -1000,7 +1000,7 @@ mod tests { self.pipe .write() .unwrap() - .extend(take(&mut self.buffer.write().unwrap() as &mut Vec).into_iter()); + .extend(take(&mut self.buffer.write().unwrap() as &mut Vec)); Ok(Some(())) }