diff --git a/clients/python/src/taskbroker_client/app.py b/clients/python/src/taskbroker_client/app.py index 63c6a15b..4c2a6207 100644 --- a/clients/python/src/taskbroker_client/app.py +++ b/clients/python/src/taskbroker_client/app.py @@ -8,7 +8,7 @@ from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE from taskbroker_client.imports import import_string from taskbroker_client.metrics import MetricsBackend -from taskbroker_client.registry import TaskNamespace, TaskRegistry +from taskbroker_client.registry import ExternalNamespace, TaskNamespace, TaskRegistry from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter from taskbroker_client.task import Task @@ -101,6 +101,30 @@ def create_namespace( app_feature=app_feature, ) + def create_external_namespace( + self, + name: str, + application: str, + *, + retry: Retry | None = None, + expires: int | datetime.timedelta | None = None, + processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE, + ) -> ExternalNamespace: + """ + Create a namespace for tasks belonging to an external (target) application. + + Tasks registered in external namespaces are routed using the host application's + task router. When routing is required for an external namespace the namespace + name sent to the router will be in the form of `{application}:{namespace_name}` + """ + return self._taskregistry.create_external_namespace( + name=name, + application=application, + retry=retry, + expires=expires, + processing_deadline_duration=processing_deadline_duration, + ) + def get_task(self, namespace: str, task: str) -> Task[Any, Any]: """Fetch a task by namespace and name.""" return self._taskregistry.get(namespace).get(task) diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index 4dbed929..ca67c497 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -16,7 +16,7 @@ from taskbroker_client.metrics import MetricsBackend from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter -from taskbroker_client.task import P, R, Task +from taskbroker_client.task import ExternalTask, P, R, Task from taskbroker_client.types import ProducerFactory logger = logging.getLogger(__name__) @@ -73,6 +73,7 @@ def contains(self, name: str) -> bool: @property def topic(self) -> str: + """The topic that a namespace is routed to.""" return self.router.route_namespace(self.name) def register( @@ -148,7 +149,7 @@ def _handle_produce_future(self, future: ProducerFuture, tags: dict[str, str]) - self.metrics.incr("taskworker.registry.send_task.success", tags=tags) def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) -> None: - topic = self.router.route_namespace(self.name) + topic = self.topic with sentry_sdk.start_span( op=OP.QUEUE_PUBLISH, @@ -196,6 +197,83 @@ def _producer(self, topic: str) -> KafkaProducer: return self._producers[topic] +class ExternalNamespace(TaskNamespace): + """ + A task namespace for tasks defined in another (target) application. + + The `application` parameter is the name of the target application which should + be different than the current host application. + + Tasks registered here are ExternalTask instances that can only be dispatched + to Kafka, not called locally. + """ + + @property + def topic(self) -> str: + return self.router.route_namespace(f"{self.application}:{self.name}") + + def register( + self, + *, + name: str, + retry: Retry | None = None, + expires: int | datetime.timedelta | None = None, + processing_deadline_duration: int | datetime.timedelta | None = None, + at_most_once: bool = False, + wait_for_delivery: bool = False, + compression_type: CompressionType = CompressionType.PLAINTEXT, + ) -> Callable[[Callable[P, R]], ExternalTask[P, R]]: + """ + Register an external task stub. + + The decorated function body is never called; only its signature matters + for type checking. dispatch via delay() or apply_async(). + + Parameters + ---------- + + name: str + The name of the task. This is serialized and must be stable across deploys. + retry: Retry | None + The retry policy for the task. If none and at_most_once is not enabled + the Task namespace default retry policy will be used. + expires: int | datetime.timedelta + The number of seconds a task activation is valid for. After this + duration the activation will be discarded and not executed. + at_most_once : bool + Enable at-most-once execution. Tasks with `at_most_once` cannot + define retry policies, and use a worker side idempotency key to + prevent processing deadline based retries. + wait_for_delivery: bool + If true, the task will wait for the delivery report to be received + before returning. + compression_type: CompressionType + The compression type to use to compress the task parameters. + """ + + def wrapped(func: Callable[P, R]) -> ExternalTask[P, R]: + task_retry = retry + if not at_most_once: + task_retry = retry or self.default_retry + task: ExternalTask[P, R] = ExternalTask( + name=name, + func=func, + namespace=self, + retry=task_retry, + expires=expires or self.default_expires, + processing_deadline_duration=( + processing_deadline_duration or self.default_processing_deadline_duration + ), + at_most_once=at_most_once, + wait_for_delivery=wait_for_delivery, + compression_type=compression_type, + ) + self._registered_tasks[name] = task + return task + + return wrapped + + # TODO(mark) All of TaskRegistry could be folded into TaskworkerApp later. class TaskRegistry: """ @@ -214,6 +292,7 @@ def __init__( ) -> None: self._application = application self._namespaces: dict[str, TaskNamespace] = {} + self._external_namespaces: dict[str, ExternalNamespace] = {} self._producer_factory = producer_factory self._router = router self._metrics = metrics @@ -264,3 +343,41 @@ def create_namespace( self._namespaces[name] = namespace return namespace + + def create_external_namespace( + self, + name: str, + application: str, + *, + retry: Retry | None = None, + expires: int | datetime.timedelta | None = None, + processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE, + ) -> ExternalNamespace: + """ + Create a namespace for tasks belonging to an external (target) application. + + `application` is the target app name; tasks dispatched here will be routed + to that application's topic. + """ + key = f"{application}:{name}" + if key in self._external_namespaces: + raise ValueError(f"External task namespace with name {name} already exists.") + namespace = ExternalNamespace( + name=name, + application=application, + router=self._router, + metrics=self._metrics, + producer_factory=self._producer_factory, + retry=retry, + expires=expires, + processing_deadline_duration=processing_deadline_duration, + ) + self._external_namespaces[key] = namespace + return namespace + + def get_external(self, application: str, name: str) -> ExternalNamespace: + """Fetch an external namespace by name.""" + key = f"{application}:{name}" + if key not in self._external_namespaces: + raise KeyError(f"No external task namespace in {application} named {name}") + return self._external_namespaces[key] diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 555053ac..5ec58e43 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -128,13 +128,17 @@ def apply_async( args=args, kwargs=kwargs, headers=headers, expires=expires, countdown=countdown ) if ALWAYS_EAGER: - self._func(*args, **kwargs) + self._call_func(*args, **kwargs) else: self._namespace.send_task( activation, wait_for_delivery=self.wait_for_delivery, ) + def _call_func(self, *args: Any, **kwargs: Any) -> None: + # Overridden in ExternalTask + self._func(*args, **kwargs) + def _signal_send(self, task: Task[Any, Any], args: Any, kwargs: Any) -> None: """ This method is a stub that test harnesses can monkey patch to capture tasks that @@ -256,3 +260,27 @@ def should_retry(self, state: RetryState, exc: Exception) -> bool: if not retry: return False return retry.should_retry(state, exc) + + +class ExternalTask(Task[P, R]): + """ + A task stub for tasks defined in another application. + + ExternalTask instances can be dispatched to Kafka via delay() or apply_async(), + but cannot be called directly. They route to the target application's topic. + """ + + def _call_func(self, *args: Any, **kwargs: Any) -> None: + """ + This method is called by delay() and apply_async() + """ + raise ValueError( + "External tasks cannot be called within an ALWAYS_EAGER block. " + "Use a mock object to ensure that tasks have delay() or apply_async() called instead." + ) + + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: + raise ValueError( + f"External tasks cannot be called locally. " + f"Use delay() or apply_async() to dispatch '{self.name}' to the target application." + ) diff --git a/clients/python/tests/test_app.py b/clients/python/tests/test_app.py index 17493a8a..ca1b6496 100644 --- a/clients/python/tests/test_app.py +++ b/clients/python/tests/test_app.py @@ -81,6 +81,53 @@ def test_create_namespace() -> None: app.get_namespace("invalid") +def test_create_external_namespace() -> None: + class ExternalRouter(TaskRouter): + def route_namespace(self, name: str) -> str: + if name == "other:test": + return "other-test" + if name == "ext:test": + return "ext-test" + if name == "test": + return "tester" + raise ValueError(f"unknown namespace {name}") + + app = TaskbrokerApp( + name="acme", producer_factory=producer_factory, router_class=ExternalRouter() + ) + + ns = app.create_namespace("test") + assert ns.application == "acme" + assert ns.name == "test" + assert ns.topic == "tester" + + ext = app.create_external_namespace(application="ext", name="test") + assert ext.application == "ext" + assert ext.name == "test" + assert ext.topic == "ext-test" + + retry = Retry(times=3) + other = app.create_external_namespace( + application="other", + name="test", + retry=retry, + expires=60 * 10, + processing_deadline_duration=60, + ) + assert other.default_retry == retry + assert other.default_processing_deadline_duration == 60 + assert other.default_expires == 60 * 10 + assert other.name == "test" + assert other.application == "other" + assert other.topic == "other-test" + + # external namespaces are stored separately from in-app namespaces + # and from each other. + assert app.taskregistry.get_external("ext", "test") == ext + assert app.taskregistry.get_external("other", "test") == other + assert app.taskregistry.get("test") == ns + + def test_get_task() -> None: app = TaskbrokerApp(name="acme", producer_factory=producer_factory, router_class=StubRouter()) ns = app.create_namespace(name="tests") diff --git a/clients/python/tests/test_external_task.py b/clients/python/tests/test_external_task.py new file mode 100644 index 00000000..c20d55bc --- /dev/null +++ b/clients/python/tests/test_external_task.py @@ -0,0 +1,285 @@ +from __future__ import annotations + +from concurrent.futures import Future +from unittest.mock import Mock, patch + +import pytest + +from taskbroker_client.metrics import NoOpMetricsBackend +from taskbroker_client.registry import ExternalNamespace, TaskRegistry +from taskbroker_client.retry import Retry +from taskbroker_client.router import DefaultRouter + +from .conftest import producer_factory + + +def make_external_namespace( + name: str = "process", + application: str = "launchpad", +) -> ExternalNamespace: + return ExternalNamespace( + name=name, + application=application, + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + retry=None, + ) + + +def test_external_task_raises_on_direct_call() -> None: + ns = make_external_namespace() + + @ns.register(name="do_something") + def do_something(x: int) -> None: + pass + + with pytest.raises(ValueError, match="External tasks cannot be called locally"): + do_something(1) + + +def test_external_task_always_eager() -> None: + ns = make_external_namespace() + + @ns.register(name="do_something") + def do_something(x: int) -> None: + pass + + with patch("taskbroker_client.task.ALWAYS_EAGER", True): + with pytest.raises(ValueError, match="External tasks cannot be called locally"): + do_something(1) + + with pytest.raises( + ValueError, match="External tasks cannot be called within an ALWAYS_EAGER block" + ): + do_something.delay(1) + + +def test_external_task_signature_type_preserved() -> None: + ns = make_external_namespace() + + @ns.register(name="run_types") + def run_types(*, name: str, cost: float, times: int) -> None: + pass + + mock_producer = Mock() + future: Future[None] = Future() + future.set_result(None) + mock_producer.produce.return_value = future + ns._producers[ns.topic] = mock_producer + + # This shouldn't trigger any type errors. + run_types.delay(name="bob", cost=2.3, times=1) + + assert mock_producer.produce.call_count == 1 + + +def test_external_task_delay_dispatches_to_kafka() -> None: + ns = make_external_namespace() + + @ns.register(name="do_something") + def do_something(x: int) -> None: + pass + + mock_producer = Mock() + future: Future[None] = Future() + future.set_result(None) + mock_producer.produce.return_value = future + ns._producers[ns.topic] = mock_producer + + do_something.delay(42) + + assert mock_producer.produce.call_count == 1 + call_args = mock_producer.produce.call_args + assert call_args[0][0].name == ns.topic + + +def test_external_task_apply_async_dispatches_to_kafka() -> None: + ns = make_external_namespace() + + @ns.register(name="do_something") + def do_something(x: int) -> None: + pass + + mock_producer = Mock() + future: Future[None] = Future() + future.set_result(None) + mock_producer.produce.return_value = future + ns._producers[ns.topic] = mock_producer + + do_something.apply_async(args=[42]) + + assert mock_producer.produce.call_count == 1 + call_args = mock_producer.produce.call_args + assert call_args[0][0].name == ns.topic + + +def test_external_namespace_stores_target_application_and_name() -> None: + ns = make_external_namespace(name="process", application="launchpad") + + assert ns.name == "process" + assert ns.application == "launchpad" + + +def test_external_namespace_topic_uses_prefixed_routing() -> None: + class CapturingRouter: + def __init__(self) -> None: + self.last_name: str | None = None + + def route_namespace(self, name: str) -> str: + if name == "launchpad:process": + return "taskworker-launchpad-process" + raise ValueError("Unkownn namespace name") + + router = CapturingRouter() + ns = ExternalNamespace( + name="process", + application="launchpad", + producer_factory=producer_factory, + router=router, + metrics=NoOpMetricsBackend(), + retry=None, + ) + assert ns.topic == "taskworker-launchpad-process" + + +def test_external_namespace_inherits_defaults() -> None: + retry = Retry(times=3) + ns = ExternalNamespace( + name="process", + application="launchpad", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + retry=retry, + expires=300, + processing_deadline_duration=60, + ) + + @ns.register(name="my_task") + def my_task() -> None: + pass + + activation = my_task.create_activation([], {}) + assert activation.expires == 300 + assert activation.processing_deadline_duration == 60 + assert my_task.retry == retry + + +def test_external_namespace_no_retry_for_at_most_once() -> None: + retry = Retry(times=3) + ns = ExternalNamespace( + name="process", + application="launchpad", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + retry=retry, + expires=300, + processing_deadline_duration=60, + ) + + @ns.register(name="my_task", at_most_once=True) + def my_task() -> None: + pass + + activation = my_task.create_activation([], {}) + assert activation.expires == 300 + assert activation.processing_deadline_duration == 60 + assert my_task.retry is None, "at_most_once tasks do not have retries" + assert my_task.at_most_once is True + + +def test_external_namespace_register_overrides_defaults() -> None: + override_retry = Retry(times=1) + ns = ExternalNamespace( + name="process", + application="launchpad", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + retry=Retry(times=5), + expires=300, + processing_deadline_duration=60, + ) + + @ns.register( + name="my_task", + retry=override_retry, + expires=60, + processing_deadline_duration=10, + ) + def my_task() -> None: + pass + + activation = my_task.create_activation([], {}) + assert activation.expires == 60 + assert activation.processing_deadline_duration == 10 + assert my_task.retry == override_retry + + +def test_registry_get_does_not_return_external_namespaces() -> None: + registry = TaskRegistry( + application="acme", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + ) + registry.create_external_namespace(name="process", application="launchpad") + + with pytest.raises(KeyError): + registry.get("process") + + with pytest.raises(KeyError): + registry.get_task("process", "any_task") + + +def test_registry_get_external_returns_external_namespace() -> None: + registry = TaskRegistry( + application="acme", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + ) + ns = registry.create_external_namespace(name="process", application="launchpad") + + assert registry.get_external("launchpad", "process") is ns + + +def test_registry_get_external_raises_for_unknown() -> None: + registry = TaskRegistry( + application="acme", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + ) + + with pytest.raises(KeyError, match="No external task namespace"): + registry.get_external("nope", "nope") + + +def test_registry_create_external_namespace_duplicate_raises() -> None: + registry = TaskRegistry( + application="acme", + producer_factory=producer_factory, + router=DefaultRouter(), + metrics=NoOpMetricsBackend(), + ) + registry.create_external_namespace(name="process", application="launchpad") + + with pytest.raises(ValueError, match="already exists"): + registry.create_external_namespace(name="process", application="launchpad") + + +def test_task_activation_targets_external_application() -> None: + """TaskActivation.application and .namespace point at the target app.""" + ns = make_external_namespace(name="process", application="launchpad") + + @ns.register(name="do_work") + def do_work(x: int) -> None: + pass + + activation = do_work.create_activation([1], {}) + assert activation.application == "launchpad" + assert activation.namespace == "process" + assert activation.taskname == "do_work" diff --git a/clients/python/tests/test_registry.py b/clients/python/tests/test_registry.py index 29f990d1..050e7d8f 100644 --- a/clients/python/tests/test_registry.py +++ b/clients/python/tests/test_registry.py @@ -10,8 +10,6 @@ ON_ATTEMPTS_EXCEEDED_DISCARD, ) -# from django.test.utils import override_settings -# from sentry.conf.types.kafka_definition import Topic from taskbroker_client.constants import MAX_PARAMETER_BYTES_BEFORE_COMPRESSION, CompressionType from taskbroker_client.metrics import NoOpMetricsBackend from taskbroker_client.registry import TaskNamespace, TaskRegistry