-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
156 lines (127 loc) · 5.33 KB
/
script.py
File metadata and controls
156 lines (127 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
"""
===================================================================
File Name: script.py
Author: Armoghan-ul-Mohmin
Date: 2025-03-10
Description:
This script extracts frames from a given video file and saves
them as individual image files in a specified directory.
It supports multithreading for faster processing and displays
video metadata in a table format using Rich. The script also
provides a progress bar to track frame extraction.
Usage:
python3 script.py <video> <dir> [--threads N]
Arguments:
<video> Path to the video file.
<dir> Directory where extracted frames will be saved.
Optional:
--threads N Number of threads to use (default: 4).
Dependencies:
- OpenCV (cv2)
- Rich (for terminal visualization)
- argparse (for command-line parsing)
- concurrent.futures (for multithreading)
- pathlib (for cross-platform path handling)
Example:
python3 script.py sample.mp4 frames --threads 8
===================================================================
"""
import cv2
import os
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.console import Console
from rich.progress import Progress
from rich.table import Table
console = Console()
def save_frame(frame, frame_count, dir):
"""
Saves a single frame as an image file.
Args:
frame (numpy.ndarray): The image frame data.
frame_count (int): The frame index number.
dir (str): The directory where frames will be saved.
"""
frame_path = os.path.join(dir, f"frame_{frame_count:04d}.jpg")
cv2.imwrite(frame_path, frame)
def extract_frames(video, dir, threads=4):
"""
Extracts frames from a video file and saves them as images.
Args:
video (str): Path to the video file.
dir (str): Path to the output directory.
threads (int): Number of threads for frame extraction.
"""
try:
# Validate video file existence
if not Path(video).is_file():
console.print(f"[bold red]Error: Video file '{video}' not found![/bold red]")
return
# Create the output directory if it doesn't exist
Path(dir).mkdir(parents=True, exist_ok=True)
# Open the video file
cap = cv2.VideoCapture(video)
if not cap.isOpened():
console.print("[bold red]Error: Failed to open video file![/bold red]")
return
# Retrieve video metadata
fps = cap.get(cv2.CAP_PROP_FPS) or 30 # Default to 30 FPS if unavailable
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 1
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration = total_frames / fps if fps > 0 else "Unknown"
# Get codec information
codec = int(cap.get(cv2.CAP_PROP_FOURCC))
codec_str = "".join([chr((codec >> 8 * i) & 0xFF) for i in range(4)]) if codec else "Unknown"
# Prepare metadata for display
metadata = {
"FPS": fps,
"Total Frames": total_frames,
"Duration": f"{duration:.2f} sec" if isinstance(duration, float) else "Unknown",
"Resolution": f"{width}x{height}",
"Codec": codec_str,
}
# Display video metadata
table = Table(title="Video Information", show_header=True, header_style="bold cyan")
table.add_column("Property", style="dim", width=20)
table.add_column("Value", style="bold yellow")
for key, value in metadata.items():
table.add_row(key, str(value))
console.print(table)
# Extract frames
frame_count = 0
futures = []
with Progress() as progress:
task = progress.add_task("[cyan]Extracting frames...", total=total_frames)
with ThreadPoolExecutor(max_workers=threads) as executor:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break # Stop when the video ends
# Submit frame saving task
futures.append(executor.submit(save_frame, frame, frame_count, dir))
frame_count += 1
progress.update(task, advance=1)
# Wait for all threads to finish
for future in as_completed(futures):
future.result()
# Release the video file
cap.release()
console.print(f"\n[bold green]Success! Extracted {frame_count} frames from '{video}'.[/bold green]")
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
def main():
"""
Main function to parse command-line arguments and start frame extraction.
"""
parser = argparse.ArgumentParser(description="Extract frames from a video file")
parser.add_argument("video", help="Path to the video file")
parser.add_argument("dir", help="Directory where extracted frames will be saved")
parser.add_argument("--threads", type=int, default=4, help="Number of threads (default: 4)")
args = parser.parse_args()
# Start frame extraction
extract_frames(args.video, args.dir, args.threads)
if __name__ == "__main__":
main()