Skip to content

Commit e4f2442

Browse files
authored
nexus sync operations (query & update) (#252)
1 parent 326204a commit e4f2442

File tree

16 files changed

+432
-22
lines changed

16 files changed

+432
-22
lines changed

hello_nexus/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ call the operations from a workflow.
1212

1313
Start a Temporal server. (See the main samples repo [README](../README.md)).
1414

15-
Run the following:
15+
Run the following to create the caller and handler namespaces, and the Nexus endpoint:
1616

1717
```
1818
temporal operator namespace create --namespace hello-nexus-basic-handler-namespace

hello_nexus/service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
type-safe clients, and it is used by Nexus handlers to validate that they implement
1010
correctly-named operation handlers with the correct input and output types.
1111
12-
The service defined in this file features two operations: echo and hello.
12+
The service defined in this file exposes two operations: my_sync_operation and
13+
my_workflow_run_operation.
1314
"""
1415

1516
from dataclasses import dataclass

message_passing/introduction/starter.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
GetLanguagesInput,
1010
GreetingWorkflow,
1111
Language,
12+
SetLanguageInput,
1213
)
1314

1415

@@ -28,20 +29,20 @@ async def main(client: Optional[Client] = None):
2829

2930
# 👉 Execute an Update
3031
previous_language = await wf_handle.execute_update(
31-
GreetingWorkflow.set_language, Language.CHINESE
32+
GreetingWorkflow.set_language, SetLanguageInput(language=Language.CHINESE)
3233
)
33-
current_language = await wf_handle.query(GreetingWorkflow.get_language)
34-
print(f"language changed: {previous_language.name} -> {current_language.name}")
34+
assert await wf_handle.query(GreetingWorkflow.get_language) == Language.CHINESE
35+
print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}")
3536

3637
# 👉 Start an Update and then wait for it to complete
3738
update_handle = await wf_handle.start_update(
3839
GreetingWorkflow.set_language_using_activity,
39-
Language.ARABIC,
40+
SetLanguageInput(language=Language.ARABIC),
4041
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
4142
)
4243
previous_language = await update_handle.result()
43-
current_language = await wf_handle.query(GreetingWorkflow.get_language)
44-
print(f"language changed: {previous_language.name} -> {current_language.name}")
44+
assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ARABIC
45+
print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}")
4546

4647
# 👉 Send a Signal
4748
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))

message_passing/introduction/workflows.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ class GetLanguagesInput:
1616
include_unsupported: bool
1717

1818

19+
@dataclass
20+
class SetLanguageInput:
21+
language: Language
22+
23+
1924
@dataclass
2025
class ApproveInput:
2126
name: str
@@ -74,29 +79,29 @@ def approve(self, input: ApproveInput) -> None:
7479
self.approver_name = input.name
7580

7681
@workflow.update
77-
def set_language(self, language: Language) -> Language:
82+
def set_language(self, input: SetLanguageInput) -> Language:
7883
# 👉 An Update handler can mutate the Workflow state and return a value.
79-
previous_language, self.language = self.language, language
84+
previous_language, self.language = self.language, input.language
8085
return previous_language
8186

8287
@set_language.validator
83-
def validate_language(self, language: Language) -> None:
84-
if language not in self.greetings:
88+
def validate_language(self, input: SetLanguageInput) -> None:
89+
if input.language not in self.greetings:
8590
# 👉 In an Update validator you raise any exception to reject the Update.
86-
raise ValueError(f"{language.name} is not supported")
91+
raise ValueError(f"{input.language.name} is not supported")
8792

8893
@workflow.update
89-
async def set_language_using_activity(self, language: Language) -> Language:
94+
async def set_language_using_activity(self, input: SetLanguageInput) -> Language:
9095
# 👉 This update handler is async, so it can execute an activity.
91-
if language not in self.greetings:
96+
if input.language not in self.greetings:
9297
# 👉 We use a lock so that, if this handler is executed multiple
9398
# times, each execution can schedule the activity only when the
9499
# previously scheduled activity has completed. This ensures that
95100
# multiple calls to set_language are processed in order.
96101
async with self.lock:
97102
greeting = await workflow.execute_activity(
98103
call_greeting_service,
99-
language,
104+
input.language,
100105
start_to_close_timeout=timedelta(seconds=10),
101106
)
102107
# 👉 The requested language might not be supported by the remote
@@ -108,10 +113,10 @@ async def set_language_using_activity(self, language: Language) -> Language:
108113
# this purpose.)
109114
if greeting is None:
110115
raise ApplicationError(
111-
f"Greeting service does not support {language.name}"
116+
f"Greeting service does not support {input.language.name}"
112117
)
113-
self.greetings[language] = greeting
114-
previous_language, self.language = self.language, language
118+
self.greetings[input.language] = greeting
119+
previous_language, self.language = self.language, input.language
115120
return previous_language
116121

117122
@workflow.query

nexus_sync_operations/README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
This sample shows how to create a Nexus service that is backed by a long-running workflow and
2+
exposes operations that execute updates and queries against that workflow. The long-running
3+
workflow, and the updates/queries are private implementation detail of the nexus service: the caller
4+
does not know how the operations are implemented.
5+
6+
### Sample directory structure
7+
8+
- [service.py](./service.py) - shared Nexus service definition
9+
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code
10+
- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks.
11+
12+
13+
### Instructions
14+
15+
Start a Temporal server. (See the main samples repo [README](../README.md)).
16+
17+
Run the following to create the caller and handler namespaces, and the Nexus endpoint:
18+
19+
```
20+
temporal operator namespace create --namespace nexus-sync-operations-handler-namespace
21+
temporal operator namespace create --namespace nexus-sync-operations-caller-namespace
22+
23+
temporal operator nexus endpoint create \
24+
--name nexus-sync-operations-nexus-endpoint \
25+
--target-namespace nexus-sync-operations-handler-namespace \
26+
--target-task-queue nexus-sync-operations-handler-task-queue \
27+
--description-file nexus_sync_operations/endpoint_description.md
28+
```
29+
30+
In one terminal, run the Temporal worker in the handler namespace:
31+
```
32+
uv run nexus_sync_operations/handler/worker.py
33+
```
34+
35+
In another terminal, run the Temporal worker in the caller namespace and start the caller
36+
workflow:
37+
```
38+
uv run nexus_sync_operations/caller/app.py
39+
```

nexus_sync_operations/__init__.py

Whitespace-only changes.

nexus_sync_operations/caller/__init__.py

Whitespace-only changes.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import asyncio
2+
import uuid
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker
7+
8+
from nexus_sync_operations.caller.workflows import CallerWorkflow
9+
10+
NAMESPACE = "nexus-sync-operations-caller-namespace"
11+
TASK_QUEUE = "nexus-sync-operations-caller-task-queue"
12+
13+
14+
async def execute_caller_workflow(
15+
client: Optional[Client] = None,
16+
) -> None:
17+
client = client or await Client.connect(
18+
"localhost:7233",
19+
namespace=NAMESPACE,
20+
)
21+
22+
async with Worker(
23+
client,
24+
task_queue=TASK_QUEUE,
25+
workflows=[CallerWorkflow],
26+
):
27+
log = await client.execute_workflow(
28+
CallerWorkflow.run,
29+
id=str(uuid.uuid4()),
30+
task_queue=TASK_QUEUE,
31+
)
32+
for line in log:
33+
print(line)
34+
35+
36+
if __name__ == "__main__":
37+
loop = asyncio.new_event_loop()
38+
try:
39+
loop.run_until_complete(execute_caller_workflow())
40+
except KeyboardInterrupt:
41+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
"""
2+
This is a workflow that calls nexus operations. The caller does not have information about how these
3+
operations are implemented by the nexus service.
4+
"""
5+
6+
from temporalio import workflow
7+
8+
from message_passing.introduction import Language
9+
from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput
10+
11+
with workflow.unsafe.imports_passed_through():
12+
from nexus_sync_operations.service import GreetingService
13+
14+
NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint"
15+
16+
17+
@workflow.defn
18+
class CallerWorkflow:
19+
@workflow.run
20+
async def run(self) -> list[str]:
21+
log = []
22+
nexus_client = workflow.create_nexus_client(
23+
service=GreetingService,
24+
endpoint=NEXUS_ENDPOINT,
25+
)
26+
27+
# Get supported languages
28+
supported_languages = await nexus_client.execute_operation(
29+
GreetingService.get_languages, GetLanguagesInput(include_unsupported=False)
30+
)
31+
log.append(f"supported languages: {supported_languages}")
32+
33+
# Set language
34+
previous_language = await nexus_client.execute_operation(
35+
GreetingService.set_language,
36+
SetLanguageInput(language=Language.ARABIC),
37+
)
38+
assert (
39+
await nexus_client.execute_operation(GreetingService.get_language, None)
40+
== Language.ARABIC
41+
)
42+
log.append(
43+
f"language changed: {previous_language.name} -> {Language.ARABIC.name}"
44+
)
45+
46+
return log
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py)
2+
- operation: `get_languages`
3+
- operation: `get_language`
4+
- operation: `set_language`

0 commit comments

Comments
 (0)