Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 183 additions & 3 deletions src/sentry/taskworker/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
import contextlib
import logging
import threading
import traceback as _traceback
from collections.abc import MutableMapping
from contextlib import contextmanager
from typing import Any, Generator

import orjson
import sentry_sdk
from arroyo.backends.kafka import KafkaProducer
from django.conf import settings
from django.core.cache.backends.base import BaseCache
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskError
from sentry_sdk import capture_exception
from taskbroker_client.metrics import MetricsBackend, Tags
from taskbroker_client.router import TaskRouter as LibraryRouter
Expand Down Expand Up @@ -181,9 +184,11 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag
ctx = ViewerContext.deserialize(orjson.loads(raw))
except (orjson.JSONDecodeError, TypeError, KeyError, AttributeError):
logger.exception("Failed to deserialize viewer context header")
# Only `expected=True` when dispatch actually sent the header. That distinguishes
# the noise case (system task with no VC, header genuinely absent) from the bug
# case (header sent but deserialization failed → ctx is None).

# Only `expected=True` when dispatch actually sent the header.
# That distinguishes the noise case (system task with no VC, header
# genuinely absent) from the bug case (header sent but deserialization
# failed → ctx is None).
observe_viewer_context_propagation(
"task_execute",
ctx=ctx,
Expand All @@ -194,6 +199,181 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag
return viewer_context_scope(ctx)


def _qualified_type(exc: BaseException) -> str:
cls = type(exc)
return f"{cls.__module__}.{cls.__qualname__}"


def _task_meta_field(task_meta: object, field: str) -> str | None:
value = getattr(task_meta, field, None)
if value is not None:
return str(value)

activation = getattr(task_meta, "activation", None)
if activation is not None:
value = getattr(activation, field, None)
if value is not None:
return str(value)

return None


def _log_value(value: object) -> str:
return orjson.dumps("" if value is None else str(value)).decode()


def _smart_truncate(text: str, max_chars: int) -> str:
"""
Truncates a string from the middle, preserving the head and tail.
Ideal for tracebacks where the root cause (head) and final exception (tail) are most important.
Note: The output may contain newlines and is meant for payloads (e.g., TaskError),
not for structured log lines.
"""
if len(text) <= max_chars:
return text

marker = "\n... <TRACEBACK TRUNCATED> ...\n"
if max_chars <= len(marker) + 2:
return text[:max_chars]

keep_each = (max_chars - len(marker)) // 2
return text[:keep_each] + marker + text[-keep_each:]


def _get_root_cause(exc: BaseException) -> BaseException:
"""
Extracts the deepest root cause of an exception (chained exceptions).
Includes cycle detection to prevent infinite loops, and respects
Python's __suppress_context__ (e.g., raise ... from None).
"""
root = exc
seen: set[int] = set()
while True:
next_exc = root.__cause__
if next_exc is None and not root.__suppress_context__:
next_exc = root.__context__

if next_exc is None or id(next_exc) in seen:
return root

seen.add(id(next_exc))
root = next_exc


def _safe_capture_exception(exc: BaseException) -> None:
"""
Safely captures an exception to Sentry without ever raising.
Used within error hooks to ensure we never violate the 'Never raises' contract.
"""
try:
sentry_sdk.capture_exception(exc)
except Exception:
pass


class TaskErrorCaptureHook:
"""
On a task exception:
1. Ship a Sentry event tagged with task identifiers.
2. Emit a structured log line containing the task context, the exception,
and the root cause (if chained). We omit the full traceback from the logs
to prevent multiline parsing issues and massive log volumes, relying on Sentry
as the source of truth for stack traces.
3. Return a size-bounded TaskError envelope (with a smartly truncated traceback)
for the SetTaskStatus RPC.

Redundant if taskbroker_client already captures internally; harmless.
Never raises — a hook bug must not mask the original task exception.
"""

# Character limits. The proto contract is "bounded"; the exact number is
# backpressure against runaway payloads reaching the broker's logs and
# Kafka, not an exact wire guarantee.
MAX_MESSAGE_CHARS = 2_000
MAX_TRACEBACK_CHARS = 8_000

def on_exception(self, task_meta, exc: BaseException) -> TaskError | None:
try:
exc_type = _qualified_type(exc)
task_id = _task_meta_field(task_meta, "id")
taskname = _task_meta_field(task_meta, "taskname")
namespace = _task_meta_field(task_meta, "namespace")
exception_message = str(exc)[: self.MAX_MESSAGE_CHARS]

# Always extract and log the root cause. If no chain exists, these will
# duplicate the outer exception fields. This ensures a stable log schema.
root_exc = _get_root_cause(exc)
root_type = _qualified_type(root_exc)
root_message = str(root_exc)[: self.MAX_MESSAGE_CHARS]

try:
with sentry_sdk.isolation_scope() as scope:
scope.set_tag("taskname", taskname or "")
scope.set_tag("namespace", namespace or "")
scope.set_tag("task_id", task_id or "")
sentry_sdk.capture_exception(exc)
except Exception as capture_exc:
_safe_capture_exception(capture_exc)
try:
logger.error(
"taskworker.error_hook.capture_failed "
f"task_id={_log_value(task_id)} "
f"internal_error_type={_log_value(_qualified_type(capture_exc))} "
f"internal_error_message={_log_value(str(capture_exc))}"
)
except Exception:
pass

try:
# Emit a clean, single-line log without raw exc_info tracebacks
# to remain perfectly parseable by log aggregators like Vector.
logger.error(
"taskworker.task_failed "
f"task_id={_log_value(task_id)} "
f"taskname={_log_value(taskname)} "
f"namespace={_log_value(namespace)} "
f"exception_type={_log_value(exc_type)} "
f"exception_message={_log_value(exception_message)} "
f"root_cause_type={_log_value(root_type)} "
f"root_cause_message={_log_value(root_message)}"
)
except Exception:
pass

try:
tb_string = "".join(_traceback.format_exception(exc))
tb_truncated = _smart_truncate(tb_string, self.MAX_TRACEBACK_CHARS)

return TaskError(
exception_type=exc_type,
exception_message=exception_message,
traceback=tb_truncated,
)
except Exception as env_exc:
_safe_capture_exception(env_exc)
try:
logger.error(
"taskworker.error_hook.envelope_failed "
f"task_id={_log_value(task_id)} "
f"internal_error_type={_log_value(_qualified_type(env_exc))} "
f"internal_error_message={_log_value(str(env_exc))}"
)
except Exception:
pass
return None
except Exception as hook_exc:
_safe_capture_exception(hook_exc)
try:
logger.error(
"taskworker.error_hook.failed "
f"internal_error_type={_log_value(_qualified_type(hook_exc))} "
f"internal_error_message={_log_value(str(hook_exc))}"
)
except Exception:
pass
return None


_producer_local = threading.local()


Expand Down
3 changes: 3 additions & 0 deletions src/sentry/taskworker/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
make_producer,
)

from sentry.taskworker.adapters import TaskErrorCaptureHook

app = TaskbrokerApp(
name="sentry",
producer_factory=make_producer,
metrics_class=SentryMetricsBackend(),
router_class=SentryRouter(),
at_most_once_store=DjangoCacheAtMostOnceStore(cache),
context_hooks=[ViewerContextHook()],
error_hook=TaskErrorCaptureHook(),
)
app.set_config(
{
Expand Down
Loading
Loading