Skip to content

Commit 794f0e8

Browse files
SushisourceTHardy98
authored andcommitted
Update worker versioning sample to use deployments (#246)
1 parent e3a04f2 commit 794f0e8

File tree

12 files changed

+487
-240
lines changed

12 files changed

+487
-240
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
__pycache__
44
.vscode
55
.DS_Store
6+
.claude

worker_versioning/README.md

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,26 @@
1-
# Worker Versioning Sample
1+
## Worker Versioning
22

3-
This sample shows you how you can use the [Worker Versioning](https://docs.temporal.io/workers#worker-versioning)
4-
feature to deploy incompatible changes to workflow code more easily.
3+
This sample demonstrates how to use Temporal's Worker Versioning feature to safely deploy updates to workflow and activity code. It shows the difference between auto-upgrading and pinned workflows, and how to manage worker deployments with different build IDs.
54

6-
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from the root directory:
5+
The sample creates multiple worker versions (1.0, 1.1, and 2.0) within one deployment and demonstrates:
6+
- **Auto-upgrading workflows**: Automatically and controllably migrate to newer worker versions
7+
- **Pinned workflows**: Stay on the original worker version throughout their lifecycle
8+
- **Compatible vs incompatible changes**: How to make safe updates using `workflow.patched`
79

8-
uv run worker_versioning/example.py
10+
### Steps to run this sample:
911

10-
This will add some Build IDs to a Task Queue, and will also run Workers with those versions to show how you can
11-
mark add versions, mark them as compatible (or not) with one another, and run Workers at specific versions. You'll
12-
see that only the workers only process Workflow Tasks assigned versions they are compatible with.
12+
1) Run a [Temporal service](https://github.com/temporalio/samples-python/tree/main/#how-to-use).
13+
Ensure that you're using at least Server version 1.28.0 (CLI version 1.4.0).
14+
15+
2) Start the main application (this will guide you through the sample):
16+
```bash
17+
uv run worker_versioning/app.py
18+
```
19+
20+
3) Follow the prompts to start workers in separate terminals:
21+
- When prompted, run: `uv run worker_versioning/workerv1.py`
22+
- When prompted, run: `uv run worker_versioning/workerv1_1.py`
23+
- When prompted, run: `uv run worker_versioning/workerv2.py`
24+
25+
The sample will show how auto-upgrading workflows migrate to newer workers while pinned workflows
26+
remain on their original version.

worker_versioning/activities.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
1+
from dataclasses import dataclass
2+
13
from temporalio import activity
24

35

6+
@dataclass
7+
class IncompatibleActivityInput:
8+
"""Input for the incompatible activity."""
9+
10+
called_by: str
11+
more_data: str
12+
13+
414
@activity.defn
5-
async def greet(inp: str) -> str:
6-
return f"Hi from {inp}"
15+
async def some_activity(called_by: str) -> str:
16+
"""Basic activity for the workflow."""
17+
return f"some_activity called by {called_by}"
718

819

920
@activity.defn
10-
async def super_greet(inp: str, some_number: int) -> str:
11-
return f"Hi from {inp} with {some_number}"
21+
async def some_incompatible_activity(input_data: IncompatibleActivityInput) -> str:
22+
"""Incompatible activity that takes different input."""
23+
return f"some_incompatible_activity called by {input_data.called_by} with {input_data.more_data}"

