Skip to content

Commit 28cbbc2

Browse files
Expose worker roster parity in Python SDK
1 parent 97a5e7e commit 28cbbc2

6 files changed

Lines changed: 414 additions & 0 deletions

File tree

src/durable_workflow/client.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ def _route_for_metrics(path: str) -> str:
7272
parts[1] = "{schedule_id}"
7373
elif parts[0] == "search-attributes" and len(parts) >= 2:
7474
parts[1] = "{name}"
75+
elif parts[0] == "workers" and len(parts) >= 2:
76+
parts[1] = "{worker_id}"
7577
elif parts[:2] == ["bridge-adapters", "webhook"] and len(parts) >= 3:
7678
parts[2] = "{adapter}"
7779
elif (
@@ -383,6 +385,58 @@ class TaskQueueList:
383385
task_queues: list[TaskQueueDescription]
384386

385387

388+
@dataclass
389+
class WorkerDescription:
390+
"""Current server view of one registered worker."""
391+
392+
worker_id: str
393+
task_queue: str | None = None
394+
runtime: str | None = None
395+
namespace: str | None = None
396+
sdk_version: str | None = None
397+
build_id: str | None = None
398+
status: str | None = None
399+
max_concurrent_workflow_tasks: int | None = None
400+
max_concurrent_activity_tasks: int | None = None
401+
supported_workflow_types: list[str] | None = None
402+
supported_activity_types: list[str] | None = None
403+
last_heartbeat_at: str | None = None
404+
registered_at: str | None = None
405+
updated_at: str | None = None
406+
raw: dict[str, Any] | None = None
407+
408+
@classmethod
409+
def from_dict(cls, data: dict[str, Any], *, worker_id: str | None = None) -> WorkerDescription:
410+
workflow_types = data.get("supported_workflow_types")
411+
activity_types = data.get("supported_activity_types")
412+
413+
return cls(
414+
worker_id=data.get("worker_id", worker_id or ""),
415+
task_queue=data.get("task_queue"),
416+
runtime=data.get("runtime"),
417+
namespace=data.get("namespace"),
418+
sdk_version=data.get("sdk_version"),
419+
build_id=data.get("build_id"),
420+
status=data.get("status"),
421+
max_concurrent_workflow_tasks=data.get("max_concurrent_workflow_tasks"),
422+
max_concurrent_activity_tasks=data.get("max_concurrent_activity_tasks"),
423+
supported_workflow_types=workflow_types if isinstance(workflow_types, list) else None,
424+
supported_activity_types=activity_types if isinstance(activity_types, list) else None,
425+
last_heartbeat_at=data.get("last_heartbeat_at"),
426+
registered_at=data.get("registered_at"),
427+
updated_at=data.get("updated_at"),
428+
raw=data,
429+
)
430+
431+
432+
@dataclass
433+
class WorkerList:
434+
"""Registered worker roster for one namespace."""
435+
436+
namespace: str | None
437+
workers: list[WorkerDescription]
438+
439+
386440
@dataclass
387441
class ScheduleSpec:
388442
"""Calendar or interval rules for a scheduled workflow."""
@@ -1063,6 +1117,70 @@ async def delete_search_attribute(self, name: str) -> dict[str, Any]:
10631117
)
10641118
return data
10651119

