Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 112 additions & 77 deletions supervision/utils/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import deque
from collections.abc import Callable, Generator
from dataclasses import dataclass
from queue import Queue
from queue import Empty, Full, Queue

import cv2
import numpy as np
Expand Down Expand Up @@ -204,89 +204,93 @@ def process_video(
writer_buffer: int = 32,
show_progress: bool = False,
progress_message: str = "Processing video",
skip_on_error: bool = False,
) -> None:
"""
Process video frames asynchronously using a threaded pipeline.

This function orchestrates a three-stage pipeline to optimize video processing
throughput:

1. Reader thread: Continuously reads frames from the source video file and
enqueues them into a bounded queue (`frame_read_queue`). The queue size is
limited by the `prefetch` parameter to control memory usage.
2. Main thread (Processor): Dequeues frames from `frame_read_queue`, applies the
user-defined `callback` function to process each frame, then enqueues the
processed frames into another bounded queue (`frame_write_queue`) for writing.
The processing happens in the main thread, simplifying use of stateful objects
without synchronization.
3. Writer thread: Dequeues processed frames from `frame_write_queue` and writes
them sequentially to the output video file.

Args:
source_path (str): Path to the input video file.
target_path (str): Path where the processed video will be saved.
callback (Callable[[numpy.ndarray, int], numpy.ndarray]): Function called for
each frame, accepting the frame as a numpy array and its zero-based index,
returning the processed frame.
max_frames (int | None): Optional maximum number of frames to process.
If None, the entire video is processed (default).
prefetch (int): Maximum number of frames buffered by the reader thread.
Controls memory use; default is 32.
writer_buffer (int): Maximum number of frames buffered before writing.
Controls output buffer size; default is 32.
show_progress (bool): Whether to display a tqdm progress bar during processing.
Default is False.
progress_message (str): Description shown in the progress bar.

Returns:
None

Example:
```python
import cv2
import supervision as sv
from rfdetr import RFDETRMedium

model = RFDETRMedium()
"""Process video frames using a three-stage threaded pipeline with controlled
memory usage.

def callback(frame, frame_index):
return model.predict(frame)
Reads frames in a background thread, processes them via user callback in the main
thread, and writes results in another background thread. Uses bounded queues to
limit memory.

process_video(
source_path="source.mp4",
target_path="target.mp4",
callback=frame_callback,
)
```
Args:
source_path: Path to the input video file.
target_path: Path where the processed video will be saved.
callback: Function called for each frame. Receives frame (`numpy.ndarray`,
shape `(H, W, 3)`) and zero-based frame index; must return processed frame
of the same shape.
max_frames: Maximum number of frames to process. If `None`, processes entire
video.
prefetch: Maximum number of raw frames kept in memory before processing.
writer_buffer: Maximum number of processed frames kept in memory before writing.
show_progress: Whether to display a tqdm progress bar.
progress_message: Text shown in the progress bar when enabled.
skip_on_error: If `True`, silently skip frames where callback raises an
exception. If `False` (default), exception is logged and re-raised after
cleanup.

Raises:
RuntimeError: When source video cannot be opened.
Exception: Any unhandled exception raised by the callback (unless
`skip_on_error=True`).
"""
video_info = VideoInfo.from_video_path(video_path=source_path)
total_frames = (
min(video_info.total_frames, max_frames)
if max_frames is not None
if max_frames is not None and video_info.total_frames is not None
else video_info.total_frames
)

frame_read_queue: Queue[tuple[int, np.ndarray] | None] = Queue(maxsize=prefetch)
frame_write_queue: Queue[np.ndarray | None] = Queue(maxsize=writer_buffer)

stop_event = threading.Event()

def reader_thread() -> None:
frame_generator = get_video_frames_generator(
source_path=source_path,
end=max_frames,
)
for frame_index, frame in enumerate(frame_generator):
frame_read_queue.put((frame_index, frame))
frame_read_queue.put(None)
video = cv2.VideoCapture(source_path)
try:
if not video.isOpened():
raise RuntimeError(f"Cannot open video: {source_path}")

frame_generator = get_video_frames_generator(
source_path=source_path,
end=max_frames,
)

for frame_index, frame in enumerate(frame_generator):
if stop_event.is_set():
break

# non-blocking put with small timeout + backoff prevents tight CPU loop
while not stop_event.is_set():
try:
frame_read_queue.put((frame_index, frame), timeout=0.1)
break
except Full:
time.sleep(0.01) # light backoff
finally:
video.release()
# best-effort sentinel, never block forever during shutdown
try:
frame_read_queue.put(None, timeout=0.1)
except Full:
pass

def writer_thread(video_sink: VideoSink) -> None:
while True:
frame = frame_write_queue.get()
if frame is None:
break
video_sink.write_frame(frame=frame)
while not stop_event.is_set():
try:
frame = frame_write_queue.get(timeout=0.1)
if frame is None:
break
video_sink.write_frame(frame=frame)
except Empty:
continue

# Reader is non-daemon
reader_worker = threading.Thread(target=reader_thread, daemon=False)

reader_worker = threading.Thread(target=reader_thread, daemon=True)
with VideoSink(target_path=target_path, video_info=video_info) as video_sink:
# Writer remains daemon
writer_worker = threading.Thread(
target=writer_thread,
args=(video_sink,),
Expand All @@ -302,23 +306,54 @@ def writer_thread(video_sink: VideoSink) -> None:
desc=progress_message,
)

raised: BaseException | None = None

try:
while True:
read_item = frame_read_queue.get()
if read_item is None:
break
try:
read_item = frame_read_queue.get(timeout=0.5)
if read_item is None:
break
frame_index, frame = read_item
except Empty:
if stop_event.is_set():
break
continue

try:
processed_frame = callback(frame, frame_index)
frame_write_queue.put(processed_frame)
progress_bar.update(1)
except Exception as exc:
print(f"Error processing frame {frame_index}: {exc}")
if not skip_on_error:
raised = exc
break
# else: skip this frame silently

frame_index, frame = read_item
processed_frame = callback(frame, frame_index)

frame_write_queue.put(processed_frame)
progress_bar.update(1)
finally:
frame_write_queue.put(None)
reader_worker.join()
writer_worker.join()
stop_event.set()

# best-effort sentinel for writer, never block shutdown
try:
frame_write_queue.put(None, timeout=0.1)
except Full:
pass

# Give threads reasonable time to finish cleanly
reader_worker.join(timeout=10.0)
if reader_worker.is_alive():
print("Reader thread did not finish in time")

writer_worker.join(timeout=5.0)
if writer_worker.is_alive():
print("Writer thread did not finish in time")

progress_bar.close()

if raised is not None:
raise raised


class FPSMonitor:
"""
Expand Down