diff --git a/.gitignore b/.gitignore index bdb7b94c6..c948f2700 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,4 @@ build/ dist/ .coverage* **/license.json -assets/*_ghidra +**/assets/*_ghidra/ diff --git a/disassemblers/ofrak_angr/CHANGELOG.md b/disassemblers/ofrak_angr/CHANGELOG.md index 0eaf2e74c..4924036fa 100644 --- a/disassemblers/ofrak_angr/CHANGELOG.md +++ b/disassemblers/ofrak_angr/CHANGELOG.md @@ -3,7 +3,13 @@ All notable changes to `ofrak-angr` will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased 1.2.0](https://github.com/redballoonsecurity/ofrak/tree/master) +## [Unreleased 2.0.0](https://github.com/redballoonsecurity/ofrak/tree/master) + +### Added +- Add `AngrAutoLoadProject` / `AngrCustomLoadProject` tags and `AngrCustomLoadAnalyzer` for custom binary loading with `ProgramAttributes` metadata ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) + +### Changed +- **Breaking:** `AngrAnalyzer` now targets `AngrAutoLoadProject` instead of `AngrAnalysisResource`; code that manually tagged resources with `AngrAnalysisResource` should use `AngrAutoLoadProject` or `AngrCustomLoadProject` ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) ### Fixed - Pin Angr dependencies (`networkx` and `msgspec`) ([#676](https://github.com/redballoonsecurity/ofrak/pull/676)) diff --git a/disassemblers/ofrak_angr/setup.py b/disassemblers/ofrak_angr/setup.py index f96bbc18f..40b9e0965 100644 --- a/disassemblers/ofrak_angr/setup.py +++ b/disassemblers/ofrak_angr/setup.py @@ -21,7 +21,7 @@ def run(self): setuptools.setup( name="ofrak_angr", - version="1.1.0", + version="2.0.0rc1", description="OFRAK angr Components", packages=setuptools.find_packages("src"), package_dir={"": "src"}, diff --git a/disassemblers/ofrak_angr/src/ofrak_angr/components/angr_analyzer.py b/disassemblers/ofrak_angr/src/ofrak_angr/components/angr_analyzer.py index 2e8acbf2a..4fc2c0a93 100644 --- a/disassemblers/ofrak_angr/src/ofrak_angr/components/angr_analyzer.py +++ b/disassemblers/ofrak_angr/src/ofrak_angr/components/angr_analyzer.py @@ -1,20 +1,35 @@ import logging from io import BytesIO from dataclasses import dataclass, field -from typing import Any, List, Optional +from typing import Any, List, Optional, Tuple from ofrak.component.analyzer import Analyzer from ofrak.model.component_model import ComponentConfig -from ofrak.model.resource_model import ResourceAttributeDependency from ofrak.resource import Resource +import archinfo import angr.project +from ofrak.core.architecture import ProgramAttributes from ofrak.core.elf.model import Elf, ElfHeader, ElfType -from ofrak_angr.components.identifiers import AngrAnalysisResource -from ofrak_angr.model import AngrAnalysis +from ofrak.core.memory_region import ( + MemoryRegion, + get_effective_memory_permissions, + get_memory_region_permissions, +) +from ofrak_type.architecture import InstructionSet +from ofrak_type.bit_width import BitWidth +from ofrak_type.endianness import Endianness +from ofrak_type.memory_permissions import MemoryPermissions +from ofrak_angr.model import ( + AngrAnalysis, + AngrAnalysisResource, + AngrAutoLoadProject, + AngrCustomLoadProject, +) from ofrak.component.modifier import Modifier from ofrak.core import CodeRegion from ofrak import ResourceFilter +from ofrak_type.error import NotFoundError LOGGER = logging.getLogger(__file__) @@ -32,53 +47,168 @@ class AngrAnalyzerConfig(ComponentConfig): ) +def _run_angr_analysis( + load_data: BytesIO, project_args: dict, config: AngrAnalyzerConfig +) -> AngrAnalysis: + """ + Create an angr project, run CFG analysis, and execute post-analysis hook. + """ + project = angr.project.Project(load_data, load_options=project_args) + cfg = angr.analyses.analysis.AnalysisFactory(project, config.cfg_analyzer)( + **config.cfg_analyzer_args + ) + exec(config.post_cfg_analysis_hook) + return AngrAnalysis(project) + + +_ANGR_ARCH_MAP = { + (InstructionSet.X86, BitWidth.BIT_32): "X86", + (InstructionSet.X86, BitWidth.BIT_64): "AMD64", + (InstructionSet.ARM, BitWidth.BIT_32): "ARMEL", + (InstructionSet.AARCH64, BitWidth.BIT_64): "AARCH64", + (InstructionSet.MIPS, BitWidth.BIT_32): "MIPS32", + (InstructionSet.MIPS, BitWidth.BIT_64): "MIPS64", + (InstructionSet.PPC, BitWidth.BIT_32): "PPC32", + (InstructionSet.PPC, BitWidth.BIT_64): "PPC64", + (InstructionSet.AVR, BitWidth.BIT_16): "AVR8", + (InstructionSet.SPARC, BitWidth.BIT_32): "SPARC32", + (InstructionSet.SPARC, BitWidth.BIT_64): "SPARC64", +} + +_ENDIANNESS_TO_ARCHINFO = { + Endianness.BIG_ENDIAN: archinfo.Endness.BE, + Endianness.LITTLE_ENDIAN: archinfo.Endness.LE, +} + + +def _resolve_angr_arch( + program_attrs: ProgramAttributes, +) -> Optional[archinfo.Arch]: + """ + Resolve ProgramAttributes to an archinfo.Arch with correct endianness. + """ + arch_name = _ANGR_ARCH_MAP.get((program_attrs.isa, program_attrs.bit_width)) + if arch_name is None: + return None + endness = _ENDIANNESS_TO_ARCHINFO.get(program_attrs.endianness) + try: + return archinfo.arch_from_id(arch_name, endness=endness) + except archinfo.ArchNotFound: + raise NotFoundError( + f"angr does not support architecture {program_attrs.isa.name} " + f"{program_attrs.bit_width.value}-bit {program_attrs.endianness.name}" + ) + + class AngrAnalyzer(Analyzer[AngrAnalyzerConfig, AngrAnalysis]): """ - Runs angr's automated binary analysis engine to build control flow graphs (CFG), identify functions, and analyze - program structure. Use for initial comprehensive analysis of binaries with angr. Configurable CFG analyzer and - post-analysis hooks. Creates AngrAnalysis state for other angr components to use. + Runs angr's automated binary analysis engine to build control flow graphs (CFG), identify + functions, and analyze program structure. Use for auto-loadable formats (ELF, PE, Ihex) where + angr can automatically determine the binary format. Creates AngrAnalysis state for other angr + components to use. """ id = b"AngrAnalyzer" - targets = (AngrAnalysisResource,) + targets = (AngrAutoLoadProject,) outputs = (AngrAnalysis,) async def analyze( self, resource: Resource, config: AngrAnalyzerConfig = AngrAnalyzerConfig() ) -> AngrAnalysis: resource_data = await resource.get_data() + return _run_angr_analysis(BytesIO(resource_data), config.project_args, config) + + +class AngrCustomLoadAnalyzer(Analyzer[AngrAnalyzerConfig, AngrAnalysis]): + """ + Runs angr analysis on binaries that angr cannot auto-load (raw blobs, custom formats). + Consumes entry_points and base_address from ProgramAttributes to configure angr's loader. + Use for custom loading scenarios where the binary format is not natively supported by angr. + """ - project = angr.project.Project(BytesIO(resource_data), load_options=config.project_args) + id = b"AngrCustomLoadAnalyzer" + targets = (AngrCustomLoadProject,) + outputs = (AngrAnalysis,) + + async def analyze( + self, resource: Resource, config: AngrAnalyzerConfig = AngrAnalyzerConfig() + ) -> AngrAnalysis: + main_opts: dict = {} + try: + program_attrs = resource.get_attributes(ProgramAttributes) + except NotFoundError: + program_attrs = None - # Let's use angr to perform its own full analysis on the binary, and - # maintain its results for the CR / CB / BB unpackers to re-use - cfg = angr.analyses.analysis.AnalysisFactory(project, config.cfg_analyzer)( - **config.cfg_analyzer_args + if program_attrs is not None: + if program_attrs.entry_points: + # angr's CLE loader only accepts a single entry_point; additional + # entry points are typically discovered by CFGFast's heuristics. + main_opts["entry_point"] = program_attrs.entry_points[0] + if program_attrs.base_address is not None: + main_opts["base_addr"] = program_attrs.base_address + angr_arch = _resolve_angr_arch(program_attrs) + if angr_arch is not None: + main_opts["arch"] = angr_arch + + regions = list( + await resource.get_children_as_view( + MemoryRegion, r_filter=ResourceFilter.with_tags(MemoryRegion) + ) ) - # Run any user-defined analysis here - exec(config.post_cfg_analysis_hook) - - return AngrAnalysis(project) - - def _create_dependencies( - self, - resource: Resource, - resource_dependencies: Optional[List[ResourceAttributeDependency]] = None, - ): - """ - Override - [Analyzer._create_dependencies][ofrak.component.component_analyzer.Analyzer._create_dependencies] - to avoid the creation and tracking of dependencies between the angr analysis, - resource, and attributes. - - Practically speaking, this means that users of angr components should group their - work into three discrete, ordered steps: - - Step 1. Unpacking, Analysis - Step 2. Modification - Step 3. Packing - """ + if regions: + regions.sort(key=lambda r: r.virtual_address) + combined_data = bytearray() + segments = [] + code_regions: List[Tuple[int, int]] = [] + for region in regions: + perms = get_memory_region_permissions(region.resource) + if perms is not None and perms.permissions == MemoryPermissions.NONE: + continue + region_data = await region.resource.get_data() + file_offset = len(combined_data) + vaddr = region.virtual_address + size = region.size + segments.append((file_offset, vaddr, len(region_data))) + combined_data.extend(region_data) + + effective = get_effective_memory_permissions(region.resource) + if effective.value & MemoryPermissions.X.value: + code_regions.append((vaddr, vaddr + size)) + + if not segments: + raise ValueError("No accessible memory regions for analysis") + + if not code_regions: + raise ValueError("No executable memory regions for analysis") + + main_opts["backend"] = "blob" + main_opts["segments"] = segments + if "base_addr" not in main_opts: + main_opts["base_addr"] = segments[0][1] + + load_data = BytesIO(bytes(combined_data)) + else: + code_regions = [] + load_data = BytesIO(await resource.get_data()) + + # User-supplied main_opts take priority over ProgramAttributes values + project_args = dict(config.project_args) + if main_opts: + project_args["main_opts"] = {**main_opts, **project_args.get("main_opts", {})} + + # Restrict CFGFast to executable regions to avoid scanning sparse gaps + if code_regions and "regions" not in config.cfg_analyzer_args: + cfg_args = dict(config.cfg_analyzer_args) + cfg_args["regions"] = code_regions + config = AngrAnalyzerConfig( + cfg_analyzer=config.cfg_analyzer, + cfg_analyzer_args=cfg_args, + project_args=config.project_args, + post_cfg_analysis_hook=config.post_cfg_analysis_hook, + ) + + return _run_angr_analysis(load_data, project_args, config) class AngrCodeRegionModifier(Modifier): diff --git a/disassemblers/ofrak_angr/src/ofrak_angr/components/blocks/unpackers.py b/disassemblers/ofrak_angr/src/ofrak_angr/components/blocks/unpackers.py index a4091dc08..7356bd348 100644 --- a/disassemblers/ofrak_angr/src/ofrak_angr/components/blocks/unpackers.py +++ b/disassemblers/ofrak_angr/src/ofrak_angr/components/blocks/unpackers.py @@ -17,8 +17,7 @@ from ofrak.resource import Resource from ofrak.service.resource_service_i import ResourceFilter from ofrak_angr.components.angr_analyzer import AngrAnalyzerConfig, AngrCodeRegionModifier -from ofrak_angr.components.identifiers import AngrAnalysisResource -from ofrak_angr.model import AngrAnalysis +from ofrak_angr.model import AngrAnalysis, AngrAnalysisResource LOGGER = logging.getLogger(__name__) diff --git a/disassemblers/ofrak_angr/src/ofrak_angr/components/identifiers.py b/disassemblers/ofrak_angr/src/ofrak_angr/components/identifiers.py index 31ee5c948..23bee9d57 100644 --- a/disassemblers/ofrak_angr/src/ofrak_angr/components/identifiers.py +++ b/disassemblers/ofrak_angr/src/ofrak_angr/components/identifiers.py @@ -1,17 +1,27 @@ from ofrak.component.identifier import Identifier +from ofrak.core import Elf, Ihex, Pe from ofrak.core.program import Program from ofrak.resource import Resource -from ofrak_angr.model import AngrAnalysisResource +from ofrak_angr.model import AngrAutoLoadProject, AngrCustomLoadProject + + +_ANGR_AUTO_LOADABLE_FORMATS = [Elf, Ihex, Pe] class AngrAnalysisIdentifier(Identifier): """ - Tags Program resources for angr analysis. Enables angr-based components to run on the resource. Automatically - identifies programs that should be analyzed with angr. + Tags Program resources for angr analysis. Auto-loadable formats (ELF, PE, Ihex) get + AngrAutoLoadProject tag, others get AngrCustomLoadProject. Enables angr-based components + to run on the resource. """ id = b"AngrAnalysisIdentifier" targets = (Program,) async def identify(self, resource: Resource, config=None): - resource.add_tag(AngrAnalysisResource) + for tag in _ANGR_AUTO_LOADABLE_FORMATS: + if resource.has_tag(tag): + resource.add_tag(AngrAutoLoadProject) + return + + resource.add_tag(AngrCustomLoadProject) diff --git a/disassemblers/ofrak_angr/src/ofrak_angr/model.py b/disassemblers/ofrak_angr/src/ofrak_angr/model.py index 3b7726f48..fe988b585 100644 --- a/disassemblers/ofrak_angr/src/ofrak_angr/model.py +++ b/disassemblers/ofrak_angr/src/ofrak_angr/model.py @@ -11,3 +11,11 @@ class AngrAnalysis(ResourceAttributes): class AngrAnalysisResource(ResourceView): pass + + +class AngrAutoLoadProject(AngrAnalysisResource): + pass + + +class AngrCustomLoadProject(AngrAnalysisResource): + pass diff --git a/disassemblers/ofrak_angr/tests/test_unpackers.py b/disassemblers/ofrak_angr/tests/test_unpackers.py index 09f4aae92..15a4e58d7 100755 --- a/disassemblers/ofrak_angr/tests/test_unpackers.py +++ b/disassemblers/ofrak_angr/tests/test_unpackers.py @@ -3,6 +3,7 @@ Requirements Mapping: - REQ1.2 +- REQ2.2 - REQ2.3 """ from typing import Dict @@ -20,10 +21,23 @@ ComplexBlockUnpackerTestCase, ComplexBlockUnpackerUnpackAndVerifyPattern, ) +from pytest_ofrak.patterns.program_metadata import ( + custom_binary_resource, # noqa: F401 + setup_program_flat, + setup_program_with_code_region, + add_rodata_region, + add_distant_rw_region, + assert_complex_block_at_vaddr, +) from ofrak import OFRAKContext from ofrak import ResourceFilter, ResourceAttributeValueFilter from ofrak.model.viewable_tag_model import AttributesType from ofrak.core.addressable import Addressable +from ofrak_angr.components.angr_analyzer import ( + AngrAnalyzerConfig, + AngrCustomLoadAnalyzer, +) +from ofrak_angr.model import AngrAnalysis, AngrCustomLoadProject class TestAngrCodeRegionUnpackAndVerify(CodeRegionUnpackAndVerifyPattern): @@ -195,3 +209,71 @@ async def test_basic_block_no_exit(ofrak_context: OFRAKContext, busybox_resource await complexblock_0x4d8768.unpack() # In the past, unpacking that ComplexBlock would fail because it contains a BasicBlock that doens't have an exit address + + +async def test_angr_custom_load_single_region(custom_binary_resource): + """Test angr custom loading with a single CodeRegion segment (REQ2.2).""" + base_address = 0x400000 + text_vaddr = base_address + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=base_address, text_vaddr=text_vaddr + ) + assert custom_binary_resource.has_tag(AngrCustomLoadProject) + + await custom_binary_resource.run(AngrCustomLoadAnalyzer) + + angr_analysis = custom_binary_resource.get_attributes(AngrAnalysis) + assert angr_analysis.project.loader.main_object.min_addr == base_address + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) + + +async def test_angr_custom_loader_with_memory_regions(custom_binary_resource): + """Test angr custom loading with multiple MemoryRegion segments (REQ2.2).""" + text_vaddr = 0x400130 + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=0x100000, text_vaddr=text_vaddr + ) + await add_rodata_region(custom_binary_resource, rodata_vaddr=0x40A0A0) + assert custom_binary_resource.has_tag(AngrCustomLoadProject) + + await custom_binary_resource.run(AngrCustomLoadAnalyzer) + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) + + +async def test_angr_custom_load_distant_rw_region(custom_binary_resource): + """Test that a distant RW region doesn't cause CFGFast to hang (REQ2.2).""" + text_vaddr = 0x400130 + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=0x100000, text_vaddr=text_vaddr + ) + await add_distant_rw_region(custom_binary_resource, vaddr=0x80000000) + assert custom_binary_resource.has_tag(AngrCustomLoadProject) + + await custom_binary_resource.run(AngrCustomLoadAnalyzer) + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) + + +async def test_angr_custom_load_flat(custom_binary_resource): + """Test angr flat-blob loading path (no MemoryRegion children) (REQ2.2).""" + base_address = 0x400000 + await setup_program_flat(custom_binary_resource, base_address=base_address) + assert custom_binary_resource.has_tag(AngrCustomLoadProject) + + angr_config = AngrAnalyzerConfig( + project_args={ + "auto_load_libs": False, + "main_opts": { + "backend": "blob", + }, + } + ) + await custom_binary_resource.run(AngrCustomLoadAnalyzer, angr_config) + + angr_analysis = custom_binary_resource.get_attributes(AngrAnalysis) + assert angr_analysis.project.loader.main_object.min_addr == base_address diff --git a/disassemblers/ofrak_binary_ninja/CHANGELOG.md b/disassemblers/ofrak_binary_ninja/CHANGELOG.md new file mode 100644 index 000000000..1f2b41cf2 --- /dev/null +++ b/disassemblers/ofrak_binary_ninja/CHANGELOG.md @@ -0,0 +1,17 @@ +# Changelog +All notable changes to `ofrak-binary-ninja` will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased 0.2.0](https://github.com/redballoonsecurity/ofrak/tree/master) + +### Added +- Add `BinaryNinjaAutoLoadProject` / `BinaryNinjaCustomLoadProject` tags and `BinaryNinjaCustomLoadAnalyzer` for custom binary loading with `ProgramAttributes` metadata ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) + +### Changed +- **Breaking:** `BinaryNinjaAnalysisResource` moved from `ofrak_binary_ninja.components.identifiers` to `ofrak_binary_ninja.model` ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) +- **Breaking:** `BinaryNinjaAnalyzer` now targets `BinaryNinjaAutoLoadProject` instead of `BinaryNinjaAnalysisResource`; code that manually tagged resources with `BinaryNinjaAnalysisResource` should use `BinaryNinjaAutoLoadProject` or `BinaryNinjaCustomLoadProject` ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) + +## 0.1.0 - 2022-01-25 +### Added +Initial release. Hello world! diff --git a/disassemblers/ofrak_binary_ninja/setup.py b/disassemblers/ofrak_binary_ninja/setup.py index 1fc11903c..be73a6f12 100644 --- a/disassemblers/ofrak_binary_ninja/setup.py +++ b/disassemblers/ofrak_binary_ninja/setup.py @@ -20,7 +20,7 @@ def run(self): setuptools.setup( name="ofrak_binary_ninja", - version="0.1.0", + version="0.2.0rc1", author="Red Balloon Security", author_email="ofrak@redballoonsecurity.com", description="OFRAK Binary Ninja Components", diff --git a/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/binary_ninja_analyzer.py b/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/binary_ninja_analyzer.py index bd711a353..e3c15ddd8 100644 --- a/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/binary_ninja_analyzer.py +++ b/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/binary_ninja_analyzer.py @@ -1,15 +1,27 @@ import logging +import tempfile from dataclasses import dataclass from typing import Optional, List -from binaryninja import open_view, BinaryViewType +from binaryninja import BinaryView, open_view, BinaryViewType, SegmentFlag +from ofrak import ResourceFilter from ofrak.component.analyzer import Analyzer +from ofrak.core.architecture import ProgramAttributes +from ofrak.core.memory_region import ( + MemoryRegion, + get_memory_region_permissions, + get_effective_memory_permissions, +) from ofrak.model.component_model import ComponentConfig -from ofrak.model.resource_model import ResourceAttributeDependency -from ofrak_binary_ninja.components.identifiers import BinaryNinjaAnalysisResource -from ofrak_binary_ninja.model import BinaryNinjaAnalysis +from ofrak_binary_ninja.model import ( + BinaryNinjaAnalysis, + BinaryNinjaAutoLoadProject, + BinaryNinjaCustomLoadProject, +) from ofrak.resource import Resource +from ofrak_type.error import NotFoundError +from ofrak_type.memory_permissions import MemoryPermissions LOGGER = logging.getLogger(__file__) @@ -21,13 +33,14 @@ class BinaryNinjaAnalyzerConfig(ComponentConfig): class BinaryNinjaAnalyzer(Analyzer[Optional[BinaryNinjaAnalyzerConfig], BinaryNinjaAnalysis]): """ - Opens and analyzes binaries with Binary Ninja, either from scratch or from a pre-analyzed BNDB file. Creates - BinaryNinjaAnalysis state containing the BinaryView for use by other Binary Ninja components. Use for initial - comprehensive analysis with Binary Ninja's powerful analysis engine. + Opens and analyzes binaries with Binary Ninja, either from scratch or from a pre-analyzed + BNDB file. Use for auto-loadable formats (ELF, PE, Ihex) where Binary Ninja can automatically + determine the binary format. Creates BinaryNinjaAnalysis state containing the BinaryView for + use by other Binary Ninja components. """ id = b"BinaryNinjaAnalyzer" - targets = (BinaryNinjaAnalysisResource,) + targets = (BinaryNinjaAutoLoadProject,) outputs = (BinaryNinjaAnalysis,) async def analyze( @@ -36,28 +49,148 @@ async def analyze( if not config: async with resource.temp_to_disk(delete=False) as temp_path: bv = open_view(temp_path) + else: + bv = BinaryViewType.get_view_of_file(config.bndb_file) + assert bv is not None + + return BinaryNinjaAnalysis(bv) + + +class BinaryNinjaCustomLoadAnalyzer( + Analyzer[Optional[BinaryNinjaAnalyzerConfig], BinaryNinjaAnalysis] +): + """ + Opens and analyzes binaries with Binary Ninja for formats that Binary Ninja cannot + auto-load. When MemoryRegion children are present, creates user segments at their + specified virtual addresses with per-region permissions. Otherwise falls back to + loading the entire binary as a flat blob with rebase support. + """ - return BinaryNinjaAnalysis(bv) + id = b"BinaryNinjaCustomLoadAnalyzer" + targets = (BinaryNinjaCustomLoadProject,) + outputs = (BinaryNinjaAnalysis,) + + async def analyze( + self, resource: Resource, config: Optional[BinaryNinjaAnalyzerConfig] = None + ) -> BinaryNinjaAnalysis: + try: + program_attrs = resource.get_attributes(ProgramAttributes) + except NotFoundError: + program_attrs = None + + regions = list( + await resource.get_children_as_view( + MemoryRegion, r_filter=ResourceFilter.with_tags(MemoryRegion) + ) + ) + + if regions and not config: + bv = await self._load_with_regions(resource, regions, program_attrs) + elif not config: + bv = await self._load_flat(resource, program_attrs) else: bv = BinaryViewType.get_view_of_file(config.bndb_file) assert bv is not None - return BinaryNinjaAnalysis(bv) - def _create_dependencies( + return BinaryNinjaAnalysis(bv) + + async def _load_with_regions( self, resource: Resource, - resource_dependencies: Optional[List[ResourceAttributeDependency]] = None, - ): + regions: List[MemoryRegion], + program_attrs: Optional[ProgramAttributes], + ) -> BinaryView: """ - Override - [Analyzer._create_dependencies][ofrak.component.component_analyzer.Analyzer._create_dependencies] - to avoid the creation and tracking of dependencies between the BinaryNinja analysis, - resource, and attributes. + Load binary with explicit MemoryRegion segments at their virtual addresses. + """ + regions.sort(key=lambda r: r.virtual_address) + + combined_data = bytearray() + segment_info = [] + for region in regions: + perms = get_memory_region_permissions(region.resource) + if perms is not None and perms.permissions == MemoryPermissions.NONE: + continue + region_data = await region.resource.get_data() + file_offset = len(combined_data) + effective = get_effective_memory_permissions(region.resource) + flags = self._get_segment_flags(effective) + segment_info.append((file_offset, region.virtual_address, region.size, flags)) + combined_data.extend(region_data) + + if not segment_info: + raise ValueError("No accessible memory regions for analysis") + + if not any(flags & SegmentFlag.SegmentExecutable for _, _, _, flags in segment_info): + raise ValueError("No executable memory regions for analysis") + + # delete=False: Binary Ninja retains a reference to the file during analysis + with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp: + tmp.write(combined_data) + temp_path = tmp.name + + bv = open_view(temp_path, update_analysis=False) + + for seg in list(bv.segments): + bv.remove_auto_segment(seg.start, seg.length) + + for file_offset, vaddr, size, flags in segment_info: + bv.add_user_segment(vaddr, size, file_offset, size, flags) - Practically speaking, this means that users of BinaryNinja components should group their - work into three discrete, ordered steps: + if program_attrs is not None and program_attrs.entry_points: + for entry_addr in program_attrs.entry_points: + bv.add_entry_point(entry_addr) + LOGGER.info(f"Added entry point at 0x{entry_addr:x}") - Step 1. Unpacking, Analysis - Step 2. Modification - Step 3. Packing + bv.update_analysis_and_wait() + return bv + + async def _load_flat( + self, resource: Resource, program_attrs: Optional[ProgramAttributes] + ) -> BinaryView: + """ + Load binary as a flat blob with optional rebase. + """ + async with resource.temp_to_disk(delete=False) as temp_path: + bv = open_view(temp_path, update_analysis=False) + + if program_attrs is not None: + # Rebase before adding entry points (entry addresses are absolute). + # rebase() returns a new BinaryView; the original becomes invalid. + if program_attrs.base_address is not None: + current_base = bv.start + if current_base != program_attrs.base_address: + new_bv = bv.rebase(program_attrs.base_address) + if new_bv is not None: + bv = new_bv + LOGGER.info( + f"Rebased from 0x{current_base:x} to " + f"0x{program_attrs.base_address:x}" + ) + else: + raise RuntimeError( + f"Failed to rebase from 0x{current_base:x} to " + f"0x{program_attrs.base_address:x}" + ) + + if program_attrs.entry_points: + for entry_addr in program_attrs.entry_points: + bv.add_entry_point(entry_addr) + LOGGER.info(f"Added entry point at 0x{entry_addr:x}") + + bv.update_analysis_and_wait() + return bv + + @staticmethod + def _get_segment_flags(perms: MemoryPermissions) -> int: + """ + Convert MemoryPermissions to Binary Ninja SegmentFlags. """ + flags = 0 + if perms.value & MemoryPermissions.R.value: + flags |= SegmentFlag.SegmentReadable + if perms.value & MemoryPermissions.W.value: + flags |= SegmentFlag.SegmentWritable + if perms.value & MemoryPermissions.X.value: + flags |= SegmentFlag.SegmentExecutable + return flags diff --git a/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/blocks/unpackers.py b/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/blocks/unpackers.py index 55ab9a7d5..3e45bb0f1 100644 --- a/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/blocks/unpackers.py +++ b/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/blocks/unpackers.py @@ -17,8 +17,7 @@ from ofrak.model.component_model import ComponentConfig from ofrak.resource import Resource from ofrak.service.resource_service_i import ResourceFilter -from ofrak_binary_ninja.components.identifiers import BinaryNinjaAnalysisResource -from ofrak_binary_ninja.model import BinaryNinjaAnalysis +from ofrak_binary_ninja.model import BinaryNinjaAnalysisResource, BinaryNinjaAnalysis LOGGER = logging.getLogger(__name__) @@ -150,6 +149,9 @@ def _binary_ninja_get_complex_blocks( end_ea = literal_pool_search_addr else: literal_pool_search_addr += 1 + # Clip to code region boundary (NOP scanning and literal pool + # extension can push end_ea past the region) + end_ea = min(end_ea, region_end_vaddr) yield start_ea, end_ea, name diff --git a/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/identifiers.py b/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/identifiers.py index 70c92d93e..80481fa0e 100644 --- a/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/identifiers.py +++ b/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/components/identifiers.py @@ -1,21 +1,27 @@ from ofrak.component.identifier import Identifier +from ofrak.core import Elf, Ihex, Pe from ofrak.core.program import Program from ofrak.resource import Resource -from ofrak.resource_view import ResourceView +from ofrak_binary_ninja.model import BinaryNinjaAutoLoadProject, BinaryNinjaCustomLoadProject -class BinaryNinjaAnalysisResource(ResourceView): - pass +_BINARY_NINJA_AUTO_LOADABLE_FORMATS = [Elf, Ihex, Pe] class BinaryNinjaAnalysisIdentifier(Identifier): """ - Tags Program resources for Binary Ninja analysis. Enables Binary Ninja-based components to run on the resource. - Automatically identifies programs that should be analyzed with Binary Ninja. + Tags Program resources for Binary Ninja analysis. Auto-loadable formats (ELF, PE, Ihex) get + BinaryNinjaAutoLoadProject tag, others get BinaryNinjaCustomLoadProject. Enables Binary + Ninja-based components to run on the resource. """ id = b"BinaryNinjaAnalysisIdentifier" targets = (Program,) async def identify(self, resource: Resource, config=None): - resource.add_tag(BinaryNinjaAnalysisResource) + for tag in _BINARY_NINJA_AUTO_LOADABLE_FORMATS: + if resource.has_tag(tag): + resource.add_tag(BinaryNinjaAutoLoadProject) + return + + resource.add_tag(BinaryNinjaCustomLoadProject) diff --git a/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/model.py b/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/model.py index d58fe1da9..1eb865ed9 100644 --- a/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/model.py +++ b/disassemblers/ofrak_binary_ninja/src/ofrak_binary_ninja/model.py @@ -2,8 +2,21 @@ from binaryninja.binaryview import BinaryView from ofrak.model.resource_model import ResourceAttributes +from ofrak.resource_view import ResourceView @dataclass(**ResourceAttributes.DATACLASS_PARAMS) class BinaryNinjaAnalysis(ResourceAttributes): binaryview: BinaryView + + +class BinaryNinjaAnalysisResource(ResourceView): + pass + + +class BinaryNinjaAutoLoadProject(BinaryNinjaAnalysisResource): + pass + + +class BinaryNinjaCustomLoadProject(BinaryNinjaAnalysisResource): + pass diff --git a/disassemblers/ofrak_binary_ninja/tests/test_binary_ninja_analyzer.py b/disassemblers/ofrak_binary_ninja/tests/test_binary_ninja_analyzer.py index f420ae4ea..a38398ac1 100644 --- a/disassemblers/ofrak_binary_ninja/tests/test_binary_ninja_analyzer.py +++ b/disassemblers/ofrak_binary_ninja/tests/test_binary_ninja_analyzer.py @@ -1,5 +1,8 @@ """ Test the functionality of the BinaryNinjaAnalyzer component. + +Requirements Mapping: +- REQ2.2 """ from dataclasses import dataclass from typing import Tuple @@ -8,8 +11,21 @@ from ofrak import OFRAKContext from ofrak.core.filesystem import File -from ofrak_binary_ninja.components.binary_ninja_analyzer import BinaryNinjaAnalyzer +from ofrak_binary_ninja.components.binary_ninja_analyzer import ( + BinaryNinjaAnalyzer, + BinaryNinjaCustomLoadAnalyzer, +) +from ofrak_binary_ninja.model import BinaryNinjaCustomLoadProject from ofrak_binary_ninja.model import BinaryNinjaAnalysis +from ofrak_type.memory_permissions import MemoryPermissions +from pytest_ofrak.patterns.program_metadata import ( + custom_binary_resource, # noqa: F401 + setup_program_flat, + setup_program_with_code_region, + add_rodata_region, + add_distant_rw_region, + assert_complex_block_at_vaddr, +) from test_ofrak.unit.component.analyzer.analyzer_test_case import PopulatedAnalyzerTestCase @@ -45,3 +61,65 @@ async def test_binary_ninja_analyzer(test_case: PopulatedBinaryNinjaAnalyzerTest await test_case.resource.identify() analysis = await test_case.resource.analyze(BinaryNinjaAnalysis) assert isinstance(analysis, BinaryNinjaAnalysis) + + +async def test_binary_ninja_custom_load_single_region(custom_binary_resource): + """Test Binary Ninja custom loading with a single CodeRegion segment (REQ2.2).""" + base_address = 0x400000 + text_vaddr = base_address + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=base_address, text_vaddr=text_vaddr + ) + assert custom_binary_resource.has_tag(BinaryNinjaCustomLoadProject) + + await custom_binary_resource.run(BinaryNinjaCustomLoadAnalyzer) + + binja_analysis = custom_binary_resource.get_attributes(BinaryNinjaAnalysis) + assert binja_analysis.binaryview.start == base_address + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) + + +async def test_binary_ninja_custom_loader_with_memory_regions(custom_binary_resource): + """Test Binary Ninja custom loading with multiple MemoryRegion segments (REQ2.2).""" + text_vaddr = 0x400130 + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=0x100000, text_vaddr=text_vaddr + ) + await add_rodata_region( + custom_binary_resource, rodata_vaddr=0x40A0A0, permissions=MemoryPermissions.R + ) + assert custom_binary_resource.has_tag(BinaryNinjaCustomLoadProject) + + await custom_binary_resource.run(BinaryNinjaCustomLoadAnalyzer) + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) + + +async def test_binary_ninja_custom_load_distant_rw_region(custom_binary_resource): + """Test that a distant RW region doesn't cause issues in Binary Ninja (REQ2.2).""" + text_vaddr = 0x400130 + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=0x100000, text_vaddr=text_vaddr + ) + await add_distant_rw_region(custom_binary_resource, vaddr=0x80000000) + assert custom_binary_resource.has_tag(BinaryNinjaCustomLoadProject) + + await custom_binary_resource.run(BinaryNinjaCustomLoadAnalyzer) + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) + + +async def test_binary_ninja_custom_load_flat(custom_binary_resource): + """Test Binary Ninja flat-blob loading path (no MemoryRegion children) (REQ2.2).""" + base_address = 0x400000 + await setup_program_flat(custom_binary_resource, base_address=base_address) + assert custom_binary_resource.has_tag(BinaryNinjaCustomLoadProject) + + await custom_binary_resource.run(BinaryNinjaCustomLoadAnalyzer) + + binja_analysis = custom_binary_resource.get_attributes(BinaryNinjaAnalysis) + assert binja_analysis.binaryview.start == base_address diff --git a/disassemblers/ofrak_ghidra/CHANGELOG.md b/disassemblers/ofrak_ghidra/CHANGELOG.md index 32dcf7cd2..4aa1b55d2 100644 --- a/disassemblers/ofrak_ghidra/CHANGELOG.md +++ b/disassemblers/ofrak_ghidra/CHANGELOG.md @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## [Unreleased 0.2.0](https://github.com/redballoonsecurity/ofrak/tree/master) ### Added +- Support `ProgramAttributes` `entry_points` field for passing entry points to Ghidra custom loader ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) +- Support `MemoryRegionPermissions` attribute for fine-grained memory region permission control ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) - Add OFRAK requirements, requirement to test mapping, test specifications ([#656](https://github.com/redballoonsecurity/ofrak/pull/656)) ### Changed diff --git a/disassemblers/ofrak_ghidra/setup.py b/disassemblers/ofrak_ghidra/setup.py index c0e57e4df..896095275 100644 --- a/disassemblers/ofrak_ghidra/setup.py +++ b/disassemblers/ofrak_ghidra/setup.py @@ -21,7 +21,7 @@ def run(self): setuptools.setup( name="ofrak_ghidra", - version="0.2.0rc4", + version="0.2.0rc5", author="Red Balloon Security", author_email="ofrak@redballoonsecurity.com", description="OFRAK Ghidra Components", diff --git a/disassemblers/ofrak_ghidra/src/ofrak_ghidra/components/ghidra_analyzer.py b/disassemblers/ofrak_ghidra/src/ofrak_ghidra/components/ghidra_analyzer.py index 1d98beb8d..32374e4d3 100644 --- a/disassemblers/ofrak_ghidra/src/ofrak_ghidra/components/ghidra_analyzer.py +++ b/disassemblers/ofrak_ghidra/src/ofrak_ghidra/components/ghidra_analyzer.py @@ -12,6 +12,8 @@ from ofrak import ResourceFilter from ofrak.core import CodeRegion, MemoryRegion, NamedProgramSection, ProgramAttributes, Program +from ofrak.core.memory_region import get_memory_region_permissions, get_effective_memory_permissions +from ofrak_type.memory_permissions import MemoryPermissions from ofrak.component.analyzer import Analyzer from ofrak.component.modifier import Modifier from ofrak.model.component_model import ComponentConfig @@ -176,6 +178,7 @@ async def _do_ghidra_import( use_binary_loader: bool, processor: Optional[ArchInfo] = None, blocks: Optional[List[MemoryRegion]] = None, + entry_points: Optional[List[int]] = None, ): args = [ ghidra_project, @@ -200,7 +203,7 @@ async def _do_ghidra_import( if blocks is not None: args.extend(["-scriptPath", (";".join(self._script_directories))]) args.extend(["-preScript", "CreateMemoryBlocks.java"]) - args.extend(await self._build_create_memory_args(blocks)) + args.extend(await self._build_create_memory_args(blocks, entry_points)) cmd_str = " ".join([GHIDRA_HEADLESS_EXEC] + args) LOGGER.debug(f"Running command: {cmd_str}") @@ -323,6 +326,7 @@ def _build_ghidra_server_args(self) -> List[str]: return args + # TODO(#710): Deduplicate with _arch_info_to_processor_id in ofrak_pyghidra @lru_cache(maxsize=None) def _arch_info_to_processor_id(self, processor: ArchInfo): families: Dict[InstructionSet, str] = { @@ -363,9 +367,12 @@ def _arch_info_to_processor_id(self, processor: ArchInfo): # default_proc_id found, and the ArchoInfo doesn't contain any info to narrow it down further, so just break early to return the default break - for name_elem in language.iter(tag="external_name"): - name = name_elem.attrib["name"].lower() - + names = [ + name_elem.attrib["name"].lower() + for name_elem in language.iter(tag="external_name") + ] + names.append(proc_id.split(":")[-1]) + for name in names: if not processor.sub_isa and not processor.processor: if name.endswith("_any"): return proc_id @@ -376,6 +383,19 @@ def _arch_info_to_processor_id(self, processor: ArchInfo): if processor.processor and processor.processor.value.lower() == name: return proc_id + # Suspect: character-set matching (not substring matching) can + # produce false positives. Ported from ofrak_pyghidra for parity. + # See #710. + if processor.sub_isa and all( + char in processor.sub_isa.value.lower() for char in name.lower() + ): + return proc_id + + if processor.processor and all( + char in processor.processor.value.lower() for char in name.lower() + ): + return proc_id + processors_rejected.add(proc_id) if default_proc_id_found: @@ -389,26 +409,35 @@ def _arch_info_to_processor_id(self, processor: ArchInfo): f"{processor}. Considered the following specs:\n{', '.join(processors_rejected)}" ) - async def _build_create_memory_args(self, blocks: List[MemoryRegion]) -> List[str]: + async def _build_create_memory_args( + self, blocks: List[MemoryRegion], entry_points: Optional[List[int]] = None + ) -> List[str]: args: List[str] = [] + has_blocks = False + has_executable = False for i, block in enumerate(blocks): + perms = get_memory_region_permissions(block.resource) + if perms is not None and perms.permissions == MemoryPermissions.NONE: + continue + + has_blocks = True block_info: List[str] = [ str(block.virtual_address), str(block.size), ] - if block.resource.has_tag(CodeRegion): - block_info.append("rx") - else: - block_info.append("rw") + effective = get_effective_memory_permissions(block.resource) + if effective.value & MemoryPermissions.X.value: + has_executable = True + block_info.append(effective.as_str()) if block.resource.has_tag(NamedProgramSection): named_section = await block.resource.view_as(NamedProgramSection) if " " in named_section.name or "!" in named_section.name: raise ValueError( f"Bad character in section name {named_section.name} which interferes with " - f"encoding arguments to CreateMemoryRegions.java" + f"encoding arguments to CreateMemoryBlocks.java" ) block_info.append(named_section.name) else: @@ -423,6 +452,16 @@ async def _build_create_memory_args(self, blocks: List[MemoryRegion]) -> List[st args.append("!".join(block_info)) + if not has_blocks: + raise ValueError("No accessible memory regions for analysis") + + if not has_executable: + raise ValueError("No executable memory regions for analysis") + + if entry_points: + entry_strs = [f"0x{ep:x}" for ep in entry_points] + args.append(f"entry:{','.join(entry_strs)}") + return args @@ -525,10 +564,23 @@ class GhidraCustomLoadAnalyzer(GhidraProjectAnalyzer): async def analyze( self, resource: Resource, config: Optional[GhidraProjectConfig] = None ) -> GhidraProject: - arch_info: ArchInfo = await resource.analyze(ProgramAttributes) + program_attrs = await resource.analyze(ProgramAttributes) mem_blocks = await self._get_memory_blocks(await resource.view_as(Program)) use_existing = config.use_existing if config is not None else False + entry_points: Optional[List[int]] = None + if program_attrs.entry_points: + entry_points = list(program_attrs.entry_points) + + # Extract ArchInfo fields (avoids polluting lru_cache with extra fields) + arch_info = ArchInfo( + program_attrs.isa, + program_attrs.sub_isa, + program_attrs.bit_width, + program_attrs.endianness, + program_attrs.processor, + ) + async with self._prepare_ghidra_project(resource) as (ghidra_project, full_fname): program_name = await self._do_ghidra_import( ghidra_project, @@ -537,6 +589,7 @@ async def analyze( use_binary_loader=True, processor=arch_info, blocks=mem_blocks, + entry_points=entry_points, ) await self._do_ghidra_analyze_and_serve( ghidra_project, diff --git a/disassemblers/ofrak_ghidra/src/ofrak_ghidra/ghidra_scripts/CreateMemoryBlocks.java b/disassemblers/ofrak_ghidra/src/ofrak_ghidra/ghidra_scripts/CreateMemoryBlocks.java index df2c24ee6..c93b09f9a 100644 --- a/disassemblers/ofrak_ghidra/src/ofrak_ghidra/ghidra_scripts/CreateMemoryBlocks.java +++ b/disassemblers/ofrak_ghidra/src/ofrak_ghidra/ghidra_scripts/CreateMemoryBlocks.java @@ -56,13 +56,42 @@ public void run() throws Exception { Memory mem = currentProgram.getMemory(); FileBytes fileBytes = mem.getAllFileBytes().get(0); + SymbolTable symbolTable = currentProgram.getSymbolTable(); // remove existing memory blocks for (MemoryBlock block : mem.getBlocks()){ mem.removeBlock(block, TaskMonitor.DUMMY); } + // Collect explicit entry points from arguments (format: "entry:0x1000,0x2000") + List explicitEntryPoints = new ArrayList<>(); + + for (String arg : args) { + if (arg.startsWith("entry:")) { + String entryList = arg.substring(6); // Remove "entry:" prefix + for (String entryStr : entryList.split(",")) { + try { + long entryAddr; + if (entryStr.startsWith("0x") || entryStr.startsWith("0X")) { + entryAddr = Long.parseUnsignedLong(entryStr.substring(2), 16); + } else { + entryAddr = Long.parseUnsignedLong(entryStr); + } + explicitEntryPoints.add(entryAddr); + } catch (NumberFormatException e) { + println("Warning: Failed to parse entry point: " + entryStr); + } + } + } + } + + boolean hasExplicitEntryPoints = !explicitEntryPoints.isEmpty(); + for (String memRegionRaw : args) { + // Skip entry point argument + if (memRegionRaw.startsWith("entry:")) { + continue; + } String[] memRegionInfo = memRegionRaw.split("!"); @@ -76,7 +105,7 @@ public void run() throws Exception { try { if (offset >= 0){ - block = mem.createInitializedBlock(name, toAddr(address), fileBytes, offset, size, true); + block = mem.createInitializedBlock(name, toAddr(address), fileBytes, offset, size, false); } else { block = mem.createUninitializedBlock(name, toAddr(address), size, false); } @@ -88,12 +117,9 @@ public void run() throws Exception { continue; } - SymbolTable symbolTable = currentProgram.getSymbolTable(); - - // This section is brittle: there need to be instructions at this address in order to work - // So we can't just mark a section as executable and have Ghidra greedily disassemble it all - // TODO: Add argument for entry points to mark actual starts of code - if (permissions.contains("x")){ + // Only add block start as entry point if no explicit entry points provided + // and the block is executable + if (!hasExplicitEntryPoints && permissions.contains("x")){ markAsCode(currentProgram, block.getStart()); @@ -109,6 +135,23 @@ public void run() throws Exception { } } + + // Add explicit entry points + int entryIndex = 0; + for (Long entryAddr : explicitEntryPoints) { + Address addr = toAddr(entryAddr); + markAsCode(currentProgram, addr); + + try { + String labelName = entryIndex == 0 ? ENTRY_NAME : ENTRY_NAME + "_" + entryIndex; + symbolTable.createLabel(addr, labelName, SourceType.IMPORTED); + symbolTable.addExternalEntryPoint(addr); + entryIndex++; + } + catch (InvalidInputException e) { + e.printStackTrace(); + } + } } private void markAsCode(Program program, Address address) { diff --git a/disassemblers/ofrak_ghidra/src/ofrak_ghidra/ghidra_scripts/GetComplexBlocks.java b/disassemblers/ofrak_ghidra/src/ofrak_ghidra/ghidra_scripts/GetComplexBlocks.java index a0013ae4c..0af33da57 100644 --- a/disassemblers/ofrak_ghidra/src/ofrak_ghidra/ghidra_scripts/GetComplexBlocks.java +++ b/disassemblers/ofrak_ghidra/src/ofrak_ghidra/ghidra_scripts/GetComplexBlocks.java @@ -129,26 +129,23 @@ else if (function.equals(getFunctionContaining(lastInsn.getAddress()))) { endAddr = lastInsn.getAddress().add(lastInsn.getLength()); } - // Note we can't get the literal pool after the last function in the section. - if (getFunctionAfter(function) == null) { - this.size = endAddr.getOffset() - this.loadAddress; - return; - } - - if (nextFuncAddr.subtract(end) > 0) { - this.size = endAddr.getOffset() - this.loadAddress; - return; - } + // Extend with trailing data items only when there is a next function + // within the code region to bound the search. + if (nextFunc != null && nextFuncAddr.subtract(end) <= 0) { + Data data = getDataAt(endAddr); - Data data = getDataAt(endAddr); + if (data == null) { + data = getDataAfter(endAddr); + } - if (data == null) { - data = getDataAfter(endAddr); + while (data != null && nextFuncAddr.subtract(data.getAddress()) > 0) { + endAddr = data.getAddress().add(data.getLength()); + data = getDataAfter(data); + } } - - while (data != null && nextFuncAddr.subtract(data.getAddress()) > 0) { - endAddr = data.getAddress().add(data.getLength()); - data = getDataAfter(data); + // Clip to code region boundary + if (endAddr.subtract(end) > 0) { + endAddr = end; } this.size = endAddr.getOffset() - this.loadAddress; } diff --git a/disassemblers/ofrak_ghidra/tests/test_ghidra_program_analyzer.py b/disassemblers/ofrak_ghidra/tests/test_ghidra_program_analyzer.py index 3e3397f9e..f5ea5f235 100644 --- a/disassemblers/ofrak_ghidra/tests/test_ghidra_program_analyzer.py +++ b/disassemblers/ofrak_ghidra/tests/test_ghidra_program_analyzer.py @@ -1,8 +1,10 @@ """ Test the Ghidra program analyzer components. + +Requirements Mapping: +- REQ2.2 """ import os.path -import os.path import tempfile from typing import Dict, Type @@ -20,6 +22,7 @@ SegmentInjectorModifierConfig, ) from ofrak.resource import Resource +from ofrak_ghidra.components.ghidra_analyzer import GhidraCustomLoadAnalyzer from ofrak_ghidra.ghidra_model import GhidraProject, GhidraCustomLoadProject from ofrak_patch_maker.model import PatchRegionConfig from ofrak_patch_maker.patch_maker import PatchMaker @@ -35,7 +38,20 @@ BinFileType, Segment, ) -from ofrak_type import BitWidth, Endianness, InstructionSet, MemoryPermissions, Range +from ofrak_type import ( + BitWidth, + Endianness, + InstructionSet, + MemoryPermissions, + Range, +) +from pytest_ofrak.patterns.program_metadata import ( + custom_binary_resource, # noqa: F401 + setup_program_with_code_region, + add_rodata_region, + add_distant_rw_region, + assert_complex_block_at_vaddr, +) async def test_ghidra_project_analyzer(hello_world_elf_resource: Resource): @@ -213,3 +229,33 @@ async def _make_dummy_program(resource: Resource, arch_info): SegmentInjectorModifier, SegmentInjectorModifierConfig.from_fem(fem), ) + + +async def test_ghidra_custom_loader_with_program_metadata(custom_binary_resource): + """Test Ghidra custom loading with ProgramAttributes + MemoryRegions (REQ2.2).""" + text_vaddr = 0x400130 + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=0x100000, text_vaddr=text_vaddr + ) + await add_rodata_region(custom_binary_resource, rodata_vaddr=0x40A0A0) + assert custom_binary_resource.has_tag(GhidraCustomLoadProject) + + await custom_binary_resource.run(GhidraCustomLoadAnalyzer) + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) + + +async def test_ghidra_custom_load_distant_rw_region(custom_binary_resource): + """Test that a distant RW region doesn't cause issues in Ghidra (REQ2.2).""" + text_vaddr = 0x400130 + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=0x100000, text_vaddr=text_vaddr + ) + await add_distant_rw_region(custom_binary_resource, vaddr=0x80000000) + assert custom_binary_resource.has_tag(GhidraCustomLoadProject) + + await custom_binary_resource.run(GhidraCustomLoadAnalyzer) + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) diff --git a/disassemblers/ofrak_pyghidra/CHANGELOG.md b/disassemblers/ofrak_pyghidra/CHANGELOG.md index 96fdd94f7..eeff61a7a 100644 --- a/disassemblers/ofrak_pyghidra/CHANGELOG.md +++ b/disassemblers/ofrak_pyghidra/CHANGELOG.md @@ -3,9 +3,11 @@ All notable changes to `ofrak-pyghidra` will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased 0.2.0rc5](https://github.com/redballoonsecurity/ofrak/tree/master) +## [Unreleased 0.2.0rc6](https://github.com/redballoonsecurity/ofrak/tree/master) ### Added +- Support `ProgramAttributes` `entry_points` and `base_address` fields for passing program metadata to PyGhidra ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) +- Support `MemoryRegionPermissions` attribute for fine-grained memory region permission control ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) - Add a PyGhidra custom load analyzer to allow for loading programs with a custom layout ([#677](https://github.com/redballoonsecurity/ofrak/pull/677)) - Add detailed logging output and progress indicators to standalone analysis script ([#672](https://github.com/redballoonsecurity/ofrak/pull/672)) @@ -15,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix redundant re-analysis of complex blocks in the standalone analysis script ([#672](https://github.com/redballoonsecurity/ofrak/pull/672)) ### Changed +- `PyGhidraAutoAnalyzer` no longer falls back to custom loading for non-auto-loadable formats; use `PyGhidraCustomLoadAnalyzer` instead ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) - Reduce the decompilation time of PyGhidra by reusing cached unpacking results. ([#623](https://github.com/redballoonsecurity/ofrak/pull/623)) - Improve `ofrak_pyghidra` decompilation: more strings and symbol names for cross-references in decompilation. ([#633](https://github.com/redballoonsecurity/ofrak/pull/633)) - Improve unpacking logic, error messages, and testing for `ofrak_pyghidra` auto analyzer ([#637](https://github.com/redballoonsecurity/ofrak/pull/637)) diff --git a/disassemblers/ofrak_pyghidra/setup.py b/disassemblers/ofrak_pyghidra/setup.py index 16afe05d8..da8a8e1c1 100644 --- a/disassemblers/ofrak_pyghidra/setup.py +++ b/disassemblers/ofrak_pyghidra/setup.py @@ -21,7 +21,7 @@ def run(self): setuptools.setup( name="ofrak_pyghidra", - version="0.2.0rc5", + version="0.2.0rc6", author="Red Balloon Security", author_email="ofrak@redballoonsecurity.com", description="OFRAK PyGhidra Components", diff --git a/disassemblers/ofrak_pyghidra/src/ofrak_pyghidra/components/pyghidra_components.py b/disassemblers/ofrak_pyghidra/src/ofrak_pyghidra/components/pyghidra_components.py index 83ebe1611..c8cfc4b7d 100644 --- a/disassemblers/ofrak_pyghidra/src/ofrak_pyghidra/components/pyghidra_components.py +++ b/disassemblers/ofrak_pyghidra/src/ofrak_pyghidra/components/pyghidra_components.py @@ -1,14 +1,19 @@ from dataclasses import dataclass from tempfile312 import mkdtemp +import hashlib import os -from typing import Dict +from typing import Dict, Optional from xml.etree import ElementTree from ofrak.component.analyzer import Analyzer from ofrak.core.architecture import ProgramAttributes from ofrak.core.complex_block import ComplexBlock from ofrak.core.decompilation import DecompilationAnalysis -from ofrak.core.memory_region import MemoryRegion +from ofrak.core.memory_region import ( + MemoryRegion, + get_memory_region_permissions, + get_effective_memory_permissions, +) from ofrak.service.data_service_i import DataServiceInterface from ofrak.service.resource_service_i import ResourceFilter, ResourceServiceInterface from ofrak_type import ArchInfo, Endianness, InstructionSet @@ -32,6 +37,7 @@ ) from ofrak_pyghidra.standalone.pyghidra_analysis import unpack, decompile_all_functions from ofrak_type.error import NotFoundError +from ofrak_type.memory_permissions import MemoryPermissions _GHIDRA_AUTO_LOADABLE_FORMATS = [Elf, Ihex, Pe] @@ -129,7 +135,9 @@ def __init__( super().__init__(resource_factory, data_service, resource_service) self.analysis_store = analysis_store - async def analyze(self, resource: Resource, config: PyGhidraAnalyzerConfig = None): + async def analyze( + self, resource: Resource, config: Optional[PyGhidraAnalyzerConfig] = None + ) -> PyGhidraAutoLoadProject: tempdir = mkdtemp(prefix="rbs-pyghidra-bin") await resource.identify() # useful for checking tags later try: @@ -151,23 +159,10 @@ async def analyze(self, resource: Resource, config: PyGhidraAnalyzerConfig = Non ) return PyGhidraAutoLoadProject() - program_attrs = resource.get_attributes(ProgramAttributes) - # Guess that the base address is the min start address of any memory region - regions = await resource.get_children_as_view( - MemoryRegion, r_filter=ResourceFilter.with_tags(MemoryRegion) - ) - base_address = min(code_region.virtual_address for code_region in regions) - - self.analysis_store.store_analysis( - resource.get_id(), - unpack( - program_file, - decomp, - language=_arch_info_to_processor_id(program_attrs), - base_address=base_address, - ), + raise ValueError( + f"Resource {resource.get_id()!r} has PyGhidraAutoLoadProject tag but no " + f"recognized auto-loadable format tag" ) - return PyGhidraAutoLoadProject() class PyGhidraCustomLoadAnalyzer(Analyzer[None, PyGhidraCustomLoadProject]): @@ -194,37 +189,72 @@ def __init__( super().__init__(resource_factory, data_service, resource_service) self.analysis_store = analysis_store - async def analyze(self, resource: Resource, config: PyGhidraAnalyzerConfig): + async def analyze( + self, resource: Resource, config: Optional[PyGhidraAnalyzerConfig] = None + ) -> PyGhidraCustomLoadProject: + try: + program_attrs = resource.get_attributes(ProgramAttributes) + except NotFoundError: + program_attrs = None + if config is None: - try: - program_attrs = resource.get_attributes(ProgramAttributes) - language = _arch_info_to_processor_id(program_attrs) - except NotFoundError: - language = None + language = ( + _arch_info_to_processor_id(program_attrs) if program_attrs is not None else None + ) decomp = False else: decomp = config.decomp language = config.language + entry_points = None + base_address = None + if program_attrs is not None: + if program_attrs.entry_points: + entry_points = list(program_attrs.entry_points) + base_address = program_attrs.base_address + # Prepare memory regions data - regions = await resource.get_children_as_view( - MemoryRegion, r_filter=ResourceFilter.with_tags(MemoryRegion) + regions = sorted( + await resource.get_children_as_view( + MemoryRegion, r_filter=ResourceFilter.with_tags(MemoryRegion) + ), + key=lambda r: r.virtual_address, ) + md5_hash = hashlib.md5() memory_regions = [] for region in regions: + perms = get_memory_region_permissions(region.resource) + if perms is not None and perms.permissions == MemoryPermissions.NONE: + continue + region_data = await region.resource.get_data() - memory_regions.append( - { - "virtual_address": region.virtual_address, - "size": region.size, - "data": region_data, - } - ) + md5_hash.update(region_data) + region_dict = { + "virtual_address": region.virtual_address, + "size": region.size, + "data": region_data, + "permissions": get_effective_memory_permissions(region.resource).value, + } + memory_regions.append(region_dict) + + if not memory_regions: + raise ValueError("No accessible memory regions for analysis") + + if not any(r["permissions"] & MemoryPermissions.X.value for r in memory_regions): + raise ValueError("No executable memory regions for analysis") self.analysis_store.store_analysis( resource.get_id(), - unpack(None, decomp, language=language, memory_regions=memory_regions), + unpack( + None, + decomp, + language=language, + base_address=base_address, + memory_regions=memory_regions, + entry_points=entry_points, + file_hash=md5_hash.digest().hex(), + ), ) return PyGhidraCustomLoadProject() diff --git a/disassemblers/ofrak_pyghidra/src/ofrak_pyghidra/standalone/pyghidra_analysis.py b/disassemblers/ofrak_pyghidra/src/ofrak_pyghidra/standalone/pyghidra_analysis.py index 1d3c428dd..85246d5be 100644 --- a/disassemblers/ofrak_pyghidra/src/ofrak_pyghidra/standalone/pyghidra_analysis.py +++ b/disassemblers/ofrak_pyghidra/src/ofrak_pyghidra/standalone/pyghidra_analysis.py @@ -3,15 +3,17 @@ import hashlib import traceback from typing import Any, Dict, Optional, Union, List + import pyghidra import argparse import time import re import json -import logging from tempfile312 import mkdtemp from tqdm import tqdm +from ofrak_type.memory_permissions import MemoryPermissions + LOGGER = logging.getLogger("ofrak_pyghidra") @@ -26,13 +28,50 @@ def _parse_offset(java_object): return int(str(java_object.getOffsetAsBigInteger())) +def _register_entry_points(flat_api, entry_points: List[int]) -> None: + """ + Register entry points in the current Ghidra program. + + Marks each address as code and adds it as a labeled external entry point so that + Ghidra's auto-analysis will discover functions starting at these addresses. + """ + from ghidra.program.model.symbol import SourceType + from ghidra.util.exception import DuplicateNameException + + program = flat_api.getCurrentProgram() + default_space = program.getAddressFactory().getDefaultAddressSpace() + symbol_table = program.getSymbolTable() + + code_prop = program.getAddressSetPropertyMap("CodeMap") + if code_prop is None: + try: + code_prop = program.createAddressSetPropertyMap("CodeMap") + except DuplicateNameException: + code_prop = program.getAddressSetPropertyMap("CodeMap") + + for i, entry_addr in enumerate(entry_points): + try: + addr = default_space.getAddress(entry_addr) + # Mark as code (matches Java CreateMemoryBlocks.markAsCode) + if code_prop is not None: + code_prop.add(addr, addr) + label_name = "entry" if i == 0 else f"entry_{i}" + symbol_table.createLabel(addr, label_name, SourceType.IMPORTED) + symbol_table.addExternalEntryPoint(addr) + LOGGER.info(f"Added entry point at 0x{entry_addr:x}") + except Exception as e: + LOGGER.warning(f"Failed to add entry point at 0x{entry_addr:x}: {e}") + + def unpack( - program_file: str, + program_file: Optional[str], decompiled: bool, language: Optional[str] = None, base_address: Union[str, int, None] = None, memory_regions: Optional[List[Dict[str, Any]]] = None, + entry_points: Optional[List[int]] = None, show_progress: bool = False, + file_hash: Optional[str] = None, ): try: LOGGER.info("Analyzing program. This might take a while.") @@ -44,10 +83,17 @@ def unpack( program_file = os.path.join(tempdir, "program") with open(program_file, "wb") as f: f.write(b"\x00") - with pyghidra.open_program(program_file, language=language) as flat_api: - LOGGER.info("Analysis completed. Caching analysis to JSON") + # Defer auto-analysis until after program modifications are complete + needs_pre_analysis_setup = ( + bool(memory_regions) or bool(entry_points) or base_address is not None + ) + with pyghidra.open_program( + program_file, language=language, analyze=not needs_pre_analysis_setup + ) as flat_api: + LOGGER.info("Program loaded. Caching analysis to JSON") # Java packages must be imported after pyghidra.start or pyghidra.open_program from ghidra.app.decompiler import DecompInterface, DecompileOptions + from ghidra.program.util import GhidraProgramUtilities from ghidra.util.task import TaskMonitor from ghidra.program.model.block import BasicBlockModel from ghidra.program.model.symbol import RefType @@ -65,6 +111,11 @@ def unpack( memory.removeBlock(block, TaskMonitor.DUMMY) for region in memory_regions: + # Safety net: the component already filters NONE-permission regions, + # but this function is also callable standalone. + permissions = region.get("permissions") + if permissions is not None and permissions == MemoryPermissions.NONE.value: + continue addr = default_space.getAddress(region["virtual_address"]) data_bytes = region["data"] block_name = f"region_{region['virtual_address']:x}" @@ -82,27 +133,29 @@ def unpack( False, # overlay ) - # Mark as executable block = memory.getBlock(addr) - block.setExecute(True) - block.setRead(True) + if permissions is not None: + block.setRead(bool(permissions & MemoryPermissions.R.value)) + block.setWrite(bool(permissions & MemoryPermissions.W.value)) + block.setExecute(bool(permissions & MemoryPermissions.X.value)) + else: + is_executable = region.get("executable", True) + block.setRead(True) + block.setWrite(not is_executable) + block.setExecute(is_executable) except Exception as e: - logging.warning( + LOGGER.warning( f"Failed to create memory block at 0x{region['virtual_address']:x}: {e}" ) - # Analyze all - analysis_mgr = program.getOptions("Analyzers") - flat_api.analyzeAll(program) - # If base_address is provided, rebase the program - if base_address is not None: - # Convert base_address to int if it's a string + + # Rebase only when memory_regions are absent (regions use absolute addresses) + if base_address is not None and not memory_regions: if isinstance(base_address, str): if base_address.startswith("0x"): base_address = int(base_address, 16) else: base_address = int(base_address) - # Rebase the program to the specified base address program = flat_api.getCurrentProgram() address_factory = program.getAddressFactory() new_base_addr = address_factory.getDefaultAddressSpace().getAddress( @@ -111,6 +164,18 @@ def unpack( program.setImageBase(new_base_addr, True) LOGGER.info(f"Rebased program address to {hex(base_address)}") + if entry_points: + _register_entry_points(flat_api, entry_points) + + if needs_pre_analysis_setup: + flat_api.analyzeAll(flat_api.getCurrentProgram()) + # Mark as analyzed so that subsequent sessions opening the cached + # pyghidra project do not re-run analysis (which can clobber results). + if hasattr(GhidraProgramUtilities, "markProgramAnalyzed"): + GhidraProgramUtilities.markProgramAnalyzed(flat_api.getCurrentProgram()) + else: + GhidraProgramUtilities.setAnalyzedFlag(flat_api.getCurrentProgram(), True) + main_dictionary: Dict[str, Any] = {} code_regions = _unpack_program(flat_api) main_dictionary["metadata"] = {} @@ -119,10 +184,13 @@ def unpack( main_dictionary["metadata"]["path"] = program_file if base_address is not None: main_dictionary["metadata"]["base_address"] = base_address - with open(program_file, "rb") as fh: - data = fh.read() - md5_hash = hashlib.md5(data) - main_dictionary["metadata"]["hash"] = md5_hash.digest().hex() + if file_hash is not None: + main_dictionary["metadata"]["hash"] = file_hash + else: + with open(program_file, "rb") as fh: + data = fh.read() + md5_hash = hashlib.md5(data) + main_dictionary["metadata"]["hash"] = md5_hash.digest().hex() LOGGER.info(f"Program contains {len(code_regions)} code regions") for code_region in code_regions: @@ -143,6 +211,7 @@ def unpack( if len(func_cbs) == 0: continue + region_end = code_region["virtual_address"] + code_region["size"] for func, cb in tqdm(func_cbs, unit="CB", smoothing=0, disable=not show_progress): cb_key = f"func_{cb['virtual_address']}" code_region["children"].append(cb_key) @@ -155,7 +224,7 @@ def unpack( cb["decompilation"] = decompilation bb_model = BasicBlockModel(flat_api.getCurrentProgram()) basic_blocks, data_words = _unpack_complex_block( - func, flat_api, bb_model, BigInteger.ONE + func, flat_api, bb_model, BigInteger.ONE, region_end=region_end ) cb["children"] = [] for block, bb in basic_blocks: @@ -247,16 +316,13 @@ def _concat_contiguous_code_blocks(code_regions): def _unpack_code_region(code_region, flat_api): functions = [] + region_end = code_region["virtual_address"] + code_region["size"] start_address = ( flat_api.getAddressFactory() .getDefaultAddressSpace() .getAddress(hex(code_region["virtual_address"])) ) - end_address = ( - flat_api.getAddressFactory() - .getDefaultAddressSpace() - .getAddress(hex(code_region["virtual_address"] + code_region["size"])) - ) + end_address = flat_api.getAddressFactory().getDefaultAddressSpace().getAddress(hex(region_end)) func = flat_api.getFunctionAt(start_address) if func is None: func = flat_api.getFunctionAfter(start_address) @@ -268,6 +334,7 @@ def _unpack_code_region(code_region, flat_api): start = _parse_offset(func.getEntryPoint()) end, _ = _get_last_address(func, flat_api) if end is not None: + end = min(end, region_end) cb = { "virtual_address": virtual_address, "size": end - start, @@ -278,7 +345,7 @@ def _unpack_code_region(code_region, flat_api): return functions -def _unpack_complex_block(func, flat_api, bb_model, one): +def _unpack_complex_block(func, flat_api, bb_model, one, region_end): bbs = [] bb_iter = bb_model.getCodeBlocksContaining(func.getBody(), flat_api.monitor) for block in bb_iter: @@ -339,10 +406,11 @@ def _unpack_complex_block(func, flat_api, bb_model, one): bbs.append((ghidra_block, bb)) end_data_addr, end_code_addr = _get_last_address(func, flat_api) + end_data_addr = min(end_data_addr, region_end) dws = [] data = flat_api.getDataAt(end_code_addr) - while data is not None and _parse_offset(data.getAddress()) <= end_data_addr: + while data is not None and _parse_offset(data.getAddress()) < end_data_addr: num_words = 1 word_size = data.getLength() if word_size == 1: diff --git a/disassemblers/ofrak_pyghidra/tests/test_pyghidra_components.py b/disassemblers/ofrak_pyghidra/tests/test_pyghidra_components.py index 5a51b8836..7a425a8af 100644 --- a/disassemblers/ofrak_pyghidra/tests/test_pyghidra_components.py +++ b/disassemblers/ofrak_pyghidra/tests/test_pyghidra_components.py @@ -3,6 +3,7 @@ Requirements Mapping: - REQ1.2 +- REQ2.2 """ import os from typing import Dict, Tuple @@ -30,6 +31,7 @@ _arch_info_to_processor_id, PyGhidraDecompilationAnalyzer, PyGhidraCustomLoadAnalyzer, + PyGhidraCustomLoadProject, ) import ofrak_pyghidra from ofrak.core import ( @@ -41,6 +43,14 @@ Instruction, ProgramAttributes, ) +from pytest_ofrak.patterns.program_metadata import ( + custom_binary_resource, # noqa: F401 + setup_program_with_code_region, + add_rodata_region, + add_distant_rw_region, + assert_complex_block_at_vaddr, +) +from ofrak_type.memory_permissions import MemoryPermissions from ofrak_pyghidra.standalone.pyghidra_analysis import unpack, decompile_all_functions from ofrak import Resource, ResourceFilter, ResourceSort, ResourceAttributeValueFilter @@ -391,26 +401,6 @@ async def test_ihex_unpacking(ihex_resource): assert any(cb.name == "FUN_004003be" for cb in complex_blocks) -@pytest.fixture -async def custom_binary_resource(ofrak_context: OFRAKContext): - # This is a custom binary created from this aarch64 statically compiled binary: - # https://github.com/ryanwoodsmall/static-binaries/blob/master/aarch64/tini - # It was created like so: - # - `aarch64-linux-gnu-objcopy -O binary --only-section=.text tini tini.text.bin` - # - `aarch64-linux-gnu-objcopy -O binary --only-section=.rodata tini tini.rodata.bin` - # - `dd if=/dev/zero of=gap.bin bs=1 count=$((0x1234))` - # - `cat tini.text.bin > tini_custom_binary` - # - `cat gap.bin >> tini_custom_binary` - # - `cat tini.rodata.bin >> tini_custom_binary` - # So it is a binary that contains: - # - the tini .text section binary content - # - a gap of zero bytes of size 0x1234 - # - the tini .rodata binary content - return await ofrak_context.create_root_resource_from_file( - os.path.join(os.path.dirname(__file__), "assets/tini_custom_binary") - ) - - async def test_pyghidra_custom_loader(custom_binary_resource): """ Test that loading a binary with manually-defined MemoryRegions with the PyGhidraCustomLoadAnalyzer results in the right representation in OFRAK. @@ -486,4 +476,38 @@ async def test_pyghidra_custom_loader(custom_binary_resource): decomp_resource: DecompilationAnalysis = await cb.resource.view_as(DecompilationAnalysis) decomp_str = decomp_resource.decompilation print(decomp_str) - assert '"tini version 0.19.0"' in decomp_str + # Ghidra may inline the string literal or use a symbol reference, depending on + # type propagation depth. + assert "s_tini_version_0_19_0" in decomp_str or '"tini version 0.19.0"' in decomp_str + + +async def test_pyghidra_custom_loader_with_program_metadata(custom_binary_resource): + """Test PyGhidra custom loading with ProgramAttributes + MemoryRegions (REQ2.2).""" + text_vaddr = 0x400130 + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=0x100000, text_vaddr=text_vaddr + ) + await add_rodata_region( + custom_binary_resource, rodata_vaddr=0x40A0A0, permissions=MemoryPermissions.R + ) + assert custom_binary_resource.has_tag(PyGhidraCustomLoadProject) + + await custom_binary_resource.run(PyGhidraCustomLoadAnalyzer) + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) + + +async def test_pyghidra_custom_load_distant_rw_region(custom_binary_resource): + """Test that a distant RW region doesn't cause issues in PyGhidra (REQ2.2).""" + text_vaddr = 0x400130 + text_section = await setup_program_with_code_region( + custom_binary_resource, base_address=0x100000, text_vaddr=text_vaddr + ) + await add_distant_rw_region(custom_binary_resource, vaddr=0x80000000) + assert custom_binary_resource.has_tag(PyGhidraCustomLoadProject) + + await custom_binary_resource.run(PyGhidraCustomLoadAnalyzer) + + await text_section.unpack() + await assert_complex_block_at_vaddr(custom_binary_resource, text_vaddr) diff --git a/ofrak_core/CHANGELOG.md b/ofrak_core/CHANGELOG.md index 3a6562ac0..ca58faa13 100644 --- a/ofrak_core/CHANGELOG.md +++ b/ofrak_core/CHANGELOG.md @@ -3,7 +3,7 @@ All notable changes to `ofrak` will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased](https://github.com/redballoonsecurity/ofrak/tree/master) +## [Unreleased 3.4.0](https://github.com/redballoonsecurity/ofrak/tree/master) ### Added - Add Android sparse image unpacker and packer ([#662](https://github.com/redballoonsecurity/ofrak/pull/662)) @@ -11,6 +11,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add `-V, --version` flag to ofrak cli ([#652](https://github.com/redballoonsecurity/ofrak/pull/652)) - Add LZ4 compression format unpackers and packers with support for all frame types (modern, legacy, skippable) ([#661](https://github.com/redballoonsecurity/ofrak/pull/661)) - Add missing component docstrings and improve existing docstrings ([#654](https://github.com/redballoonsecurity/ofrak/pull/654)) +- Add `entry_points` and `base_address` fields to `ProgramAttributes` for passing program metadata to disassembler backends ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) +- Add `MemoryRegionPermissions` attribute for fine-grained memory region permission control ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) +- Extend `ElfProgramAttributesAnalyzer` and `UImageProgramAttributesAnalyzer` to include entry points and base address ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) ### Changed - Remove test dependencies that are already in the global `requirements-dev.txt` ([#695](https://github.com/redballoonsecurity/ofrak/pull/695)) @@ -21,6 +24,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Remove `pkg_resources` usage from `build_image.py`, broken by setuptools 82.0.0 ([#708](https://github.com/redballoonsecurity/ofrak/pull/708)) - Fix GUI serialization of enum values and script creator generating invalid Python syntax for enum values - `build_image.py` uses `OFRAK_DIR` from `extra_build_args` to identify `pytest_ofrak` location for develop builds ([#657](https://github.com/redballoonsecurity/ofrak/pull/657/)) +- Fix deserialization of dataclass instances with new default fields from older serialized data ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) +- Fix `UImageProgramAttributesAnalyzer` not catching `KeyError` for unsupported architectures ([#701](https://github.com/redballoonsecurity/ofrak/pull/701)) ## [3.3.0](https://github.com/redballoonsecurity/ofrak/compare/ofrak-v3.2.0...ofrak-v3.3.0) - 2025-10-03 diff --git a/ofrak_core/src/ofrak/core/architecture.py b/ofrak_core/src/ofrak/core/architecture.py index ada79aa07..f959a7586 100644 --- a/ofrak_core/src/ofrak/core/architecture.py +++ b/ofrak_core/src/ofrak/core/architecture.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Optional, Tuple from ofrak.model.resource_model import ResourceAttributes @@ -10,4 +11,9 @@ class ProgramAttributes(ResourceAttributes, ArchInfo): """ Analyzer output containing architecture attributes of a program. + :ivar entry_points: program entry point virtual addresses (first is the main entry) + :ivar base_address: preferred load address / image base, or None if unknown """ + + entry_points: Tuple[int, ...] = () + base_address: Optional[int] = None diff --git a/ofrak_core/src/ofrak/core/elf/analyzer.py b/ofrak_core/src/ofrak/core/elf/analyzer.py index 0070ff03f..1655bb6ec 100644 --- a/ofrak_core/src/ofrak/core/elf/analyzer.py +++ b/ofrak_core/src/ofrak/core/elf/analyzer.py @@ -1,6 +1,6 @@ import io import logging -from typing import Optional, TypeVar +from typing import Optional, Tuple, TypeVar from ofrak.component.analyzer import Analyzer from ofrak.core import NamedProgramSection @@ -11,6 +11,8 @@ ElfHeader, ElfBasicHeader, ElfProgramHeader, + ElfProgramHeaderType, + ElfType, ElfSegmentStructure, ElfSegment, ElfSectionStructure, @@ -414,6 +416,7 @@ class ElfProgramAttributesAnalyzer(Analyzer[None, ProgramAttributes]): async def analyze( self, resource: Resource, config: Optional[ComponentConfig] = None ) -> ProgramAttributes: + elf = await resource.view_as(Elf) elf_header = await resource.get_only_descendant_as_view( ElfHeader, r_filter=ResourceFilter.with_tags(ElfHeader) ) @@ -421,12 +424,28 @@ async def analyze( ElfBasicHeader, r_filter=ResourceFilter.with_tags(ElfBasicHeader) ) + # e_entry is meaningless for relocatable objects (ET_REL); always 0 + if elf_header.e_type == ElfType.ET_REL.value: + entry_points: Tuple[int, ...] = () + else: + entry_points = (elf_header.e_entry,) + + # Base address from first PT_LOAD segment (None for relocatable objects) + base_address: Optional[int] = None + program_headers = await elf.get_program_headers() + for phdr in program_headers: + if phdr.p_type == ElfProgramHeaderType.LOAD.value: + base_address = phdr.p_vaddr + break + return ProgramAttributes( elf_header.get_isa(), None, elf_basic_header.get_bitwidth(), elf_basic_header.get_endianness(), None, + entry_points=entry_points, + base_address=base_address, ) diff --git a/ofrak_core/src/ofrak/core/memory_region.py b/ofrak_core/src/ofrak/core/memory_region.py index f2e68c2ac..0135e550a 100644 --- a/ofrak_core/src/ofrak/core/memory_region.py +++ b/ofrak_core/src/ofrak/core/memory_region.py @@ -1,17 +1,57 @@ import logging from dataclasses import dataclass -from typing import Iterable +from typing import Iterable, Optional from ofrak.core.addressable import Addressable from ofrak.model.resource_model import index, ResourceAttributes from ofrak.model.viewable_tag_model import AttributesType from ofrak.resource import Resource from ofrak_type.error import NotFoundError +from ofrak_type.memory_permissions import MemoryPermissions from ofrak_type.range import Range LOGGER = logging.getLogger(__file__) +@dataclass(**ResourceAttributes.DATACLASS_PARAMS) +class MemoryRegionPermissions(ResourceAttributes): + """ + Memory permissions (read/write/execute) for a MemoryRegion resource. + When absent, disassembler backends fall back to heuristics (CodeRegion tag → RX, + otherwise RW). Regions with `NONE` permissions are skipped entirely. + """ + + permissions: MemoryPermissions + + +def get_memory_region_permissions(resource: Resource) -> Optional[MemoryRegionPermissions]: + """ + Get the MemoryRegionPermissions attribute from a resource, or None if not set. + """ + try: + return resource.get_attributes(MemoryRegionPermissions) + except NotFoundError: + return None + + +def get_effective_memory_permissions(resource: Resource) -> MemoryPermissions: + """ + Get effective permissions for a memory region resource. + + Returns explicit permissions if set via `MemoryRegionPermissions`, otherwise + falls back to RX for `CodeRegion` resources or RW for other regions. + """ + perms = get_memory_region_permissions(resource) + if perms is not None: + return perms.permissions + # Deferred import to avoid circular dependency (code_region imports memory_region) + from ofrak.core.code_region import CodeRegion + + if resource.has_tag(CodeRegion): + return MemoryPermissions.RX + return MemoryPermissions.RW + + @dataclass class MemoryRegion(Addressable): """ diff --git a/ofrak_core/src/ofrak/core/uimage.py b/ofrak_core/src/ofrak/core/uimage.py index 247655633..3f131306c 100644 --- a/ofrak_core/src/ofrak/core/uimage.py +++ b/ofrak_core/src/ofrak/core/uimage.py @@ -435,18 +435,23 @@ def from_deserialized_header( UImageArch.PPC: Endianness.BIG_ENDIAN, } - uimage_arch = UImageArch(header.ih_arch) - try: + uimage_arch = UImageArch(header.ih_arch) isa = UIMAGE_ARCH_TO_ISA[uimage_arch] bit_width = UIMAGE_ARCH_TO_BIT_WIDTH[uimage_arch] endianness = UIMAGE_ARCH_TO_ENDIANNESS[uimage_arch] - except ValueError: - raise NotImplementedError( - f"Unsupported/unknown uImage architecture: {uimage_arch.name}" - ) - - return ProgramAttributes(isa, None, bit_width, endianness, None) + except (ValueError, KeyError): + raise NotImplementedError(f"Unsupported/unknown uImage architecture: {header.ih_arch}") + + return ProgramAttributes( + isa, + None, + bit_width, + endianness, + None, + entry_points=(header.get_entry_point_vaddr(),), + base_address=header.get_load_vaddr(), + ) #################### diff --git a/ofrak_core/src/ofrak/service/serialization/serializers/class_instance_serializer.py b/ofrak_core/src/ofrak/service/serialization/serializers/class_instance_serializer.py index 15920527e..d4985a002 100644 --- a/ofrak_core/src/ofrak/service/serialization/serializers/class_instance_serializer.py +++ b/ofrak_core/src/ofrak/service/serialization/serializers/class_instance_serializer.py @@ -86,9 +86,18 @@ def _deserialize_instance(self, cls: Any, cls_fields_pjson: Dict[str, PJSONType] expected_fields_and_types = self._get_class_fields_and_types( cls, as_dataclass=is_dataclass(cls) ) + # Skip dataclass fields that have defaults and are missing from the JSON; + # the constructor will fill them in automatically. + dc_defaults = { + f.name + for f in (fields(cls) if is_dataclass(cls) else ()) + if f.default is not dataclasses.MISSING + or f.default_factory is not dataclasses.MISSING # type: ignore[misc] + } deserialized_fields = { field_name: self._service.from_pjson(cls_fields_pjson.get(field_name), field_type) for field_name, field_type in expected_fields_and_types.items() + if field_name in cls_fields_pjson or field_name not in dc_defaults } if is_dataclass(cls) and getattr(cls, dataclasses._PARAMS).init: # type: ignore return cls(**deserialized_fields) diff --git a/ofrak_core/tests/components/assets/entry_at_zero.elf b/ofrak_core/tests/components/assets/entry_at_zero.elf new file mode 100644 index 000000000..f2fd7e1e6 --- /dev/null +++ b/ofrak_core/tests/components/assets/entry_at_zero.elf @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:f0f5d6900b9130b0aa0e4871e02a4c511da1e22efff4fa41470f1ac51852c768 +size 4608 diff --git a/ofrak_core/tests/components/test_elf_analyzers.py b/ofrak_core/tests/components/test_elf_analyzers.py index fb751eaa2..0771190af 100644 --- a/ofrak_core/tests/components/test_elf_analyzers.py +++ b/ofrak_core/tests/components/test_elf_analyzers.py @@ -604,6 +604,7 @@ async def test_elf_program_attributes_analyzer(ofrak_context: OFRAKContext): BitWidth.BIT_32, Endianness.LITTLE_ENDIAN, None, + entry_points=(0,), ) elf_r = await _create_populated_elf( ofrak_context, diff --git a/ofrak_core/tests/components/test_memory_region.py b/ofrak_core/tests/components/test_memory_region.py index c8329890a..7bde38cd2 100644 --- a/ofrak_core/tests/components/test_memory_region.py +++ b/ofrak_core/tests/components/test_memory_region.py @@ -4,7 +4,16 @@ Requirements Mapping: - REQ1.2 """ -from ofrak.core import MemoryRegion +import pytest + +from ofrak import OFRAKContext +from ofrak.core import CodeRegion, MemoryRegion +from ofrak.core.memory_region import ( + MemoryRegionPermissions, + get_memory_region_permissions, + get_effective_memory_permissions, +) +from ofrak_type.memory_permissions import MemoryPermissions def test_memory_region_str(): @@ -34,3 +43,62 @@ def test_memory_region_hash(): assert region_a in memory_bank assert region_b in memory_bank assert region_c not in memory_bank + + +class TestMemoryRegionPermissions: + """Tests for MemoryRegionPermissions ResourceAttribute.""" + + def test_memory_region_permissions_creation(self): + """ + Test that MemoryRegionPermissions can be created with all permission types. + """ + for perm in MemoryPermissions: + perms_attr = MemoryRegionPermissions(permissions=perm) + assert perms_attr.permissions == perm + + def test_memory_region_permissions_frozen(self): + """ + Test that MemoryRegionPermissions is frozen (immutable). + """ + perms_attr = MemoryRegionPermissions(permissions=MemoryPermissions.RX) + with pytest.raises(AttributeError): + perms_attr.permissions = MemoryPermissions.RW + + def test_memory_region_permissions_equality(self): + """ + Test MemoryRegionPermissions equality comparison. + """ + perms1 = MemoryRegionPermissions(permissions=MemoryPermissions.RX) + perms2 = MemoryRegionPermissions(permissions=MemoryPermissions.RX) + perms3 = MemoryRegionPermissions(permissions=MemoryPermissions.RW) + + assert perms1 == perms2 + assert perms1 != perms3 + + +async def test_get_effective_memory_permissions_explicit(ofrak_context: OFRAKContext): + """Explicit MemoryRegionPermissions override the CodeRegion/default heuristic.""" + resource = await ofrak_context.create_root_resource("test", b"\x00" * 8) + resource.add_tag(CodeRegion) + resource.add_attributes(MemoryRegionPermissions(MemoryPermissions.W)) + # Would be RX from the CodeRegion tag, but explicit attribute wins + assert get_effective_memory_permissions(resource) == MemoryPermissions.W + + +async def test_get_effective_memory_permissions_code_region_fallback(ofrak_context: OFRAKContext): + """Without explicit permissions, CodeRegion resources default to RX.""" + resource = await ofrak_context.create_root_resource("test", b"\x00" * 8) + resource.add_tag(CodeRegion) + assert get_effective_memory_permissions(resource) == MemoryPermissions.RX + + +async def test_get_effective_memory_permissions_default_rw(ofrak_context: OFRAKContext): + """Without explicit permissions or CodeRegion tag, default is RW.""" + resource = await ofrak_context.create_root_resource("test", b"\x00" * 8) + assert get_effective_memory_permissions(resource) == MemoryPermissions.RW + + +async def test_get_memory_region_permissions_absent(ofrak_context: OFRAKContext): + """get_memory_region_permissions returns None when no attribute is set.""" + resource = await ofrak_context.create_root_resource("test", b"\x00" * 8) + assert get_memory_region_permissions(resource) is None diff --git a/ofrak_core/tests/components/test_program_metadata.py b/ofrak_core/tests/components/test_program_metadata.py new file mode 100644 index 000000000..f933a5dde --- /dev/null +++ b/ofrak_core/tests/components/test_program_metadata.py @@ -0,0 +1,117 @@ +""" +Test the entry_points and base_address fields on ProgramAttributes, +and the format-specific analyzers that populate them. + +Requirements Mapping: +- REQ2.2 +""" +import os + + +from ofrak import OFRAKContext +from ofrak.core.architecture import ProgramAttributes +from ofrak_type.architecture import InstructionSet +from ofrak_type.bit_width import BitWidth +from ofrak_type.endianness import Endianness +from pytest_ofrak import ASSETS_DIR as PYTEST_OFRAK_ASSETS_DIR + +ASSETS_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "assets")) + + +def test_program_attributes_metadata_defaults(): + """New fields default to empty/None, preserving backwards compatibility.""" + attrs = ProgramAttributes( + InstructionSet.X86, None, BitWidth.BIT_64, Endianness.LITTLE_ENDIAN, None + ) + assert attrs.entry_points == () + assert attrs.base_address is None + + +def test_program_attributes_with_metadata(): + """entry_points and base_address can be set explicitly.""" + attrs = ProgramAttributes( + InstructionSet.X86, + None, + BitWidth.BIT_64, + Endianness.LITTLE_ENDIAN, + None, + entry_points=(0x1000, 0x2000), + base_address=0x400000, + ) + assert attrs.entry_points == (0x1000, 0x2000) + assert attrs.base_address == 0x400000 + + +class TestElfProgramAttributesAnalyzer: + """Tests for ElfProgramAttributesAnalyzer entry_points and base_address.""" + + async def test_elf_program_attributes_hello_out(self, ofrak_context: OFRAKContext): + """Test correct values from hello.out.""" + filepath = os.path.join(ASSETS_DIR, "hello.out") + resource = await ofrak_context.create_root_resource_from_file(filepath) + await resource.unpack_recursively() + attrs = await resource.analyze(ProgramAttributes) + assert attrs.entry_points == (0x4003E0,) + assert attrs.base_address == 0x400000 + + async def test_elf_program_attributes_arm(self, ofrak_context: OFRAKContext): + """Test correct values from ARM ELF.""" + filepath = os.path.join(ASSETS_DIR, "arm_reloc_relocated.elf") + resource = await ofrak_context.create_root_resource_from_file(filepath) + await resource.unpack_recursively() + attrs = await resource.analyze(ProgramAttributes) + assert attrs.entry_points == (0x8104,) + assert attrs.base_address == 0x0 + + async def test_elf_no_pt_load(self, ofrak_context: OFRAKContext): + """Relocatable .o (ET_REL) has no entry point and no PT_LOAD.""" + filepath = os.path.join(PYTEST_OFRAK_ASSETS_DIR, "..", "elf", "assets", "program.o") + resource = await ofrak_context.create_root_resource_from_file(filepath) + await resource.unpack_recursively() + attrs = await resource.analyze(ProgramAttributes) + assert attrs.entry_points == () + assert attrs.base_address is None + + async def test_elf_entry_point_zero(self, ofrak_context: OFRAKContext): + """ELF e_entry=0 is valid (unlike PE where entry_rva=0 means 'no entry').""" + filepath = os.path.join(ASSETS_DIR, "entry_at_zero.elf") + resource = await ofrak_context.create_root_resource_from_file(filepath) + await resource.unpack_recursively() + attrs = await resource.analyze(ProgramAttributes) + assert attrs.entry_points == (0x0,) + + +class TestUImageProgramAttributesAnalyzer: + async def test_uimage_program_attributes(self, ofrak_context: OFRAKContext): + """UImage header ih_ep and ih_load are extracted.""" + filepath = os.path.join(ASSETS_DIR, "uimage") + resource = await ofrak_context.create_root_resource_from_file(filepath) + await resource.unpack_recursively() + attrs = await resource.analyze(ProgramAttributes) + assert attrs.entry_points == (0x0,) + assert attrs.base_address == 0x0 + + +class TestIhexStartAddress: + """IHEX start_addr is available via the Ihex view (no separate analyzer).""" + + async def test_ihex_start_addr_present(self, ofrak_context: OFRAKContext): + from ofrak.core.ihex import Ihex + + filepath = os.path.join(ASSETS_DIR, "hello_world.ihex") + resource = await ofrak_context.create_root_resource_from_file(filepath) + await resource.unpack_recursively() + ihex = await resource.view_as(Ihex) + assert ihex.start_addr == 0x4003E0 + + async def test_ihex_no_start_address(self, ofrak_context: OFRAKContext): + import bincopy + from ofrak.core.ihex import Ihex + + bf = bincopy.BinFile() + bf.add_binary(b"\x00" * 16, address=0x1000) + ihex_data = bf.as_ihex().encode("ascii") + resource = await ofrak_context.create_root_resource("no_start.ihex", ihex_data) + await resource.unpack_recursively() + ihex = await resource.view_as(Ihex) + assert ihex.start_addr is None diff --git a/ofrak_core/tests/service/serialization_service/test_pjson.py b/ofrak_core/tests/service/serialization_service/test_pjson.py index d9930f1df..a5c6f6a98 100644 --- a/ofrak_core/tests/service/serialization_service/test_pjson.py +++ b/ofrak_core/tests/service/serialization_service/test_pjson.py @@ -389,3 +389,40 @@ def test_ofrak_classes(superclass_type, descendant_type, data, _test_serialize_d instance = data.draw(builds(descendant_type)) _test_serialize_deserialize(instance, descendant_type) _test_serialize_deserialize(instance, superclass_type) + + +def test_dataclass_backward_compat_missing_defaults( + serializer: PJSONSerializationService, +): + """ + Test that dataclass instances with missing fields that have defaults can be + deserialized from old JSON that predates those fields. + + This test verifies that the ClassInstanceSerializer backward-compat logic + correctly skips fields with defaults when they are absent from the JSON, + letting the dataclass constructor fill them in. + """ + from ofrak.core.architecture import ProgramAttributes + from ofrak_type.architecture import InstructionSet + from ofrak_type.bit_width import BitWidth + from ofrak_type.endianness import Endianness + + # Simulate old serialized ProgramAttributes (before entry_points/base_address existed) + old_obj = ProgramAttributes( + InstructionSet.ARM, None, BitWidth.BIT_32, Endianness.LITTLE_ENDIAN, None + ) + pjson = serializer.to_pjson(old_obj, ProgramAttributes) + + # Remove the new fields from the serialized form, simulating old data + cls_ref, cls_fields = pjson + del cls_fields["entry_points"] + del cls_fields["base_address"] + + # Deserialize — should succeed with defaults filled in + restored = serializer.from_pjson((cls_ref, cls_fields), ProgramAttributes) + assert isinstance(restored, ProgramAttributes) + assert restored.isa == InstructionSet.ARM + assert restored.bit_width == BitWidth.BIT_32 + assert restored.endianness == Endianness.LITTLE_ENDIAN + assert restored.entry_points == () + assert restored.base_address is None diff --git a/ofrak_core/version.py b/ofrak_core/version.py index ba047d0bd..8c152bb2c 100644 --- a/ofrak_core/version.py +++ b/ofrak_core/version.py @@ -1 +1 @@ -VERSION = "3.4.0rc6" +VERSION = "3.4.0rc7" diff --git a/disassemblers/ofrak_pyghidra/tests/assets/tini_custom_binary b/pytest_ofrak/src/pytest_ofrak/assets/tini_custom_binary similarity index 100% rename from disassemblers/ofrak_pyghidra/tests/assets/tini_custom_binary rename to pytest_ofrak/src/pytest_ofrak/assets/tini_custom_binary diff --git a/pytest_ofrak/src/pytest_ofrak/patterns/program_metadata.py b/pytest_ofrak/src/pytest_ofrak/patterns/program_metadata.py new file mode 100644 index 000000000..c1a4a83cf --- /dev/null +++ b/pytest_ofrak/src/pytest_ofrak/patterns/program_metadata.py @@ -0,0 +1,194 @@ +""" +Shared helpers for testing ProgramAttributes entry_points/base_address with disassembler backends. + +Requirements Mapping: +- REQ2.2 +""" +import os +from typing import Optional + +import pytest + +from ofrak import OFRAKContext, ResourceFilter, ResourceAttributeValueFilter +from ofrak.core import ( + Program, + CodeRegion, + ComplexBlock, + BasicBlock, + Addressable, + ProgramAttributes, +) +from ofrak.core.memory_region import MemoryRegion, MemoryRegionPermissions +from ofrak.resource import Resource +from ofrak_type import InstructionSet, BitWidth, Endianness, SubInstructionSet, Range +from ofrak_type.memory_permissions import MemoryPermissions + +from pytest_ofrak import ASSETS_DIR + +TINI_CUSTOM_BINARY = os.path.join(ASSETS_DIR, "tini_custom_binary") + +# Constants for the tini_custom_binary test asset. +# This is a custom binary created from an aarch64 statically compiled binary: +# https://github.com/ryanwoodsmall/static-binaries/blob/master/aarch64/tini +# It was created like so: +# - `aarch64-linux-gnu-objcopy -O binary --only-section=.text tini tini.text.bin` +# - `aarch64-linux-gnu-objcopy -O binary --only-section=.rodata tini tini.rodata.bin` +# - `dd if=/dev/zero of=gap.bin bs=1 count=$((0x1234))` +# - `cat tini.text.bin gap.bin tini.rodata.bin > tini_custom_binary` +# So it contains: .text section binary content, a zero gap of 0x1234 bytes, then .rodata content. +TINI_TEXT_SIZE = 40792 +TINI_TEXT_OFFSET = 0 +TINI_GAP_SIZE = 0x1234 +TINI_RODATA_OFFSET = TINI_TEXT_OFFSET + TINI_TEXT_SIZE + TINI_GAP_SIZE +TINI_RODATA_SIZE = 7052 + + +@pytest.fixture +async def custom_binary_resource(ofrak_context: OFRAKContext): + """Load the tini_custom_binary test asset as a root resource.""" + return await ofrak_context.create_root_resource_from_file(TINI_CUSTOM_BINARY) + + +async def setup_program_flat( + resource: Resource, + *, + base_address: int, +) -> None: + """ + Tag resource as Program with ProgramAttributes (no MemoryRegion children). + """ + resource.add_tag(Program) + await resource.save() + await resource.identify() + + resource.add_attributes( + ProgramAttributes( + isa=InstructionSet.AARCH64, + sub_isa=SubInstructionSet.ARMv8A, + bit_width=BitWidth.BIT_64, + endianness=Endianness.LITTLE_ENDIAN, + processor=None, + entry_points=(base_address,), + base_address=base_address, + ) + ) + await resource.save() + + +async def setup_program_with_code_region( + resource: Resource, + *, + base_address: int, + text_vaddr: int, + text_size: int = TINI_TEXT_SIZE, +) -> Resource: + """ + Tag resource as Program with ProgramAttributes and a CodeRegion child. + """ + resource.add_tag(Program) + await resource.save() + await resource.identify() + + resource.add_attributes( + ProgramAttributes( + isa=InstructionSet.AARCH64, + sub_isa=SubInstructionSet.ARMv8A, + bit_width=BitWidth.BIT_64, + endianness=Endianness.LITTLE_ENDIAN, + processor=None, + entry_points=(text_vaddr,), + base_address=base_address, + ) + ) + await resource.save() + + text_section = await resource.create_child( + tags=(CodeRegion,), + data_range=Range.from_size(TINI_TEXT_OFFSET, text_size), + ) + text_section.add_view( + CodeRegion( + virtual_address=text_vaddr, + size=text_size, + ) + ) + await text_section.save() + return text_section + + +async def add_rodata_region( + resource: Resource, + rodata_vaddr: int, + rodata_size: int = TINI_RODATA_SIZE, + permissions: Optional[MemoryPermissions] = None, +) -> Resource: + """ + Add a .rodata MemoryRegion child with optional permissions. + """ + rodata_section = await resource.create_child( + tags=(MemoryRegion,), + data_range=Range.from_size(TINI_RODATA_OFFSET, rodata_size), + ) + rodata_section.add_view( + MemoryRegion( + virtual_address=rodata_vaddr, + size=rodata_size, + ) + ) + if permissions is not None: + rodata_section.add_attributes(MemoryRegionPermissions(permissions)) + await rodata_section.save() + return rodata_section + + +async def add_distant_rw_region( + resource: Resource, + vaddr: int, + size: int = 0x1000, +) -> Resource: + """ + Add a small MemoryRegion child at a distant virtual address with explicit RW permissions. + + Uses inline data (not a range into the parent) since the distant region doesn't + correspond to parent file content. + """ + distant_region = await resource.create_child( + tags=(MemoryRegion,), + data=b"\x00" * size, + ) + distant_region.add_view( + MemoryRegion( + virtual_address=vaddr, + size=size, + ) + ) + distant_region.add_attributes(MemoryRegionPermissions(MemoryPermissions.RW)) + await distant_region.save() + return distant_region + + +async def assert_complex_block_at_vaddr(resource: Resource, vaddr: int) -> ComplexBlock: + """ + Assert a ComplexBlock at vaddr has non-zero size and BasicBlock children. + """ + cb = await resource.get_only_descendant_as_view( + v_type=ComplexBlock, + r_filter=ResourceFilter( + tags=[ComplexBlock], + attribute_filters=(ResourceAttributeValueFilter(Addressable.VirtualAddress, vaddr),), + ), + ) + assert cb.virtual_address == vaddr + assert cb.size > 0, f"ComplexBlock at 0x{vaddr:x} has zero size" + + # Verify the disassembler actually produced basic blocks, not just a stub entry + await cb.resource.unpack() + basic_blocks = list( + await cb.resource.get_children_as_view( + BasicBlock, r_filter=ResourceFilter(tags=[BasicBlock]) + ) + ) + assert ( + len(basic_blocks) > 0 + ), f"ComplexBlock at 0x{vaddr:x} has no BasicBlock children after unpacking" + return cb