Skip to content

Commit 69751bb

Browse files
committed
poe lint/fmt
1 parent eb92f48 commit 69751bb

File tree

6 files changed

+78
-30
lines changed

6 files changed

+78
-30
lines changed

resource_locking/lock_manager_workflow.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,22 @@
44

55
from temporalio import workflow
66

7-
from resource_locking.shared import (
8-
AcquireRequest,
9-
AcquireResponse,
10-
)
7+
from resource_locking.shared import AcquireRequest, AcquireResponse
8+
119

1210
# Internal to this workflow, we'll associate randomly generated release signal names with each acquire request.
1311
@dataclass
1412
class InternalAcquireRequest(AcquireRequest):
1513
release_signal: Optional[str]
1614

15+
1716
@dataclass
1817
class LockManagerWorkflowInput:
1918
# Key is resource, value is current lock holder for the resource (None if not locked)
2019
resources: dict[str, Optional[InternalAcquireRequest]]
2120
waiters: list[InternalAcquireRequest]
2221

22+
2323
@workflow.defn
2424
class LockManagerWorkflow:
2525
@workflow.init
@@ -28,7 +28,7 @@ def __init__(self, input: LockManagerWorkflowInput):
2828
self.waiters = input.waiters
2929
self.release_signal_to_resource: dict[str, str] = {}
3030
for resource, holder in self.resources.items():
31-
if holder is not None:
31+
if holder is not None and holder.release_signal is not None:
3232
self.release_signal_to_resource[holder.release_signal] = resource
3333

3434
@workflow.signal
@@ -47,7 +47,9 @@ async def add_resources(self, resources: list[str]):
4747

4848
@workflow.signal
4949
async def acquire_resource(self, request: AcquireRequest):
50-
internal_request = InternalAcquireRequest(workflow_id=request.workflow_id, release_signal=None)
50+
internal_request = InternalAcquireRequest(
51+
workflow_id=request.workflow_id, release_signal=None
52+
)
5153

5254
for resource, holder in self.resources.items():
5355
# Naively give out the first free resource, if we have one
@@ -61,7 +63,9 @@ async def acquire_resource(self, request: AcquireRequest):
6163
f"workflow_id={request.workflow_id} is waiting for a resource"
6264
)
6365

