diff --git a/README.md b/README.md index f04f8f3e..599fb387 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,9 @@ [![Linting](https://github.com/TourmalineCore/c2pie/actions/workflows/lint-on-pull-request.yml/badge.svg?branch=develop)](https://github.com/TourmalineCore/c2pie/actions/workflows/lint-on-pull-request.yml) [![c2pa](https://img.shields.io/badge/c2pa-v1.4-seagreen.svg)](https://c2pa.org/) -[![coverage](https://img.shields.io/badge/e2e_coverage-71.13%25-yellow)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml) -[![coverage](https://img.shields.io/badge/units_coverage-79.65%25-yellow)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml) -[![coverage](https://img.shields.io/badge/full_coverage-91.80%25-forestgreen)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml) +[![coverage](https://img.shields.io/badge/e2e_coverage-71.15%25-yellow)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml) +[![coverage](https://img.shields.io/badge/units_coverage-85.01%25-olivedrab)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml) +[![coverage](https://img.shields.io/badge/full_coverage-92.51%25-forestgreen)](https://github.com/TourmalineCore/c2pie/actions/workflows/calculate-tests-coverage-on-pull-request.yml) [![latest](https://img.shields.io/pypi/v/c2pie?label=latest&colorB=fc8021)](https://pypi.org/project/c2pie/)
diff --git a/c2pie/c2pa/assertion.py b/c2pie/c2pa/assertion.py index fc57807c..76bad2e4 100644 --- a/c2pie/c2pa/assertion.py +++ b/c2pie/c2pa/assertion.py @@ -37,7 +37,12 @@ def __init__( if not content_boxes: payload = self.get_payload_from_schema() box_type_hex = get_assertion_content_box_type(self.type) - content_boxes = [ContentBox(box_type=box_type_hex, payload=payload)] + content_boxes = [ + ContentBox( + box_type=box_type_hex, + payload=payload, + ) + ] super().__init__( content_type=get_assertion_content_type(self.type), @@ -64,56 +69,71 @@ class HashDataAssertion(Assertion): def __init__( self, - cai_offset: int, hashed_data: bytes, - additional_exclusions: list[dict[str, int]] | None = None, ): - exclusions: list[dict[str, int]] = [ - { - "start": cai_offset, - "length": 65535, - }, - ] - - if additional_exclusions: - exclusions.extend(additional_exclusions) + exclusions: list[dict[str, int]] = [] schema: dict[str, Any] = { - "name": "jumbf manifest", "exclusions": exclusions, "alg": "sha256", "hash": hashed_data, - "pad": [], + # The specification recommends setting the pad to at least 16 bytes. We use 64 bytes + # to allow for some extra space before the 23-byte limit is exceeded, since otherwise + # the CBOR header of the pad field would be reduced by 1 byte. + "pad": b"\x00" * 64, } - super().__init__(C2PA_AssertionTypes.data_hash, schema) - def set_hash_data_length( + super().__init__( + C2PA_AssertionTypes.data_hash, + schema, + ) + + def add_full_c2pa_structure_exclusion( self, + offset: int, length: int, ) -> None: - if self.schema.get("name") != "jumbf manifest": - raise ValueError("c2pa.hash.data: jumbf manifest is missing") + exclusions = self.schema["exclusions"] + previous_exclusion_length = len(cbor_to_bytes(exclusions)) + + self.schema["exclusions"].extend( + [ + { + "start": offset, + "length": length, + }, + ] + ) + + # NOTE: If the number of exclusions exceeds 23, an additional length byte + # will be added to the CBOR header of serialized exclusions array. This byte + # is included in the recalculation of the serialized exclusions. + current_exclusion_length = len(cbor_to_bytes(exclusions)) + + difference = previous_exclusion_length - current_exclusion_length - exclusions = self.schema.get("exclusions", []) + if -difference > len(self.schema["pad"]): + raise ValueError("Difference in length exceeds the predefined pad") - if not exclusions: - raise ValueError("c2pa.hash.data: exclusions are missing") + # If the pad is less than 24 bytes the size of the cbor header + # will change during conversion to cbor and will occupy less than 2 bytes. + updated_pad_length = len(self.schema["pad"]) + difference - exclusions[0]["length"] = int(length) + # If a CBOR overflow is not handled, the extra length byte that + # would be added in this case will not be taken into account. + if updated_pad_length < 24: + updated_pad_length -= 1 + + self.schema["pad"] = b"\x00" * updated_pad_length payload = self.get_payload_from_schema() - if self.content_boxes: - self.content_boxes[0] = ContentBox( + + self.content_boxes = [ + ContentBox( box_type=get_assertion_content_box_type(self.type), payload=payload, ) - else: - self.content_boxes = [ - ContentBox( - box_type=get_assertion_content_box_type(self.type), - payload=payload, - ) - ] + ] self.sync_payload() diff --git a/c2pie/c2pa/assertion_store.py b/c2pie/c2pa/assertion_store.py index 8108049c..d5b31894 100644 --- a/c2pie/c2pa/assertion_store.py +++ b/c2pie/c2pa/assertion_store.py @@ -20,11 +20,16 @@ def __init__( def get_assertions(self) -> list: return self.assertions - def set_hash_data_length( + def add_full_c2pa_structure_exclusion( self, + offset: int, length: int, ) -> None: for assertion in self.assertions: if assertion.type == C2PA_AssertionTypes.data_hash: - assertion.set_hash_data_length(length) + assertion.add_full_c2pa_structure_exclusion( + offset, + length, + ) + self.sync_payload() diff --git a/c2pie/c2pa/claim_signature.py b/c2pie/c2pa/claim_signature.py index 84923620..eb72b687 100644 --- a/c2pie/c2pa/claim_signature.py +++ b/c2pie/c2pa/claim_signature.py @@ -64,6 +64,8 @@ def __init__( self.require_tsa = require_tsa self.tsa_log_dir = tsa_log_dir + self.serialized_cose_sign1_length = 0 + content_boxes = self._generate_payload() super().__init__( @@ -129,10 +131,49 @@ def _generate_unprotected_header(self, serialized_sig_structure: bytes) -> bytes }, ], }, + # The specification recommends setting the pad to at least 16 bytes. We use 64 bytes + # to allow for some extra space before the 23-byte limit is exceeded, since otherwise + # the CBOR header of the pad field would be reduced by 1 byte. + "pad": b"\x00" * 8, } return unprotected_header + def serialize_cose_sign1_tagged_with_alignment( + self, + cose_sign1: list, + ) -> bytes: + cose_sign1_tagged_cbor = cbor2.dumps( + cbor2.CBORTag(18, cose_sign1), + canonical=True, + ) + + # The length of a TSA token can be variable. To ensure that a new token does not exceed + # the exclusion boundary for the C2PA structure, we need to align the length of + # the Claim Signature using the pad field, similar to the Data Hash Assertion. + if self.serialized_cose_sign1_length == 0: + self.serialized_cose_sign1_length = len(cose_sign1_tagged_cbor) + elif self.serialized_cose_sign1_length != len(cose_sign1_tagged_cbor): + difference = self.serialized_cose_sign1_length - len(cose_sign1_tagged_cbor) + + if -difference > len(cose_sign1[1]["pad"]): + raise ValueError("Difference in length exceeds the predefined pad") + + updated_pad_length = len(cose_sign1[1]["pad"]) + difference + + # If a CBOR overflow is not handled, the extra length byte that + # would be added in this case will not be taken into account. + if updated_pad_length > 23: + updated_pad_length += 1 + + cose_sign1[1]["pad"] = b"\x00" * updated_pad_length + cose_sign1_tagged_cbor = cbor2.dumps( + cbor2.CBORTag(18, cose_sign1), + canonical=True, + ) + + return cose_sign1_tagged_cbor + def _create_cose_sign1_tagged(self) -> bytes: """ COSE_Sign1 = [ @@ -179,4 +220,6 @@ def _create_cose_sign1_tagged(self) -> bytes: cose_sign1 = [serialized_protected_header, unprotected_header, None, signature] - return cbor2.dumps(cbor2.CBORTag(18, cose_sign1), canonical=True) + cose_sign1_tagged_cbor = self.serialize_cose_sign1_tagged_with_alignment(cose_sign1) + + return cose_sign1_tagged_cbor diff --git a/c2pie/c2pa/config.py b/c2pie/c2pa/config.py deleted file mode 100644 index f2b0abce..00000000 --- a/c2pie/c2pa/config.py +++ /dev/null @@ -1 +0,0 @@ -RETRY_SIGNATURE = 8 # max retries, might be terminated earliers diff --git a/c2pie/c2pa/manifest.py b/c2pie/c2pa/manifest.py index 224f68f7..cc7beb87 100644 --- a/c2pie/c2pa/manifest.py +++ b/c2pie/c2pa/manifest.py @@ -44,8 +44,9 @@ def get_assertions(self): return self.assertion_store.get_assertions() return - def set_hash_data_length( + def add_full_c2pa_structure_exclusion( self, + offset: int, length: int, ): """ @@ -53,7 +54,10 @@ def set_hash_data_length( and ClaimSignature (COSE Sign1 detached over Claim CBOR). """ if self.assertion_store and self.claim and self.claim_signature: - self.assertion_store.set_hash_data_length(length) + self.assertion_store.add_full_c2pa_structure_exclusion( + offset, + length, + ) self.claim.set_assertion_store(self.assertion_store) self.claim_signature.set_claim(self.claim) diff --git a/c2pie/c2pa/manifest_store.py b/c2pie/c2pa/manifest_store.py index 1377ca7e..bf0bf032 100644 --- a/c2pie/c2pa/manifest_store.py +++ b/c2pie/c2pa/manifest_store.py @@ -25,11 +25,15 @@ def __init__( def sync_payload(self): super().sync_payload() - def set_hash_data_length_for_all( + def add_full_c2pa_structure_exclusion( self, + offset: int, length: int, ) -> None: - self.manifests[-1].set_hash_data_length(length) + self.manifests[-1].add_full_c2pa_structure_exclusion( + offset, + length, + ) super().sync_payload() diff --git a/c2pie/c2pa_injection/jpg_injection.py b/c2pie/c2pa_injection/jpg_injection.py index 957ede1d..fc1f7550 100644 --- a/c2pie/c2pa_injection/jpg_injection.py +++ b/c2pie/c2pa_injection/jpg_injection.py @@ -1,3 +1,13 @@ +from c2pie.c2pa.manifest_store import ManifestStore + +# JPG_SEGMENT_MAX_PAYLOAD_LENGTH = +# 65535 (max segment length) +# - 2 (bytes of length) +# - 2 (bytes of CI) +# - 2 (bytes of EN) +# - 4 (bytes of Z) +# - 4 (bytes of LBox) +# - 4 (bytes of TBox) JPG_SEGMENT_MAX_PAYLOAD_LENGTH = 65517 @@ -98,3 +108,36 @@ def serialize(self): self.serialized_length = len(serialized_storage_data) return serialized_storage_data + + +def create_and_serialize_app11_storage( + manifest_store: ManifestStore, +) -> bytes: + serialized_manifest_store = manifest_store.serialize() + + app11_storage = JpgSegmentApp11Storage( + app11_segment_box_length=manifest_store.get_length(), + app11_segment_box_type=manifest_store.get_type(), + payload=serialized_manifest_store, + ) + + return app11_storage.serialize() + + +def emplace_manifest_into_jpeg( + content_bytes: bytes, + manifest_store: ManifestStore, + c2pa_offset: int, +) -> bytes: + serialized_app11_storage = create_and_serialize_app11_storage(manifest_store) + + serialized_app11_storage_length = len(serialized_app11_storage) + + manifest_store.add_full_c2pa_structure_exclusion( + c2pa_offset, + serialized_app11_storage_length, + ) + + tail = create_and_serialize_app11_storage(manifest_store) + + return content_bytes[:c2pa_offset] + tail + content_bytes[c2pa_offset:] diff --git a/c2pie/c2pa_injection/pdf_injection.py b/c2pie/c2pa_injection/pdf_injection.py index 6a9fc413..6af43a9a 100644 --- a/c2pie/c2pa_injection/pdf_injection.py +++ b/c2pie/c2pa_injection/pdf_injection.py @@ -4,7 +4,6 @@ from pypdf import PdfWriter -from c2pie.c2pa.config import RETRY_SIGNATURE from c2pie.c2pa.manifest_store import ManifestStore @@ -64,9 +63,26 @@ def _xref_entry(offset: int) -> bytes: return f"{offset:010d} 00000 n \n".encode("ascii") +def prepare_pdf_bytes(content: bytes) -> bytes: + """ + Returns the PDF bytes ready for signing repaired via pypdf if the raw + bytes lack a parseable structure. + + Must be called before hashing so that the hash and cai_offset are + computed against the same byte sequence that will be written to disk. + """ + try: + _scan_pdf_to_get_its_data(content) + return content + except ValueError: + repaired = _read_pdf_using_pypdf(content) + return repaired + + def emplace_manifest_into_pdf( initial_content: bytes, - manifests: ManifestStore, + manifest_store: ManifestStore, + c2pa_offset: int, *, author: str | None = None, ) -> bytes: @@ -75,11 +91,8 @@ def emplace_manifest_into_pdf( - Exception c2pa.hash.data: start == len(initial_content), length == length of the entire tail (see C2PA 2.2). - Sign the claim, build the jumbf store, place it as EmbeddedFile, write xref/trailer correctly. """ - try: - info = _scan_pdf_to_get_its_data(initial_content) - except ValueError: - initial_content = _read_pdf_using_pypdf(initial_content=initial_content) - info = _scan_pdf_to_get_its_data(initial_content) + info = _scan_pdf_to_get_its_data(initial_content) + initial_length_of_file = len(initial_content) pointer_on_previous_xref = info.startxref starting_value = info.max_obj + 1 @@ -89,114 +102,138 @@ def emplace_manifest_into_pdf( author_info_required = bool(author) - assumed_hash_data_len = 0 - last = -1 - for _ in range(RETRY_SIGNATURE): - manifests.set_hash_data_length_for_all(assumed_hash_data_len) - store = manifests.serialize() - length_of_c2pa_manifest = len(store) - - object_1 = ( - f"{starting_value} 0 obj\n".encode("ascii") - + f"<< /Type /EmbeddedFile /Subtype {subtype} /Length {length_of_c2pa_manifest} >>\n".encode("ascii") - + b"stream\n" - + store - + b"\nendstream\nendobj\n" - ) - object_2 = ( - f"{starting_value + 1} 0 obj\n".encode("ascii") - + ( - f"<< /Type /Filespec /AFRelationship /C2PA_Manifest " - f"/F ({fname}) /UF ({fname}) /Desc (C2PA Manifest Store) " - f"/Subtype {subtype} /EF << /F {starting_value} 0 R >> >>\n" - ).encode("ascii") - + b"endobj\n" - ) - object_3 = ( - f"{starting_value + 2} 0 obj\n".encode("ascii") - + f"<< /Type /Names /Names [ ({fname}) {starting_value + 1} 0 R ] >>\n".encode("ascii") - + b"endobj\n" - ) - object_4 = ( - f"{starting_value + 3} 0 obj\n".encode("ascii") - + f"<< /Type /Names /EmbeddedFiles {starting_value + 2} 0 R >>\n".encode("ascii") - + b"endobj\n" - ) - object_5 = ( - f"{starting_value + 4} 0 obj\n".encode("ascii") - + ( - f"<< /Type /Catalog /Pages {info.pages_ref} /Names " - f"{starting_value + 3} 0 R /AF [ {starting_value + 1} 0 R ] >>\n" - ).encode("ascii") + serialized_manifest_store = manifest_store.serialize() + + serialized_manifest_store_length = len(serialized_manifest_store) + + object_1 = ( + f"{starting_value} 0 obj\n".encode("ascii") + + f"<< /Type /EmbeddedFile /Subtype {subtype} /Length {serialized_manifest_store_length} >>\n".encode("ascii") + + b"stream\n" + + serialized_manifest_store + + b"\nendstream\nendobj\n" + ) + object_2 = ( + f"{starting_value + 1} 0 obj\n".encode("ascii") + + ( + f"<< /Type /Filespec /AFRelationship /C2PA_Manifest " + f"/F ({fname}) /UF ({fname}) /Desc (C2PA Manifest Store) " + f"/Subtype {subtype} /EF << /F {starting_value} 0 R >> >>\n" + ).encode("ascii") + + b"endobj\n" + ) + object_3 = ( + f"{starting_value + 2} 0 obj\n".encode("ascii") + + f"<< /Type /Names /Names [ ({fname}) {starting_value + 1} 0 R ] >>\n".encode("ascii") + + b"endobj\n" + ) + object_4 = ( + f"{starting_value + 3} 0 obj\n".encode("ascii") + + f"<< /Type /Names /EmbeddedFiles {starting_value + 2} 0 R >>\n".encode("ascii") + + b"endobj\n" + ) + object_5 = ( + f"{starting_value + 4} 0 obj\n".encode("ascii") + + ( + f"<< /Type /Catalog /Pages {info.pages_ref} /Names " + f"{starting_value + 3} 0 R /AF [ {starting_value + 1} 0 R ] >>\n" + ).encode("ascii") + + b"endobj\n" + ) + + if author_info_required: + author_s = author.replace(")", r"\)") if author else "" + object_6 = ( + f"{starting_value + 5} 0 obj\n".encode("ascii") + + f"<< /Author ({author_s}) >>\n".encode("ascii") + b"endobj\n" ) + else: + object_6 = b"" + + sep = b"\n" + offset_of_object_1 = initial_length_of_file + len(sep) + offset_of_object_2 = offset_of_object_1 + len(object_1) + offset_of_object_3 = offset_of_object_2 + len(object_2) + offset_of_object_4 = offset_of_object_3 + len(object_3) + offset_of_object_5 = offset_of_object_4 + len(object_4) + + if author_info_required: + offset_of_object_6 = offset_of_object_5 + len(object_5) + xref_pos = offset_of_object_6 + len(object_6) + else: + xref_pos = offset_of_object_5 + len(object_5) + + count = 5 + (1 if author_info_required else 0) + xref = b"xref\n" + f"{starting_value} {count}\n".encode("ascii") + xref += ( + _xref_entry(offset_of_object_1) + + _xref_entry(offset_of_object_2) + + _xref_entry(offset_of_object_3) + + _xref_entry(offset_of_object_4) + + _xref_entry(offset_of_object_5) + ) - if author_info_required: - author_s = author.replace(")", r"\)") if author else "" - object_6 = ( - f"{starting_value + 5} 0 obj\n".encode("ascii") - + f"<< /Author ({author_s}) >>\n".encode("ascii") - + b"endobj\n" - ) - else: - object_6 = b"" - - sep = b"\n" - offset_of_object_1 = initial_length_of_file + len(sep) - offset_of_object_2 = offset_of_object_1 + len(object_1) - offset_of_object_3 = offset_of_object_2 + len(object_2) - offset_of_object_4 = offset_of_object_3 + len(object_3) - offset_of_object_5 = offset_of_object_4 + len(object_4) - if author_info_required: - offset_of_object_6 = offset_of_object_5 + len(object_5) - xref_pos = offset_of_object_6 + len(object_6) - else: - xref_pos = offset_of_object_5 + len(object_5) - - count = 5 + (1 if author_info_required else 0) - xref = b"xref\n" + f"{starting_value} {count}\n".encode("ascii") - xref += ( - _xref_entry(offset_of_object_1) - + _xref_entry(offset_of_object_2) - + _xref_entry(offset_of_object_3) - + _xref_entry(offset_of_object_4) - + _xref_entry(offset_of_object_5) - ) - if author_info_required: - xref += _xref_entry(offset_of_object_6) - - size_val = starting_value + count - trailer = ( - b"trailer\n<< " - + f"/Size {size_val} ".encode("ascii") - + f"/Root {starting_value + 4} 0 R ".encode("ascii") - + f"/Prev {pointer_on_previous_xref} ".encode("ascii") - ) - if author_info_required: - trailer += f"/Info {starting_value + 5} 0 R ".encode("ascii") - trailer += b">>\n" - - tail = ( - sep - + object_1 - + object_2 - + object_3 - + object_4 - + object_5 - + object_6 - + xref - + trailer - + b"startxref\n" - + str(xref_pos).encode("ascii") - + b"\n%%EOF\n" - ) + if author_info_required: + xref += _xref_entry(offset_of_object_6) - total_len = len(tail) - if total_len == last: - return initial_content + tail - last = total_len - assumed_hash_data_len = total_len + size_val = starting_value + count + trailer = ( + b"trailer\n<< " + + f"/Size {size_val} ".encode("ascii") + + f"/Root {starting_value + 4} 0 R ".encode("ascii") + + f"/Prev {pointer_on_previous_xref} ".encode("ascii") + ) + + if author_info_required: + trailer += f"/Info {starting_value + 5} 0 R ".encode("ascii") + + trailer += b">>\n" + + tail = ( + sep + + object_1 + + object_2 + + object_3 + + object_4 + + object_5 + + object_6 + + xref + + trailer + + b"startxref\n" + + str(xref_pos).encode("ascii") + + b"\n%%EOF\n" + ) + + manifest_store.add_full_c2pa_structure_exclusion( + c2pa_offset, + len(tail), + ) + + serialized_manifest_store = manifest_store.serialize() + serialized_manifest_store_length = len(serialized_manifest_store) + + object_1 = ( + f"{starting_value} 0 obj\n".encode("ascii") + + f"<< /Type /EmbeddedFile /Subtype {subtype} /Length {serialized_manifest_store_length} >>\n".encode("ascii") + + b"stream\n" + + serialized_manifest_store + + b"\nendstream\nendobj\n" + ) + + tail = ( + sep + + object_1 + + object_2 + + object_3 + + object_4 + + object_5 + + object_6 + + xref + + trailer + + b"startxref\n" + + str(xref_pos).encode("ascii") + + b"\n%%EOF\n" + ) - manifests.set_hash_data_length_for_all(assumed_hash_data_len) - store = manifests.serialize() return initial_content + tail diff --git a/c2pie/interface.py b/c2pie/interface.py index 58f2b65b..029e2576 100644 --- a/c2pie/interface.py +++ b/c2pie/interface.py @@ -10,10 +10,9 @@ from c2pie.c2pa.assertion_store import AssertionStore from c2pie.c2pa.claim import Claim from c2pie.c2pa.claim_signature import ClaimSignature -from c2pie.c2pa.config import RETRY_SIGNATURE from c2pie.c2pa.manifest import Manifest from c2pie.c2pa.manifest_store import ManifestStore -from c2pie.c2pa_injection.jpg_injection import JpgSegmentApp11Storage +from c2pie.c2pa_injection.jpg_injection import emplace_manifest_into_jpeg from c2pie.c2pa_injection.pdf_injection import emplace_manifest_into_pdf from c2pie.jumbf_boxes.box import Box from c2pie.utils.assertion_schemas import C2PA_AssertionTypes @@ -25,13 +24,9 @@ def c2pie_GenerateAssertion(assertion_type: C2PA_AssertionTypes, assertion_schem def c2pie_GenerateHashDataAssertion( - cai_offset: int, hashed_data: bytes, ) -> HashDataAssertion: - return HashDataAssertion( - cai_offset, - hashed_data, - ) + return HashDataAssertion(hashed_data) def c2pie_GenerateActionsAssertion( @@ -78,7 +73,7 @@ def c2pie_GenerateManifestStore( private_key: bytes, certificate_chain: bytes, file_name: str, - # TODO: #66 : move that variables to configfile + # TODO: #66: move that variables to configfile tsa_url: str | None, require_tsa: bool, tsa_log_dir: str | None, @@ -123,32 +118,16 @@ def c2pie_EmplaceManifest( manifest_store: ManifestStore, ) -> bytes: if format_type == C2PA_ContentTypes.jpg or format_type == C2PA_ContentTypes.jpeg: - assumed_hash_data_len = 0 - final_length = -1 - tail = b"" - - for _ in range(RETRY_SIGNATURE): - manifest_store.set_hash_data_length_for_all(assumed_hash_data_len) - - payload = manifest_store.serialize() - storage = JpgSegmentApp11Storage( - app11_segment_box_length=manifest_store.get_length(), - app11_segment_box_type=manifest_store.get_type(), - payload=payload, - ) - - tail = storage.serialize() - total_len = len(tail) - - if total_len == final_length: - break - - final_length = total_len - assumed_hash_data_len = total_len - - return content_bytes[:c2pa_offset] + tail + content_bytes[c2pa_offset:] - - if format_type == C2PA_ContentTypes.pdf: - return emplace_manifest_into_pdf(content_bytes, manifest_store) - - raise ValueError(f"Unsupported content type {format_type}!") + return emplace_manifest_into_jpeg( + content_bytes, + manifest_store, + c2pa_offset, + ) + elif format_type == C2PA_ContentTypes.pdf: + return emplace_manifest_into_pdf( + content_bytes, + manifest_store, + c2pa_offset, + ) + else: + raise ValueError(f"Unsupported content type {format_type}!") diff --git a/c2pie/jumbf_boxes/super_box.py b/c2pie/jumbf_boxes/super_box.py index 5a039bbd..e20e4bf9 100644 --- a/c2pie/jumbf_boxes/super_box.py +++ b/c2pie/jumbf_boxes/super_box.py @@ -15,7 +15,11 @@ def __init__( label: str = "", content_boxes: list | None = None, ): - self.description_box = DescriptionBox(content_type=content_type, label=label) + self.description_box = DescriptionBox( + content_type=content_type, + label=label, + ) + self.content_boxes = [] if content_boxes is None else content_boxes payload = self.description_box.serialize() + self.serialize_content_boxes() diff --git a/c2pie/signing.py b/c2pie/signing.py index 25377727..1ed0df15 100644 --- a/c2pie/signing.py +++ b/c2pie/signing.py @@ -3,6 +3,7 @@ from pathlib import Path from typing import Literal +from c2pie.c2pa_injection.pdf_injection import prepare_pdf_bytes from c2pie.c2pa_parsing.jumbf_parsing import extract_manifest_boxes, get_active_manifest_uuid from c2pie.c2pa_parsing.manifest_extractor import extract_manifest_store_bytes from c2pie.interface import ( @@ -138,6 +139,7 @@ def sign_file( file_type: C2PA_ContentTypes = _get_content_type_by_filepath(input_path) if file_type.name == "pdf": + raw_bytes = prepare_pdf_bytes(raw_bytes) cai_offset = len(raw_bytes) else: cai_offset = 2 @@ -145,7 +147,6 @@ def sign_file( assertions = [] hash_data_assertion = c2pie_GenerateHashDataAssertion( - cai_offset=cai_offset, hashed_data=hashlib.sha256(raw_bytes).digest(), ) diff --git a/tests/c2pa/assertions/assertion_test.py b/tests/c2pa/assertions/assertion_test.py index b41122ad..66d27953 100644 --- a/tests/c2pa/assertions/assertion_test.py +++ b/tests/c2pa/assertions/assertion_test.py @@ -1,4 +1,4 @@ -from c2pie.c2pa.assertion import Assertion, HashDataAssertion +from c2pie.c2pa.assertion import Assertion from c2pie.utils.assertion_schemas import C2PA_AssertionTypes, cbor_to_bytes, json_to_bytes from c2pie.utils.content_types import jumbf_content_types @@ -76,11 +76,3 @@ def test_serialize_cbor_assertion(): def test_assertion_content_boxes_not_empty(): # noqa: F811 actions_assertion = Assertion(C2PA_AssertionTypes.actions, {}) assert len(actions_assertion.content_boxes) != 0 - - -def test_additional_extensions_adding_for_hash_data_assertions(): - additional_exclusion = {"some_extension": 343} - data_hash_assertion = HashDataAssertion( - cai_offset=124, hashed_data=b"", additional_exclusions=[additional_exclusion] - ) - assert additional_exclusion in data_hash_assertion.schema["exclusions"] diff --git a/tests/c2pa/assertions/data_hash_assertion_test.py b/tests/c2pa/assertions/data_hash_assertion_test.py index 20ef37d1..b9038192 100644 --- a/tests/c2pa/assertions/data_hash_assertion_test.py +++ b/tests/c2pa/assertions/data_hash_assertion_test.py @@ -1,3 +1,5 @@ +import pytest + from c2pie.c2pa.assertion import HashDataAssertion from c2pie.utils.assertion_schemas import C2PA_AssertionTypes, cbor_to_bytes from c2pie.utils.content_types import jumbf_content_types @@ -7,119 +9,121 @@ def test_hash_data_assertion_has_correct_type(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) assert data_hash_assertion.type == C2PA_AssertionTypes.data_hash def test_hash_data_assertion_content_type_is_cbor(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) assert data_hash_assertion.get_content_type() == jumbf_content_types["cbor"] def test_hash_data_assertion_label(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) assert data_hash_assertion.get_label() == "c2pa.hash.data" def test_hash_data_assertion_schema_alg_is_sha256(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) assert data_hash_assertion.schema["alg"] == "sha256" -def test_hash_data_assertion_schema_pad_is_empty(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) - assert data_hash_assertion.schema["pad"] == [] - - -def test_hash_data_assertion_has_correct_offset(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) - assert data_hash_assertion.schema["exclusions"][0]["start"] == CAI_OFFSET - - -def test_hash_data_assertion_default_exclusion_length(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) - assert data_hash_assertion.schema["exclusions"][0]["length"] == 65535 +def test_hash_data_assertion_schema_pad_is_64_bytes_length(): + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) + assert data_hash_assertion.schema["pad"] == b"\x00" * 64 def test_hash_data_assertion_has_correct_hash(): expected_hashed_data = b"\xab" * 32 - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=expected_hashed_data, - ) + data_hash_assertion = HashDataAssertion(hashed_data=expected_hashed_data) assert data_hash_assertion.schema["hash"] == expected_hashed_data def test_hash_data_assertion_serializes_as_cbor(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) expected_payload = cbor_to_bytes(data_hash_assertion.schema) assert len(data_hash_assertion.content_boxes) == 1 assert data_hash_assertion.content_boxes[0].payload == expected_payload -def test_hash_data_assertion_with_additional_exclusions(): - additional = [ - { - "start": 100, - "length": 200, - }, - ] - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - additional_exclusions=additional, - ) - exclusions = data_hash_assertion.schema["exclusions"] - assert len(exclusions) == 2 - assert exclusions[1] == {"start": 100, "length": 200} - - -def test_hash_data_assertion_without_additional_exclusions_has_one_exclusion(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, - ) - assert len(data_hash_assertion.schema["exclusions"]) == 1 +def test_hash_data_assertion_without_additional_exclusions_has_not_exclusions(): + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) + assert len(data_hash_assertion.schema["exclusions"]) == 0 def test_set_hash_data_length_updates_exclusion(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) + data_hash_assertion.add_full_c2pa_structure_exclusion( + CAI_OFFSET, + 200, ) - data_hash_assertion.set_hash_data_length(200) assert data_hash_assertion.schema["exclusions"][0]["length"] == 200 def test_set_hash_data_length_updates_content_box_payload(): - data_hash_assertion = HashDataAssertion( - cai_offset=CAI_OFFSET, - hashed_data=HASHED_DATA, + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) + data_hash_assertion.add_full_c2pa_structure_exclusion( + CAI_OFFSET, + 200, ) - data_hash_assertion.set_hash_data_length(200) expected_payload = cbor_to_bytes(data_hash_assertion.schema) assert data_hash_assertion.content_boxes[0].payload == expected_payload + + +def test_align_hash_data_with_large_difference_causes_error(): + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) + data_hash_assertion.schema["pad"] = b"\x00" + + with pytest.raises(ValueError, match="Difference in length exceeds the predefined pad"): + data_hash_assertion.add_full_c2pa_structure_exclusion( + CAI_OFFSET, + 200, + ) + + +def test_exceed_cbor_23_bytes_limit_add_1_byte_to_length(): + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) + + # Empty list (default value) of exclusions serialized + # in CBOR has 1 bytes in length. + + # Following list of exclusions serialized in CBOR: + # [{ "start": 2, "length": "", }] + # + # has 17 bytes in length + + # We need to set the exclusion size so that the difference + # is greater than 41 bytes (64 - 23 = 41). + + fake_payload = b"\x00" * 24 # 41 - 17 = 24 + + data_hash_assertion.add_full_c2pa_structure_exclusion( + CAI_OFFSET, + fake_payload, + ) + + # Don`t forget about additional byte + assert len(data_hash_assertion.schema["pad"]) == 22 + + +def test_data_hash_assertion_exclusions_more_then_23(): + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) + data_hash_assertion.schema["exclusions"] = [{"start": 0, "length": 0}] * 23 + + data_hash_assertion.add_full_c2pa_structure_exclusion( + CAI_OFFSET, + 0, + ) + + assert len(data_hash_assertion.schema["pad"]) == 47 + + +def test_calculation_of_pad_inside_data_hash_assertion_was_performed_correctly(): + data_hash_assertion = HashDataAssertion(hashed_data=HASHED_DATA) + + data_hash_assertion.add_full_c2pa_structure_exclusion( + CAI_OFFSET, + 0, + ) + + assert len(data_hash_assertion.schema["pad"]) == 48 diff --git a/tests/c2pa/claim_signature_test.py b/tests/c2pa/claim_signature_test.py index 078bbfa4..955666fd 100644 --- a/tests/c2pa/claim_signature_test.py +++ b/tests/c2pa/claim_signature_test.py @@ -1,3 +1,6 @@ +import cbor2 +import pytest + from c2pie.c2pa.assertion import Assertion from c2pie.c2pa.assertion_store import AssertionStore from c2pie.c2pa.claim import Claim @@ -57,3 +60,108 @@ def test_create_claim_signature_with_non_empty_claim(): assert claim_signature.claim is not None # noqa: B015 assert claim_signature.content_boxes[0].get_type() == b"cbor".hex() # noqa: B015 + + +def test_serialization_cose_sign1_is_performed_with_alignment(): + claim_signature = ClaimSignature.__new__(ClaimSignature) + claim_signature.serialized_cose_sign1_length = 0 + + cose_sign1 = [ + "protected_header", + { + "pad": b"\x00\x00\x00\x00", + }, + "payload", + "signature", + ] + + serialized_cose_sign1_cbor_1 = claim_signature.serialize_cose_sign1_tagged_with_alignment(cose_sign1) + + assert claim_signature.serialized_cose_sign1_length != 0 + assert cbor2.loads(serialized_cose_sign1_cbor_1).value[1]["pad"] == cose_sign1[1]["pad"] + + cose_sign1 = [ + "protected_header", + { + "pad": b"\x00\x00\x00\x00", + }, + "payload", + "signature2", + ] + + serialized_cose_sign1_cbor_2 = claim_signature.serialize_cose_sign1_tagged_with_alignment(cose_sign1) + + assert len(serialized_cose_sign1_cbor_1) == len(serialized_cose_sign1_cbor_2) + + cose_sign1 = [ + "protected_header", + { + "pad": b"\x00\x00\x00\x00", + }, + "", + "signature", + ] + + serialized_cose_sign1_cbor_3 = claim_signature.serialize_cose_sign1_tagged_with_alignment(cose_sign1) + + assert len(serialized_cose_sign1_cbor_1) == len(serialized_cose_sign1_cbor_3) + + +def test_align_cose_sign1_with_large_difference_causes_error(): + claim_signature = ClaimSignature.__new__(ClaimSignature) + claim_signature.serialized_cose_sign1_length = 1 + + cose_sign1 = [ + "protected_header", + { + "pad": b"\x00\x00\x00\x00", + }, + "payload", + "signature", + ] + + with pytest.raises(ValueError, match="Difference in length exceeds the predefined pad"): + claim_signature.serialize_cose_sign1_tagged_with_alignment(cose_sign1) + + +def test_cose_sign1_tagged_tag_value_is_18(): + claim_signature = ClaimSignature.__new__(ClaimSignature) + claim_signature.serialized_cose_sign1_length = 0 + + cose_sign1 = [ + "protected_header", + { + "pad": b"\x00\x00\x00\x00", + }, + "payload", + "signature", + ] + + serialized_cose_sign1_cbor = claim_signature.serialize_cose_sign1_tagged_with_alignment(cose_sign1) + + assert cbor2.loads(serialized_cose_sign1_cbor).tag == 18 + + +def test_exceed_cbor_limit_add_1_bytes_to_length(): + claim_signature = ClaimSignature.__new__(ClaimSignature) + + # We must ensure that the difference is such + # that the pad size is greater than 23 bytes. + + # Current length of cose_sign1 serialized in CBOR is 50 bytes. + cose_sign1 = [ + "protected_header", + { + "pad": b"\x00" * 8, + }, + "payload", + "signature", + ] + + # cose_sign1 CBOR encoded + CBOR limit - current pad + 1 (COSE tag) + # ~ 50 + 24 - 8 + 1 + claim_signature.serialized_cose_sign1_length = 67 + + serialized_cose_sign1_cbor = claim_signature.serialize_cose_sign1_tagged_with_alignment(cose_sign1) + + assert len(cbor2.loads(serialized_cose_sign1_cbor).value[1]["pad"]) == 25 diff --git a/tests/c2pa/e2e_test.py b/tests/c2pa/e2e_test.py index 1dfe9d56..f7d9eadb 100644 --- a/tests/c2pa/e2e_test.py +++ b/tests/c2pa/e2e_test.py @@ -1,5 +1,4 @@ import json -import os import shutil import subprocess from pathlib import Path @@ -9,12 +8,12 @@ from c2pie.signing import sign_file from c2pie.utils.content_types import C2PA_ContentTypes -FIXTURES_DIR = Path(__file__).parent.parent / "test_files" +TEST_FILES_DIR = Path(__file__).parent.parent / "test_files" test_files_by_extension = { "pdf": [ "test_doc.pdf", - "test_doc2.pdf", + "test_broken_doc.pdf", ], "jpg": [ "test_image.jpg", @@ -26,37 +25,58 @@ def get_test_file_full_path(filename: str) -> Path: - path = FIXTURES_DIR / filename + path = TEST_FILES_DIR / filename if not path.exists(): raise FileNotFoundError(f"Fixture not found: {path}") + return path -def copy_test_file(source_path: str, destination_path: Path) -> None: +def copy_test_file( + source_path: str, + destination_path: Path, +) -> None: source_full_path = get_test_file_full_path(source_path) - destination_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copyfile(source_full_path, destination_path) + + destination_path.parent.mkdir( + parents=True, + exist_ok=True, + ) + + shutil.copyfile( + source_full_path, + destination_path, + ) def has_c2patool() -> bool: return shutil.which("c2patool") is not None -def _c2pa_json_report(asset_path: str) -> dict: +def _validate_using_c2patool_and_return_json_report(asset_path: Path) -> dict: """ Return c2patool's JSON report. If parsing fails, raise with stdout/stderr for debugging. """ c2patool_launch_command = ["c2patool", asset_path, "-d"] - cp2atool_result = subprocess.run(c2patool_launch_command, capture_output=True, text=True) - evaluation_result = cp2atool_result + cp2atool_result = subprocess.run( + c2patool_launch_command, + # If set to False (by default), 'stdout' and 'stderr' outputs + # will not be available via '.stderr' and '.stdout', correspondingly. + capture_output=True, + # If set to False (by default), a byte stream will be + # returned instead of a string. + text=True, + ) + if cp2atool_result.returncode == 0: - return json.loads(cp2atool_result.stdout or "{}") + return json.loads(cp2atool_result.stdout) + pytest.fail( "c2patool failed or did not output JSON.\n" - f"args={evaluation_result.args if evaluation_result else None}\n" - f"stdout={evaluation_result.stdout if evaluation_result else None}\n" - f"stderr={evaluation_result.stderr if evaluation_result else None}" + f"args={cp2atool_result.args if cp2atool_result else None}\n" + f"stdout={cp2atool_result.stdout if cp2atool_result else None}\n" + f"stderr={cp2atool_result.stderr if cp2atool_result else None}" ) @@ -64,35 +84,66 @@ def _c2pa_json_report(asset_path: str) -> dict: def test_e2e_signing_with_c2patool_validation(tmp_path): if not has_c2patool(): pytest.skip("c2patool not available") + if not sign_file: pytest.skip("sign_file function not available yet") - os.environ["C2PA_BACKEND"] = "tool" - for content_type in C2PA_ContentTypes: input_file = tmp_path / f"in.{content_type.name}" output_file = tmp_path / f"out.{content_type.name}" for test_file in test_files_by_extension[content_type.name]: - copy_test_file(f"./{test_file}", input_file) + copy_test_file( + test_file, + input_file, + ) - try: - sign_file( - input_path=input_file, - output_path=output_file, - ) - except NotImplementedError: - pytest.xfail("sign_file function not implemented yet") + sign_file( + input_path=input_file, + output_path=output_file, + ) - data = _c2pa_json_report(str(output_file)) - assert "manifests" in data or "manifest" in data + report = _validate_using_c2patool_and_return_json_report(output_file) + assert "manifests" in report - manifests = data.get("manifests") + manifests = report.get("manifests") assert manifests, "no manifests in output" - if isinstance(manifests, dict): - manifests_list = list(manifests.values()) - else: - manifests_list = manifests - + manifests_list = list(manifests.values()) assert manifests_list, "empty manifests list after normalization" + + +@pytest.mark.e2e +@pytest.mark.parametrize( + "iteration", + range(30), +) +def test_e2e_signature_stability( + iteration, + tmp_path, +): + if not has_c2patool(): + pytest.skip("c2patool not available") + + if not sign_file: + pytest.skip("sign_file function not available yet") + + for content_type in C2PA_ContentTypes: + input_file = tmp_path / f"in.{content_type.name}" + output_file = tmp_path / f"out.{content_type.name}" + + for test_file in test_files_by_extension[content_type.name]: + copy_test_file( + test_file, + input_file, + ) + + sign_file( + input_path=input_file, + output_path=output_file, + ) + + report = _validate_using_c2patool_and_return_json_report(output_file) + validation_state = report.get("validation_state") + + assert validation_state == "Valid" diff --git a/tests/c2pa/interface_test.py b/tests/c2pa/interface_test.py index d06cb707..e124aba4 100644 --- a/tests/c2pa/interface_test.py +++ b/tests/c2pa/interface_test.py @@ -1,4 +1,7 @@ from pathlib import Path +from unittest.mock import patch + +import pytest from c2pie.c2pa.manifest_store import ManifestStore from c2pie.interface import ( @@ -40,7 +43,6 @@ def test_generate_hash_data_assertion_returns_hash_data_assertion_instance(): from c2pie.c2pa.assertion import HashDataAssertion hash_data_assertion = c2pie_GenerateHashDataAssertion( - cai_offset=2, hashed_data=b"\x00" * 32, ) assert isinstance(hash_data_assertion, HashDataAssertion) @@ -133,7 +135,7 @@ def test_emplace_manifest_returns_bytes_with_jpeg_signature(): jpeg_bytes = f.read() assertions = [ - c2pie_GenerateHashDataAssertion(cai_offset=2, hashed_data=b"\x00" * 32), + c2pie_GenerateHashDataAssertion(hashed_data=b"\x00" * 32), c2pie_GenerateActionsAssertion(action="c2pa.created"), ] @@ -156,3 +158,73 @@ def test_emplace_manifest_returns_bytes_with_jpeg_signature(): assert isinstance(result, bytes) assert result[:2] == b"\xff\xd8" + + +FIXTURES_FOLDER_PATH = Path(__file__).parent.parent / "test_files" + +test_cases = [ + Path(FIXTURES_FOLDER_PATH / "test_image.jpg"), + Path(FIXTURES_FOLDER_PATH / "test_doc.pdf"), +] + + +@pytest.mark.parametrize( + "file", + test_cases, + ids=lambda x: x.suffix[1:], +) +def test_calculated_exclusion_covers_the_full_storage(file): + with open(KEY_FILEPATH, "rb") as f: + key = f.read() + with open(CERT_FILEPATH, "rb") as f: + cert = f.read() + + with open(file, "rb") as f: + raw_bytes = f.read() + + assertions = [ + c2pie_GenerateHashDataAssertion( + hashed_data=b"\x00" * 32, + ), + ] + + manifest_store = c2pie_GenerateManifestStore( + assertions=assertions, + private_key=key, + certificate_chain=cert, + file_name=file.name, + tsa_url=None, + require_tsa=False, + tsa_log_dir=None, + ) + + file_extension = C2PA_ContentTypes(file.suffix) + + if file_extension == C2PA_ContentTypes.jpeg or file_extension == C2PA_ContentTypes.jpg: + """ + Expected length of serialized data in JPEG/JPG format consists + of APP11 segment header + payload (serialized ManifestStore). + + More info about APP11 segment you can see here: docs/JPG-structure-overview.md + """ + expected_serialized_length = 2 + 2 + 2 + 2 + 4 + len(manifest_store.serialize()) + elif file_extension == C2PA_ContentTypes.pdf: + """ + Expected length of serialized data in PDF format consists + of boby (serialized ManifestStore) + updated cross-ref table and trailer. + + More info about PDF Incremental Update you can see here: docs/PDF-structure-overview.md + """ + expected_serialized_length = 7148 + + with patch("c2pie.c2pa.manifest_store.ManifestStore.add_full_c2pa_structure_exclusion") as mock_func: + c2pie_EmplaceManifest( + format_type=file_extension, + content_bytes=raw_bytes, + c2pa_offset=2, + manifest_store=manifest_store, + ) + + last_call = mock_func.call_args + + assert expected_serialized_length == last_call.args[1] diff --git a/tests/c2pa/manifest_store_test.py b/tests/c2pa/manifest_store_test.py index b25ea0ae..feeb7151 100644 --- a/tests/c2pa/manifest_store_test.py +++ b/tests/c2pa/manifest_store_test.py @@ -77,7 +77,6 @@ def test_manifest_store_with_previous_manifests_serializes_without_error(): def test_manifest_store_set_hash_data_length_only_affects_new_manifests(): data_hash_assertion = HashDataAssertion( - 0, b"\x00\x00\x00", ) @@ -115,7 +114,7 @@ def test_manifest_store_set_hash_data_length_only_affects_new_manifests(): ] ) - manifest_store.set_hash_data_length_for_all(1024) + manifest_store.add_full_c2pa_structure_exclusion(2, 1024) previous_box = manifest_store.content_boxes[0] assert previous_box == previous_manifest diff --git a/tests/test_files/test_doc2.pdf b/tests/test_files/test_broken_doc.pdf similarity index 100% rename from tests/test_files/test_doc2.pdf rename to tests/test_files/test_broken_doc.pdf