diff --git a/supervision/utils/video.py b/supervision/utils/video.py index 0ece0916da..c2df72d9c0 100644 --- a/supervision/utils/video.py +++ b/supervision/utils/video.py @@ -5,7 +5,7 @@ from collections import deque from collections.abc import Callable, Generator from dataclasses import dataclass -from queue import Queue +from queue import Empty, Full, Queue import cv2 import numpy as np @@ -204,89 +204,93 @@ def process_video( writer_buffer: int = 32, show_progress: bool = False, progress_message: str = "Processing video", + skip_on_error: bool = False, ) -> None: - """ - Process video frames asynchronously using a threaded pipeline. - - This function orchestrates a three-stage pipeline to optimize video processing - throughput: - - 1. Reader thread: Continuously reads frames from the source video file and - enqueues them into a bounded queue (`frame_read_queue`). The queue size is - limited by the `prefetch` parameter to control memory usage. - 2. Main thread (Processor): Dequeues frames from `frame_read_queue`, applies the - user-defined `callback` function to process each frame, then enqueues the - processed frames into another bounded queue (`frame_write_queue`) for writing. - The processing happens in the main thread, simplifying use of stateful objects - without synchronization. - 3. Writer thread: Dequeues processed frames from `frame_write_queue` and writes - them sequentially to the output video file. - - Args: - source_path (str): Path to the input video file. - target_path (str): Path where the processed video will be saved. - callback (Callable[[numpy.ndarray, int], numpy.ndarray]): Function called for - each frame, accepting the frame as a numpy array and its zero-based index, - returning the processed frame. - max_frames (int | None): Optional maximum number of frames to process. - If None, the entire video is processed (default). - prefetch (int): Maximum number of frames buffered by the reader thread. - Controls memory use; default is 32. - writer_buffer (int): Maximum number of frames buffered before writing. - Controls output buffer size; default is 32. - show_progress (bool): Whether to display a tqdm progress bar during processing. - Default is False. - progress_message (str): Description shown in the progress bar. - - Returns: - None - - Example: - ```python - import cv2 - import supervision as sv - from rfdetr import RFDETRMedium - - model = RFDETRMedium() + """Process video frames using a three-stage threaded pipeline with controlled + memory usage. - def callback(frame, frame_index): - return model.predict(frame) + Reads frames in a background thread, processes them via user callback in the main + thread, and writes results in another background thread. Uses bounded queues to + limit memory. - process_video( - source_path="source.mp4", - target_path="target.mp4", - callback=frame_callback, - ) - ``` + Args: + source_path: Path to the input video file. + target_path: Path where the processed video will be saved. + callback: Function called for each frame. Receives frame (`numpy.ndarray`, + shape `(H, W, 3)`) and zero-based frame index; must return processed frame + of the same shape. + max_frames: Maximum number of frames to process. If `None`, processes entire + video. + prefetch: Maximum number of raw frames kept in memory before processing. + writer_buffer: Maximum number of processed frames kept in memory before writing. + show_progress: Whether to display a tqdm progress bar. + progress_message: Text shown in the progress bar when enabled. + skip_on_error: If `True`, silently skip frames where callback raises an + exception. If `False` (default), exception is logged and re-raised after + cleanup. + + Raises: + RuntimeError: When source video cannot be opened. + Exception: Any unhandled exception raised by the callback (unless + `skip_on_error=True`). """ video_info = VideoInfo.from_video_path(video_path=source_path) total_frames = ( min(video_info.total_frames, max_frames) - if max_frames is not None + if max_frames is not None and video_info.total_frames is not None else video_info.total_frames ) frame_read_queue: Queue[tuple[int, np.ndarray] | None] = Queue(maxsize=prefetch) frame_write_queue: Queue[np.ndarray | None] = Queue(maxsize=writer_buffer) + stop_event = threading.Event() + def reader_thread() -> None: - frame_generator = get_video_frames_generator( - source_path=source_path, - end=max_frames, - ) - for frame_index, frame in enumerate(frame_generator): - frame_read_queue.put((frame_index, frame)) - frame_read_queue.put(None) + video = cv2.VideoCapture(source_path) + try: + if not video.isOpened(): + raise RuntimeError(f"Cannot open video: {source_path}") + + frame_generator = get_video_frames_generator( + source_path=source_path, + end=max_frames, + ) + + for frame_index, frame in enumerate(frame_generator): + if stop_event.is_set(): + break + + # non-blocking put with small timeout + backoff prevents tight CPU loop + while not stop_event.is_set(): + try: + frame_read_queue.put((frame_index, frame), timeout=0.1) + break + except Full: + time.sleep(0.01) # light backoff + finally: + video.release() + # best-effort sentinel, never block forever during shutdown + try: + frame_read_queue.put(None, timeout=0.1) + except Full: + pass def writer_thread(video_sink: VideoSink) -> None: - while True: - frame = frame_write_queue.get() - if frame is None: - break - video_sink.write_frame(frame=frame) + while not stop_event.is_set(): + try: + frame = frame_write_queue.get(timeout=0.1) + if frame is None: + break + video_sink.write_frame(frame=frame) + except Empty: + continue + + # Reader is non-daemon + reader_worker = threading.Thread(target=reader_thread, daemon=False) - reader_worker = threading.Thread(target=reader_thread, daemon=True) with VideoSink(target_path=target_path, video_info=video_info) as video_sink: + # Writer remains daemon writer_worker = threading.Thread( target=writer_thread, args=(video_sink,), @@ -302,23 +306,54 @@ def writer_thread(video_sink: VideoSink) -> None: desc=progress_message, ) + raised: BaseException | None = None + try: while True: - read_item = frame_read_queue.get() - if read_item is None: - break + try: + read_item = frame_read_queue.get(timeout=0.5) + if read_item is None: + break + frame_index, frame = read_item + except Empty: + if stop_event.is_set(): + break + continue + + try: + processed_frame = callback(frame, frame_index) + frame_write_queue.put(processed_frame) + progress_bar.update(1) + except Exception as exc: + print(f"Error processing frame {frame_index}: {exc}") + if not skip_on_error: + raised = exc + break + # else: skip this frame silently - frame_index, frame = read_item - processed_frame = callback(frame, frame_index) - - frame_write_queue.put(processed_frame) - progress_bar.update(1) finally: - frame_write_queue.put(None) - reader_worker.join() - writer_worker.join() + stop_event.set() + + # best-effort sentinel for writer, never block shutdown + try: + frame_write_queue.put(None, timeout=0.1) + except Full: + pass + + # Give threads reasonable time to finish cleanly + reader_worker.join(timeout=10.0) + if reader_worker.is_alive(): + print("Reader thread did not finish in time") + + writer_worker.join(timeout=5.0) + if writer_worker.is_alive(): + print("Writer thread did not finish in time") + progress_bar.close() + if raised is not None: + raise raised + class FPSMonitor: """