Skip to content

Commit 0c66e2f

Browse files
authored
Add nexus multiple args sample (#244)
* Add nexus multiple args sample * Autoformat * Ugh, StrEnum only 3.11 * Remove pointless comment * Add snipsync
1 parent 2f36e1a commit 0c66e2f

File tree

15 files changed

+317
-1
lines changed

15 files changed

+317
-1
lines changed

nexus_multiple_args/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
This sample shows how to map a Nexus operation to a handler workflow that takes multiple input arguments. The Nexus operation receives a single input object but unpacks it into multiple arguments when starting the workflow.
2+
3+
### Sample directory structure
4+
5+
- [service.py](./service.py) - shared Nexus service definition
6+
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code
7+
- [handler](./handler) - Nexus operation handlers, together with a workflow used by the Nexus operation, and a worker that polls for both workflow and Nexus tasks.
8+
9+
### Instructions
10+
11+
Start a Temporal server. (See the main samples repo [README](../README.md)).
12+
13+
Run the following:
14+
15+
```
16+
temporal operator namespace create --namespace nexus-multiple-args-handler-namespace
17+
temporal operator namespace create --namespace nexus-multiple-args-caller-namespace
18+
19+
temporal operator nexus endpoint create \
20+
--name nexus-multiple-args-nexus-endpoint \
21+
--target-namespace nexus-multiple-args-handler-namespace \
22+
--target-task-queue nexus-multiple-args-handler-task-queue
23+
```
24+
25+
In one terminal, run the Temporal worker in the handler namespace:
26+
```
27+
uv run nexus_multiple_args/handler/worker.py
28+
```
29+
30+
In another terminal, run the Temporal worker in the caller namespace and start the caller workflow:
31+
```
32+
uv run nexus_multiple_args/caller/app.py
33+
```

nexus_multiple_args/__init__.py

Whitespace-only changes.

nexus_multiple_args/caller/__init__.py

Whitespace-only changes.

nexus_multiple_args/caller/app.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import asyncio
2+
import uuid
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker
7+
8+
from nexus_multiple_args.caller.workflows import CallerWorkflow
9+
10+
NAMESPACE = "nexus-multiple-args-caller-namespace"
11+
TASK_QUEUE = "nexus-multiple-args-caller-task-queue"
12+
13+
14+
async def execute_caller_workflow(
15+
client: Optional[Client] = None,
16+
) -> tuple[str, str]:
17+
client = client or await Client.connect(
18+
"localhost:7233",
19+
namespace=NAMESPACE,
20+
)
21+
22+
async with Worker(
23+
client,
24+
task_queue=TASK_QUEUE,
25+
workflows=[CallerWorkflow],
26+
):
27+
# Execute workflow with English language
28+
result1 = await client.execute_workflow(
29+
CallerWorkflow.run,
30+
args=["Nexus", "en"],
31+
id=str(uuid.uuid4()),
32+
task_queue=TASK_QUEUE,
33+
)
34+
35+
# Execute workflow with Spanish language
36+
result2 = await client.execute_workflow(
37+
CallerWorkflow.run,
38+
args=["Nexus", "es"],
39+
id=str(uuid.uuid4()),
40+
task_queue=TASK_QUEUE,
41+
)
42+
43+
return result1, result2
44+
45+
46+
if __name__ == "__main__":
47+
loop = asyncio.new_event_loop()
48+
try:
49+
results = loop.run_until_complete(execute_caller_workflow())
50+
for result in results:
51+
print(result)
52+
except KeyboardInterrupt:
53+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from temporalio import workflow
2+
3+
with workflow.unsafe.imports_passed_through():
4+
from nexus_multiple_args.service import HelloInput, MyNexusService
5+
6+
NEXUS_ENDPOINT = "nexus-multiple-args-nexus-endpoint"
7+
8+
9+
# This is a workflow that calls a nexus operation with multiple arguments.
10+
@workflow.defn
11+
class CallerWorkflow:
12+
# An __init__ method is always optional on a workflow class. Here we use it to set the
13+
# nexus client, but that could alternatively be done in the run method.
14+
def __init__(self):
15+
self.nexus_client = workflow.create_nexus_client(
16+
service=MyNexusService,
17+
endpoint=NEXUS_ENDPOINT,
18+
)
19+
20+
# The workflow run method demonstrates calling a nexus operation with multiple arguments
21+
# packed into an input object.
22+
@workflow.run
23+
async def run(self, name: str, language: str) -> str:
24+
# Start the nexus operation and wait for the result in one go, using execute_operation.
25+
# The multiple arguments (name and language) are packed into a HelloInput object.
26+
result = await self.nexus_client.execute_operation(
27+
MyNexusService.hello,
28+
HelloInput(name=name, language=language),
29+
)
30+
return result.message

nexus_multiple_args/handler/__init__.py

Whitespace-only changes.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from __future__ import annotations
2+
3+
import uuid
4+
5+
import nexusrpc
6+
from temporalio import nexus
7+
8+
from nexus_multiple_args.handler.workflows import HelloHandlerWorkflow
9+
from nexus_multiple_args.service import HelloInput, HelloOutput, MyNexusService
10+
11+
12+
# @@@SNIPSTART samples-python-nexus-handler-multiargs
13+
@nexusrpc.handler.service_handler(service=MyNexusService)
14+
class MyNexusServiceHandler:
15+
"""
16+
Service handler that demonstrates multiple argument handling in Nexus operations.
17+
"""
18+
19+
# This is a nexus operation that is backed by a Temporal workflow.
20+
# The key feature here is that it demonstrates how to map a single input object
21+
# (HelloInput) to a workflow that takes multiple individual arguments.
22+
@nexus.workflow_run_operation
23+
async def hello(
24+
self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput
25+
) -> nexus.WorkflowHandle[HelloOutput]:
26+
"""
27+
Start a workflow with multiple arguments unpacked from the input object.
28+
"""
29+
return await ctx.start_workflow(
30+
HelloHandlerWorkflow.run,
31+
args=[
32+
input.name, # First argument: name
33+
input.language, # Second argument: language
34+
],
35+
id=str(uuid.uuid4()),
36+
)
37+
38+
39+
# @@@SNIPEND
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
import logging
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker
7+
8+
from nexus_multiple_args.handler.service_handler import MyNexusServiceHandler
9+
from nexus_multiple_args.handler.workflows import HelloHandlerWorkflow
10+
11+
interrupt_event = asyncio.Event()
12+
13+
NAMESPACE = "nexus-multiple-args-handler-namespace"
14+
TASK_QUEUE = "nexus-multiple-args-handler-task-queue"
15+
16+
17+
async def main(client: Optional[Client] = None):
18+
logging.basicConfig(level=logging.INFO)
19+
20+
client = client or await Client.connect(
21+
"localhost:7233",
22+
namespace=NAMESPACE,
23+
)
24+
25+
# Start the worker, passing the Nexus service handler instance, in addition to the
26+
# workflow classes that are started by your nexus operations, and any activities
27+
# needed. This Worker will poll for both workflow tasks and Nexus tasks.
28+
async with Worker(
29+
client,
30+
task_queue=TASK_QUEUE,
31+
workflows=[HelloHandlerWorkflow],
32+
nexus_service_handlers=[MyNexusServiceHandler()],
33+
):
34+
logging.info("Worker started, ctrl+c to exit")
35+
await interrupt_event.wait()
36+
logging.info("Shutting down")
37+
38+
39+
if __name__ == "__main__":
40+
loop = asyncio.new_event_loop()
41+
try:
42+
loop.run_until_complete(main())
43+
except KeyboardInterrupt:
44+
interrupt_event.set()
45+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from temporalio import workflow
2+
3+
with workflow.unsafe.imports_passed_through():
4+
from nexus_multiple_args.service import HelloOutput
5+
6+
7+
# This is the workflow that is started by the `hello` nexus operation.
8+
# It demonstrates handling multiple arguments passed from the Nexus service.
9+
@workflow.defn
10+
class HelloHandlerWorkflow:
11+
@workflow.run
12+
async def run(self, name: str, language: str) -> HelloOutput:
13+
"""
14+
Handle the hello workflow with multiple arguments.
15+
16+
This method receives the individual arguments (name and language)
17+
that were unpacked from the HelloInput in the service handler.
18+
"""
19+
if language == "en":
20+
message = f"Hello {name} 👋"
21+
elif language == "fr":
22+
message = f"Bonjour {name} 👋"
23+
elif language == "de":
24+
message = f"Hallo {name} 👋"
25+
elif language == "es":
26+
message = f"¡Hola! {name} 👋"
27+
elif language == "tr":
28+
message = f"Merhaba {name} 👋"
29+
else:
30+
raise ValueError(f"Unsupported language: {language}")
31+
32+
return HelloOutput(message=message)

nexus_multiple_args/service.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""
2+
This is a Nexus service definition that demonstrates multiple argument handling.
3+
4+
A service definition defines a Nexus service as a named collection of operations, each
5+
with input and output types. It does not implement operation handling: see the service
6+
handler and operation handlers in nexus_multiple_args.handler.service_handler for that.
7+
8+
A Nexus service definition is used by Nexus callers (e.g. a Temporal workflow) to create
9+
type-safe clients, and it is used by Nexus handlers to validate that they implement
10+
correctly-named operation handlers with the correct input and output types.
11+
12+
The service defined in this file features one operation: hello, where hello
13+
demonstrates handling multiple arguments through a single input object.
14+
"""
15+
16+
from dataclasses import dataclass
17+
18+
import nexusrpc
19+
20+
21+
@dataclass
22+
class HelloInput:
23+
name: str
24+
language: str
25+
26+
27+
@dataclass
28+
class HelloOutput:
29+
message: str
30+
31+
32+
@nexusrpc.service
33+
class MyNexusService:
34+
hello: nexusrpc.Operation[HelloInput, HelloOutput]

0 commit comments

Comments
 (0)