-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscanner.py
More file actions
190 lines (150 loc) · 6.92 KB
/
Copy pathscanner.py
File metadata and controls
190 lines (150 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""
scanner.py —
Security scanner: validates file magic bytes against claimed type and
searches for embedded malicious patterns (scripts, executables, polyglots).
Any file that fails is deleted immediately and the event is logged to DB.
⚠️ LIMITATIONS — This is a lightweight pre-screening mechanism, NOT a full
security analysis. It inspects only the first 8 KB of file content and
relies on pattern matching. It CANNOT detect:
- Polymorphic or encrypted payloads that avoid known signatures
- Logic bombs or time-delayed exploits
- Steganographic content hidden in valid image/video data
- Exploits targeting specific decoder vulnerabilities
- Memory-corruption triggers that require full file parsing
This scanner should be treated as a first line of defense that blocks
the most obvious threats, not as a guarantee of file safety.
"""
import os
import logging
logger = logging.getLogger(__name__)
# ── Magic-byte signatures ──────────────────────────────────────────────────────
# Each entry: signature_bytes -> (offset, human_name)
IMAGE_MAGIC: list[tuple[bytes, int, str]] = [
(b"\xff\xd8\xff", 0, "JPEG"),
(b"\x89PNG\r\n\x1a\n", 0, "PNG"),
(b"GIF87a", 0, "GIF"),
(b"GIF89a", 0, "GIF"),
(b"BM", 0, "BMP"),
# WEBP: "RIFF" at 0, "WEBP" at 8
]
VIDEO_MAGIC: list[tuple[bytes, int, str]] = [
# MKV / WEBM
(b"\x1a\x45\xdf\xa3", 0, "MKV/WEBM"),
# FLV
(b"FLV\x01", 0, "FLV"),
]
# Patterns that are ALWAYS suspicious regardless of claimed type
MALICIOUS_PATTERNS: list[tuple[bytes, str]] = [
(b"<?php", "Embedded PHP code"),
(b"<%@", "Embedded server-side script"),
(b"<script", "Embedded JavaScript"),
(b"#!/bin/", "Embedded shell script"),
(b"#!/usr/bin/", "Embedded shell script"),
(b"\x7fELF", "ELF executable"),
(b"MZ\x90\x00", "Windows PE executable"), # more specific than bare MZ
]
# Read this many bytes for the scan (8 KB is enough for headers)
SCAN_BYTES = 8192
# ── Internal helpers ───────────────────────────────────────────────────────────
def _read_head(path: str) -> bytes:
size = os.path.getsize(path)
with open(path, "rb") as fh:
return fh.read(min(SCAN_BYTES, size))
def _read_tail(path: str, n: int = 16) -> bytes:
"""Read the last *n* bytes of a file (for EOF marker checks)."""
size = os.path.getsize(path)
with open(path, "rb") as fh:
fh.seek(max(0, size - n))
return fh.read()
def _check_image(head: bytes) -> tuple[bool, str]:
"""Return (ok, reason). WEBP needs an extra check."""
for sig, offset, name in IMAGE_MAGIC:
if head[offset:offset + len(sig)] == sig:
return True, ""
# WEBP: b"RIFF" at 0 AND b"WEBP" at 8
if head[:4] == b"RIFF" and len(head) >= 12 and head[8:12] == b"WEBP":
return True, ""
return False, "File header does not match any known image format"
def _check_video(head: bytes) -> tuple[bool, str]:
"""Return (ok, reason)."""
# MP4 / MOV: 4-byte length + b"ftyp" at offset 4
if len(head) >= 8 and head[4:8] == b"ftyp":
return True, ""
# AVI: b"RIFF" at 0, b"AVI " at 8
if head[:4] == b"RIFF" and len(head) >= 12 and head[8:12] == b"AVI ":
return True, ""
for sig, offset, name in VIDEO_MAGIC:
if head[offset:offset + len(sig)] == sig:
return True, ""
return False, "File header does not match any known video format"
def _check_malicious(head: bytes) -> tuple[bool, str]:
"""Return (safe, threat_description)."""
lower = head.lower()
for pattern, label in MALICIOUS_PATTERNS:
if pattern.lower() in lower:
return False, label
# Embedded ZIP in the middle of the data (possible polyglot / zip-bomb)
# Skip first 64 bytes to avoid false positives on formats that legitimately
# begin with a local file header (e.g. ODT, DOCX are ZIP-based, but those
# should never be sent here as images/videos).
if b"PK\x03\x04" in head[64:]:
return False, "Embedded ZIP archive (possible polyglot file)"
return True, ""
def _check_eof(tail: bytes, file_size: int, media_type: str, head: bytes) -> tuple[bool, str]:
"""Validate EOF markers by reading the actual end of the file."""
if media_type == "photo":
# Only validate JPEG EOF — PNG/GIF/BMP have no reliable trailing marker
if head[:3] == b"\xff\xd8\xff":
if tail[-2:] != b"\xff\xd9":
return False, "JPEG file does not end with valid EOF marker"
return True, ""
# ── Public API ─────────────────────────────────────────────────────────────────
def scan_file(file_path: str, media_type: str) -> tuple[bool, str]:
"""
Full security scan.
Parameters
----------
file_path : absolute path to the downloaded file
media_type : "photo" | "video"
Returns
-------
(True, "") — file is safe
(False, reason_string) — file is dangerous; caller must delete it
NOTE: This is a lightweight pre-screening mechanism. See module
docstring for detailed limitations.
"""
try:
if not os.path.exists(file_path):
return False, "File not found after download"
if os.path.getsize(file_path) == 0:
return False, "File is empty"
head = _read_head(file_path)
file_size = os.path.getsize(file_path)
# 1. Magic-byte validation
if media_type == "photo":
ok, reason = _check_image(head)
elif media_type == "video":
ok, reason = _check_video(head)
else:
ok, reason = False, f"Unknown media_type: {media_type!r}"
if not ok:
logger.warning(f"[SCAN] Magic mismatch — {file_path}: {reason}")
return False, reason
# 2. Malicious-pattern scan
safe, threat = _check_malicious(head)
if not safe:
logger.warning(f"[SCAN] Threat detected — {file_path}: {threat}")
return False, threat
# 3. EOF marker validation — read actual tail of file
if file_size > SCAN_BYTES:
tail = _read_tail(file_path, 32)
safe, threat = _check_eof(tail, file_size, media_type, head)
if not safe:
logger.warning(f"[SCAN] EOF check — {file_path}: {threat}")
return False, threat
logger.info(f"[SCAN] OK — {file_path}")
return True, ""
except OSError as exc:
msg = f"Cannot read file for scanning: {exc}"
logger.error(f"[SCAN] {msg}")
return False, msg