Skip to content

Commit f4f9589

Browse files
committed
Nexus samples
1 parent 5d64065 commit f4f9589

23 files changed

+2971
-2000
lines changed

hello_nexus/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Nexus
2+
3+
Temporal Nexus is a feature of the Temporal platform designed to connect durable executions across team, namespace,
4+
region, and cloud boundaries. It promotes a more modular architecture for sharing a subset of your team’s capabilities
5+
via well-defined service API contracts for other teams to use. These can abstract underlying Temporal primitives such as
6+
Workflows, or execute arbitrary code.
7+
8+
Learn more at [temporal.io/nexus](https://temporal.io/nexus).
9+
10+
The samples in this directory form an introduction to Nexus.
11+
12+
### Samples
13+
14+
- [basic](./basic) - Nexus service definition, operation handlers, and calling workflows.
15+
- [without_service_definition](./without_service_definition) - A Nexus service implementation without a service definition

hello_nexus/__init__.py

Whitespace-only changes.

hello_nexus/basic/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
This sample shows how to define a Nexus service, implement the operation handlers, and
2+
call the operations from a workflow.
3+
4+
### Sample directory structure
5+
6+
- [service.py](./service.py) - shared Nexus service definition
7+
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code
8+
- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow and Nexus tasks.
9+
10+
11+
### Instructions
12+
13+
Start a Temporal server.
14+
15+
Run the following:
16+
17+
```
18+
temporal operator namespace create --namespace my-target-namespace
19+
temporal operator namespace create --namespace my-caller-namespace
20+
21+
temporal operator nexus endpoint create \
22+
--name my-nexus-endpoint \
23+
--target-namespace my-target-namespace \
24+
--target-task-queue my-target-task-queue \
25+
--description-file ./hello_nexus/basic/service_description.md
26+
```
27+
28+
In one terminal, run the Temporal worker in the handler namespace:
29+
```
30+
uv run hello_nexus/basic/handler/worker.py
31+
```
32+
33+
In another terminal, run the Temporal worker in the caller namespace and start the caller
34+
workflow:
35+
```
36+
uv run hello_nexus/basic/caller/app.py
37+
```

hello_nexus/basic/__init__.py

Whitespace-only changes.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from . import app as app

hello_nexus/basic/caller/app.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
import uuid
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
7+
8+
from hello_nexus.basic.caller.workflows import CallerWorkflow
9+
from hello_nexus.basic.service import MyOutput
10+
11+
NAMESPACE = "my-caller-namespace"
12+
TASK_QUEUE = "my-caller-task-queue"
13+
14+
15+
async def execute_caller_workflow(
16+
client: Optional[Client] = None,
17+
) -> tuple[MyOutput, MyOutput]:
18+
client = client or await Client.connect(
19+
"localhost:7233",
20+
namespace=NAMESPACE,
21+
)
22+
23+
async with Worker(
24+
client,
25+
task_queue=TASK_QUEUE,
26+
workflows=[CallerWorkflow],
27+
# TODO(dan): isinstance(op, nexusrpc.contract.Operation) is failing under the
28+
# sandbox in temporalio/worker/_interceptor.py
29+
workflow_runner=UnsandboxedWorkflowRunner(),
30+
):
31+
return await client.execute_workflow(
32+
CallerWorkflow.run,
33+
arg="world",
34+
id=str(uuid.uuid4()),
35+
task_queue=TASK_QUEUE,
36+
)
37+
38+
39+
if __name__ == "__main__":
40+
loop = asyncio.new_event_loop()
41+
try:
42+
results = loop.run_until_complete(execute_caller_workflow())
43+
for output in results:
44+
print(output.message)
45+
except KeyboardInterrupt:
46+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from temporalio import workflow
2+
from temporalio.workflow import NexusServiceClient
3+
4+
from hello_nexus.basic.service import MyInput, MyNexusService, MyOutput
5+
6+
NEXUS_ENDPOINT = "my-nexus-endpoint"
7+
8+
9+
@workflow.defn
10+
class CallerWorkflow:
11+
def __init__(self):
12+
self.nexus_service_client = NexusServiceClient(
13+
MyNexusService,
14+
endpoint=NEXUS_ENDPOINT,
15+
)
16+
17+
@workflow.run
18+
async def run(self, name: str) -> tuple[MyOutput, MyOutput]:
19+
sync_result: MyOutput = await self.nexus_service_client.execute_operation(
20+
MyNexusService.my_sync_operation,
21+
MyInput(name),
22+
)
23+
wf_result: MyOutput = await self.nexus_service_client.execute_operation(
24+
MyNexusService.my_workflow_run_operation,
25+
MyInput(name),
26+
)
27+
return sync_result, wf_result

hello_nexus/basic/handler/__init__.py

Whitespace-only changes.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from __future__ import annotations
2+
3+
4+
class MyDBClient:
5+
"""
6+
This class represents a resource that your Nexus operation handlers may need when they
7+
are handling Nexus requests, but which is only available when the Nexus worker is
8+
started. Notice that:
9+
10+
(a) The user's service handler class __init__ constructor takes a MyDBClient instance
11+
(see hello_nexus.handler.MyNexusService)
12+
13+
(b) The user is responsible for instantiating the service handler class when they
14+
start the worker (see hello_nexus.handler.worker), so they can pass any
15+
necessary resources (such as this database client) to the service handler.
16+
"""
17+
18+
@classmethod
19+
def connect(cls) -> MyDBClient:
20+
return cls()
21+
22+
def execute(self, query: str) -> str:
23+
return "query-result"
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""
2+
This file demonstrates how to define operation handlers by implementing the `start`
3+
method only, using the "shorthand" decorators sync_operation_handler and
4+
workflow_run_operation_handler.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import uuid
10+
11+
import temporalio.common
12+
import temporalio.nexus.handler
13+
from nexusrpc.handler import (
14+
StartOperationContext,
15+
service_handler,
16+
sync_operation_handler,
17+
)
18+
from temporalio.client import WorkflowHandle
19+
20+
from hello_nexus.basic.handler.db_client import MyDBClient
21+
from hello_nexus.basic.handler.workflows import WorkflowStartedByNexusOperation
22+
from hello_nexus.basic.service import MyInput, MyNexusService, MyOutput
23+
24+
25+
@service_handler(service=MyNexusService)
26+
class MyNexusServiceHandler:
27+
# You can create an __init__ method accepting what is needed by your operation
28+
# handlers to handle requests. You typically instantiate your service handler class
29+
# when starting your worker. See hello_nexus/basic/handler/worker.py.
30+
def __init__(self, connected_db_client: MyDBClient):
31+
# `connected_db_client` is intended as an example of something that might be
32+
# required by your operation handlers when handling requests, but is only
33+
# available at worker-start time.
34+
self.connected_db_client = connected_db_client
35+
36+
# This is a nexus operation that is backed by a Temporal workflow. The start method
37+
# starts a workflow, and returns a nexus operation token that the handler can use to
38+
# obtain a workflow handle (for example if a cancel request is subsequently sent by
39+
# the caller). The Temporal server takes care of delivering the workflow result to the
40+
# calling workflow. The task queue defaults to the task queue being used by the Nexus
41+
# worker.
42+
@temporalio.nexus.handler.workflow_run_operation_handler
43+
async def my_workflow_run_operation(
44+
self, ctx: StartOperationContext, input: MyInput
45+
) -> WorkflowHandle[WorkflowStartedByNexusOperation, MyOutput]:
46+
# You could use self.connected_db_client here.
47+
return await temporalio.nexus.handler.start_workflow(
48+
ctx,
49+
WorkflowStartedByNexusOperation.run,
50+
input,
51+
id=str(uuid.uuid4()),
52+
)
53+
54+
# This is a sync operation. That means that unlike the workflow run operation above,
55+
# in this case the `start` method returns the final operation result. Sync operations
56+
# are free to make arbitrary network calls, or perform CPU-bound computations. Total
57+
# execution duration must not exceed 10s.
58+
@sync_operation_handler
59+
async def my_sync_operation(
60+
self, ctx: StartOperationContext, input: MyInput
61+
) -> MyOutput:
62+
# You could use self.connected_db_client here.
63+
return MyOutput(message=f"Hello {input.name} from sync operation!")

0 commit comments

Comments
 (0)