Skip to content

Commit d582eb4

Browse files
Add polyglot parity for system repair-status and repair-pass
Add the Python SDK side of the operator task-repair surface so the polyglot parity matrix now covers the six-operation system surface at its repair pair. - Client.repair_status() mirrors GET /system/repair, returning the policy/candidates snapshot that dw system:repair-status already renders. - Client.repair_pass(run_ids=..., instance_id=...) mirrors POST /system/repair/pass with the same optional filters as dw system:repair-pass; an unfiltered call sends an empty JSON body, which the server treats as a full-scope sweep. Both methods are operator surfaces; callers must be authenticated with admin scope on the control plane. The returned dicts are passed through as-is so callers can observe the full repair report (throttled flag, per-scope candidate totals, failure arrays) without the SDK drifting out of sync with the server shape. Ship two shared control-plane parity fixtures (tests/fixtures/control-plane/system-repair-{status,pass}-parity.json) byte-identical with the CLI, covering the GET no-body and POST run_ids+instance_id bodies. The existing control-plane fixture contract test picks them up automatically, and TestSystemMaintenance pins the SDK projection to the fixture semantic body. Verified: - pytest tests/ --ignore=tests/integration -> 617 passed - ruff check src/durable_workflow/client.py tests/test_client.py -> clean - mypy src/durable_workflow/client.py -> no issues - scripts/check-cli-parity.py --cli ../cli-231 -> byte-identical
1 parent 8187e4a commit d582eb4

4 files changed

Lines changed: 263 additions & 0 deletions

File tree

src/durable_workflow/client.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,53 @@ async def test_external_storage(
13681368
)
13691369
return StorageTestResult.from_dict(data)
13701370