1120+
# ── Workers ───────────────────────────────────────────────────────
1121+
async def list_workers(
1122+
self,
1123+
*,
1124+
task_queue: str | None = None,
1125+
status: str | None = None,
1126+
) -> WorkerList:
1127+
"""List registered workers in the current namespace."""
1128+
params: dict[str, str] = {}
1129+
if task_queue is not None:
1130+
params["task_queue"] = task_queue
1131+
if status is not None:
1132+
params["status"] = status
1133+
1134+
path = "/workers"
1135+
if params:
1136+
path = f"{path}?{urlencode(params)}"
1137+
1138+
data = await self._request("GET", path)
1139+
if not isinstance(data, dict):
1140+
raise ServerError(
1141+
200,
1142+
{
1143+
"reason": "invalid_worker_response",
1144+
"message": f"expected JSON object, got {type(data).__name__}",
1145+
},
1146+
)
1147+
items = data.get("workers", [])
1148+
1149+
return WorkerList(
1150+
namespace=data.get("namespace"),
1151+
workers=[
1152+
WorkerDescription.from_dict(item)
1153+
for item in items
1154+
if isinstance(item, dict)
1155+
],
1156+
)
1157+
1158+
async def describe_worker(self, worker_id: str) -> WorkerDescription:
1159+
"""Return runtime, capacity, heartbeat, and type support for one worker."""
1160+
data = await self._request("GET", f"/workers/{quote(worker_id, safe='')}", context=worker_id)
1161+
if not isinstance(data, dict):
1162+
raise ServerError(
1163+
200,
1164+
{
1165+
"reason": "invalid_worker_response",
1166+
"message": f"expected JSON object, got {type(data).__name__}",
1167+
},
1168+
)
1169+
return WorkerDescription.from_dict(data, worker_id=worker_id)
1170+
1171+
async def deregister_worker(self, worker_id: str) -> dict[str, Any]:
1172+
"""Remove a stale or retired worker from the server roster."""
1173+
data = await self._request("DELETE", f"/workers/{quote(worker_id, safe='')}", context=worker_id)
1174+
if not isinstance(data, dict):
1175+
raise ServerError(
1176+
200,
1177+
{
1178+
"reason": "invalid_worker_response",
1179+
"message": f"expected JSON object, got {type(data).__name__}",
1180+
},
1181+
)
1182+
return data
1183+
10661184
# ── Workflows ──────────────────────────────────────────────────────
10671185
async def start_workflow(
10681186
self,
@@ -1701,6 +1819,7 @@ async def register_worker(
17011819
max_concurrent_activity_tasks: int | None = None,
17021820
runtime: str = "python",
17031821
sdk_version: str | None = None,
1822+
build_id: str | None = None,
17041823
) -> Any:
17051824
"""Register this process with the server as a worker for ``task_queue``.
17061825
@@ -1725,6 +1844,8 @@ async def register_worker(
17251844
}
17261845
if workflow_definition_fingerprints is not None:
17271846
body["workflow_definition_fingerprints"] = workflow_definition_fingerprints
1847+
if build_id is not None:
1848+
body["build_id"] = build_id
17281849
if max_concurrent_workflow_tasks is not None:
17291850
body["max_concurrent_workflow_tasks"] = max_concurrent_workflow_tasks
17301851
if max_concurrent_activity_tasks is not None:
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "worker.deregister",
5+
"request": {
6+
"method": "DELETE",
7+
"path": "/workers/polyglot-worker-231"
8+
},
9+
"semantic_body": {
10+
"worker_id": "polyglot-worker-231",
11+
"outcome": "deregistered"
12+
},
13+
"response_body": {
14+
"worker_id": "polyglot-worker-231",
15+
"outcome": "deregistered"
16+
},
17+
"cli": {
18+
"argv": {
19+
"worker-id": "polyglot-worker-231",
20+
"--json": true
21+
}
22+
},
23+
"sdk_python": {
24+
"method": "deregister_worker",
25+
"args": {
26+
"worker_id": "polyglot-worker-231"
27+
}
28+
}
29+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "worker.describe",
5+
"request": {
6+
"method": "GET",
7+
"path": "/workers/polyglot-worker-231"
8+
},
9+
"semantic_body": {
10+
"worker_id": "polyglot-worker-231",
11+
"namespace": "orders-prod",
12+
"task_queue": "external-workflows",
13+
"runtime": "external",
14+
"status": "active"
15+
},
16+
"response_body": {
17+
"worker_id": "polyglot-worker-231",
18+
"namespace": "orders-prod",
19+
"task_queue": "external-workflows",
20+
"runtime": "external",
21+
"sdk_version": "durable-worker/1.2.3",
22+
"build_id": "build-231",
23+
"status": "active",
24+
"max_concurrent_workflow_tasks": 4,
25+
"max_concurrent_activity_tasks": 8,
26+
"supported_workflow_types": [
27+
"orders.fulfillment"
28+
],
29+
"supported_activity_types": [
30+
"inventory.sync"
31+
],
32+
"last_heartbeat_at": "2026-04-22T09:30:00Z",
33+
"registered_at": "2026-04-22T09:00:00Z",
34+
"updated_at": "2026-04-22T09:30:00Z"
35+
},
36+
"cli": {
37+
"argv": {
38+
"worker-id": "polyglot-worker-231",
39+
"--json": true
40+
}
41+
},
42+
"sdk_python": {
43+
"method": "describe_worker",
44+
"args": {
45+
"worker_id": "polyglot-worker-231"
46+
}
47+
}
48+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "worker.list",
5+
"request": {
6+
"method": "GET",
7+
"path": "/workers",
8+
"query": {
9+
"task_queue": "external-workflows",
10+
"status": "active"
11+
}
12+
},
13+
"semantic_body": {
14+
"namespace": "orders-prod",
15+
"worker_ids": [
16+
"polyglot-worker-231",
17+
"batch-worker-231"
18+
],
19+
"task_queue": "external-workflows",
20+
"status": "active"
21+
},
22+
"response_body": {
23+
"namespace": "orders-prod",
24+
"workers": [
25+
{
26+
"worker_id": "polyglot-worker-231",
27+
"namespace": "orders-prod",
28+
"task_queue": "external-workflows",
29+
"runtime": "external",
30+
"sdk_version": "durable-worker/1.2.3",
31+
"build_id": "build-231",
32+
"status": "active",
33+
"last_heartbeat_at": "2026-04-22T09:30:00Z"
34+
},
35+
{
36+
"worker_id": "batch-worker-231",
37+
"namespace": "orders-prod",
38+
"task_queue": "external-workflows",
39+
"runtime": "worker-runtime",
40+
"sdk_version": "polyglot-sdk/2.0.0",
41+
"build_id": "build-232",
42+
"status": "active",
43+
"last_heartbeat_at": "2026-04-22T09:31:00Z"
44+
}
45+
]
46+
},
47+
"cli": {
48+
"argv": {
49+
"--task-queue": "external-workflows",
50+
"--status": "active",
51+
"--json": true
52+
}
53+
},
54+
"sdk_python": {
55+
"method": "list_workers",
56+
"kwargs": {
57+
"task_queue": "external-workflows",
58+
"status": "active"
59+
}
60+
}
61+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "worker.register",
5+
"request": {
6+
"method": "POST",
7+
"path": "/worker/register",
8+
"body": {
9+
"worker_id": "polyglot-worker-231",
10+
"task_queue": "external-workflows",
11+
"runtime": "external",
12+
"sdk_version": "durable-worker/1.2.3",
13+
"build_id": "build-231",
14+
"supported_workflow_types": [
15+
"orders.fulfillment"
16+
],
17+
"supported_activity_types": [
18+
"inventory.sync"
19+
],
20+
"max_concurrent_workflow_tasks": 4,
21+
"max_concurrent_activity_tasks": 8
22+
}
23+
},
24+
"semantic_body": {
25+
"worker_id": "polyglot-worker-231",
26+
"task_queue": "external-workflows",
27+
"runtime": "external",
28+
"status": "active"
29+
},
30+
"response_body": {
31+
"worker_id": "polyglot-worker-231",
32+
"task_queue": "external-workflows",
33+
"runtime": "external",
34+
"status": "active",
35+
"registered": true
36+
},
37+
"cli": {
38+
"argv": {
39+
"worker-id": "polyglot-worker-231",
40+
"--task-queue": "external-workflows",
41+
"--runtime": "external",
42+
"--sdk-version": "durable-worker/1.2.3",
43+
"--build-id": "build-231",
44+
"--workflow-type": [
45+
"orders.fulfillment"
46+
],
47+
"--activity-type": [
48+
"inventory.sync"
49+
],
50+
"--max-workflow-tasks": "4",
51+
"--max-activity-tasks": "8",
52+
"--json": true
53+
}
54+
},
55+
"sdk_python": {
56+
"method": "register_worker",
57+
"kwargs": {
58+
"worker_id": "polyglot-worker-231",
59+
"task_queue": "external-workflows",
60+
"runtime": "external",
61+
"sdk_version": "durable-worker/1.2.3",
62+
"build_id": "build-231",
63+
"supported_workflow_types": [
64+
"orders.fulfillment"
65+
],
66+
"supported_activity_types": [
67+
"inventory.sync"
68+
],
69+
"max_concurrent_workflow_tasks": 4,
70+
"max_concurrent_activity_tasks": 8
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)