Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 99 additions & 5 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
import ipaddress
import json
import logging
import os
import os.path
import socket
import sys
import tempfile
import threading
import time
import traceback
from collections import defaultdict
from functools import wraps
from typing import Any, DefaultDict, Iterable, List, Mapping, Optional
Expand Down Expand Up @@ -391,13 +395,103 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]:
def launch(source: Source, args: List[str]) -> None:
source_entrypoint = AirbyteEntrypoint(source)
parsed_args = source_entrypoint.parse_args(args)

# Heartbeat state — shared with the background heartbeat thread.
_HEARTBEAT_INTERVAL_S = 30.0
messages_written = 0
bytes_written = 0
print_blocked = False
print_blocked_since = 0.0
heartbeat_stop = threading.Event()

def _heartbeat() -> None:
"""Emit periodic status to stderr to diagnose stdout pipe blocking and deadlocks.

Writes directly to fd 2 (stderr) which the Kubernetes container
runtime collects independently of the orchestrator reading stdout.

When a stall is detected (message count frozen for 3+ intervals = 90s),
a full thread dump is emitted to help diagnose deadlocks in the
concurrent source worker pool.
"""
from airbyte_cdk.sources.concurrent_source.queue_registry import get_queue

start = time.monotonic()
stderr_fd = 2
last_msgs = 0
stall_count = 0
_STALL_THRESHOLD = 3 # intervals before triggering thread dump (3 * 30s = 90s)
_DUMP_REPEAT_INTERVAL = 10 # re-dump every 10 intervals (~5 min) during ongoing stall

while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S):
now = time.monotonic()
elapsed = now - start

# Detect stall: same message count for multiple consecutive intervals
if messages_written == last_msgs and messages_written > 0:
stall_count += 1
else:
stall_count = 0
last_msgs = messages_written

blocked_str = "YES" if print_blocked else "NO"
blocked_dur = (
f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else ""
)

# Include queue stats if concurrent source is active
queue_stats = ""
q = get_queue()
if q is not None:
try:
queue_stats = f" queue_size={q.qsize()} queue_full={q.full()}"
except Exception:
pass # Queue methods are best-effort

line = (
f"STDOUT_HEARTBEAT: t={elapsed:.0f}s "
f"msgs={messages_written} bytes={bytes_written} "
f"print_blocked={blocked_str}{blocked_dur}"
f"{queue_stats}\n"
)

# Dump all thread stacks when stall is detected, then periodically during ongoing stall
if stall_count == _STALL_THRESHOLD or (
stall_count > _STALL_THRESHOLD
and (stall_count - _STALL_THRESHOLD) % _DUMP_REPEAT_INTERVAL == 0
):
line += "=== THREAD DUMP (stall detected) ===\n"
thread_names = {t.ident: t.name for t in threading.enumerate()}
for thread_id, frame in sys._current_frames().items():
thread_name = thread_names.get(thread_id, "unknown")
line += f"\nThread {thread_name} ({thread_id}):\n"
line += "".join(traceback.format_stack(frame))
line += "=== END THREAD DUMP ===\n"

try:
os.write(stderr_fd, line.encode())
except OSError:
pass # Best-effort diagnostic — if stderr (fd 2) is broken, silently give up.

heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True)
heartbeat_thread.start()

# temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs
# Refer to: https://github.com/airbytehq/oncall/issues/6235
with PRINT_BUFFER:
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
print(f"{message}\n", end="")
try:
with PRINT_BUFFER:
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
data = f"{message}\n"
print_blocked = True
print_blocked_since = time.monotonic()
print(data, end="")
print_blocked = False
messages_written += 1
bytes_written += len(data)
finally:
heartbeat_stop.set()


def _init_internal_request_filter() -> None:
Expand Down
48 changes: 27 additions & 21 deletions airbyte_cdk/sources/concurrent_source/concurrent_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
PartitionGenerationCompletedSentinel,
)
from airbyte_cdk.sources.concurrent_source.queue_registry import register_queue, unregister_queue
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
Expand Down Expand Up @@ -106,29 +107,34 @@ def read(
streams: List[AbstractStream],
) -> Iterator[AirbyteMessage]:
self._logger.info("Starting syncing")
concurrent_stream_processor = ConcurrentReadProcessor(
streams,
PartitionEnqueuer(self._queue, self._threadpool),
self._threadpool,
self._logger,
self._slice_logger,
self._message_repository,
PartitionReader(
self._queue,
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
),
)
# Register queue so the heartbeat thread can report queue stats for deadlock diagnosis
register_queue(self._queue)
try:
concurrent_stream_processor = ConcurrentReadProcessor(
streams,
PartitionEnqueuer(self._queue, self._threadpool),
self._threadpool,
self._logger,
self._slice_logger,
self._message_repository,
PartitionReader(
self._queue,
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
),
)

# Enqueue initial partition generation tasks
yield from self._submit_initial_partition_generators(concurrent_stream_processor)
# Enqueue initial partition generation tasks
yield from self._submit_initial_partition_generators(concurrent_stream_processor)

# Read from the queue until all partitions were generated and read
yield from self._consume_from_queue(
self._queue,
concurrent_stream_processor,
)
self._threadpool.check_for_errors_and_shutdown()
self._logger.info("Finished syncing")
# Read from the queue until all partitions were generated and read
yield from self._consume_from_queue(
self._queue,
concurrent_stream_processor,
)
self._threadpool.check_for_errors_and_shutdown()
self._logger.info("Finished syncing")
finally:
unregister_queue()

def _submit_initial_partition_generators(
self, concurrent_stream_processor: ConcurrentReadProcessor
Expand Down
41 changes: 41 additions & 0 deletions airbyte_cdk/sources/concurrent_source/queue_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Module-level registry for the concurrent source queue.

The heartbeat thread in entrypoint.py needs to report queue stats (size, full/empty)
to help diagnose deadlocks. Since the queue is created deep inside ConcurrentSource,
this registry provides a lightweight way to expose it without threading the queue
object through the entire call chain.

Usage:
# In ConcurrentSource.read():
register_queue(self._queue)

# In the heartbeat thread:
q = get_queue()
if q is not None:
print(f"queue_size={q.qsize()} queue_full={q.full()}")
"""

from queue import Queue
from typing import Optional

from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem

_queue: Optional[Queue[QueueItem]] = None


def register_queue(queue: Queue[QueueItem]) -> None:
"""Register the concurrent source queue for heartbeat monitoring."""
global _queue
_queue = queue


def get_queue() -> Optional[Queue[QueueItem]]:
"""Return the registered queue, or None if no concurrent source is active."""
return _queue


def unregister_queue() -> None:
"""Clear the registered queue."""
global _queue
_queue = None
Loading