From 1e2cb83170c024c423a1b53b57facfc0eaa6645c Mon Sep 17 00:00:00 2001 From: Evgeny Kiriyak <224408464+evkir@users.noreply.github.com> Date: Thu, 28 May 2026 08:37:33 +0300 Subject: [PATCH 1/4] fix(recon): whitelist nmap flags, reject unknown --- cyberai/agents/recon/nmap_tool.py | 47 +++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/cyberai/agents/recon/nmap_tool.py b/cyberai/agents/recon/nmap_tool.py index ceda9cd..99379c8 100644 --- a/cyberai/agents/recon/nmap_tool.py +++ b/cyberai/agents/recon/nmap_tool.py @@ -1,12 +1,55 @@ +import shlex import subprocess -from typing import Dict, Any +from typing import Any, Dict, List + +from cyberai.core.security.input_sanitizer import sanitize_target + +# Whitelist of nmap flags the toolkit is allowed to pass through. +# Anything outside this set is rejected — prevents abuse like +# -oN /etc/cron.d/x, --script=, or arbitrary file writes. +ALLOWED_FLAGS = { + "-sV", "-sC", "-sT", "-sS", "-sU", "-sn", + "-T0", "-T1", "-T2", "-T3", "-T4", "-T5", + "-Pn", "-A", "-O", + "-p", "--top-ports", "-oX", +} + +# Flags that consume the next token as a value (port spec, count, etc.). +_VALUE_FLAGS = {"-p", "--top-ports", "-oX"} + + +def validate_flags(flags: str) -> List[str]: + """Parse a flag string via shlex and reject anything not whitelisted. + + Returns the validated token list. Raises ValueError on the first + unknown flag so a malicious flag string never reaches subprocess. + """ + tokens = shlex.split(flags) + safe: List[str] = [] + i = 0 + while i < len(tokens): + tok = tokens[i] + if tok not in ALLOWED_FLAGS: + raise ValueError(f"Rejected nmap flag: {tok!r}") + safe.append(tok) + if tok in _VALUE_FLAGS and i + 1 < len(tokens): + safe.append(tokens[i + 1]) + i += 1 + i += 1 + return safe def run_nmap(target: str, flags: str = "-sV -T4 --top-ports 1000") -> Dict[str, Any]: """ Run nmap against target, return parsed results. Requires nmap installed on system. """ - cmd = ["nmap", "-oX", "-"] + flags.split() + [target] + safe_target = sanitize_target(target) + try: + safe_flags = validate_flags(flags) + except ValueError as exc: + return {"target": target, "error": f"unsafe nmap flags: {exc}"} + + cmd = ["nmap", "-oX", "-"] + safe_flags + [safe_target] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=120 From 182e583299915898b9a33825b60b2d347951d8fc Mon Sep 17 00:00:00 2001 From: Evgeny Kiriyak <224408464+evkir@users.noreply.github.com> Date: Thu, 28 May 2026 08:38:34 +0300 Subject: [PATCH 2/4] refactor(recon): merge nmap_tool and nmap_wrapper into one --- cyberai/agents/recon/nmap_wrapper.py | 82 ---------------------------- 1 file changed, 82 deletions(-) delete mode 100644 cyberai/agents/recon/nmap_wrapper.py diff --git a/cyberai/agents/recon/nmap_wrapper.py b/cyberai/agents/recon/nmap_wrapper.py deleted file mode 100644 index 26ee41d..0000000 --- a/cyberai/agents/recon/nmap_wrapper.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -Graceful nmap timeout wrapper for ReconAgent. -Returns partial results instead of crashing on timeout. -""" -import subprocess -import logging - -logger = logging.getLogger("cyberai.recon.nmap_wrapper") - - -def run_nmap_safe( - target: str, - flags: str = "-sV -T4 --top-ports 1000", - timeout: int = 120, -) -> dict: - """ - Run nmap with timeout. On timeout returns partial/empty result - instead of raising — pipeline continues with what we have. - - Returns: - dict with 'ports', 'raw', 'error' (if any), 'timed_out' flag - """ - cmd = ["nmap"] + flags.split() + [target] - logger.info(f"[nmap] running: {' '.join(cmd)} (timeout={timeout}s)") - - result = { - "target": target, - "ports": [], - "services": {}, - "raw": "", - "timed_out": False, - "error": None, - } - - try: - proc = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=timeout, - ) - result["raw"] = proc.stdout - result["ports"] = _parse_ports(proc.stdout) - result["services"] = _parse_services(proc.stdout) - - except subprocess.TimeoutExpired: - logger.warning(f"[nmap] timeout after {timeout}s for {target} — returning partial") - result["timed_out"] = True - result["error"] = f"nmap timed out after {timeout}s" - - except FileNotFoundError: - logger.error("[nmap] nmap not found — install with: apt install nmap") - result["error"] = "nmap binary not found" - - except Exception as e: - logger.error(f"[nmap] unexpected error: {e}") - result["error"] = str(e) - - return result - - -def _parse_ports(output: str) -> list[int]: - ports = [] - for line in output.splitlines(): - if "/tcp" in line and "open" in line: - try: - ports.append(int(line.split("/")[0].strip())) - except ValueError: - pass - return ports - - -def _parse_services(output: str) -> dict[str, str]: - services = {} - for line in output.splitlines(): - if "/tcp" in line and "open" in line: - parts = line.split() - if len(parts) >= 3: - port = parts[0].split("/")[0] - service = parts[2] if len(parts) > 2 else "unknown" - services[port] = service - return services From 2bab94c46cc1db9ba3b6383ffc01e8cc8f670037 Mon Sep 17 00:00:00 2001 From: Evgeny Kiriyak <224408464+evkir@users.noreply.github.com> Date: Thu, 28 May 2026 08:39:39 +0300 Subject: [PATCH 3/4] feat(recon): nmap result caching by target+flags hash --- cyberai/agents/recon/nmap_tool.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/cyberai/agents/recon/nmap_tool.py b/cyberai/agents/recon/nmap_tool.py index 99379c8..014d1e4 100644 --- a/cyberai/agents/recon/nmap_tool.py +++ b/cyberai/agents/recon/nmap_tool.py @@ -3,6 +3,8 @@ from typing import Any, Dict, List from cyberai.core.security.input_sanitizer import sanitize_target +from cyberai.core.cache import FileCache +from pathlib import Path # Whitelist of nmap flags the toolkit is allowed to pass through. # Anything outside this set is rejected — prevents abuse like @@ -17,6 +19,18 @@ # Flags that consume the next token as a value (port spec, count, etc.). _VALUE_FLAGS = {"-p", "--top-ports", "-oX"} +# Dedicated 1-hour cache for nmap results, keyed by target+flags. +# Avoids re-scanning the same target repeatedly within a session. +NMAP_CACHE_TTL = 3600 # 1 hour +_nmap_cache = FileCache( + cache_dir=Path.home() / ".cyberai" / "nmap-cache", + ttl=NMAP_CACHE_TTL, +) + + +def _cache_key(target: str, flags: str) -> str: + return f"nmap:{target}:{flags}" + def validate_flags(flags: str) -> List[str]: """Parse a flag string via shlex and reject anything not whitelisted. @@ -49,18 +63,28 @@ def run_nmap(target: str, flags: str = "-sV -T4 --top-ports 1000") -> Dict[str, except ValueError as exc: return {"target": target, "error": f"unsafe nmap flags: {exc}"} + cache_key = _cache_key(safe_target, flags) + cached = _nmap_cache.get(cache_key) + if cached is not None: + cached["cached"] = True + return cached + cmd = ["nmap", "-oX", "-"] + safe_flags + [safe_target] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=120 ) - return { + parsed = { "target": target, "raw": result.stdout, "stderr": result.stderr, "returncode": result.returncode, "ports": _parse_ports(result.stdout), + "cached": False, } + if result.returncode == 0: + _nmap_cache.set(cache_key, parsed) + return parsed except subprocess.TimeoutExpired: return {"target": target, "error": "nmap timeout after 120s"} except FileNotFoundError: From 1d2f53dcec0352496781034eb417f7f7ba40a4c7 Mon Sep 17 00:00:00 2001 From: Evgeny Kiriyak <224408464+evkir@users.noreply.github.com> Date: Thu, 28 May 2026 08:41:27 +0300 Subject: [PATCH 4/4] test(recon): nmap flag whitelist + cache hit/miss --- tests/unit/test_nmap_tool.py | 92 ++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 tests/unit/test_nmap_tool.py diff --git a/tests/unit/test_nmap_tool.py b/tests/unit/test_nmap_tool.py new file mode 100644 index 0000000..afebab7 --- /dev/null +++ b/tests/unit/test_nmap_tool.py @@ -0,0 +1,92 @@ +"""Unit tests for nmap flag whitelist and result caching (day 10).""" +from __future__ import annotations + +from unittest.mock import patch, MagicMock + +import pytest + +from cyberai.agents.recon import nmap_tool +from cyberai.agents.recon.nmap_tool import validate_flags, run_nmap + + +# ── flag whitelist ──────────────────────────────────────────────────── + +def test_allowed_flags_pass(): + assert validate_flags("-sV -T4 --top-ports 1000") == [ + "-sV", "-T4", "--top-ports", "1000" + ] + + +def test_value_flag_keeps_its_argument(): + assert validate_flags("-p 80,443 -sV") == ["-p", "80,443", "-sV"] + + +@pytest.mark.parametrize("bad", [ + "-sV; rm -rf /", + "-oN /etc/cron.d/x", + "--script=http-vuln", + "-sV && curl evil.com", + "--unsafe-flag", +]) +def test_unknown_flags_rejected(bad): + with pytest.raises(ValueError): + validate_flags(bad) + + +def test_run_nmap_rejects_unsafe_flags_gracefully(): + """Unsafe flags must not crash — run_nmap returns an error dict.""" + result = run_nmap("scanme.test", flags="-sV; rm -rf /") + assert "error" in result + assert "unsafe" in result["error"].lower() + + +# ── caching ─────────────────────────────────────────────────────────── + +@pytest.fixture(autouse=True) +def _clean_cache(): + nmap_tool._nmap_cache.clear() + yield + nmap_tool._nmap_cache.clear() + + +def _fake_proc(stdout: str = "", rc: int = 0) -> MagicMock: + proc = MagicMock() + proc.stdout = stdout + proc.stderr = "" + proc.returncode = rc + return proc + + +def test_cache_miss_then_hit(): + """First call runs nmap; second identical call comes from cache.""" + fake = _fake_proc(stdout="", rc=0) + with patch.object(nmap_tool.subprocess, "run", return_value=fake) as m: + first = run_nmap("scanme.test", flags="-sV") + second = run_nmap("scanme.test", flags="-sV") + + assert first["cached"] is False + assert second["cached"] is True + # subprocess.run called only once — second served from cache + assert m.call_count == 1 + + +def test_failed_scan_not_cached(): + """A non-zero return code must not be cached.""" + fake = _fake_proc(stdout="", rc=1) + with patch.object(nmap_tool.subprocess, "run", return_value=fake) as m: + run_nmap("scanme.test", flags="-sV") + run_nmap("scanme.test", flags="-sV") + + # both calls hit subprocess — nothing was cached + assert m.call_count == 2 + + +def test_different_flags_different_cache(): + """Different flags must not collide in the cache.""" + fake = _fake_proc(stdout="", rc=0) + with patch.object(nmap_tool.subprocess, "run", return_value=fake) as m: + run_nmap("scanme.test", flags="-sV") + run_nmap("scanme.test", flags="-sV -Pn") + + # different flag strings -> two real scans + assert m.call_count == 2