64-
async def allocate_resource(self, resource: str, internal_request: InternalAcquireRequest):
66+
async def allocate_resource(
67+
self, resource: str, internal_request: InternalAcquireRequest
68+
):
6569
self.resources[resource] = internal_request
6670
workflow.logger.info(
6771
f"workflow_id={internal_request.workflow_id} acquired resource {resource}"
@@ -70,12 +74,19 @@ async def allocate_resource(self, resource: str, internal_request: InternalAcqui
7074
self.release_signal_to_resource[internal_request.release_signal] = resource
7175

7276
requester = workflow.get_external_workflow_handle(internal_request.workflow_id)
73-
await requester.signal("assign_resource", AcquireResponse(release_signal_name=internal_request.release_signal, resource=resource))
77+
await requester.signal(
78+
"assign_resource",
79+
AcquireResponse(
80+
release_signal_name=internal_request.release_signal, resource=resource
81+
),
82+
)
7483

7584
@workflow.signal(dynamic=True)
7685
async def release_resource(self, signal_name, *args):
7786
if not signal_name in self.release_signal_to_resource:
78-
workflow.logger.warning(f"Ignoring unknown signal: {signal_name} was not a valid release signal.")
87+
workflow.logger.warning(
88+
f"Ignoring unknown signal: {signal_name} was not a valid release signal."
89+
)
7990
return
8091

8192
resource = self.release_signal_to_resource[signal_name]
@@ -85,6 +96,7 @@ async def release_resource(self, signal_name, *args):
8596
workflow.logger.warning(
8697
f"Ignoring request to release resource that is not locked: {resource}"
8798
)
99+
return
88100

89101
# Remove the current holder
90102
workflow.logger.info(
@@ -110,7 +122,9 @@ async def run(self, _: LockManagerWorkflowInput) -> None:
110122
timeout=timedelta(hours=12),
111123
)
112124

113-
workflow.continue_as_new(LockManagerWorkflowInput(
114-
resources=self.resources,
115-
waiters=self.waiters,
116-
))
125+
workflow.continue_as_new(
126+
LockManagerWorkflowInput(
127+
resources=self.resources,
128+
waiters=self.waiters,
129+
)
130+
)

resource_locking/resource_allocator.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,30 @@
11
from contextlib import asynccontextmanager
22
from datetime import timedelta
3-
from typing import Optional, AsyncGenerator
3+
from typing import Optional
44

5+
from temporalio import activity, workflow
56
from temporalio.client import Client
6-
from temporalio import workflow, activity
77
from temporalio.common import WorkflowIDConflictPolicy
88

9-
from resource_locking.lock_manager_workflow import LockManagerWorkflowInput, LockManagerWorkflow
10-
from resource_locking.shared import AcquireResponse, LOCK_MANAGER_WORKFLOW_ID, AcquireRequest, AcquiredResource
9+
from resource_locking.lock_manager_workflow import (
10+
LockManagerWorkflow,
11+
LockManagerWorkflowInput,
12+
)
13+
from resource_locking.shared import (
14+
LOCK_MANAGER_WORKFLOW_ID,
15+
AcquiredResource,
16+
AcquireRequest,
17+
AcquireResponse,
18+
)
19+
1120

1221
# Use this class in workflow code that that needs to run on locked resources.
1322
class ResourceAllocator:
1423
def __init__(self, client: Client):
1524
self.client = client
1625

1726
@activity.defn
18-
async def send_acquire_signal(self):
27+
async def send_acquire_signal(self) -> None:
1928
info = activity.info()
2029

2130
# This will start and signal the workflow if it isn't running, otherwise it will signal the current run.
@@ -29,16 +38,22 @@ async def send_acquire_signal(self):
2938
task_queue="default",
3039
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
3140
start_signal="acquire_resource",
32-
start_signal_args=[AcquireRequest(info.workflow_id)]
41+
start_signal_args=[AcquireRequest(info.workflow_id)],
3342
)
3443

3544
@classmethod
3645
@asynccontextmanager
37-
async def acquire_resource(cls, *, already_acquired_resource: Optional[AcquiredResource] = None, max_wait_time: timedelta = timedelta(minutes=5)):
46+
async def acquire_resource(
47+
cls,
48+
*,
49+
already_acquired_resource: Optional[AcquiredResource] = None,
50+
max_wait_time: timedelta = timedelta(minutes=5),
51+
):
3852
warn_when_workflow_has_timeouts()
3953

4054
resource = already_acquired_resource
4155
if resource is None:
56+
4257
async def assign_resource(input: AcquireResponse):
4358
workflow.set_signal_handler("assign_resource", None)
4459
nonlocal resource
@@ -49,12 +64,18 @@ async def assign_resource(input: AcquireResponse):
4964

5065
workflow.set_signal_handler("assign_resource", assign_resource)
5166

52-
await workflow.execute_activity(
53-
ResourceAllocator.send_acquire_signal,
67+
await workflow.start_activity(
68+
ResourceAllocator.send_acquire_signal, # type: ignore[arg-type]
5469
start_to_close_timeout=timedelta(seconds=10),
5570
)
5671

57-
await workflow.wait_condition(lambda: resource is not None, timeout=max_wait_time)
72+
await workflow.wait_condition(
73+
lambda: resource is not None, timeout=max_wait_time
74+
)
75+
76+
# Can't happen, but the typechecker doesn't know about workflow.wait_condition
77+
if resource is None:
78+
raise RuntimeError("resource was None when it can't be")
5879

5980
# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
6081
# finally block will release the resource if an activity fails. This is why we asserted the lack of
@@ -67,6 +88,7 @@ async def assign_resource(input: AcquireResponse):
6788
handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)
6889
await handle.signal(resource.release_signal_name)
6990

91+
7092
def warn_when_workflow_has_timeouts():
7193
if has_timeout(workflow.info().run_timeout):
7294
workflow.logger.warning(
@@ -77,6 +99,7 @@ def warn_when_workflow_has_timeouts():
7799
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
78100
)
79101

102+
80103
def has_timeout(timeout: Optional[timedelta]) -> bool:
81104
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
82105
# continue_as_new call).

resource_locking/resource_locking_workflow.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
import asyncio
22
from dataclasses import dataclass, field
33
from datetime import timedelta
4-
from typing import Optional, Callable
4+
from typing import Callable, Optional
55

66
from temporalio import activity, workflow
77

88
from resource_locking.resource_allocator import ResourceAllocator
99
from resource_locking.shared import (
1010
LOCK_MANAGER_WORKFLOW_ID,
11+
AcquiredResource,
1112
AcquireRequest,
12-
AcquireResponse, AcquiredResource,
13+
AcquireResponse,
1314
)
1415

16+
1517
@dataclass
1618
class UseResourceActivityInput:
1719
resource: str
1820
iteration: str
1921

22+
2023
@activity.defn
2124
async def use_resource(input: UseResourceActivityInput) -> None:
2225
info = activity.info()
@@ -54,7 +57,9 @@ class FailWorkflowException(Exception):
5457
class ResourceLockingWorkflow:
5558
@workflow.run
5659
async def run(self, input: ResourceLockingWorkflowInput):
57-
async with ResourceAllocator.acquire_resource(already_acquired_resource=input.already_acquired_resource) as resource:
60+
async with ResourceAllocator.acquire_resource(
61+
already_acquired_resource=input.already_acquired_resource
62+
) as resource:
5863
for iteration in ["first", "second", "third"]:
5964
await workflow.execute_activity(
6065
use_resource,
@@ -76,7 +81,7 @@ async def run(self, input: ResourceLockingWorkflowInput):
7681
)
7782

7883
# By default, ResourceAllocator will release the resource when we return. We want to hold the resource
79-
# across continue-as-new for the sake of demonstration.
84+
# across continue-as-new for the sake of demonstration.b
8085
resource.autorelease = False
8186

8287
workflow.continue_as_new(next_input)

resource_locking/shared.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@
33

44
LOCK_MANAGER_WORKFLOW_ID = "lock_manager"
55

6+
67
@dataclass
78
class AcquireRequest:
89
workflow_id: str
910

11+
1012
@dataclass
1113
class AcquireResponse:
1214
release_signal_name: str
1315
resource: str
1416

17+
1518
@dataclass
1619
class AcquiredResource(AcquireResponse):
1720
autorelease: bool = field(default=True)

resource_locking/starter.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
from typing import Any
33

44
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
5+
from temporalio.common import WorkflowIDConflictPolicy
56

6-
from resource_locking.lock_manager_workflow import LockManagerWorkflow, LockManagerWorkflowInput
7+
from resource_locking.lock_manager_workflow import (
8+
LockManagerWorkflow,
9+
LockManagerWorkflowInput,
10+
)
711
from resource_locking.resource_locking_workflow import (
812
ResourceLockingWorkflow,
913
ResourceLockingWorkflowInput,
1014
)
1115
from resource_locking.shared import LOCK_MANAGER_WORKFLOW_ID
12-
from temporalio.common import WorkflowIDConflictPolicy
1316

1417

1518
async def main():

resource_locking/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from temporalio.client import Client
55
from temporalio.worker import Worker
66

7-
from resource_locking.resource_allocator import ResourceAllocator
87
from resource_locking.lock_manager_workflow import LockManagerWorkflow
8+
from resource_locking.resource_allocator import ResourceAllocator
99
from resource_locking.resource_locking_workflow import (
1010
ResourceLockingWorkflow,
1111
use_resource,

0 commit comments

Comments
 (0)