Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/hexgraph/engine/targets/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ def record_manifest(firmware: Target, *, method: str, root_rel: str, files: list
"child_target_id": f.get("child_target_id"),
# F07: keep the container-format tag so packed_containers() can flag un-recursed
# nested filesystems (omitted entirely for ordinary files, so the manifest stays lean).
**({"container": f["container"]} if f.get("container") else {})}
**({"container": f["container"]} if f.get("container") else {}),
# F08: this path is byte-identical to (and reuses the target of) an earlier ELF — the
# row was deduped, not cloned. Present only on the duplicate paths.
**({"dedup_of": f["dedup_of"]} if f.get("dedup_of") else {})}
for f in files
],
}
Expand Down
23 changes: 18 additions & 5 deletions src/hexgraph/engine/targets/reveal.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ def reveal_dir(session: Session, project_id: str, firmware_target_id: str, prefi

norm = (prefix or "").strip().strip("/")
project = session.get(Project, project_id)

def _under(rel: str) -> bool:
# Match the dir prefix: the whole tree ("" matches all), an exact dir
# ("usr/sbin" matches "usr/sbin/telnetd"), or an exact file path. Avoid a
# bare substring match ("usr/sb" must NOT match "usr/sbnet/x").
rel = (rel or "").strip("/")
return norm == "" or rel == norm or rel.startswith(norm + "/")

# F08: a binary deduped to a shared target has no row of its own at its alternate path(s) —
# only a `dedup_of` ref in the manifest. Build the path→target map from the manifest so
# revealing a directory still reveals every target that lives under it, including via a deduped
# path whose keeper's own name sits in a different directory.
fs = (fw.metadata_json or {}).get("filesystem") or {}
ids_under_prefix = {
f.get("child_target_id") for f in fs.get("files", [])
if f.get("child_target_id") and _under(f.get("rel"))
}
children = (
session.query(Target)
.filter(Target.project_id == project_id, Target.parent_id == firmware_target_id)
Expand All @@ -101,11 +118,7 @@ def reveal_dir(session: Session, project_id: str, firmware_target_id: str, prefi
for c in children:
if c.visible:
continue
name = (c.name or "").strip("/")
# Match the dir prefix: the whole tree ("" matches all), an exact dir
# ("usr/sbin" matches "usr/sbin/telnetd"), or an exact file path. Avoid a
# bare substring match ("usr/sb" must NOT match "usr/sbnet/x").
if norm == "" or name == norm or name.startswith(norm + "/"):
if c.id in ids_under_prefix or _under(c.name): # any manifest path under prefix, or its own name
c.visible = True
_materialize_on_reveal(session, project, c)
revealed_ids.append(c.id)
Expand Down
16 changes: 16 additions & 0 deletions src/hexgraph/engine/targets/unpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from hexgraph.engine.graph.edges import add_edge
from hexgraph.engine.targets.filesystem import persistent_base, record_manifest
from hexgraph.engine.targets.ingest import ingest_file
from hexgraph.engine.targets.targets import file_sha256
from hexgraph.sandbox.executor import Executor, get_executor


Expand Down Expand Up @@ -45,6 +46,12 @@ def unpack_firmware(
root = base / root_rel
files = manifest.get("files", [])

# F08: a firmware re-packs the SAME binary at several paths (the FIT inner image == the top-level
# cpio, a busybox hard-link farm, a package shipped in two layers). Register each unique-bytes ELF
# ONCE and point every later byte-identical path at that same target (a `dedup_of` ref in the
# manifest) — otherwise one image's hundreds of duplicates each mint a hidden target + contains
# edge, doubling the graph for no information. (merge_duplicates folds any that slip through.)
seen_sha: dict[str, str] = {}
for entry in files:
if not entry.get("is_elf"):
continue
Expand All @@ -54,6 +61,14 @@ def unpack_firmware(
if not host_path.is_file():
continue

digest = file_sha256(str(host_path))
keeper = seen_sha.get(digest)
if keeper is not None:
# byte-identical to an already-registered sibling — reuse it, don't clone the row/edge.
entry["child_target_id"] = keeper
entry["dedup_of"] = keeper
continue
Comment thread
branover marked this conversation as resolved.

child = ingest_file(session, project, host_path, name=entry["rel"], parent=parent, visible=False)
add_edge(
session, project_id=project.id,
Expand All @@ -62,6 +77,7 @@ def unpack_firmware(
created_by_tool="unpack", attrs={"path": entry["rel"]},
Comment thread
branover marked this conversation as resolved.
)
entry["child_target_id"] = child.id
seen_sha[digest] = child.id
children.append(child)

if parent.kind != TargetKind.firmware_image:
Expand Down
75 changes: 75 additions & 0 deletions tests/test_unpack_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""F08: a firmware that re-packs the SAME binary at several paths (a FIT inner image == the
top-level cpio, a busybox hard-link farm, a package shipped in two layers) must register the bytes
ONCE — not mint a duplicate hidden target per path. unpack_firmware dedups byte-identical extracted
ELFs by sha256 and records a `dedup_of` ref on the duplicate manifest paths. No-Docker fake executor.
"""

from pathlib import Path

from hexgraph.db.models import Target
from hexgraph.db.session import session_scope
from hexgraph.engine.targets.ingest import create_project, ingest_file
from hexgraph.engine.targets.unpack import unpack_firmware

ELF_DUP = b"\x7fELF\x01\x01\x01" + b"the very same bytes, packed at two paths" + b"\x00" * 8
ELF_UNIQUE = b"\x7fELF\x01\x01\x01" + b"a different, unique binary" + b"\x00" * 8


class _FakeDupExecutor:
"""Lays the SAME ELF at two paths + a distinct third (binwalk-style, root == /out)."""

def run_json_probe(self, probe, artifact, *, outdir=None, **kw):
assert probe == "unpack_probe.py"
out = Path(outdir)
for rel, data in (("boot/svc", ELF_DUP), ("pkg/svc", ELF_DUP), ("bin/busybox", ELF_UNIQUE)):
(out / rel).parent.mkdir(parents=True, exist_ok=True)
(out / rel).write_bytes(data)
return {
"method": "fake", "root": "/out",
"files": [
{"rel": "boot/svc", "container_path": "/out/boot/svc", "size": len(ELF_DUP), "is_elf": True},
{"rel": "pkg/svc", "container_path": "/out/pkg/svc", "size": len(ELF_DUP), "is_elf": True},
{"rel": "bin/busybox", "container_path": "/out/bin/busybox", "size": len(ELF_UNIQUE), "is_elf": True},
],
}


def test_byte_identical_children_are_deduped(hg_home, tmp_path):
fw_src = tmp_path / "firmware.bin"
fw_src.write_bytes(b"FAKEFW" + b"\x00" * 64)
with session_scope() as session:
project = create_project(session, name="fw")
firmware = ingest_file(session, project, fw_src, name="firmware.bin")
children = unpack_firmware(session, project, firmware, runner=_FakeDupExecutor())

# the two byte-identical paths collapse to ONE child; busybox is its own -> 2 children, not 3.
assert len(children) == 2
elf_targets = session.query(Target).filter(Target.parent_id == firmware.id).all()
assert len(elf_targets) == 2 # no duplicate row was minted

files = {f["rel"]: f for f in firmware.metadata_json["filesystem"]["files"]}
keeper_id = files["boot/svc"]["child_target_id"] # first occurrence keeps the row
assert files["pkg/svc"]["child_target_id"] == keeper_id # the dup path reuses that target
assert files["pkg/svc"]["dedup_of"] == keeper_id # ...and is flagged as a dedup
assert "dedup_of" not in files["boot/svc"] # the keeper is not a dup
assert "dedup_of" not in files["bin/busybox"] # nor is the distinct binary


def test_reveal_dir_finds_a_binary_via_its_deduped_path(hg_home, tmp_path):
# F08 regression guard: the shared binary's keeper is named "boot/svc", but it also lives at
# "pkg/svc" (deduped, no row of its own). Revealing the "pkg" directory must still reveal it —
# reveal_dir consults the manifest path map, not just live Target.name.
from hexgraph.engine.targets.reveal import reveal_dir

fw_src = tmp_path / "firmware.bin"
fw_src.write_bytes(b"FAKEFW" + b"\x00" * 64)
with session_scope() as session:
project = create_project(session, name="fw")
firmware = ingest_file(session, project, fw_src, name="firmware.bin")
unpack_firmware(session, project, firmware, runner=_FakeDupExecutor())

files = {f["rel"]: f for f in firmware.metadata_json["filesystem"]["files"]}
keeper_id = files["boot/svc"]["child_target_id"]
res = reveal_dir(session, project.id, firmware.id, "pkg") # the dir only the DEDUPED path is in
assert res["revealed"] == 1 and res["target_ids"] == [keeper_id]
assert session.get(Target, keeper_id).visible is True
Loading