Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion clients/python/src/taskbroker_client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE
from taskbroker_client.imports import import_string
from taskbroker_client.metrics import MetricsBackend
from taskbroker_client.registry import TaskNamespace, TaskRegistry
from taskbroker_client.registry import ExternalNamespace, TaskNamespace, TaskRegistry
from taskbroker_client.retry import Retry
from taskbroker_client.router import TaskRouter
from taskbroker_client.task import Task
Expand Down Expand Up @@ -101,6 +101,30 @@ def create_namespace(
app_feature=app_feature,
)

def create_external_namespace(
self,
name: str,
application: str,
*,
retry: Retry | None = None,
expires: int | datetime.timedelta | None = None,
processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE,
) -> ExternalNamespace:
"""
Create a namespace for tasks belonging to an external (target) application.

Tasks registered in external namespaces are routed using the host application's
task router. When routing is required for an external namespace the namespace
name sent to the router will be in the form of `{application}:{namespace_name}`
"""
return self._taskregistry.create_external_namespace(
name=name,
application=application,
retry=retry,
expires=expires,
processing_deadline_duration=processing_deadline_duration,
)

def get_task(self, namespace: str, task: str) -> Task[Any, Any]:
"""Fetch a task by namespace and name."""
return self._taskregistry.get(namespace).get(task)
Expand Down
121 changes: 119 additions & 2 deletions clients/python/src/taskbroker_client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from taskbroker_client.metrics import MetricsBackend
from taskbroker_client.retry import Retry
from taskbroker_client.router import TaskRouter
from taskbroker_client.task import P, R, Task
from taskbroker_client.task import ExternalTask, P, R, Task
from taskbroker_client.types import ProducerFactory

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,6 +73,7 @@ def contains(self, name: str) -> bool:

@property
def topic(self) -> str:
"""The topic that a namespace is routed to."""
return self.router.route_namespace(self.name)

def register(
Expand Down Expand Up @@ -148,7 +149,7 @@ def _handle_produce_future(self, future: ProducerFuture, tags: dict[str, str]) -
self.metrics.incr("taskworker.registry.send_task.success", tags=tags)

def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) -> None:
topic = self.router.route_namespace(self.name)
topic = self.topic

