From 18956f906b7deb08bb8ee59b4ec285871c26548c Mon Sep 17 00:00:00 2001 From: Psychevus Date: Sat, 26 Jul 2025 06:31:30 +0330 Subject: [PATCH] Add module-level parallel execution --- README.md | 2 + main.py | 85 ++++++++++++++++++++++++++++++++++++------ tests/test_pipeline.py | 34 +++++++++++++++++ 3 files changed, 109 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index dc46dd1..6411c70 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ * Docker container and AWS Lambda compatible * Extensible plugin system for custom modules * Opt-in pipeline mode to feed Subfinder → Httpx → Nuclei → Nmap → TestSSL +* `--parallel` flag to run independent modules concurrently for speed --- @@ -141,6 +142,7 @@ class DirBuster(Module): Run with: ```bash python main.py example.com --tools dirbuster +python main.py example.com --parallel ``` --- diff --git a/main.py b/main.py index b63037b..067a0f9 100644 --- a/main.py +++ b/main.py @@ -85,6 +85,7 @@ def pipeline( use_pipeline: bool = False, *, show_summary: bool = False, + parallel: bool = False, ) -> List[Finding]: findings: List[Finding] = [] @@ -96,11 +97,53 @@ def pipeline( "ssl_issues": 0, } + def _run_host_tool(tool_cls: type[Module], host_list: List[str], name: str) -> List[Finding]: + local: List[Finding] = [] + for host in host_list: + try: + res = tool_cls().run(host) + except Exception as exc: # noqa: BLE001 + logger.error("%s error: %s", name, exc) + continue + local.extend(res) + if name == "nmap": + xml_path = res[0].data.get("xml") + if xml_path: + summary["open_ports"] += _count_nmap_ports(xml_path) + else: + json_path = res[0].data.get("report") + if json_path: + summary["ssl_issues"] += _count_testssl_issues(json_path) + return local + # Data passed between modules hosts: List[str] = [target] urls: List[str] = [] + processed: set[str] = set() + + if parallel and not use_pipeline: + with ThreadPoolExecutor() as pool: + futs = {} + for name in selected: + cls = Module.registry.get(name) + if not cls: + logger.warning("⚠️ Unknown tool '%s' – skipping", name) + continue + futs[pool.submit(cls().run, target)] = (name, cls) + for fut, (name, cls) in futs.items(): + try: + res = fut.result(timeout=getattr(cls, "TIMEOUT", 60)) + findings.extend(res) + except Exception as exc: # noqa: BLE001 + logger.error("%s error: %s", name, exc) + if show_summary: + _print_summary(summary) + return findings + for tool_name in selected: + if tool_name in processed: + continue tool_cls = Module.registry.get(tool_name) if not tool_cls: logger.warning("⚠️ Unknown tool '%s' – skipping", tool_name) @@ -126,17 +169,34 @@ def pipeline( findings.extend(res) summary["nuclei"] += len(res) elif use_pipeline and tool_name in {"nmap", "testssl"}: - for h in hosts: - res = tool_cls().run(h) - findings.extend(res) - if tool_name == "nmap": - xml_path = res[0].data.get("xml") - if xml_path: - summary["open_ports"] += _count_nmap_ports(xml_path) - else: - json_path = res[0].data.get("report") - if json_path: - summary["ssl_issues"] += _count_testssl_issues(json_path) + if parallel and tool_name in {"nmap", "testssl"}: + others = {"nmap", "testssl"} - {tool_name} + batch = [tool_name] + [ + o for o in others if o in selected and o not in processed + ] + processed.update(batch) + with ThreadPoolExecutor() as pool: + futs = { + pool.submit(_run_host_tool, Module.registry[name], hosts, name): name + for name in batch + } + for fut, name in futs.items(): + res = fut.result() + findings.extend(res) + continue + else: + processed.add(tool_name) + for h in hosts: + res = tool_cls().run(h) + findings.extend(res) + if tool_name == "nmap": + xml_path = res[0].data.get("xml") + if xml_path: + summary["open_ports"] += _count_nmap_ports(xml_path) + else: + json_path = res[0].data.get("report") + if json_path: + summary["ssl_issues"] += _count_testssl_issues(json_path) else: findings.extend(tool_cls().run(target)) except Exception as exc: # noqa: BLE001 @@ -226,7 +286,7 @@ def cli() -> None: parser.add_argument( "--parallel", action="store_true", - help="Run scans for multiple targets concurrently", + help="Run scans and independent modules concurrently", ) parser.add_argument( "--no-summary", @@ -254,6 +314,7 @@ def run_one(t: str) -> List[Finding]: args.tools, use_pipeline=args.pipeline, show_summary=not args.no_summary, + parallel=args.parallel, ) prefix = safe_filename_component(t) write_json(res, args.out, prefix=prefix) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index d051b38..0c86e02 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -85,3 +85,37 @@ def run(self, target: str): 'open_ports': 1, 'ssl_issues': 1, } + + +def test_parallel_execution(monkeypatch): + import time + from modules.base import Module, Finding + import importlib + import main + + importlib.reload(main) + monkeypatch.setattr(Module, 'registry', {}) + monkeypatch.setattr(main.Module, 'registry', Module.registry) + + class A(Module): + name = 'a' + TIMEOUT = 1 + + def run(self, target: str): + time.sleep(0.2) + return [Finding(tool=self.name, data={})] + + class B(Module): + name = 'b' + TIMEOUT = 1 + + def run(self, target: str): + time.sleep(0.2) + return [Finding(tool=self.name, data={})] + + start = time.perf_counter() + res = pipeline('example.com', ['a', 'b'], parallel=True) + duration = time.perf_counter() - start + assert len(res) == 2 + assert duration < 0.35 +