worker_versioning/app.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""Main application for the worker versioning sample."""
2+
3+
import asyncio
4+
import logging
5+
import uuid
6+
7+
from temporalio.client import Client
8+
9+
TASK_QUEUE = "worker-versioning"
10+
DEPLOYMENT_NAME = "my-deployment"
11+
12+
logging.basicConfig(level=logging.INFO)
13+
14+
15+
async def main() -> None:
16+
client = await Client.connect("localhost:7233")
17+
18+
# Wait for v1 worker and set as current version
19+
logging.info(
20+
"Waiting for v1 worker to appear. Run `python worker_versioning/workerv1.py` in another terminal"
21+
)
22+
await wait_for_worker_and_make_current(client, "1.0")
23+
24+
# Start auto-upgrading and pinned workflows. Importantly, note that when we start the workflows,
25+
# we are using a workflow type name which does *not* include the version number. We defined them
26+
# with versioned names so we could show changes to the code, but here when the client invokes
27+
# them, we're demonstrating that the client remains version-agnostic.
28+
auto_upgrade_workflow_id = "worker-versioning-versioning-autoupgrade_" + str(
29+
uuid.uuid4()
30+
)
31+
auto_upgrade_execution = await client.start_workflow(
32+
"AutoUpgrading",
33+
id=auto_upgrade_workflow_id,
34+
task_queue=TASK_QUEUE,
35+
)
36+
37+
pinned_workflow_id = "worker-versioning-versioning-pinned_" + str(uuid.uuid4())
38+
pinned_execution = await client.start_workflow(
39+
"Pinned",
40+
id=pinned_workflow_id,
41+
task_queue=TASK_QUEUE,
42+
)
43+
44+
logging.info("Started auto-upgrading workflow: %s", auto_upgrade_execution.id)
45+
logging.info("Started pinned workflow: %s", pinned_execution.id)
46+
47+
# Signal both workflows a few times to drive them
48+
await advance_workflows(auto_upgrade_execution, pinned_execution)
49+
50+
# Now wait for the v1.1 worker to appear and become current
51+
logging.info(
52+
"Waiting for v1.1 worker to appear. Run `python worker_versioning/workerv1_1.py` in another terminal"
53+
)
54+
await wait_for_worker_and_make_current(client, "1.1")
55+
56+
# Once it has, we will continue to advance the workflows.
57+
# The auto-upgrade workflow will now make progress on the new worker, while the pinned one will
58+
# keep progressing on the old worker.
59+
await advance_workflows(auto_upgrade_execution, pinned_execution)
60+
61+
# Finally we'll start the v2 worker, and again it'll become the new current version
62+
logging.info(
63+
"Waiting for v2 worker to appear. Run `python worker_versioning/workerv2.py` in another terminal"
64+
)
65+
await wait_for_worker_and_make_current(client, "2.0")
66+
67+
# Once it has we'll start one more new workflow, another pinned one, to demonstrate that new
68+
# pinned workflows start on the current version.
69+
pinned_workflow_2_id = "worker-versioning-versioning-pinned-2_" + str(uuid.uuid4())
70+
pinned_execution_2 = await client.start_workflow(
71+
"Pinned",
72+
id=pinned_workflow_2_id,
73+
task_queue=TASK_QUEUE,
74+
)
75+
logging.info("Started pinned workflow v2: %s", pinned_execution_2.id)
76+
77+
# Now we'll conclude all workflows. You should be able to see in your server UI that the pinned
78+
# workflow always stayed on 1.0, while the auto-upgrading workflow migrated.
79+
for handle in [auto_upgrade_execution, pinned_execution, pinned_execution_2]:
80+
await handle.signal("do_next_signal", "conclude")
81+
await handle.result()
82+
83+
logging.info("All workflows completed")
84+
85+
86+
async def advance_workflows(auto_upgrade_execution, pinned_execution):
87+
"""Signal both workflows a few times to drive them."""
88+
for i in range(3):
89+
await auto_upgrade_execution.signal("do_next_signal", "do-activity")
90+
await pinned_execution.signal("do_next_signal", "some-signal")
91+
92+
93+
async def wait_for_worker_and_make_current(client: Client, build_id: str) -> None:
94+
import temporalio.api.workflowservice.v1 as wsv1
95+
from temporalio.common import WorkerDeploymentVersion
96+
97+
target_version = WorkerDeploymentVersion(
98+
deployment_name=DEPLOYMENT_NAME, build_id=build_id
99+
)
100+
101+
while True:
102+
try:
103+
describe_request = wsv1.DescribeWorkerDeploymentRequest(
104+
namespace=client.namespace,
105+
deployment_name=DEPLOYMENT_NAME,
106+
)
107+
response = await client.workflow_service.describe_worker_deployment(
108+
describe_request
109+
)
110+
111+
for version_summary in response.worker_deployment_info.version_summaries:
112+
if (
113+
version_summary.deployment_version.deployment_name
114+
== target_version.deployment_name
115+
and version_summary.deployment_version.build_id
116+
== target_version.build_id
117+
):
118+
break
119+
else:
120+
await asyncio.sleep(1)
121+
continue
122+
123+
break
124+
125+
except Exception:
126+
await asyncio.sleep(1)
127+
continue
128+
129+
# Once the version is available, set it as current
130+
set_request = wsv1.SetWorkerDeploymentCurrentVersionRequest(
131+
namespace=client.namespace,
132+
deployment_name=DEPLOYMENT_NAME,
133+
build_id=target_version.build_id,
134+
)
135+
await client.workflow_service.set_worker_deployment_current_version(set_request)
136+
137+
138+
if __name__ == "__main__":
139+
asyncio.run(main())

worker_versioning/example.py

Lines changed: 0 additions & 120 deletions
This file was deleted.

worker_versioning/workerv1.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Worker v1 for the worker versioning sample."""
2+
3+
import asyncio
4+
import logging
5+
6+
from temporalio.client import Client
7+
from temporalio.common import WorkerDeploymentVersion
8+
from temporalio.worker import Worker, WorkerDeploymentConfig
9+
10+
from worker_versioning.activities import some_activity, some_incompatible_activity
11+
from worker_versioning.app import DEPLOYMENT_NAME, TASK_QUEUE
12+
from worker_versioning.workflows import AutoUpgradingWorkflowV1, PinnedWorkflowV1
13+
14+
logging.basicConfig(level=logging.INFO)
15+
16+
17+
async def main() -> None:
18+
"""Run worker v1."""
19+
client = await Client.connect("localhost:7233")
20+
21+
# Create worker v1
22+
worker = Worker(
23+
client,
24+
task_queue=TASK_QUEUE,
25+
workflows=[AutoUpgradingWorkflowV1, PinnedWorkflowV1],
26+
activities=[some_activity, some_incompatible_activity],
27+
deployment_config=WorkerDeploymentConfig(
28+
version=WorkerDeploymentVersion(
29+
deployment_name=DEPLOYMENT_NAME, build_id="1.0"
30+
),
31+
use_worker_versioning=True,
32+
),
33+
)
34+
35+
logging.info("Starting worker v1 (build 1.0)")
36+
await worker.run()
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())

0 commit comments

Comments
 (0)