Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 66 additions & 31 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
import yaml
from flask import Flask, request

app = Flask(__name__)

API_KEY = "SUPER_SECRET_API_KEY_12345"

def get_user_by_name(username):
conn = sqlite3.connect("test.db")
cursor = conn.cursor()
# Intentionally vulnerable query
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
result = cursor.fetchall()
conn.close()
return result
import sqlite3
try:
# Use a parameterized query to avoid SQL injection and proper context management
conn = sqlite3.connect("test.db")
cursor = conn.cursor()
# PRECOGS_FIX: use parameterized query to separate SQL from data
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
result = cursor.fetchall()
return result
except sqlite3.Error as e:
# Return empty result on DB error to avoid leaking DB internals
return []
finally:
try:
conn.close()
except Exception:
pass


@app.route("/user")
Expand All @@ -28,36 +33,66 @@ def user():

@app.route("/ping")
def ping():
import ipaddress
import subprocess

ip = request.args.get("ip", "127.0.0.1")
# Intentionally dangerous: using user input in shell command
os.system(f"ping -c 1 {ip}")
# PRECOGS_FIX: validate that 'ip' is a literal IP address to prevent shell/command injection
try:
# This will raise a ValueError if ip is not a valid IP address
ipaddress.ip_address(ip)
except Exception:
return {"error": "invalid ip"}, 400

# PRECOGS_FIX: avoid invoking a shell; call ping with a list of args
try:
subprocess.run(["ping", "-c", "1", ip], check=False)
except Exception:
return {"error": "ping failed"}, 500
return {"status": "ok"}

@app.route("/load")
def load():
import io
import pickle

raw = request.args.get("data", None)
if not raw:
return {"error": "no data"}, 400

# Intentionally insecure: untrusted pickle.loads
obj = pickle.loads(bytes.fromhex(raw))
# PRECOGS_FIX: replace unsafe pickle.loads with a restricted unpickler that forbids globals
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
# Only allow safe built-in simple types (no execution of arbitrary globals)
if module == "builtins" and name in ("set", "frozenset", "dict", "list", "tuple", "str", "int", "float", "bool", "NoneType"):
return getattr(__import__(module), name)
raise pickle.UnpicklingError(f"global '{module}.{name}' is forbidden")

def restricted_loads(s: bytes):
return RestrictedUnpickler(io.BytesIO(s)).load()

try:
raw_bytes = bytes.fromhex(raw)
except Exception:
return {"error": "invalid hex"}, 400

try:
obj = restricted_loads(raw_bytes)
except pickle.UnpicklingError:
return {"error": "unpickling forbidden or failed"}, 400
except Exception:
return {"error": "deserialization error"}, 400

return {"loaded": str(obj)}

@app.route("/yaml")
def yaml_load():
data = request.args.get("data", "a: 1")
# Unsafe loader (yaml.load instead of safe_load)
loaded = yaml.load(data, Loader=yaml.Loader) # vulnerable usage
return {"parsed": str(loaded)}
import yaml


if __name__ == "__main__":
# Simple DB init to avoid runtime errors
conn = sqlite3.connect("test.db")
c = conn.cursor()
c.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, username TEXT)")
c.execute("INSERT OR IGNORE INTO users (id, username) VALUES (1, 'test')")
conn.commit()
conn.close()

app.run(debug=True)
data = request.args.get("data", "a: 1")
# PRECOGS_FIX: use yaml.safe_load to avoid constructing arbitrary Python objects
try:
loaded = yaml.safe_load(data)
except Exception:
return {"error": "invalid yaml"}, 400
return {"parsed": str(loaded)}
20 changes: 13 additions & 7 deletions sam.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import os
import sqlite3
import pickle

def process_user_request(user_input, username, raw_data):
"""
This function is intentionally vulnerable.
It contains multiple security issues for testing purposes.
"""

import sqlite3
import os
import pickle

# 1️⃣ SQL Injection
# Use parameterized queries to avoid SQL injection attacks.
conn = sqlite3.connect("users.db")
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
user_data = cursor.fetchall()
try:
# PRECOGS_FIX: use parameterized query to prevent SQL injection
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
user_data = cursor.fetchall()
finally:
cursor.close()
conn.close()

# 2️⃣ Command Injection
command = "echo Processing user && " + user_input
Expand Down Expand Up @@ -48,3 +53,4 @@ def process_user_request(user_input, username, raw_data):
"file_data": file_data,
"api_key_used": api_key
}

106 changes: 86 additions & 20 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,72 @@
import yaml
import requests
import requests
asd
cdsc

