diff --git a/src/hexgraph/engine/targets/filesystem.py b/src/hexgraph/engine/targets/filesystem.py index ff03eb5..2eebeae 100644 --- a/src/hexgraph/engine/targets/filesystem.py +++ b/src/hexgraph/engine/targets/filesystem.py @@ -33,7 +33,10 @@ def record_manifest(firmware: Target, *, method: str, root_rel: str, files: list "child_target_id": f.get("child_target_id"), # F07: keep the container-format tag so packed_containers() can flag un-recursed # nested filesystems (omitted entirely for ordinary files, so the manifest stays lean). - **({"container": f["container"]} if f.get("container") else {})} + **({"container": f["container"]} if f.get("container") else {}), + # F08: this path is byte-identical to (and reuses the target of) an earlier ELF — the + # row was deduped, not cloned. Present only on the duplicate paths. + **({"dedup_of": f["dedup_of"]} if f.get("dedup_of") else {})} for f in files ], } diff --git a/src/hexgraph/engine/targets/reveal.py b/src/hexgraph/engine/targets/reveal.py index 5dce93a..ce05af3 100644 --- a/src/hexgraph/engine/targets/reveal.py +++ b/src/hexgraph/engine/targets/reveal.py @@ -92,6 +92,23 @@ def reveal_dir(session: Session, project_id: str, firmware_target_id: str, prefi norm = (prefix or "").strip().strip("/") project = session.get(Project, project_id) + + def _under(rel: str) -> bool: + # Match the dir prefix: the whole tree ("" matches all), an exact dir + # ("usr/sbin" matches "usr/sbin/telnetd"), or an exact file path. Avoid a + # bare substring match ("usr/sb" must NOT match "usr/sbnet/x"). + rel = (rel or "").strip("/") + return norm == "" or rel == norm or rel.startswith(norm + "/") + + # F08: a binary deduped to a shared target has no row of its own at its alternate path(s) — + # only a `dedup_of` ref in the manifest. Build the path→target map from the manifest so + # revealing a directory still reveals every target that lives under it, including via a deduped + # path whose keeper's own name sits in a different directory. + fs = (fw.metadata_json or {}).get("filesystem") or {} + ids_under_prefix = { + f.get("child_target_id") for f in fs.get("files", []) + if f.get("child_target_id") and _under(f.get("rel")) + } children = ( session.query(Target) .filter(Target.project_id == project_id, Target.parent_id == firmware_target_id) @@ -101,11 +118,7 @@ def reveal_dir(session: Session, project_id: str, firmware_target_id: str, prefi for c in children: if c.visible: continue - name = (c.name or "").strip("/") - # Match the dir prefix: the whole tree ("" matches all), an exact dir - # ("usr/sbin" matches "usr/sbin/telnetd"), or an exact file path. Avoid a - # bare substring match ("usr/sb" must NOT match "usr/sbnet/x"). - if norm == "" or name == norm or name.startswith(norm + "/"): + if c.id in ids_under_prefix or _under(c.name): # any manifest path under prefix, or its own name c.visible = True _materialize_on_reveal(session, project, c) revealed_ids.append(c.id) diff --git a/src/hexgraph/engine/targets/unpack.py b/src/hexgraph/engine/targets/unpack.py index e0bb3a8..0148233 100644 --- a/src/hexgraph/engine/targets/unpack.py +++ b/src/hexgraph/engine/targets/unpack.py @@ -13,6 +13,7 @@ from hexgraph.engine.graph.edges import add_edge from hexgraph.engine.targets.filesystem import persistent_base, record_manifest from hexgraph.engine.targets.ingest import ingest_file +from hexgraph.engine.targets.targets import file_sha256 from hexgraph.sandbox.executor import Executor, get_executor @@ -45,6 +46,12 @@ def unpack_firmware( root = base / root_rel files = manifest.get("files", []) + # F08: a firmware re-packs the SAME binary at several paths (the FIT inner image == the top-level + # cpio, a busybox hard-link farm, a package shipped in two layers). Register each unique-bytes ELF + # ONCE and point every later byte-identical path at that same target (a `dedup_of` ref in the + # manifest) — otherwise one image's hundreds of duplicates each mint a hidden target + contains + # edge, doubling the graph for no information. (merge_duplicates folds any that slip through.) + seen_sha: dict[str, str] = {} for entry in files: if not entry.get("is_elf"): continue @@ -54,6 +61,14 @@ def unpack_firmware( if not host_path.is_file(): continue + digest = file_sha256(str(host_path)) + keeper = seen_sha.get(digest) + if keeper is not None: + # byte-identical to an already-registered sibling — reuse it, don't clone the row/edge. + entry["child_target_id"] = keeper + entry["dedup_of"] = keeper + continue + child = ingest_file(session, project, host_path, name=entry["rel"], parent=parent, visible=False) add_edge( session, project_id=project.id, @@ -62,6 +77,7 @@ def unpack_firmware( created_by_tool="unpack", attrs={"path": entry["rel"]}, ) entry["child_target_id"] = child.id + seen_sha[digest] = child.id children.append(child) if parent.kind != TargetKind.firmware_image: diff --git a/tests/test_unpack_dedup.py b/tests/test_unpack_dedup.py new file mode 100644 index 0000000..3a2eccb --- /dev/null +++ b/tests/test_unpack_dedup.py @@ -0,0 +1,75 @@ +"""F08: a firmware that re-packs the SAME binary at several paths (a FIT inner image == the +top-level cpio, a busybox hard-link farm, a package shipped in two layers) must register the bytes +ONCE — not mint a duplicate hidden target per path. unpack_firmware dedups byte-identical extracted +ELFs by sha256 and records a `dedup_of` ref on the duplicate manifest paths. No-Docker fake executor. +""" + +from pathlib import Path + +from hexgraph.db.models import Target +from hexgraph.db.session import session_scope +from hexgraph.engine.targets.ingest import create_project, ingest_file +from hexgraph.engine.targets.unpack import unpack_firmware + +ELF_DUP = b"\x7fELF\x01\x01\x01" + b"the very same bytes, packed at two paths" + b"\x00" * 8 +ELF_UNIQUE = b"\x7fELF\x01\x01\x01" + b"a different, unique binary" + b"\x00" * 8 + + +class _FakeDupExecutor: + """Lays the SAME ELF at two paths + a distinct third (binwalk-style, root == /out).""" + + def run_json_probe(self, probe, artifact, *, outdir=None, **kw): + assert probe == "unpack_probe.py" + out = Path(outdir) + for rel, data in (("boot/svc", ELF_DUP), ("pkg/svc", ELF_DUP), ("bin/busybox", ELF_UNIQUE)): + (out / rel).parent.mkdir(parents=True, exist_ok=True) + (out / rel).write_bytes(data) + return { + "method": "fake", "root": "/out", + "files": [ + {"rel": "boot/svc", "container_path": "/out/boot/svc", "size": len(ELF_DUP), "is_elf": True}, + {"rel": "pkg/svc", "container_path": "/out/pkg/svc", "size": len(ELF_DUP), "is_elf": True}, + {"rel": "bin/busybox", "container_path": "/out/bin/busybox", "size": len(ELF_UNIQUE), "is_elf": True}, + ], + } + + +def test_byte_identical_children_are_deduped(hg_home, tmp_path): + fw_src = tmp_path / "firmware.bin" + fw_src.write_bytes(b"FAKEFW" + b"\x00" * 64) + with session_scope() as session: + project = create_project(session, name="fw") + firmware = ingest_file(session, project, fw_src, name="firmware.bin") + children = unpack_firmware(session, project, firmware, runner=_FakeDupExecutor()) + + # the two byte-identical paths collapse to ONE child; busybox is its own -> 2 children, not 3. + assert len(children) == 2 + elf_targets = session.query(Target).filter(Target.parent_id == firmware.id).all() + assert len(elf_targets) == 2 # no duplicate row was minted + + files = {f["rel"]: f for f in firmware.metadata_json["filesystem"]["files"]} + keeper_id = files["boot/svc"]["child_target_id"] # first occurrence keeps the row + assert files["pkg/svc"]["child_target_id"] == keeper_id # the dup path reuses that target + assert files["pkg/svc"]["dedup_of"] == keeper_id # ...and is flagged as a dedup + assert "dedup_of" not in files["boot/svc"] # the keeper is not a dup + assert "dedup_of" not in files["bin/busybox"] # nor is the distinct binary + + +def test_reveal_dir_finds_a_binary_via_its_deduped_path(hg_home, tmp_path): + # F08 regression guard: the shared binary's keeper is named "boot/svc", but it also lives at + # "pkg/svc" (deduped, no row of its own). Revealing the "pkg" directory must still reveal it — + # reveal_dir consults the manifest path map, not just live Target.name. + from hexgraph.engine.targets.reveal import reveal_dir + + fw_src = tmp_path / "firmware.bin" + fw_src.write_bytes(b"FAKEFW" + b"\x00" * 64) + with session_scope() as session: + project = create_project(session, name="fw") + firmware = ingest_file(session, project, fw_src, name="firmware.bin") + unpack_firmware(session, project, firmware, runner=_FakeDupExecutor()) + + files = {f["rel"]: f for f in firmware.metadata_json["filesystem"]["files"]} + keeper_id = files["boot/svc"]["child_target_id"] + res = reveal_dir(session, project.id, firmware.id, "pkg") # the dir only the DEDUPED path is in + assert res["revealed"] == 1 and res["target_ids"] == [keeper_id] + assert session.get(Target, keeper_id).visible is True