-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdisk_verification.py
More file actions
136 lines (111 loc) · 5.13 KB
/
disk_verification.py
File metadata and controls
136 lines (111 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# 警告:这个会删除上面的所有资料
# Mac用户如果遇到[Errno 16] Resource busy请先执行sudo diskutil unmounDisk /dev/diskX
# Mac和Linux用户可能需要使用root权限执行
# 如有错误请使用Issues
# 如想要改进代码可以提交PR
import os
import random
import sys
import argparse
from rich.progress import Progress, BarColumn, TimeRemainingColumn
from rich.console import Console
console = Console()
def get_confirmation(device_path):
console.print("[bold red]警告:此操作将永久擦除目标设备上的所有数据![/]")
console.print(f"目标设备: [bold]{device_path}[/]")
confirmation = input("确定要继续吗?(y/N): ")
return confirmation.lower() == 'y'
def generate_random_data(chunk_size):
return bytes(random.getrandbits(8) for _ in range(chunk_size))
def write_and_verify(device_path):
chunk_size = 1024 * 1024 # 1MB
max_blocks = 0
successful_write = 0
successful_read = 0
validation_results = []
# 写入阶段
with Progress(
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.0f}%",
"• 写入速度: [progress.speed]{task.speed}MB/s",
TimeRemainingColumn(),
console=console
) as progress:
task = progress.add_task("[cyan]写入数据...", total=None)
try:
with open(device_path, 'wb+', buffering=0) as device:
while True:
data = generate_random_data(chunk_size)
device.write(data)
device.flush()
os.fsync(device.fileno())
max_blocks += 1
progress.update(task, advance=1,
description=f"[cyan]写入数据... ({max_blocks}MB)")
except IOError as e:
console.print(f"\n[red]写入错误: {str(e)}[/]")
successful_write = max_blocks
console.print(f"[yellow]最大写入容量: {successful_write}MB[/]")
# 验证阶段
if successful_write > 0:
console.print("\n[bold]开始验证数据完整性...[/]")
current_line = []
validation_errors = 0
with Progress(
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.0f}%",
"• 剩余时间: [progress.remaining]",
console=console
) as progress:
task = progress.add_task("[green]验证数据...", total=successful_write)
try:
with open(device_path, 'rb+', buffering=0) as device:
for i in range(successful_write):
device.seek(i * chunk_size)
original_data = generate_random_data(chunk_size)
read_data = device.read(chunk_size)
if read_data == original_data:
current_line.append("[green]🟩[/]")
successful_read += 1
else:
current_line.append("[red]🟥[/]")
validation_errors += 1
# 每50个区块换行显示
if len(current_line) % 50 == 0:
console.print("".join(current_line), end="")
current_line = []
progress.update(task, advance=1,
description=f"[green]验证数据... ({i+1}/{successful_write}MB)")
except IOError as e:
console.print(f"\n[red]验证错误: {str(e)}[/]")
# 打印剩余的区块状态
if current_line:
console.print("".join(current_line))
console.print(f"\n[bold]验证结果:[/]")
console.print(f"成功区块: [green]{successful_read} 🟩[/]")
console.print(f"异常区块: [red]{validation_errors} 🟥[/]")
return successful_write, successful_read
def main():
parser = argparse.ArgumentParser(description='USB存储设备容量验证工具')
parser.add_argument('device', help='目标设备路径(例如:/dev/disk2)')
args = parser.parse_args()
if not args.device.startswith('/dev/disk'):
console.print("[red]错误:请指定正确的块设备路径[/]")
sys.exit(1)
if not get_confirmation(args.device):
console.print("[yellow]操作已取消[/]")
sys.exit(0)
try:
written, verified = write_and_verify(args.device)
console.print(f"\n[bold]最终结果:[/]")
console.print(f"成功写入: [cyan]{written}MB[/]")
console.print(f"成功验证: [green]{verified}MB[/]")
if written != verified:
console.print("[red bold]警告:设备可能存在扩容问题或坏块![/]")
else:
console.print("[green bold]✓ 设备容量验证通过[/]")
except Exception as e:
console.print(f"[red bold]发生错误: {str(e)}[/]")
sys.exit(1)
if __name__ == '__main__':
main()