with sentry_sdk.start_span(
op=OP.QUEUE_PUBLISH,
Expand Down Expand Up @@ -196,6 +197,83 @@ def _producer(self, topic: str) -> KafkaProducer:
return self._producers[topic]


class ExternalNamespace(TaskNamespace):
"""
A task namespace for tasks defined in another (target) application.

The `application` parameter is the name of the target application which should
be different than the current host application.

Tasks registered here are ExternalTask instances that can only be dispatched
to Kafka, not called locally.
"""

@property
def topic(self) -> str:
return self.router.route_namespace(f"{self.application}:{self.name}")

def register(
self,
*,
name: str,
retry: Retry | None = None,
expires: int | datetime.timedelta | None = None,
processing_deadline_duration: int | datetime.timedelta | None = None,
at_most_once: bool = False,
wait_for_delivery: bool = False,
compression_type: CompressionType = CompressionType.PLAINTEXT,
) -> Callable[[Callable[P, R]], ExternalTask[P, R]]:
"""
Register an external task stub.

The decorated function body is never called; only its signature matters
for type checking. dispatch via delay() or apply_async().

Parameters
----------

name: str
The name of the task. This is serialized and must be stable across deploys.
retry: Retry | None
The retry policy for the task. If none and at_most_once is not enabled
the Task namespace default retry policy will be used.
expires: int | datetime.timedelta
The number of seconds a task activation is valid for. After this
duration the activation will be discarded and not executed.
at_most_once : bool
Enable at-most-once execution. Tasks with `at_most_once` cannot
define retry policies, and use a worker side idempotency key to
prevent processing deadline based retries.
wait_for_delivery: bool
If true, the task will wait for the delivery report to be received
before returning.
compression_type: CompressionType
The compression type to use to compress the task parameters.
"""

def wrapped(func: Callable[P, R]) -> ExternalTask[P, R]:
task_retry = retry
if not at_most_once:
task_retry = retry or self.default_retry
task: ExternalTask[P, R] = ExternalTask(
name=name,
func=func,
namespace=self,
retry=task_retry,
expires=expires or self.default_expires,
processing_deadline_duration=(
processing_deadline_duration or self.default_processing_deadline_duration
),
at_most_once=at_most_once,
wait_for_delivery=wait_for_delivery,
compression_type=compression_type,
)
self._registered_tasks[name] = task
return task

return wrapped
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated register logic across namespace classes

Low Severity

ExternalNamespace.register is a near-identical copy of TaskNamespace.register — the only difference is instantiating ExternalTask instead of Task. The retry-handling logic, default-resolution, and task-storage code are fully duplicated, meaning any future bug fix or behavior change needs to be applied in both places.

Additional Locations (1)
Fix in Cursor Fix in Web



# TODO(mark) All of TaskRegistry could be folded into TaskworkerApp later.
class TaskRegistry:
"""
Expand All @@ -214,6 +292,7 @@ def __init__(
) -> None:
self._application = application
self._namespaces: dict[str, TaskNamespace] = {}
self._external_namespaces: dict[str, ExternalNamespace] = {}
self._producer_factory = producer_factory
self._router = router
self._metrics = metrics
Expand Down Expand Up @@ -264,3 +343,41 @@ def create_namespace(
self._namespaces[name] = namespace

return namespace

def create_external_namespace(
self,
name: str,
application: str,
*,
retry: Retry | None = None,
expires: int | datetime.timedelta | None = None,
processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE,
) -> ExternalNamespace:
"""
Create a namespace for tasks belonging to an external (target) application.

`application` is the target app name; tasks dispatched here will be routed
to that application's topic.
"""
key = f"{application}:{name}"
if key in self._external_namespaces:
raise ValueError(f"External task namespace with name {name} already exists.")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate error message omits application context

Low Severity

The duplicate-check in create_external_namespace uses the composite key f"{application}:{name}" but the error message only includes {name}, not {application}. This makes the error misleading — it implies the collision is on the name alone, when it's actually on the application:name pair. The sibling method get_external correctly includes both application and name in its error message.

Fix in Cursor Fix in Web

namespace = ExternalNamespace(
name=name,
application=application,
router=self._router,
metrics=self._metrics,
producer_factory=self._producer_factory,
retry=retry,
expires=expires,
processing_deadline_duration=processing_deadline_duration,
)
self._external_namespaces[key] = namespace
return namespace

def get_external(self, application: str, name: str) -> ExternalNamespace:
"""Fetch an external namespace by name."""
key = f"{application}:{name}"
if key not in self._external_namespaces:
raise KeyError(f"No external task namespace in {application} named {name}")
return self._external_namespaces[key]
30 changes: 29 additions & 1 deletion clients/python/src/taskbroker_client/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,17 @@ def apply_async(
args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown
)
if ALWAYS_EAGER:
self._func(*args, **kwargs)
self._call_func(*args, **kwargs)
else:
self._namespace.send_task(
activation,
wait_for_delivery=self.wait_for_delivery,
)

def _call_func(self, *args: Any, **kwargs: Any) -> None:
# Overridden in ExternalTask
self._func(*args, **kwargs)

def _signal_send(self, task: Task[Any, Any], args: Any, kwargs: Any) -> None:
"""
This method is a stub that test harnesses can monkey patch to capture tasks that
Expand Down Expand Up @@ -256,3 +260,27 @@ def should_retry(self, state: RetryState, exc: Exception) -> bool:
if not retry:
return False
return retry.should_retry(state, exc)


class ExternalTask(Task[P, R]):
"""
A task stub for tasks defined in another application.

ExternalTask instances can be dispatched to Kafka via delay() or apply_async(),
but cannot be called directly. They route to the target application's topic.
"""

def _call_func(self, *args: Any, **kwargs: Any) -> None:
"""
This method is called by delay() and apply_async()
"""
raise ValueError(
"External tasks cannot be called within an ALWAYS_EAGER block. "
"Use a mock object to ensure that tasks have delay() or apply_async() called instead."
)

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
raise ValueError(
f"External tasks cannot be called locally. "
f"Use delay() or apply_async() to dispatch '{self.name}' to the target application."
)
Comment thread
cursor[bot] marked this conversation as resolved.
47 changes: 47 additions & 0 deletions clients/python/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,53 @@ def test_create_namespace() -> None:
app.get_namespace("invalid")


def test_create_external_namespace() -> None:
class ExternalRouter(TaskRouter):
def route_namespace(self, name: str) -> str:
if name == "other:test":
return "other-test"
if name == "ext:test":
return "ext-test"
if name == "test":
return "tester"
raise ValueError(f"unknown namespace {name}")

app = TaskbrokerApp(
name="acme", producer_factory=producer_factory, router_class=ExternalRouter()
)

ns = app.create_namespace("test")
assert ns.application == "acme"
assert ns.name == "test"
assert ns.topic == "tester"

ext = app.create_external_namespace(application="ext", name="test")
assert ext.application == "ext"
assert ext.name == "test"
assert ext.topic == "ext-test"

retry = Retry(times=3)
other = app.create_external_namespace(
application="other",
name="test",
retry=retry,
expires=60 * 10,
processing_deadline_duration=60,
)
assert other.default_retry == retry
assert other.default_processing_deadline_duration == 60
assert other.default_expires == 60 * 10
assert other.name == "test"
assert other.application == "other"
assert other.topic == "other-test"

# external namespaces are stored separately from in-app namespaces
# and from each other.
assert app.taskregistry.get_external("ext", "test") == ext
assert app.taskregistry.get_external("other", "test") == other
assert app.taskregistry.get("test") == ns


def test_get_task() -> None:
app = TaskbrokerApp(name="acme", producer_factory=producer_factory, router_class=StubRouter())
ns = app.create_namespace(name="tests")
Expand Down
Loading
Loading