diff --git a/libs/openant-core/core/parser_adapter.py b/libs/openant-core/core/parser_adapter.py index ef2f845..aa04b8a 100644 --- a/libs/openant-core/core/parser_adapter.py +++ b/libs/openant-core/core/parser_adapter.py @@ -79,7 +79,7 @@ def parse_repository( skip_tests: bool = True, name: str = None, diff_manifest: str | None = None, - fresh: bool = False, + library_mode: bool = False, ) -> ParseResult: """Parse a repository into an OpenAnt dataset. @@ -93,9 +93,6 @@ def parse_repository( processing_level: "all", "reachable", "codeql", or "exploitable". skip_tests: If True, exclude test files from parsing (default: True). name: Dataset name override (default: derived from repo path basename). - fresh: If True, delete existing dataset.json before parsing so all - units are regenerated from scratch. Only dataset.json is deleted; - other artifacts in output_dir (e.g. analyzer outputs) are preserved. Returns: ParseResult with paths to generated files and stats. @@ -108,18 +105,6 @@ def parse_repository( output_dir = os.path.abspath(output_dir) os.makedirs(output_dir, exist_ok=True) - if fresh: - dataset_path = os.path.join(output_dir, "dataset.json") - # Use try/except instead of exists()+remove() to avoid a TOCTOU race - # if a concurrent --fresh run removes the file between the two calls. - # Only dataset.json is deleted; other artifacts (analyzer outputs, etc.) - # in output_dir are preserved. - try: - os.remove(dataset_path) - print("[Parser] --fresh: deleted existing dataset.json", file=sys.stderr) - except FileNotFoundError: - pass - # Detect language if auto if language == "auto": language = detect_language(repo_path) @@ -127,7 +112,7 @@ def parse_repository( # Dispatch to the right parser if language == "python": - result = _parse_python(repo_path, output_dir, processing_level, skip_tests, name) + result = _parse_python(repo_path, output_dir, processing_level, skip_tests, name, library_mode) elif language == "javascript": result = _parse_javascript(repo_path, output_dir, processing_level, skip_tests, name) elif language == "go": @@ -207,11 +192,34 @@ def _maybe_apply_diff_filter( # Reachability filter (shared by Python path; JS/Go handle it internally) # --------------------------------------------------------------------------- +def _library_seed_ids(functions: dict) -> "set[str]": + """Public-API seed set for library-mode reachability. + + A pure library exposes no main/route/CLI entry point, so the structural + detector finds nothing and the whole library is filtered out (0 reachable). + In library-mode the *public surface* IS the entry surface: seed every + exported/public function and let the forward BFS pull in its callees. + + Public = exported AND not name-private. ``is_exported`` is honoured when the + parser provides it (C/Go/JS — excludes ``static``/unexported); for parsers + without the field (python/ruby/php) it defaults True and the name heuristic + (leading underscore = private) decides. The bias is intentionally toward + over-seeding (more reachable = more analysed), never under-seeding. + """ + seeds: set[str] = set() + for func_id, fd in functions.items(): + name = (fd.get("name") or func_id.rsplit(":", 1)[-1]).split(".")[-1] + if fd.get("is_exported", True) and not name.startswith("_"): + seeds.add(func_id) + return seeds + + def apply_reachability_filter( dataset: dict, output_dir: str, processing_level: str, extra_entry_points: "set[str] | None" = None, + library_mode: bool = False, ) -> dict: """Filter dataset units to only those reachable from entry points. @@ -277,6 +285,11 @@ def _load_module(name, filename): entry_points = detector.detect_entry_points() if extra_entry_points: entry_points = entry_points | extra_entry_points + # Library-mode (opt-in): the public API is the entry surface. Union-only — + # never demotes a structurally-detected app entry point, so an app scan with + # the flag on can only gain reachable units, never lose one. + if library_mode: + entry_points = entry_points | _library_seed_ids(functions) units = dataset.get("units", []) original_count = len(units) @@ -374,7 +387,7 @@ def _load_module(name, filename): # Python parser # --------------------------------------------------------------------------- -def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_tests: bool = True, name: str = None) -> ParseResult: +def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_tests: bool = True, name: str = None, library_mode: bool = False) -> ParseResult: """Invoke the Python parser. The Python parser has a clean `parse_repository()` function that we can @@ -402,7 +415,8 @@ def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_t # Apply reachability filter if processing_level requires it if processing_level != "all": - dataset = _apply_reachability_filter(dataset, output_dir, processing_level) + dataset = _apply_reachability_filter(dataset, output_dir, processing_level, + library_mode=library_mode) # Write outputs write_json(dataset_path, dataset) diff --git a/libs/openant-core/tests/test_library_mode_reachability.py b/libs/openant-core/tests/test_library_mode_reachability.py new file mode 100644 index 0000000..6db8a16 --- /dev/null +++ b/libs/openant-core/tests/test_library_mode_reachability.py @@ -0,0 +1,122 @@ +"""Library-mode reachability seeding (BUG-005). + +A pure library exposes no main/route/CLI entry point, so the structural detector +finds nothing and `apply_reachability_filter` drops EVERY unit — the library +(including any vulnerable sink it contains) is never analysed. Library-mode seeds +the public API surface so the forward BFS pulls in the rest. + +These tests pin: (1) the mode-OFF baseline, (2) the public API becomes +reachable when ON (and its private callee comes along via the call edge), (3) a +truly-unreferenced private function stays out, and — adversarially — (4) turning +the mode ON for an APP can only ADD reachable units, never remove one (union-only +seed merge), so existing app scans are never degraded. + +NOTE: stacked on PR #75. On master a no-entry-point library blacks out (0 units), +which is the bug this PR fixes. PR #75's zero-seed fallback already prevents that +blackout — bluntly — by returning ALL units unfiltered when no entry point is +detected. So the mode-OFF baseline here is "all units unfiltered" (#75), and +library-mode ON refines it to the precise public-API-reachable subset. +""" + +import json +import sys +from pathlib import Path + +_CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(_CORE_ROOT)) + +from core.parser_adapter import apply_reachability_filter + + +def _run(tmp_path, functions, call_graph, *, library_mode, entry_types=None): + """Write a call_graph.json + dataset and run the filter; return kept unit ids.""" + entry_types = entry_types or {} + reverse = {} + for caller, callees in call_graph.items(): + for callee in callees: + reverse.setdefault(callee, []).append(caller) + # functions carry name (+ optional unit_type to trip the structural detector) + fns = {fid: {"name": fid.split(":")[-1].split(".")[-1], + "unit_type": entry_types.get(fid, "function")} for fid in functions} + (tmp_path / "call_graph.json").write_text(json.dumps( + {"functions": fns, "call_graph": call_graph, "reverse_call_graph": reverse})) + dataset = {"units": [{"id": fid, "unit_type": entry_types.get(fid, "function")} + for fid in functions]} + out = apply_reachability_filter(dataset, str(tmp_path), "reachable", + library_mode=library_mode) + return {u["id"] for u in out["units"]} + + +# library: public_api() -> _sink() (no structural entry point) +_LIB_FNS = ["lib.py:public_api", "lib.py:_sink"] +_LIB_CG = {"lib.py:public_api": ["lib.py:_sink"]} + + +def test_library_mode_off_returns_all_unfiltered(tmp_path): + """Mode off (stacked on #75): a no-entry-point library is NOT blacked out — + #75's zero-seed fallback returns all units unfiltered. Library-mode ON refines + this to the public-API-reachable subset (see precision test below).""" + kept = _run(tmp_path, _LIB_FNS, _LIB_CG, library_mode=False) + assert kept == set(_LIB_FNS), f"expected #75 all-unfiltered fallback, got {kept}" + + +def test_library_public_api_reachable_when_mode_on(tmp_path): + """Mode on: the public API is seeded, and its private callee comes along.""" + kept = _run(tmp_path, _LIB_FNS, _LIB_CG, library_mode=True) + assert "lib.py:public_api" in kept, f"public API not seeded: {kept}" + assert "lib.py:_sink" in kept, f"private callee of the public API not reached: {kept}" + + +def test_unreferenced_private_stays_out(tmp_path): + """Precision: a private function nothing calls is NOT seeded (only the public + surface is) — so library-mode doesn't blanket-seed every unit.""" + fns = _LIB_FNS + ["lib.py:_orphan"] + kept = _run(tmp_path, fns, _LIB_CG, library_mode=True) + assert "lib.py:_orphan" not in kept, f"unreferenced private wrongly seeded: {kept}" + + +# app: main() is a route_handler entry; helper() is its callee; _dead() is unreferenced +_APP_FNS = ["app.py:main", "app.py:helper", "app.py:_dead"] +_APP_CG = {"app.py:main": ["app.py:helper"]} +_APP_ENTRY = {"app.py:main": "route_handler"} + + +def test_app_baseline_mode_off(tmp_path): + """App with a real entry point: normal reachable set when mode off.""" + kept = _run(tmp_path, _APP_FNS, _APP_CG, library_mode=False, entry_types=_APP_ENTRY) + assert kept == {"app.py:main", "app.py:helper"}, f"app baseline changed: {kept}" + + +def test_app_mode_on_is_additive_only(tmp_path): + """Adversarial: turning library-mode ON for an app can only ADD reachable units + (union-only seed merge) — it must never drop one the app scan already had.""" + off = _run(tmp_path, _APP_FNS, _APP_CG, library_mode=False, entry_types=_APP_ENTRY) + on = _run(tmp_path, _APP_FNS, _APP_CG, library_mode=True, entry_types=_APP_ENTRY) + assert off <= on, f"library-mode REMOVED app units: off={off} on={on}" + assert off == {"app.py:main", "app.py:helper"} + + +def test_parse_repository_wiring(tmp_path): + """Integration guard: library_mode must flow parse_repository -> _parse_python -> + apply_reachability_filter. (A unit test on the filter alone missed a wiring bug + where `_parse_python` referenced library_mode before it was threaded.)""" + from core.parser_adapter import parse_repository + repo = tmp_path / "repo"; repo.mkdir() + (repo / "lib.py").write_text( + "def public_api(x):\n return _sink(x)\n\ndef _sink(x):\n return eval(x)\n") + import json as _json + + def _kept(library_mode): + out = tmp_path / f"out_{library_mode}"; out.mkdir() + parse_repository(repo_path=str(repo), output_dir=str(out), language="python", + processing_level="reachable", library_mode=library_mode) + ds = _json.loads((out / "dataset.json").read_text()) + return {u.get("id") for u in ds.get("units", [])} + + # Stacked on #75: mode off returns all units unfiltered (zero-seed fallback), + # not a blackout. Mode on refines to the public-API-reachable subset. + assert _kept(False) == {"lib.py:public_api", "lib.py:_sink"}, \ + "mode off: expected #75 all-unfiltered fallback" + on = _kept(True) + assert any(i.endswith(":public_api") for i in on), f"public api not analysed: {on}" + assert any(i.endswith(":_sink") for i in on), f"eval sink not analysed: {on}"