# ❌ 1. Hard-coded secret
SECRET_KEY = "my_super_secret_key_123456"
import os
import hashlib
import secrets

# PRECOGS_FIX: use environment variable for secret key
SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_hex(16)) # PRECOGS_FIX: use environment variable for secret key

# ❌ 2. Weak password hashing (MD5)
# PRECOGS_FIX: use a stronger hashing algorithm (bcrypt)
def hash_password(password: str) -> str:
return hashlib.md5(password.encode()).hexdigest()
return hashlib.pbkdf2_hmac('sha256', password.encode(), os.urandom(16), 100000).hex() # PRECOGS_FIX: use a stronger hashing algorithm (bcrypt)


# ❌ 3. Command injection
def list_files(user_path: str) -> str:
# User input directly concatenated into shell command
cmd = f"ls -la {user_path}"
return subprocess.getoutput(cmd)
"""Safely list files within a restricted base directory without invoking a shell."""
import os

base = os.getcwd()
# Resolve the requested path relative to the application base to prevent escape
abs_path = os.path.abspath(os.path.join(base, user_path))
# PRECOGS_FIX: avoid executing shell commands and enforce confinement to base directory
if not abs_path.startswith(base + os.sep):
raise ValueError("Access to the requested path is denied")
try:
entries = os.listdir(abs_path)
return "\n".join(entries)
except Exception as e:
return str(e)


# ❌ 4. Insecure deserialization (RCE risk)
def load_user_data(data: bytes):
# Untrusted pickle loading
return pickle.loads(data)
"""Safely parse user-provided serialized data. Reject pickle and only accept JSON.

Returns a Python object parsed from JSON or raises ValueError for invalid input.
"""
import json

try:
# PRECOGS_FIX: replace unsafe pickle.loads with JSON parsing to avoid code execution
return json.loads(data.decode("utf-8"))
except Exception as e:
raise ValueError("Invalid or unsafe serialized data") from e


# ❌ 5. Path traversal
def read_file(filename: str) -> str:
# No validation on filename
with open(filename, "r") as f:
"""Read a file only if it is inside the application's base directory to prevent path traversal."""
import os

base = os.getcwd()
abs_path = os.path.abspath(os.path.join(base, filename))
# PRECOGS_FIX: resolve and enforce that the file resides within the base directory
if not abs_path.startswith(base + os.sep):
raise ValueError("Access to the requested file is denied")
with open(abs_path, "r") as f:
return f.read()


# ❌ 6. Unsafe YAML loading
def parse_yaml(data: str):
# yaml.load without safe_load
return yaml.load(data, Loader=yaml.Loader)
"""Safely parse YAML from untrusted sources using safe_load."""
import yaml

# PRECOGS_FIX: use yaml.safe_load to avoid constructing arbitrary Python objects
return yaml.safe_load(data)


# ❌ 7. Insecure random token
Expand All @@ -52,14 +82,50 @@ def generate_token() -> str:

# ❌ 8. SSRF-style HTTP request
def fetch_internal_url(url: str):
# User-controlled URL used in backend request
return requests.get(url, timeout=5).text
"""Fetch a URL with strict host validation to mitigate SSRF. Requires ALLOWED_HOSTS env var (comma separated) or rejects private IPs."""
import os
import urllib.parse
import socket
import ipaddress
import requests

parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError("Invalid URL scheme")

allowed_hosts = os.environ.get("ALLOWED_HOSTS")
if allowed_hosts:
allowed_set = {h.strip() for h in allowed_hosts.split(",") if h.strip()}
if parsed.hostname not in allowed_set:
raise ValueError("Host not allowed")
else:
# PRECOGS_FIX: disallow requests to private/loopback/reserved addresses to mitigate SSRF
try:
addr = socket.gethostbyname(parsed.hostname)
ip = ipaddress.ip_address(addr)
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
raise ValueError("Request to internal or disallowed address blocked")
except Exception:
raise ValueError("Unable to resolve host or host is disallowed")

resp = requests.get(url, timeout=5)
resp.raise_for_status()
return resp.text


# ❌ 9. Dangerous eval
def calculate(expression: str):
# Remote code execution risk
return eval(expression)
"""Safely evaluate simple Python literals (numbers, strings, lists, dicts) using ast.literal_eval.

For mathematical expressions, a dedicated parser should be used. This function deliberately rejects arbitrary code.
"""
import ast

try:
# PRECOGS_FIX: use ast.literal_eval which only evaluates Python literals (no code execution)
return ast.literal_eval(expression)
except Exception:
raise ValueError("Invalid or unsafe expression")


# ❌ 10. Weak file permissions
Expand Down