-
-
Notifications
You must be signed in to change notification settings - Fork 6
feat: Add simpler API for creating external tasks #570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ | |
| from taskbroker_client.metrics import MetricsBackend | ||
| from taskbroker_client.retry import Retry | ||
| from taskbroker_client.router import TaskRouter | ||
| from taskbroker_client.task import P, R, Task | ||
| from taskbroker_client.task import ExternalTask, P, R, Task | ||
| from taskbroker_client.types import ProducerFactory | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -73,6 +73,7 @@ def contains(self, name: str) -> bool: | |
|
|
||
| @property | ||
| def topic(self) -> str: | ||
| """The topic that a namespace is routed to.""" | ||
| return self.router.route_namespace(self.name) | ||
|
|
||
| def register( | ||
|
|
@@ -148,7 +149,7 @@ def _handle_produce_future(self, future: ProducerFuture, tags: dict[str, str]) - | |
| self.metrics.incr("taskworker.registry.send_task.success", tags=tags) | ||
|
|
||
| def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) -> None: | ||
| topic = self.router.route_namespace(self.name) | ||
| topic = self.topic | ||
|
|
||
| with sentry_sdk.start_span( | ||
| op=OP.QUEUE_PUBLISH, | ||
|
|
@@ -196,6 +197,83 @@ def _producer(self, topic: str) -> KafkaProducer: | |
| return self._producers[topic] | ||
|
|
||
|
|
||
| class ExternalNamespace(TaskNamespace): | ||
| """ | ||
| A task namespace for tasks defined in another (target) application. | ||
|
|
||
| The `application` parameter is the name of the target application which should | ||
| be different than the current host application. | ||
|
|
||
| Tasks registered here are ExternalTask instances that can only be dispatched | ||
| to Kafka, not called locally. | ||
| """ | ||
|
|
||
| @property | ||
| def topic(self) -> str: | ||
| return self.router.route_namespace(f"{self.application}:{self.name}") | ||
|
|
||
| def register( | ||
| self, | ||
| *, | ||
| name: str, | ||
| retry: Retry | None = None, | ||
| expires: int | datetime.timedelta | None = None, | ||
| processing_deadline_duration: int | datetime.timedelta | None = None, | ||
| at_most_once: bool = False, | ||
| wait_for_delivery: bool = False, | ||
| compression_type: CompressionType = CompressionType.PLAINTEXT, | ||
| ) -> Callable[[Callable[P, R]], ExternalTask[P, R]]: | ||
| """ | ||
| Register an external task stub. | ||
|
|
||
| The decorated function body is never called; only its signature matters | ||
| for type checking. dispatch via delay() or apply_async(). | ||
|
|
||
| Parameters | ||
| ---------- | ||
|
|
||
| name: str | ||
| The name of the task. This is serialized and must be stable across deploys. | ||
| retry: Retry | None | ||
| The retry policy for the task. If none and at_most_once is not enabled | ||
| the Task namespace default retry policy will be used. | ||
| expires: int | datetime.timedelta | ||
| The number of seconds a task activation is valid for. After this | ||
| duration the activation will be discarded and not executed. | ||
| at_most_once : bool | ||
| Enable at-most-once execution. Tasks with `at_most_once` cannot | ||
| define retry policies, and use a worker side idempotency key to | ||
| prevent processing deadline based retries. | ||
| wait_for_delivery: bool | ||
| If true, the task will wait for the delivery report to be received | ||
| before returning. | ||
| compression_type: CompressionType | ||
| The compression type to use to compress the task parameters. | ||
| """ | ||
|
|
||
| def wrapped(func: Callable[P, R]) -> ExternalTask[P, R]: | ||
| task_retry = retry | ||
| if not at_most_once: | ||
| task_retry = retry or self.default_retry | ||
| task: ExternalTask[P, R] = ExternalTask( | ||
| name=name, | ||
| func=func, | ||
| namespace=self, | ||
| retry=task_retry, | ||
| expires=expires or self.default_expires, | ||
| processing_deadline_duration=( | ||
| processing_deadline_duration or self.default_processing_deadline_duration | ||
| ), | ||
| at_most_once=at_most_once, | ||
| wait_for_delivery=wait_for_delivery, | ||
| compression_type=compression_type, | ||
| ) | ||
| self._registered_tasks[name] = task | ||
| return task | ||
|
|
||
| return wrapped | ||
|
|
||
|
|
||
| # TODO(mark) All of TaskRegistry could be folded into TaskworkerApp later. | ||
| class TaskRegistry: | ||
| """ | ||
|
|
@@ -214,6 +292,7 @@ def __init__( | |
| ) -> None: | ||
| self._application = application | ||
| self._namespaces: dict[str, TaskNamespace] = {} | ||
| self._external_namespaces: dict[str, ExternalNamespace] = {} | ||
| self._producer_factory = producer_factory | ||
| self._router = router | ||
| self._metrics = metrics | ||
|
|
@@ -264,3 +343,41 @@ def create_namespace( | |
| self._namespaces[name] = namespace | ||
|
|
||
| return namespace | ||
|
|
||
| def create_external_namespace( | ||
| self, | ||
| name: str, | ||
| application: str, | ||
| *, | ||
| retry: Retry | None = None, | ||
| expires: int | datetime.timedelta | None = None, | ||
| processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE, | ||
| ) -> ExternalNamespace: | ||
| """ | ||
| Create a namespace for tasks belonging to an external (target) application. | ||
|
|
||
| `application` is the target app name; tasks dispatched here will be routed | ||
| to that application's topic. | ||
| """ | ||
| key = f"{application}:{name}" | ||
| if key in self._external_namespaces: | ||
| raise ValueError(f"External task namespace with name {name} already exists.") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplicate error message omits application contextLow Severity The duplicate-check in |
||
| namespace = ExternalNamespace( | ||
| name=name, | ||
| application=application, | ||
| router=self._router, | ||
| metrics=self._metrics, | ||
| producer_factory=self._producer_factory, | ||
| retry=retry, | ||
| expires=expires, | ||
| processing_deadline_duration=processing_deadline_duration, | ||
| ) | ||
| self._external_namespaces[key] = namespace | ||
| return namespace | ||
|
|
||
| def get_external(self, application: str, name: str) -> ExternalNamespace: | ||
| """Fetch an external namespace by name.""" | ||
| key = f"{application}:{name}" | ||
| if key not in self._external_namespaces: | ||
| raise KeyError(f"No external task namespace in {application} named {name}") | ||
| return self._external_namespaces[key] | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated register logic across namespace classes
Low Severity
ExternalNamespace.registeris a near-identical copy ofTaskNamespace.register— the only difference is instantiatingExternalTaskinstead ofTask. The retry-handling logic, default-resolution, and task-storage code are fully duplicated, meaning any future bug fix or behavior change needs to be applied in both places.Additional Locations (1)
clients/python/src/taskbroker_client/registry.py#L78-L140