diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 57820f005..729e809b2 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -7,10 +7,14 @@ import ipaddress import json import logging +import os import os.path import socket import sys import tempfile +import threading +import time +import traceback from collections import defaultdict from functools import wraps from typing import Any, DefaultDict, Iterable, List, Mapping, Optional @@ -391,13 +395,103 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) + + # Heartbeat state — shared with the background heartbeat thread. + _HEARTBEAT_INTERVAL_S = 30.0 + messages_written = 0 + bytes_written = 0 + print_blocked = False + print_blocked_since = 0.0 + heartbeat_stop = threading.Event() + + def _heartbeat() -> None: + """Emit periodic status to stderr to diagnose stdout pipe blocking and deadlocks. + + Writes directly to fd 2 (stderr) which the Kubernetes container + runtime collects independently of the orchestrator reading stdout. + + When a stall is detected (message count frozen for 3+ intervals = 90s), + a full thread dump is emitted to help diagnose deadlocks in the + concurrent source worker pool. + """ + from airbyte_cdk.sources.concurrent_source.queue_registry import get_queue + + start = time.monotonic() + stderr_fd = 2 + last_msgs = 0 + stall_count = 0 + _STALL_THRESHOLD = 3 # intervals before triggering thread dump (3 * 30s = 90s) + _DUMP_REPEAT_INTERVAL = 10 # re-dump every 10 intervals (~5 min) during ongoing stall + + while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): + now = time.monotonic() + elapsed = now - start + + # Detect stall: same message count for multiple consecutive intervals + if messages_written == last_msgs and messages_written > 0: + stall_count += 1 + else: + stall_count = 0 + last_msgs = messages_written + + blocked_str = "YES" if print_blocked else "NO" + blocked_dur = ( + f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else "" + ) + + # Include queue stats if concurrent source is active + queue_stats = "" + q = get_queue() + if q is not None: + try: + queue_stats = f" queue_size={q.qsize()} queue_full={q.full()}" + except Exception: + pass # Queue methods are best-effort + + line = ( + f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " + f"msgs={messages_written} bytes={bytes_written} " + f"print_blocked={blocked_str}{blocked_dur}" + f"{queue_stats}\n" + ) + + # Dump all thread stacks when stall is detected, then periodically during ongoing stall + if stall_count == _STALL_THRESHOLD or ( + stall_count > _STALL_THRESHOLD + and (stall_count - _STALL_THRESHOLD) % _DUMP_REPEAT_INTERVAL == 0 + ): + line += "=== THREAD DUMP (stall detected) ===\n" + thread_names = {t.ident: t.name for t in threading.enumerate()} + for thread_id, frame in sys._current_frames().items(): + thread_name = thread_names.get(thread_id, "unknown") + line += f"\nThread {thread_name} ({thread_id}):\n" + line += "".join(traceback.format_stack(frame)) + line += "=== END THREAD DUMP ===\n" + + try: + os.write(stderr_fd, line.encode()) + except OSError: + pass # Best-effort diagnostic — if stderr (fd 2) is broken, silently give up. + + heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True) + heartbeat_thread.start() + # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs # Refer to: https://github.com/airbytehq/oncall/issues/6235 - with PRINT_BUFFER: - for message in source_entrypoint.run(parsed_args): - # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and - # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time - print(f"{message}\n", end="") + try: + with PRINT_BUFFER: + for message in source_entrypoint.run(parsed_args): + # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and + # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time + data = f"{message}\n" + print_blocked = True + print_blocked_since = time.monotonic() + print(data, end="") + print_blocked = False + messages_written += 1 + bytes_written += len(data) + finally: + heartbeat_stop.set() def _init_internal_request_filter() -> None: diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..6ffbdb0cd 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -12,6 +12,7 @@ from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( PartitionGenerationCompletedSentinel, ) +from airbyte_cdk.sources.concurrent_source.queue_registry import register_queue, unregister_queue from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository @@ -106,29 +107,34 @@ def read( streams: List[AbstractStream], ) -> Iterator[AirbyteMessage]: self._logger.info("Starting syncing") - concurrent_stream_processor = ConcurrentReadProcessor( - streams, - PartitionEnqueuer(self._queue, self._threadpool), - self._threadpool, - self._logger, - self._slice_logger, - self._message_repository, - PartitionReader( - self._queue, - PartitionLogger(self._slice_logger, self._logger, self._message_repository), - ), - ) + # Register queue so the heartbeat thread can report queue stats for deadlock diagnosis + register_queue(self._queue) + try: + concurrent_stream_processor = ConcurrentReadProcessor( + streams, + PartitionEnqueuer(self._queue, self._threadpool), + self._threadpool, + self._logger, + self._slice_logger, + self._message_repository, + PartitionReader( + self._queue, + PartitionLogger(self._slice_logger, self._logger, self._message_repository), + ), + ) - # Enqueue initial partition generation tasks - yield from self._submit_initial_partition_generators(concurrent_stream_processor) + # Enqueue initial partition generation tasks + yield from self._submit_initial_partition_generators(concurrent_stream_processor) - # Read from the queue until all partitions were generated and read - yield from self._consume_from_queue( - self._queue, - concurrent_stream_processor, - ) - self._threadpool.check_for_errors_and_shutdown() - self._logger.info("Finished syncing") + # Read from the queue until all partitions were generated and read + yield from self._consume_from_queue( + self._queue, + concurrent_stream_processor, + ) + self._threadpool.check_for_errors_and_shutdown() + self._logger.info("Finished syncing") + finally: + unregister_queue() def _submit_initial_partition_generators( self, concurrent_stream_processor: ConcurrentReadProcessor diff --git a/airbyte_cdk/sources/concurrent_source/queue_registry.py b/airbyte_cdk/sources/concurrent_source/queue_registry.py new file mode 100644 index 000000000..322c50518 --- /dev/null +++ b/airbyte_cdk/sources/concurrent_source/queue_registry.py @@ -0,0 +1,41 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Module-level registry for the concurrent source queue. + +The heartbeat thread in entrypoint.py needs to report queue stats (size, full/empty) +to help diagnose deadlocks. Since the queue is created deep inside ConcurrentSource, +this registry provides a lightweight way to expose it without threading the queue +object through the entire call chain. + +Usage: + # In ConcurrentSource.read(): + register_queue(self._queue) + + # In the heartbeat thread: + q = get_queue() + if q is not None: + print(f"queue_size={q.qsize()} queue_full={q.full()}") +""" + +from queue import Queue +from typing import Optional + +from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem + +_queue: Optional[Queue[QueueItem]] = None + + +def register_queue(queue: Queue[QueueItem]) -> None: + """Register the concurrent source queue for heartbeat monitoring.""" + global _queue + _queue = queue + + +def get_queue() -> Optional[Queue[QueueItem]]: + """Return the registered queue, or None if no concurrent source is active.""" + return _queue + + +def unregister_queue() -> None: + """Clear the registered queue.""" + global _queue + _queue = None \ No newline at end of file