1371+
# ── System maintenance ────────────────────────────────────────────
1372+
async def repair_status(self) -> dict[str, Any]:
1373+
"""Return the current task repair policy and candidate snapshot.
1374+
1375+
Mirrors ``dw system:repair-status``. Operator surface; the caller
1376+
must be authenticated with admin scope.
1377+
"""
1378+
data = await self._request("GET", "/system/repair", context="repair_status")
1379+
if not isinstance(data, dict):
1380+
raise ServerError(
1381+
200,
1382+
{
1383+
"reason": "invalid_repair_status_response",
1384+
"message": f"expected JSON object, got {type(data).__name__}",
1385+
},
1386+
)
1387+
return data
1388+
1389+
async def repair_pass(
1390+
self,
1391+
*,
1392+
run_ids: list[str] | None = None,
1393+
instance_id: str | None = None,
1394+
) -> dict[str, Any]:
1395+
"""Run one task repair sweep on the server.
1396+
1397+
Mirrors ``dw system:repair-pass``. Without filters the server runs
1398+
a full-scope pass; pass ``run_ids`` or ``instance_id`` to narrow
1399+
the sweep. Operator surface; requires admin scope.
1400+
"""
1401+
body: dict[str, Any] = {}
1402+
if run_ids:
1403+
body["run_ids"] = list(run_ids)
1404+
if instance_id is not None:
1405+
body["instance_id"] = instance_id
1406+
1407+
data = await self._request("POST", "/system/repair/pass", json=body, context="repair_pass")
1408+
if not isinstance(data, dict):
1409+
raise ServerError(
1410+
200,
1411+
{
1412+
"reason": "invalid_repair_pass_response",
1413+
"message": f"expected JSON object, got {type(data).__name__}",
1414+
},
1415+
)
1416+
return data
1417+
13711418
# ── Task queues ────────────────────────────────────────────────────
13721419
async def list_task_queues(self) -> TaskQueueList:
13731420
"""List task queues with server-side admission status.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "system.repair.pass",
5+
"request": {
6+
"method": "POST",
7+
"path": "/system/repair/pass",
8+
"body": {
9+
"run_ids": [
10+
"wf-run-1",
11+
"wf-run-2"
12+
],
13+
"instance_id": "chk-42"
14+
}
15+
},
16+
"semantic_body": {
17+
"namespace": "orders-prod",
18+
"instance_id": "chk-42",
19+
"repaired_existing_tasks": 1,
20+
"repaired_missing_tasks": 0,
21+
"dispatched_tasks": 1,
22+
"selected_existing_task_candidates": 1,
23+
"selected_missing_task_candidates": 0,
24+
"existing_task_failures": [],
25+
"missing_run_failures": []
26+
},
27+
"response_body": {
28+
"throttled": false,
29+
"selected_existing_task_candidates": 1,
30+
"selected_missing_task_candidates": 0,
31+
"repaired_existing_tasks": 1,
32+
"repaired_missing_tasks": 0,
33+
"dispatched_tasks": 1,
34+
"selected_command_contract_candidates": 0,
35+
"backfilled_command_contracts": 0,
36+
"command_contract_backfill_unavailable": 0,
37+
"command_contract_failures": [],
38+
"existing_task_failures": [],
39+
"missing_run_failures": []
40+
},
41+
"cli": {
42+
"argv": {
43+
"--run-id": [
44+
"wf-run-1",
45+
"wf-run-2"
46+
],
47+
"--instance-id": "chk-42",
48+
"--json": true
49+
}
50+
},
51+
"sdk_python": {
52+
"method": "repair_pass",
53+
"args": {
54+
"run_ids": [
55+
"wf-run-1",
56+
"wf-run-2"
57+
],
58+
"instance_id": "chk-42"
59+
}
60+
}
61+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "system.repair.status",
5+
"request": {
6+
"method": "GET",
7+
"path": "/system/repair"
8+
},
9+
"semantic_body": {
10+
"namespace": "orders-prod",
11+
"total_candidates": 3,
12+
"existing_task_candidates": 2,
13+
"missing_task_candidates": 1,
14+
"scan_pressure": false,
15+
"scan_limit": 100,
16+
"scan_strategy": "oldest_first",
17+
"scope_keys": [
18+
"wf-polyglot-231:wf-run-1",
19+
"wf-polyglot-231:wf-run-2"
20+
]
21+
},
22+
"response_body": {
23+
"policy": {
24+
"redispatch_after_seconds": 120,
25+
"loop_throttle_seconds": 15,
26+
"scan_limit": 100,
27+
"scan_strategy": "oldest_first",
28+
"failure_backoff_max_seconds": 900
29+
},
30+
"candidates": {
31+
"total_candidates": 3,
32+
"existing_task_candidates": 2,
33+
"missing_task_candidates": 1,
34+
"scan_pressure": false,
35+
"oldest_task_candidate_created_at": "2026-04-22T05:30:00Z",
36+
"oldest_missing_run_started_at": "2026-04-22T05:25:00Z",
37+
"scopes": [
38+
{
39+
"scope_key": "wf-polyglot-231:wf-run-1",
40+
"existing_task_candidates": 2,
41+
"missing_task_candidates": 0,
42+
"total_candidates": 2,
43+
"selected_total_candidates": 2,
44+
"scan_limited_by_global_policy": false
45+
},
46+
{
47+
"scope_key": "wf-polyglot-231:wf-run-2",
48+
"existing_task_candidates": 0,
49+
"missing_task_candidates": 1,
50+
"total_candidates": 1,
51+
"selected_total_candidates": 1,
52+
"scan_limited_by_global_policy": false
53+
}
54+
]
55+
}
56+
},
57+
"cli": {
58+
"argv": {
59+
"--json": true
60+
}
61+
},
62+
"sdk_python": {
63+
"method": "repair_status",
64+
"args": {}
65+
}
66+
}

tests/test_client.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,95 @@ async def test_external_storage_probe_matches_polyglot_fixture(self, client: Cli
963963
assert result.large_payload.reference_uri == fixture["semantic_body"]["large_payload_reference_uri"]
964964

965965

966+
class TestSystemMaintenance:
967+
@pytest.mark.asyncio
968+
async def test_repair_status_matches_polyglot_fixture(self, client: Client) -> None:
969+
fixture_path = (
970+
Path(__file__).parent / "fixtures" / "control-plane" / "system-repair-status-parity.json"
971+
)
972+
fixture = json.loads(fixture_path.read_text())
973+
assert fixture["operation"] == "system.repair.status"
974+
sdk = fixture["sdk_python"]
975+
976+
resp = _mock_response(200, fixture["response_body"])
977+
978+
with patch.object(
979+
client._http, "request", new_callable=AsyncMock, return_value=resp
980+
) as mock:
981+
result = await client.repair_status(**sdk["args"])
982+
983+
assert mock.call_args.args[0] == fixture["request"]["method"]
984+
assert mock.call_args.args[1] == f"/api{fixture['request']['path']}"
985+
assert mock.call_args.kwargs.get("json") is None
986+
987+
assert result == fixture["response_body"]
988+
989+
semantic = fixture["semantic_body"]
990+
candidates = result["candidates"]
991+
assert candidates["total_candidates"] == semantic["total_candidates"]
992+
assert candidates["existing_task_candidates"] == semantic["existing_task_candidates"]
993+
assert candidates["missing_task_candidates"] == semantic["missing_task_candidates"]
994+
assert candidates["scan_pressure"] is semantic["scan_pressure"]
995+
assert result["policy"]["scan_limit"] == semantic["scan_limit"]
996+
assert result["policy"]["scan_strategy"] == semantic["scan_strategy"]
997+
assert [scope["scope_key"] for scope in candidates["scopes"]] == semantic["scope_keys"]
998+
999+
@pytest.mark.asyncio
1000+
async def test_repair_pass_matches_polyglot_fixture(self, client: Client) -> None:
1001+
fixture_path = (
1002+
Path(__file__).parent / "fixtures" / "control-plane" / "system-repair-pass-parity.json"
1003+
)
1004+
fixture = json.loads(fixture_path.read_text())
1005+
assert fixture["operation"] == "system.repair.pass"
1006+
sdk = fixture["sdk_python"]
1007+
1008+
resp = _mock_response(200, fixture["response_body"])
1009+
1010+
with patch.object(
1011+
client._http, "request", new_callable=AsyncMock, return_value=resp
1012+
) as mock:
1013+
result = await client.repair_pass(**sdk["args"])
1014+
1015+
assert mock.call_args.args[0] == fixture["request"]["method"]
1016+
assert mock.call_args.args[1] == f"/api{fixture['request']['path']}"
1017+
assert mock.call_args.kwargs.get("json") == fixture["request"]["body"]
1018+
1019+
assert result == fixture["response_body"]
1020+
1021+
semantic = fixture["semantic_body"]
1022+
assert result["repaired_existing_tasks"] == semantic["repaired_existing_tasks"]
1023+
assert result["repaired_missing_tasks"] == semantic["repaired_missing_tasks"]
1024+
assert result["dispatched_tasks"] == semantic["dispatched_tasks"]
1025+
assert result["existing_task_failures"] == semantic["existing_task_failures"]
1026+
assert result["missing_run_failures"] == semantic["missing_run_failures"]
1027+
1028+
@pytest.mark.asyncio
1029+
async def test_repair_pass_sends_empty_body_without_filters(self, client: Client) -> None:
1030+
resp = _mock_response(200, {
1031+
"throttled": False,
1032+
"selected_existing_task_candidates": 0,
1033+
"selected_missing_task_candidates": 0,
1034+
"repaired_existing_tasks": 0,
1035+
"repaired_missing_tasks": 0,
1036+
"dispatched_tasks": 0,
1037+
"selected_command_contract_candidates": 0,
1038+
"backfilled_command_contracts": 0,
1039+
"command_contract_backfill_unavailable": 0,
1040+
"command_contract_failures": [],
1041+
"existing_task_failures": [],
1042+
"missing_run_failures": [],
1043+
})
1044+
1045+
with patch.object(
1046+
client._http, "request", new_callable=AsyncMock, return_value=resp
1047+
) as mock:
1048+
await client.repair_pass()
1049+
1050+
assert mock.call_args.args[0] == "POST"
1051+
assert mock.call_args.args[1] == "/api/system/repair/pass"
1052+
assert mock.call_args.kwargs.get("json") == {}
1053+
1054+
9661055
class TestTaskQueues:
9671056
@pytest.mark.asyncio
9681057
async def test_list_task_queues_matches_polyglot_fixture(self, client: Client) -> None:

0 commit comments

Comments
 (0)