Skip to content

Commit be6fd7e

Browse files
Let Python Worker report its build_id during registration
Accept a `build_id` keyword on `Worker(...)` and forward it through `Client.register_worker` so the server's per-build-id rollout snapshot for the task queue can attribute active, draining, and stale worker counts to the right cohort. An empty or whitespace-only `build_id` is rejected at construction rather than silently dropped.
1 parent 1dd4ec6 commit be6fd7e

2 files changed

Lines changed: 52 additions & 0 deletions

File tree

src/durable_workflow/worker.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ def __init__(
242242
workflows: Iterable[type] = (),
243243
activities: Iterable[Callable[..., Any]] = (),
244244
worker_id: str | None = None,
245+
build_id: str | None = None,
245246
poll_timeout: float = 35.0,
246247
max_concurrent_workflow_tasks: int = 10,
247248
max_concurrent_activity_tasks: int = 10,
@@ -258,6 +259,12 @@ def __init__(
258259
}
259260
self.activities = {_activity_name(a): a for a in activities}
260261
self.worker_id = worker_id or f"py-worker-{uuid.uuid4().hex[:8]}"
262+
if build_id is not None:
263+
if not isinstance(build_id, str) or build_id.strip() == "":
264+
raise ValueError("build_id must be a non-empty string when provided")
265+
self.build_id: str | None = build_id
266+
else:
267+
self.build_id = None
261268
_guard_worker_workflow_fingerprints(self.worker_id, self.workflow_definition_fingerprints)
262269
if max_concurrent_workflow_tasks < 1:
263270
raise ValueError("max_concurrent_workflow_tasks must be at least 1")
@@ -346,6 +353,7 @@ async def _register(self) -> None:
346353
supported_activity_types=list(self.activities),
347354
max_concurrent_workflow_tasks=self.max_concurrent_workflow_tasks,
348355
max_concurrent_activity_tasks=self.max_concurrent_activity_tasks,
356+
build_id=self.build_id,
349357
)
350358
log.info("worker %s registered on %s", self.worker_id, self.task_queue)
351359

tests/test_worker.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,50 @@ async def test_register_advertises_custom_concurrency_limits(self, mock_client:
187187
assert call_kwargs["max_concurrent_workflow_tasks"] == 3
188188
assert call_kwargs["max_concurrent_activity_tasks"] == 7
189189

190+
@pytest.mark.asyncio
191+
async def test_register_forwards_build_id_when_configured(
192+
self, mock_client: AsyncMock
193+
) -> None:
194+
worker = Worker(
195+
mock_client,
196+
task_queue="q1",
197+
workflows=[TestWorkflow],
198+
activities=[echo_activity],
199+
worker_id="w-build",
200+
build_id="release-2026.04.22-a1",
201+
)
202+
assert worker.build_id == "release-2026.04.22-a1"
203+
204+
await worker._register()
205+
206+
call_kwargs = mock_client.register_worker.call_args.kwargs
207+
assert call_kwargs["build_id"] == "release-2026.04.22-a1"
208+
209+
@pytest.mark.asyncio
210+
async def test_register_omits_build_id_when_not_configured(
211+
self, mock_client: AsyncMock
212+
) -> None:
213+
worker = Worker(
214+
mock_client,
215+
task_queue="q1",
216+
workflows=[TestWorkflow],
217+
activities=[echo_activity],
218+
worker_id="w-no-build",
219+
)
220+
assert worker.build_id is None
221+
222+
await worker._register()
223+
224+
call_kwargs = mock_client.register_worker.call_args.kwargs
225+
assert call_kwargs["build_id"] is None
226+
227+
def test_constructor_rejects_empty_build_id(self, mock_client: AsyncMock) -> None:
228+
with pytest.raises(ValueError, match="build_id"):
229+
Worker(mock_client, task_queue="q1", build_id="")
230+
231+
with pytest.raises(ValueError, match="build_id"):
232+
Worker(mock_client, task_queue="q1", build_id=" ")
233+
190234
def test_constructor_rejects_non_positive_concurrency_limits(
191235
self, mock_client: AsyncMock
192236
) -> None:

0 commit comments

Comments
 (0)