From 25792db437a127e34f489299fb606f8639f0b1bf Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 26 Dec 2025 16:49:32 -0800 Subject: [PATCH 01/10] Add gRPC Server to Taskworker --- src/sentry/conf/server.py | 1 + src/sentry/conf/types/kafka_definition.py | 1 + src/sentry/runner/commands/run.py | 9 ++ src/sentry/taskworker/client/client.py | 5 + src/sentry/taskworker/registry.py | 1 + src/sentry/taskworker/tasks/examples.py | 13 +-- src/sentry/taskworker/worker.py | 110 ++++++++++++++++++++-- src/sentry/taskworker/workerchild.py | 8 ++ 8 files changed, 136 insertions(+), 12 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 8d4396231d65..f5f35932d5f4 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2630,6 +2630,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: "taskworker-usage-dlq": "default", "taskworker-workflows-engine": "default", "taskworker-workflows-engine-dlq": "default", + "test-topic": "default", } diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index 151e1e0f83bb..33b2f5b21fab 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -109,6 +109,7 @@ class Topic(Enum): TASKWORKER_USAGE_DLQ = "taskworker-usage-dlq" TASKWORKER_WORKFLOWS_ENGINE = "taskworker-workflows-engine" TASKWORKER_WORKFLOWS_ENGINE_DLQ = "taskworker-workflows-engine-dlq" + TEST_TOPIC = "test-topic" class ConsumerDefinition(TypedDict, total=False): diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 4f4c859a67f8..1cc4d0604181 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -300,6 +300,12 @@ def worker(ignore_unknown_queues: bool, **options: Any) -> None: help="The number of seconds before touching the health check file", default=taskworker_constants.DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, ) +@click.option( + "--grpc-port", + help="Port for the gRPC server to listen on. Will try subsequent ports if unavailable.", + default=50052, + type=int, +) @log_options() @configuration def taskworker(**options: Any) -> None: @@ -324,6 +330,7 @@ def run_taskworker( processing_pool_name: str, health_check_file_path: str | None, health_check_sec_per_touch: float, + grpc_port: int, **options: Any, ) -> None: """ @@ -347,6 +354,7 @@ def run_taskworker( processing_pool_name=processing_pool_name, health_check_file_path=health_check_file_path, health_check_sec_per_touch=health_check_sec_per_touch, + grpc_port=grpc_port, **options, ) exitcode = worker.start() @@ -419,6 +427,7 @@ def taskbroker_send_tasks( KAFKA_CLUSTERS["default"]["common"]["bootstrap.servers"] = bootstrap_servers if kafka_topic and namespace: + print(f"overriding {namespace} to route to {kafka_topic}") options.set("taskworker.route.overrides", {namespace: kafka_topic}) try: diff --git a/src/sentry/taskworker/client/client.py b/src/sentry/taskworker/client/client.py index dd34684d473e..ad64c53fbf1f 100644 --- a/src/sentry/taskworker/client/client.py +++ b/src/sentry/taskworker/client/client.py @@ -317,8 +317,11 @@ def update_task( ) with metrics.timer("taskworker.update_task.rpc", tags={"host": processing_result.host}): + print("calling set task status...") response = self._host_to_stubs[processing_result.host].SetTaskStatus(request) + print(f"done!") except grpc.RpcError as err: + print(f"err: {err}") metrics.incr( "taskworker.client.rpc_error", tags={"method": "SetTaskStatus", "status": err.code().name}, @@ -333,6 +336,8 @@ def update_task( self._check_consecutive_unavailable_errors() raise + print("done updating task.") + self._num_consecutive_unavailable_errors = 0 self._temporary_unavailable_hosts.pop(processing_result.host, None) if response.HasField("task"): diff --git a/src/sentry/taskworker/registry.py b/src/sentry/taskworker/registry.py index 9cca426baf46..ea73bb7d6955 100644 --- a/src/sentry/taskworker/registry.py +++ b/src/sentry/taskworker/registry.py @@ -153,6 +153,7 @@ def _handle_produce_future(self, future: ProducerFuture, tags: dict[str, str]) - def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) -> None: topic = self.router.route_namespace(self.name) + print(f"sending task to topic {topic} in namespace {self.name}") with sentry_sdk.start_span( op=OP.QUEUE_PUBLISH, diff --git a/src/sentry/taskworker/tasks/examples.py b/src/sentry/taskworker/tasks/examples.py index 285035bf9aca..5c0c12dd5522 100644 --- a/src/sentry/taskworker/tasks/examples.py +++ b/src/sentry/taskworker/tasks/examples.py @@ -52,8 +52,9 @@ def will_retry(failure: str) -> None: @exampletasks.register(name="examples.simple_task") def simple_task(*args: list[Any], **kwargs: dict[str, Any]) -> None: - sleep(0.1) - logger.debug("simple_task complete") + sleep(60) + # logger.debug("simple_task complete") + print("simple_task HELLO!") @exampletasks.register( @@ -90,7 +91,7 @@ def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str, logger.debug("timed_task complete") -@exampletasks.register(name="examples.simple_task", compression_type=CompressionType.ZSTD) -def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None: - sleep(0.1) - logger.debug("simple_task_compressed complete") +# @exampletasks.register(name="examples.simple_task", compression_type=CompressionType.ZSTD) +# def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None: +# sleep(0.1) +# logger.debug("simple_task_compressed complete") diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index 16e13255e216..d4a041bbe45a 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -3,6 +3,7 @@ import logging import multiprocessing import queue +import random import signal import threading import time @@ -14,6 +15,7 @@ import grpc from sentry_protos.taskbroker.v1.taskbroker_pb2 import FetchNextTask +from sentry_protos.taskworker.v1 import taskworker_pb2, taskworker_pb2_grpc from sentry import options from sentry.taskworker.app import import_app @@ -36,6 +38,54 @@ logger = logging.getLogger("sentry.taskworker.worker") +class WorkerServicer(taskworker_pb2_grpc.WorkerServicer): + """ + gRPC servicer that receives task activations pushed from the broker + """ + + def __init__(self, worker: TaskWorker) -> None: + self.worker = worker + + def AddTask( + self, + request: taskworker_pb2.AddTaskRequest, + context: grpc.ServicerContext, + ) -> taskworker_pb2.AddTaskResponse: + """Handle incoming task activation""" + logger.info( + "taskworker.grpc.task_received", + extra={ + "task_id": request.task.id if request.task else None, + "callback_url": request.callback_url, + }, + ) + + # Create InflightTaskActivation from the pushed task + inflight = InflightTaskActivation( + activation=request.task, + host=request.callback_url, + receive_timestamp=time.monotonic(), + ) + + # Push the task to the worker queue + added = self.worker._push_task(inflight) + + return taskworker_pb2.AddTaskResponse(added=added) + + def GetQueueSize( + self, + request: taskworker_pb2.GetQueueSizeRequest, + context: grpc.ServicerContext, + ) -> taskworker_pb2.GetQueueSizeResponse: + print("queue length being asked for") + + # Read the shared counter + with self.worker._child_tasks_count.get_lock(): + length = self.worker._child_tasks_count.value + + return taskworker_pb2.GetQueueSizeResponse(length=length) + + class TaskWorker: """ A TaskWorker fetches tasks from a taskworker RPC host and handles executing task activations. @@ -63,6 +113,7 @@ def __init__( process_type: str = "spawn", health_check_file_path: str | None = None, health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, + grpc_port: int = 50052, **kwargs: dict[str, Any], ) -> None: self.options = kwargs @@ -94,6 +145,7 @@ def __init__( self._child_tasks: multiprocessing.Queue[InflightTaskActivation] = self.mp_context.Queue( maxsize=child_tasks_queue_maxsize ) + self._child_tasks_count = self.mp_context.Value("i", 0) self._processed_tasks: multiprocessing.Queue[ProcessingResult] = self.mp_context.Queue( maxsize=result_queue_maxsize ) @@ -106,13 +158,14 @@ def __init__( self._setstatus_backoff_seconds = 0 self._processing_pool_name: str = processing_pool_name or "unknown" + self._grpc_port: int = grpc_port def start(self) -> int: """ - Run the worker main loop + Run the worker gRPC server - Once started a Worker will loop until it is killed, or - completes its max_task_count when it shuts down. + Once started a Worker will run a gRPC server that receives task activations + until it is killed or shuts down. """ self.start_result_thread() self.start_spawn_children_thread() @@ -126,11 +179,21 @@ def signal_handler(*args: Any) -> None: signal.signal(signal.SIGTERM, signal_handler) try: - while True: - self.run_once() + # Start gRPC server + server = grpc.server(ThreadPoolExecutor(max_workers=10)) + taskworker_pb2_grpc.add_WorkerServicer_to_server(WorkerServicer(self), server) + server.add_insecure_port(f"[::]:{self._grpc_port}") + server.start() + logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) + + # Wait for shutdown signal + server.wait_for_termination() + except KeyboardInterrupt: + server.stop(grace=5) self.shutdown() - raise + + return 0 def run_once(self) -> None: """Access point for tests to run a single worker loop""" @@ -169,6 +232,35 @@ def shutdown(self) -> None: logger.info("taskworker.worker.shutdown.complete") + def _push_task(self, inflight: InflightTaskActivation) -> bool: + """ + Push a task to child tasks queue. Returns False if the task could not be added. + """ + try: + start_time = time.monotonic() + self._child_tasks.put(inflight, block=False) + with self._child_tasks_count.get_lock(): + self._child_tasks_count.value += 1 + metrics.distribution( + "taskworker.worker.child_task.put.duration", + time.monotonic() - start_time, + tags={"processing_pool": self._processing_pool_name}, + ) + except queue.Full: + metrics.incr( + "taskworker.worker.child_tasks.put.full", + tags={"processing_pool": self._processing_pool_name}, + ) + logger.warning( + "taskworker.add_task.child_task_queue_full", + extra={ + "task_id": inflight.activation.id, + "processing_pool": self._processing_pool_name, + }, + ) + return False + return True + def _add_task(self) -> bool: """ Add a task to child tasks queue. Returns False if no new task was fetched. @@ -194,6 +286,8 @@ def _add_task(self) -> bool: try: start_time = time.monotonic() self._child_tasks.put(inflight) + with self._child_tasks_count.get_lock(): + self._child_tasks_count.value += 1 metrics.distribution( "taskworker.worker.child_task.put.duration", time.monotonic() - start_time, @@ -237,6 +331,7 @@ def result_thread() -> None: try: result = self._processed_tasks.get(timeout=1.0) + print(f"processed tasks: {result}") executor.submit(self._send_result, result, fetch_next) except queue.Empty: metrics.incr( @@ -273,6 +368,8 @@ def _send_result(self, result: ProcessingResult, fetch: bool = True) -> bool: try: start_time = time.monotonic() self._child_tasks.put(next) + with self._child_tasks_count.get_lock(): + self._child_tasks_count.value += 1 metrics.distribution( "taskworker.worker.child_task.put.duration", time.monotonic() - start_time, @@ -352,6 +449,7 @@ def spawn_children_thread() -> None: self._max_child_task_count, self._processing_pool_name, self._process_type, + self._child_tasks_count, ), ) process.start() diff --git a/src/sentry/taskworker/workerchild.py b/src/sentry/taskworker/workerchild.py index f274bb9bea4d..3fb1bd8dccbf 100644 --- a/src/sentry/taskworker/workerchild.py +++ b/src/sentry/taskworker/workerchild.py @@ -100,6 +100,7 @@ def child_process( max_task_count: int | None, processing_pool_name: str, process_type: str, + child_tasks_count: Any, ) -> None: """ The entrypoint for spawned worker children. @@ -145,6 +146,7 @@ def run_worker( max_task_count: int | None, processing_pool_name: str, process_type: str, + child_tasks_count: Any, ) -> None: processed_task_count = 0 @@ -178,6 +180,9 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: try: inflight = child_tasks.get(timeout=1.0) + # Decrement the counter after successfully getting a task + with child_tasks_count.get_lock(): + child_tasks_count.value -= 1 except queue.Empty: metrics.incr( "taskworker.worker.child_task_queue_empty", @@ -236,6 +241,8 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: ) continue + print(f"Task func is {task_func}") + set_current_task(inflight.activation) next_state = TASK_ACTIVATION_STATUS_FAILURE @@ -468,4 +475,5 @@ def record_task_execution( max_task_count, processing_pool_name, process_type, + child_tasks_count, ) From 2672abb624f79869663e030a9990380d4fafcc88 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 2 Jan 2026 14:05:30 -0800 Subject: [PATCH 02/10] Return Queue Size on `PushTask` --- src/sentry/taskworker/worker.py | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index d4a041bbe45a..af356fbb982d 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -3,7 +3,6 @@ import logging import multiprocessing import queue -import random import signal import threading import time @@ -38,7 +37,7 @@ logger = logging.getLogger("sentry.taskworker.worker") -class WorkerServicer(taskworker_pb2_grpc.WorkerServicer): +class WorkerServicer(taskworker_pb2_grpc.WorkerServiceServicer): """ gRPC servicer that receives task activations pushed from the broker """ @@ -46,12 +45,13 @@ class WorkerServicer(taskworker_pb2_grpc.WorkerServicer): def __init__(self, worker: TaskWorker) -> None: self.worker = worker - def AddTask( + def PushTask( self, - request: taskworker_pb2.AddTaskRequest, + request: taskworker_pb2.PushTaskRequest, context: grpc.ServicerContext, - ) -> taskworker_pb2.AddTaskResponse: - """Handle incoming task activation""" + ) -> taskworker_pb2.PushTaskResponse: + """Handle incoming task activation.""" + logger.info( "taskworker.grpc.task_received", extra={ @@ -60,7 +60,7 @@ def AddTask( }, ) - # Create InflightTaskActivation from the pushed task + # Create `InflightTaskActivation` from the pushed task inflight = InflightTaskActivation( activation=request.task, host=request.callback_url, @@ -70,20 +70,11 @@ def AddTask( # Push the task to the worker queue added = self.worker._push_task(inflight) - return taskworker_pb2.AddTaskResponse(added=added) - - def GetQueueSize( - self, - request: taskworker_pb2.GetQueueSizeRequest, - context: grpc.ServicerContext, - ) -> taskworker_pb2.GetQueueSizeResponse: - print("queue length being asked for") - # Read the shared counter with self.worker._child_tasks_count.get_lock(): - length = self.worker._child_tasks_count.value + queue_size = self.worker._child_tasks_count.value - return taskworker_pb2.GetQueueSizeResponse(length=length) + return taskworker_pb2.PushTaskResponse(added=added, queue_size=queue_size) class TaskWorker: @@ -181,7 +172,7 @@ def signal_handler(*args: Any) -> None: try: # Start gRPC server server = grpc.server(ThreadPoolExecutor(max_workers=10)) - taskworker_pb2_grpc.add_WorkerServicer_to_server(WorkerServicer(self), server) + taskworker_pb2_grpc.add_WorkerServiceServicer_to_server(WorkerServicer(self), server) server.add_insecure_port(f"[::]:{self._grpc_port}") server.start() logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) @@ -331,7 +322,6 @@ def result_thread() -> None: try: result = self._processed_tasks.get(timeout=1.0) - print(f"processed tasks: {result}") executor.submit(self._send_result, result, fetch_next) except queue.Empty: metrics.incr( From b3f90113fb9e9f3472ff14ea71d1f8901fac172b Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 2 Jan 2026 17:07:26 -0800 Subject: [PATCH 03/10] Pass Address Into `SetTaskStatus` Request --- src/sentry/taskworker/client/client.py | 11 ++++++----- src/sentry/taskworker/tasks/examples.py | 6 ++---- src/sentry/taskworker/worker.py | 1 + 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/sentry/taskworker/client/client.py b/src/sentry/taskworker/client/client.py index ad64c53fbf1f..4e043a044e6f 100644 --- a/src/sentry/taskworker/client/client.py +++ b/src/sentry/taskworker/client/client.py @@ -137,10 +137,12 @@ def __init__( health_check_settings: HealthCheckSettings | None = None, rpc_secret: str | None = None, grpc_config: str | None = None, + port: int = 50052, ) -> None: assert len(hosts) > 0, "You must provide at least one RPC host to connect to" self._hosts = hosts self._rpc_secret = rpc_secret + self._port = port self._grpc_options: list[tuple[str, Any]] = [ ("grpc.max_receive_message_length", MAX_ACTIVATION_SIZE) @@ -304,6 +306,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=fetch_next_task, + address=f"http://127.0.0.1:{self._port}", ) try: @@ -317,11 +320,11 @@ def update_task( ) with metrics.timer("taskworker.update_task.rpc", tags={"host": processing_result.host}): - print("calling set task status...") + logger.debug("calling set task status...") response = self._host_to_stubs[processing_result.host].SetTaskStatus(request) - print(f"done!") + logger.debug("Done setting task status") except grpc.RpcError as err: - print(f"err: {err}") + logger.warning("Failed to perform RPC - %s", err) metrics.incr( "taskworker.client.rpc_error", tags={"method": "SetTaskStatus", "status": err.code().name}, @@ -336,8 +339,6 @@ def update_task( self._check_consecutive_unavailable_errors() raise - print("done updating task.") - self._num_consecutive_unavailable_errors = 0 self._temporary_unavailable_hosts.pop(processing_result.host, None) if response.HasField("task"): diff --git a/src/sentry/taskworker/tasks/examples.py b/src/sentry/taskworker/tasks/examples.py index 5c0c12dd5522..b14a3916f5fe 100644 --- a/src/sentry/taskworker/tasks/examples.py +++ b/src/sentry/taskworker/tasks/examples.py @@ -4,7 +4,6 @@ from time import sleep from typing import Any -from sentry.taskworker.constants import CompressionType from sentry.taskworker.namespaces import exampletasks from sentry.taskworker.retry import LastAction, NoRetriesRemainingError, Retry, RetryTaskError from sentry.taskworker.retry import retry_task as retry_task_helper @@ -52,9 +51,8 @@ def will_retry(failure: str) -> None: @exampletasks.register(name="examples.simple_task") def simple_task(*args: list[Any], **kwargs: dict[str, Any]) -> None: - sleep(60) - # logger.debug("simple_task complete") - print("simple_task HELLO!") + sleep(10) + logger.info("Simple task complete!") @exampletasks.register( diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index af356fbb982d..5aa7de7636df 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -124,6 +124,7 @@ def __init__( ), rpc_secret=app.config["rpc_secret"], grpc_config=options.get("taskworker.grpc_service_config"), + port=grpc_port, ) if process_type == "fork": self.mp_context = multiprocessing.get_context("fork") From bca37a740d06090dce0b5e35046792ac299160d0 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 5 Jan 2026 13:39:30 -0800 Subject: [PATCH 04/10] Publish Built Image? --- .github/workflows/self-hosted.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/self-hosted.yml b/.github/workflows/self-hosted.yml index 2fcf4b90fb4f..e21bac1ecfc8 100644 --- a/.github/workflows/self-hosted.yml +++ b/.github/workflows/self-hosted.yml @@ -91,6 +91,7 @@ jobs: SOURCE_COMMIT=${{ github.sha }} TARGETARCH=${{ matrix.platform }} ghcr: true + publish_on_pr: true tag_nightly: false tag_latest: false From 72e3acf4655cd25a2044a39ffe3d2effcc9458f0 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 5 Jan 2026 15:04:09 -0800 Subject: [PATCH 05/10] Use Repository for Protos Package (Temporary) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 999897110421..0d32abb3558f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,7 @@ dependencies = [ "sentry-forked-email-reply-parser>=0.5.12.post1", "sentry-kafka-schemas>=2.1.16", "sentry-ophio>=1.1.3", - "sentry-protos>=0.4.10", + "sentry-protos @ git+https://github.com/getsentry/sentry-protos.git@george/push-broker-worker#subdirectory=py", # "sentry-protos>=0.4.10", "sentry-redis-tools>=0.5.0", "sentry-relay>=0.9.22", "sentry-sdk[http2]>=2.47.0", From 8a82b97c9e6eeb1fc03f239e98324a23c7c9fc86 Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Mon, 5 Jan 2026 23:05:19 +0000 Subject: [PATCH 06/10] :snowflake: re-freeze requirements --- uv.lock | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/uv.lock b/uv.lock index ad7fc05460da..477011512986 100644 --- a/uv.lock +++ b/uv.lock @@ -273,7 +273,7 @@ wheels = [ [[package]] name = "devservices" -version = "1.2.3" +version = "1.2.4" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "packaging", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -283,7 +283,7 @@ dependencies = [ { name = "supervisor", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/devservices-1.2.3-py3-none-any.whl", hash = "sha256:19beb1dabb533c5dcbd021d6a34e3f357e5c868670f0dfe8945911d3965a6494" }, + { url = "https://pypi.devinfra.sentry.io/wheels/devservices-1.2.4-py3-none-any.whl", hash = "sha256:637055d5dae3dd01899ba066d511aa14537d791ed7f997292bc6a72dd5ddf416" }, ] [[package]] @@ -2196,7 +2196,7 @@ requires-dist = [ { name = "sentry-forked-email-reply-parser", specifier = ">=0.5.12.post1" }, { name = "sentry-kafka-schemas", specifier = ">=2.1.16" }, { name = "sentry-ophio", specifier = ">=1.1.3" }, - { name = "sentry-protos", specifier = ">=0.4.10" }, + { name = "sentry-protos", git = "https://github.com/getsentry/sentry-protos.git?subdirectory=py&rev=george%2Fpush-broker-worker" }, { name = "sentry-redis-tools", specifier = ">=0.5.0" }, { name = "sentry-relay", specifier = ">=0.9.22" }, { name = "sentry-sdk", extras = ["http2"], specifier = ">=2.47.0" }, @@ -2227,7 +2227,7 @@ requires-dist = [ dev = [ { name = "black", specifier = ">=25.1.0" }, { name = "covdefaults", specifier = ">=2.3.0" }, - { name = "devservices", specifier = ">=1.2.3" }, + { name = "devservices", specifier = ">=1.2.4" }, { name = "docker", specifier = ">=7.1.0" }, { name = "ephemeral-port-reserve", specifier = ">=1.1.4" }, { name = "flake8", specifier = ">=7.3.0" }, @@ -2402,15 +2402,12 @@ wheels = [ [[package]] name = "sentry-protos" version = "0.4.10" -source = { registry = "https://pypi.devinfra.sentry.io/simple" } +source = { git = "https://github.com/getsentry/sentry-protos.git?subdirectory=py&rev=george%2Fpush-broker-worker#50006eba5f52efdca34d83a0096543aab90223aa" } dependencies = [ { name = "grpc-stubs", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpcio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] -wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.4.10-py3-none-any.whl", hash = "sha256:fe32f66f6d074978fb3b72be20932cb8354a49a119fac9861c4a876b9f476b2e" }, -] [[package]] name = "sentry-redis-tools" From 185b48aab5bb4159825f17f634138d9fb1983f25 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 5 Jan 2026 15:17:04 -0800 Subject: [PATCH 07/10] Add Git to Self Hosted Dockerfile for Custom Sentry Protos Pkg --- self-hosted/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/self-hosted/Dockerfile b/self-hosted/Dockerfile index 2d14e98e41ef..4eea48f56f95 100644 --- a/self-hosted/Dockerfile +++ b/self-hosted/Dockerfile @@ -46,6 +46,7 @@ RUN set -x \ # uwsgi-dogstatsd && buildDeps=" \ gcc \ + git \ libpcre2-dev \ wget \ zlib1g-dev \ From 05eefdc49fb1b1b08e280e77f7160888d2a618ed Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 5 Jan 2026 16:54:02 -0800 Subject: [PATCH 08/10] Have Taskworker Add/Remove Itself to Taskbroker --- src/sentry/taskworker/client/client.py | 67 ++++++++++++++++++++++++++ src/sentry/taskworker/worker.py | 47 ++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/src/sentry/taskworker/client/client.py b/src/sentry/taskworker/client/client.py index 4e043a044e6f..0a74948690ee 100644 --- a/src/sentry/taskworker/client/client.py +++ b/src/sentry/taskworker/client/client.py @@ -12,8 +12,10 @@ import grpc from google.protobuf.message import Message from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( + AddWorkerRequest, FetchNextTask, GetTaskRequest, + RemoveWorkerRequest, SetTaskStatusRequest, ) from sentry_protos.taskbroker.v1.taskbroker_pb2_grpc import ConsumerServiceStub @@ -348,3 +350,68 @@ def update_task( receive_timestamp=time.monotonic(), ) return None + + def add_worker(self, host: str, address: str) -> None: + """ + Register this worker with a taskbroker. + + Sends an AddWorker message to notify the broker that this worker + is available to receive tasks. + """ + # Ensure we have a connection to this host + if host not in self._host_to_stubs: + self._host_to_stubs[host] = self._connect_to_host(host) + + request = AddWorkerRequest(address=address) + + try: + with metrics.timer("taskworker.add_worker.rpc", tags={"host": host}): + self._host_to_stubs[host].AddWorker(request) + logger.info( + "taskworker.client.add_worker.success", + extra={"host": host, "address": address}, + ) + metrics.incr("taskworker.client.add_worker.success", tags={"host": host}) + except grpc.RpcError as err: + logger.warning( + "taskworker.client.add_worker.failed", + extra={"host": host, "error": str(err), "status": err.code().name}, + ) + metrics.incr( + "taskworker.client.rpc_error", + tags={"method": "AddWorker", "status": err.code().name}, + ) + + def remove_worker(self, host: str, address: str) -> None: + """ + Unregister this worker from a taskbroker. + + Sends a RemoveWorker message to notify the broker that this worker + is shutting down and should no longer receive tasks. + """ + if host not in self._host_to_stubs: + logger.warning( + "taskworker.client.remove_worker.unknown_host", + extra={"host": host}, + ) + return + + request = RemoveWorkerRequest(address=address) + + try: + with metrics.timer("taskworker.remove_worker.rpc", tags={"host": host}): + self._host_to_stubs[host].RemoveWorker(request) + logger.info( + "taskworker.client.remove_worker.success", + extra={"host": host, "address": address}, + ) + metrics.incr("taskworker.client.remove_worker.success", tags={"host": host}) + except grpc.RpcError as err: + logger.warning( + "taskworker.client.remove_worker.failed", + extra={"host": host, "error": str(err), "status": err.code().name}, + ) + metrics.incr( + "taskworker.client.rpc_error", + tags={"method": "RemoveWorker", "status": err.code().name}, + ) diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index 5aa7de7636df..2d520bcd8d7c 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -2,6 +2,7 @@ import logging import multiprocessing +import os import queue import signal import threading @@ -77,6 +78,11 @@ def PushTask( return taskworker_pb2.PushTaskResponse(added=added, queue_size=queue_size) +def get_host() -> str: + pod_ip = os.environ.get("POD_IP") + return pod_ip if pod_ip else "localhost" + + class TaskWorker: """ A TaskWorker fetches tasks from a taskworker RPC host and handles executing task activations. @@ -178,6 +184,9 @@ def signal_handler(*args: Any) -> None: server.start() logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) + # Register this worker with all connected taskbrokers + self._register_with_brokers() + # Wait for shutdown signal server.wait_for_termination() @@ -191,12 +200,50 @@ def run_once(self) -> None: """Access point for tests to run a single worker loop""" self._add_task() + def _register_with_brokers(self) -> None: + """ + Register this worker with all connected taskbrokers. + + Sends an AddWorker message to each broker to notify them that + this worker is available to receive tasks. + """ + address = f"http://{get_host()}:{self._grpc_port}" + + logger.info( + "taskworker.worker.registering_with_brokers", + extra={"broker_count": len(self.client._hosts), "address": address}, + ) + + for host in self.client._hosts: + self.client.add_worker(host, address) + + def _unregister_from_brokers(self) -> None: + """ + Unregister this worker from all connected taskbrokers. + + Sends a RemoveWorker message to each broker to notify them that + this worker is shutting down and should no longer receive tasks. + """ + address = f"http://{get_host()}:{self._grpc_port}" + + logger.info( + "taskworker.worker.unregistering_from_brokers", + extra={"broker_count": len(self.client._hosts), "address": address}, + ) + + for host in self.client._hosts: + self.client.remove_worker(host, address) + def shutdown(self) -> None: """ Shutdown cleanly Activate the shutdown event and drain results before terminating children. """ logger.info("taskworker.worker.shutdown.start") + + # Unregister this worker from all connected taskbrokers + self._unregister_from_brokers() + self._shutdown_event.set() logger.info("taskworker.worker.shutdown.spawn_children") From 91a5080a9932cc81159d2d91c2d4e895ed8c5df1 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 6 Jan 2026 13:23:04 -0800 Subject: [PATCH 09/10] Randomize Simple Task Time to Match Ingest Transactions Save Event --- src/sentry/taskworker/tasks/examples.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/sentry/taskworker/tasks/examples.py b/src/sentry/taskworker/tasks/examples.py index b14a3916f5fe..41ede877b5c1 100644 --- a/src/sentry/taskworker/tasks/examples.py +++ b/src/sentry/taskworker/tasks/examples.py @@ -1,6 +1,8 @@ from __future__ import annotations import logging +import random +import time from time import sleep from typing import Any @@ -51,7 +53,16 @@ def will_retry(failure: str) -> None: @exampletasks.register(name="examples.simple_task") def simple_task(*args: list[Any], **kwargs: dict[str, Any]) -> None: - sleep(10) + logger.info("Starting simple task...") + mu = 0.7 + sigma = 0.8 + + sleep_duration = random.lognormvariate(mu, sigma) + + final_sleep_time = max(0.20, min(sleep_duration, 20)) + + time.sleep(final_sleep_time) + logger.info("Simple task complete!") @@ -93,3 +104,4 @@ def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str, # def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None: # sleep(0.1) # logger.debug("simple_task_compressed complete") +# logger.debug("simple_task_compressed complete") From af8388d444e6eb308616f7f2a3999f1df7f2204d Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 9 Jan 2026 16:29:37 -0800 Subject: [PATCH 10/10] Remove Queue Size Counter --- build-local.sh | 29 +++++++++++++++++++++++++ src/sentry/taskworker/client/client.py | 17 +++++++++++++-- src/sentry/taskworker/tasks/examples.py | 10 ++------- src/sentry/taskworker/worker.py | 20 +---------------- src/sentry/taskworker/workerchild.py | 8 ------- 5 files changed, 47 insertions(+), 37 deletions(-) create mode 100755 build-local.sh diff --git a/build-local.sh b/build-local.sh new file mode 100755 index 000000000000..764fe1f873a6 --- /dev/null +++ b/build-local.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Accept image name as first argument +if [ $# -eq 0 ]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +IMAGE_NAME="$1" + +echo "Building: ${IMAGE_NAME}" >&2 + +# Build frontend assets +pnpm install --frozen-lockfile --production +python3 -m tools.fast_editable --path . +python3 -m sentry.build.main + +# Build Docker image +docker build \ + -f self-hosted/Dockerfile \ + -t "${IMAGE_NAME}" \ + --platform linux/amd64 \ + --build-arg SOURCE_COMMIT="$(git rev-parse HEAD)" \ + --build-arg TARGETARCH=amd64 \ + . + +# Output the image name for use in other scripts +echo "${IMAGE_NAME}" diff --git a/src/sentry/taskworker/client/client.py b/src/sentry/taskworker/client/client.py index 0a74948690ee..1c57fbd71869 100644 --- a/src/sentry/taskworker/client/client.py +++ b/src/sentry/taskworker/client/client.py @@ -322,9 +322,22 @@ def update_task( ) with metrics.timer("taskworker.update_task.rpc", tags={"host": processing_result.host}): - logger.debug("calling set task status...") + logger.debug( + "Calling set task status", + extra={ + "task_id": processing_result.task_id, + "status": processing_result.status, + "host": processing_result.host, + "receive_timestamp": processing_result.receive_timestamp, + }, + ) + start_time = time.time() response = self._host_to_stubs[processing_result.host].SetTaskStatus(request) - logger.debug("Done setting task status") + duration_ms = (time.time() - start_time) * 1000 + logger.debug( + "Done setting task status", + extra={"duration_ms": duration_ms}, + ) except grpc.RpcError as err: logger.warning("Failed to perform RPC - %s", err) metrics.incr( diff --git a/src/sentry/taskworker/tasks/examples.py b/src/sentry/taskworker/tasks/examples.py index 41ede877b5c1..ecba27343507 100644 --- a/src/sentry/taskworker/tasks/examples.py +++ b/src/sentry/taskworker/tasks/examples.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import random import time from time import sleep from typing import Any @@ -54,14 +53,9 @@ def will_retry(failure: str) -> None: @exampletasks.register(name="examples.simple_task") def simple_task(*args: list[Any], **kwargs: dict[str, Any]) -> None: logger.info("Starting simple task...") - mu = 0.7 - sigma = 0.8 - sleep_duration = random.lognormvariate(mu, sigma) - - final_sleep_time = max(0.20, min(sleep_duration, 20)) - - time.sleep(final_sleep_time) + sleep_time = 0.1 + time.sleep(sleep_time) logger.info("Simple task complete!") diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index 2d520bcd8d7c..b02dcbf0125d 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -52,15 +52,6 @@ def PushTask( context: grpc.ServicerContext, ) -> taskworker_pb2.PushTaskResponse: """Handle incoming task activation.""" - - logger.info( - "taskworker.grpc.task_received", - extra={ - "task_id": request.task.id if request.task else None, - "callback_url": request.callback_url, - }, - ) - # Create `InflightTaskActivation` from the pushed task inflight = InflightTaskActivation( activation=request.task, @@ -72,8 +63,7 @@ def PushTask( added = self.worker._push_task(inflight) # Read the shared counter - with self.worker._child_tasks_count.get_lock(): - queue_size = self.worker._child_tasks_count.value + queue_size = self.worker._child_tasks.qsize() return taskworker_pb2.PushTaskResponse(added=added, queue_size=queue_size) @@ -143,7 +133,6 @@ def __init__( self._child_tasks: multiprocessing.Queue[InflightTaskActivation] = self.mp_context.Queue( maxsize=child_tasks_queue_maxsize ) - self._child_tasks_count = self.mp_context.Value("i", 0) self._processed_tasks: multiprocessing.Queue[ProcessingResult] = self.mp_context.Queue( maxsize=result_queue_maxsize ) @@ -278,8 +267,6 @@ def _push_task(self, inflight: InflightTaskActivation) -> bool: try: start_time = time.monotonic() self._child_tasks.put(inflight, block=False) - with self._child_tasks_count.get_lock(): - self._child_tasks_count.value += 1 metrics.distribution( "taskworker.worker.child_task.put.duration", time.monotonic() - start_time, @@ -325,8 +312,6 @@ def _add_task(self) -> bool: try: start_time = time.monotonic() self._child_tasks.put(inflight) - with self._child_tasks_count.get_lock(): - self._child_tasks_count.value += 1 metrics.distribution( "taskworker.worker.child_task.put.duration", time.monotonic() - start_time, @@ -406,8 +391,6 @@ def _send_result(self, result: ProcessingResult, fetch: bool = True) -> bool: try: start_time = time.monotonic() self._child_tasks.put(next) - with self._child_tasks_count.get_lock(): - self._child_tasks_count.value += 1 metrics.distribution( "taskworker.worker.child_task.put.duration", time.monotonic() - start_time, @@ -487,7 +470,6 @@ def spawn_children_thread() -> None: self._max_child_task_count, self._processing_pool_name, self._process_type, - self._child_tasks_count, ), ) process.start() diff --git a/src/sentry/taskworker/workerchild.py b/src/sentry/taskworker/workerchild.py index 3fb1bd8dccbf..f274bb9bea4d 100644 --- a/src/sentry/taskworker/workerchild.py +++ b/src/sentry/taskworker/workerchild.py @@ -100,7 +100,6 @@ def child_process( max_task_count: int | None, processing_pool_name: str, process_type: str, - child_tasks_count: Any, ) -> None: """ The entrypoint for spawned worker children. @@ -146,7 +145,6 @@ def run_worker( max_task_count: int | None, processing_pool_name: str, process_type: str, - child_tasks_count: Any, ) -> None: processed_task_count = 0 @@ -180,9 +178,6 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: try: inflight = child_tasks.get(timeout=1.0) - # Decrement the counter after successfully getting a task - with child_tasks_count.get_lock(): - child_tasks_count.value -= 1 except queue.Empty: metrics.incr( "taskworker.worker.child_task_queue_empty", @@ -241,8 +236,6 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: ) continue - print(f"Task func is {task_func}") - set_current_task(inflight.activation) next_state = TASK_ACTIVATION_STATUS_FAILURE @@ -475,5 +468,4 @@ def record_task_execution( max_task_count, processing_pool_name, process_type, - child_tasks_count, )