Skip to content

Commit 0b9cd8a

Browse files
Validate SDK durable timeout budgets
1 parent 93dafbf commit 0b9cd8a

2 files changed

Lines changed: 142 additions & 0 deletions

File tree

src/durable_workflow/workflow.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,13 @@ class ChildWorkflowRetryPolicy(ActivityRetryPolicy):
267267
ChildWorkflowRetryPolicyInput = ChildWorkflowRetryPolicy | ActivityRetryPolicy | Mapping[str, Any]
268268

269269

270+
def _validate_positive_timeout(name: str, value: int | None) -> None:
271+
if value is None:
272+
return
273+
if value < 1:
274+
raise ValueError(f"{name} must be >= 1 second")
275+
276+
270277
@dataclass
271278
class ScheduleActivity:
272279
"""Command requesting an activity task.
@@ -296,6 +303,8 @@ def to_server_command(
296303
size_warning: serializer.PayloadSizeWarningConfig | None = serializer.DEFAULT_PAYLOAD_SIZE_WARNING,
297304
warning_context: PayloadWarningContext = None,
298305
) -> dict[str, Any]:
306+
self._validate_timeouts()
307+
299308
command: dict[str, Any] = {
300309
"type": "schedule_activity",
301310
"activity_type": self.activity_type,
@@ -328,6 +337,38 @@ def to_server_command(
328337
command["heartbeat_timeout"] = self.heartbeat_timeout
329338
return command
330339

340+
def _validate_timeouts(self) -> None:
341+
timeouts = {
342+
"start_to_close_timeout": self.start_to_close_timeout,
343+
"schedule_to_start_timeout": self.schedule_to_start_timeout,
344+
"schedule_to_close_timeout": self.schedule_to_close_timeout,
345+
"heartbeat_timeout": self.heartbeat_timeout,
346+
}
347+
for name, value in timeouts.items():
348+
_validate_positive_timeout(name, value)
349+
350+
if (
351+
self.heartbeat_timeout is not None
352+
and self.start_to_close_timeout is not None
353+
and self.heartbeat_timeout > self.start_to_close_timeout
354+
):
355+
raise ValueError("heartbeat_timeout must be <= start_to_close_timeout")
356+
357+
if self.schedule_to_close_timeout is None:
358+
return
359+
360+
if (
361+
self.start_to_close_timeout is not None
362+
and self.start_to_close_timeout > self.schedule_to_close_timeout
363+
):
364+
raise ValueError("start_to_close_timeout must be <= schedule_to_close_timeout")
365+
366+
if (
367+
self.schedule_to_start_timeout is not None
368+
and self.schedule_to_start_timeout > self.schedule_to_close_timeout
369+
):
370+
raise ValueError("schedule_to_start_timeout must be <= schedule_to_close_timeout")
371+
331372

332373
@dataclass
333374
class StartTimer:
@@ -555,6 +596,8 @@ def to_server_command(
555596
size_warning: serializer.PayloadSizeWarningConfig | None = serializer.DEFAULT_PAYLOAD_SIZE_WARNING,
556597
warning_context: PayloadWarningContext = None,
557598
) -> dict[str, Any]:
599+
self._validate_timeouts()
600+
558601
cmd: dict[str, Any] = {
559602
"type": "start_child_workflow",
560603
"workflow_type": self.workflow_type,
@@ -587,6 +630,17 @@ def to_server_command(
587630
cmd["run_timeout_seconds"] = self.run_timeout_seconds
588631
return cmd
589632

633+
def _validate_timeouts(self) -> None:
634+
_validate_positive_timeout("execution_timeout_seconds", self.execution_timeout_seconds)
635+
_validate_positive_timeout("run_timeout_seconds", self.run_timeout_seconds)
636+
637+
if (
638+
self.execution_timeout_seconds is not None
639+
and self.run_timeout_seconds is not None
640+
and self.run_timeout_seconds > self.execution_timeout_seconds
641+
):
642+
raise ValueError("run_timeout_seconds must be <= execution_timeout_seconds")
643+
590644

591645
@dataclass
592646
class RecordVersionMarker:

tests/test_replay.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,61 @@ def test_schedule_activity_server_command_includes_retry_policy_and_timeouts(sel
220220
assert server_cmd["schedule_to_close_timeout"] == 300
221221
assert server_cmd["heartbeat_timeout"] == 15
222222

223+
@pytest.mark.parametrize(
224+
("field", "value", "message"),
225+
[
226+
("start_to_close_timeout", 0, "start_to_close_timeout must be >= 1 second"),
227+
("schedule_to_start_timeout", 0, "schedule_to_start_timeout must be >= 1 second"),
228+
("schedule_to_close_timeout", 0, "schedule_to_close_timeout must be >= 1 second"),
229+
("heartbeat_timeout", 0, "heartbeat_timeout must be >= 1 second"),
230+
],
231+
)
232+
def test_schedule_activity_rejects_non_positive_timeout_budgets(
233+
self,
234+
field: str,
235+
value: int,
236+
message: str,
237+
) -> None:
238+
cmd = ScheduleActivity(
239+
activity_type="charge-card",
240+
arguments=[],
241+
**{field: value},
242+
)
243+
244+
with pytest.raises(ValueError, match=message):
245+
cmd.to_server_command("default-queue")
246+
247+
@pytest.mark.parametrize(
248+
("kwargs", "message"),
249+
[
250+
(
251+
{"start_to_close_timeout": 10, "heartbeat_timeout": 11},
252+
"heartbeat_timeout must be <= start_to_close_timeout",
253+
),
254+
(
255+
{"start_to_close_timeout": 301, "schedule_to_close_timeout": 300},
256+
"start_to_close_timeout must be <= schedule_to_close_timeout",
257+
),
258+
(
259+
{"schedule_to_start_timeout": 301, "schedule_to_close_timeout": 300},
260+
"schedule_to_start_timeout must be <= schedule_to_close_timeout",
261+
),
262+
],
263+
)
264+
def test_schedule_activity_rejects_incoherent_timeout_envelopes(
265+
self,
266+
kwargs: dict[str, int],
267+
message: str,
268+
) -> None:
269+
cmd = ScheduleActivity(
270+
activity_type="charge-card",
271+
arguments=[],
272+
**kwargs,
273+
)
274+
275+
with pytest.raises(ValueError, match=message):
276+
cmd.to_server_command("default-queue")
277+
223278

224279
class TestTwoActivities:
225280
def test_first_schedules(self) -> None:
@@ -728,6 +783,39 @@ def test_server_command_shape(self) -> None:
728783
assert sc["execution_timeout_seconds"] == 600
729784
assert sc["run_timeout_seconds"] == 120
730785

786+
@pytest.mark.parametrize(
787+
("field", "value", "message"),
788+
[
789+
("execution_timeout_seconds", 0, "execution_timeout_seconds must be >= 1 second"),
790+
("run_timeout_seconds", 0, "run_timeout_seconds must be >= 1 second"),
791+
],
792+
)
793+
def test_server_command_rejects_non_positive_child_timeout_budgets(
794+
self,
795+
field: str,
796+
value: int,
797+
message: str,
798+
) -> None:
799+
cmd = StartChildWorkflow(
800+
workflow_type="sub",
801+
arguments=[],
802+
**{field: value},
803+
)
804+
805+
with pytest.raises(ValueError, match=message):
806+
cmd.to_server_command("default-q")
807+
808+
def test_server_command_rejects_child_run_timeout_larger_than_execution_timeout(self) -> None:
809+
cmd = StartChildWorkflow(
810+
workflow_type="sub",
811+
arguments=[],
812+
execution_timeout_seconds=120,
813+
run_timeout_seconds=121,
814+
)
815+
816+
with pytest.raises(ValueError, match="run_timeout_seconds must be <= execution_timeout_seconds"):
817+
cmd.to_server_command("default-q")
818+
731819
def test_server_command_defaults(self) -> None:
732820
cmd = StartChildWorkflow(workflow_type="sub", arguments=[])
733821
sc = cmd.to_server_command("default-q")

0 commit comments

Comments
 (0)