From 712bb2c3a5b88c973fec53b053f930f842eb401a Mon Sep 17 00:00:00 2001 From: SequeI Date: Wed, 3 Dec 2025 19:07:15 +0000 Subject: [PATCH 1/4] feat: Support keys and signatures from memory Add support for providing elliptic curve keys and signatures as bytes in the library API. Private/public keys can now be passed as bytes (PEM or compressed format), and signatures can be provided as JSON strings or bytes. All changes are backward compatible. - Signer/Verifier accept keys from bytes or file paths - Public keys support compressed format (33/49/67 bytes) - Signature.read() and verify() accept bytes/strings - Updated documentation and changelog Signed-off-by: SequeI --- CHANGELOG.md | 3 +- README.md | 18 +++ src/model_signing/__init__.py | 6 +- src/model_signing/_signing/sign_ec_key.py | 120 ++++++++++++++++-- src/model_signing/_signing/sign_sigstore.py | 26 +++- .../_signing/sign_sigstore_pb.py | 26 +++- src/model_signing/signing.py | 12 +- src/model_signing/verifying.py | 25 +++- tests/api_test.py | 85 +++++++++++++ 9 files changed, 292 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f4ce652..49621e61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,8 @@ All versions prior to 1.0.0 are untracked. ## [Unreleased] ### Added --Added the `digest` subcommand to compute and print a model's digest. This enables other tools to easily pair the attestations with a model directory. +- Added the `digest` subcommand to compute and print a model's digest. This enables other tools to easily pair the attestations with a model directory. +- Added support for providing elliptic curve keys and signatures from memory (as bytes) in the library API. Private and public keys can now be passed as bytes in addition to file paths, and public keys also support compressed format (33 bytes for secp256r1). Signatures can be provided as JSON strings or bytes in addition to file paths. ### Changed - ... diff --git a/README.md b/README.md index cbfe4a2c..d97e5b64 100644 --- a/README.md +++ b/README.md @@ -359,6 +359,14 @@ model_signing.signing.Config().use_elliptic_key_signer( ) ``` +Keys can also be provided from memory as bytes instead of file paths: +```python +private_key_bytes = Path("key.priv").read_bytes() +model_signing.signing.Config().use_elliptic_key_signer( + private_key=private_key_bytes +).sign("finbert", "finbert.sig") +``` + The same signing configuration can be used to sign multiple models: ```python @@ -395,6 +403,16 @@ for model in all_models: verifying_config.verify(model, f"{model}_sharded.sig") ``` +Public keys can also be provided from memory as bytes (PEM format or compressed 33-byte format for secp256r1), and signatures can be provided as JSON strings or bytes: +```python +public_key_bytes = Path("key.pub").read_bytes() +signature_bytes = Path("model.sig").read_bytes() +verifying_config = model_signing.verifying.Config().use_elliptic_key_verifier( + public_key=public_key_bytes +) +verifying_config.verify("finbert", signature_bytes) +``` + Consult the [official documentation](https://sigstore.github.io/model-transparency/model_signing.html) for more details. diff --git a/src/model_signing/__init__.py b/src/model_signing/__init__.py index 6d45803e..5eaad5c8 100644 --- a/src/model_signing/__init__.py +++ b/src/model_signing/__init__.py @@ -60,7 +60,8 @@ This example generates a signature using a private key based on elliptic curve cryptography. It also hashes the model by ignoring `README.md` and any git -related file present in the model directory. +related file present in the model directory. Keys can also be provided as bytes +from memory instead of file paths. We also support signing with signing certificates, using a similar API as above. @@ -98,6 +99,9 @@ ).verify("finbert", "finbert.sig") ``` +Public keys can also be provided as bytes (PEM format or compressed format), +and signatures can be provided as JSON strings or bytes instead of file paths. + A reminder that we still need to set the verification configuration. This sets up the cryptographic primitives to verify the signature and is needed to know how to parse the signature file. diff --git a/src/model_signing/_signing/sign_ec_key.py b/src/model_signing/_signing/sign_ec_key.py index 08cde115..764f659f 100644 --- a/src/model_signing/_signing/sign_ec_key.py +++ b/src/model_signing/_signing/sign_ec_key.py @@ -82,22 +82,115 @@ def get_ec_key_hash( raise ValueError(f"Unexpected key size {key_size}") +def _load_private_key( + private_key: pathlib.Path | bytes, password: str | None = None +) -> ec.EllipticCurvePrivateKey: + """Load a private key from a path or bytes. + + Args: + private_key: Either a path to a PEM-encoded private key file, or bytes + containing the PEM-encoded private key. + password: Optional password for the private key. + + Returns: + The loaded private key. + + Raises: + ValueError: If the key format is invalid or unsupported. + """ + match private_key: + case pathlib.Path(): + key_bytes = private_key.read_bytes() + case bytes(): + key_bytes = private_key + case _: + raise TypeError( + f"Expected pathlib.Path or bytes, got {type(private_key)}" + ) + + loaded_key = serialization.load_pem_private_key(key_bytes, password) + if not isinstance(loaded_key, ec.EllipticCurvePrivateKey): + raise ValueError("Only elliptic curve private keys are supported") + _check_supported_ec_key(loaded_key.public_key()) + return loaded_key + + +def _load_public_key( + public_key: pathlib.Path | bytes, +) -> ec.EllipticCurvePublicKey: + """Load a public key from a path, bytes (PEM), or compressed format. + + Args: + public_key: + - A path to a PEM-encoded public key file + - Bytes containing PEM-encoded public key + - 33 bytes of compressed public key (for secp256r1) + + Returns: + The loaded public key. + + Raises: + ValueError: If the key format is invalid or unsupported. + """ + match public_key: + case pathlib.Path(): + key_bytes = public_key.read_bytes() + case bytes(): + key_bytes = public_key + case _: + raise TypeError( + f"Expected pathlib.Path or bytes, got {type(public_key)}" + ) + + # 33 bytes for secp256r1, 49 for secp384r1, 67 for secp521r1 + if len(key_bytes) in [33, 49, 67]: + curve_map = {33: ec.SECP256R1(), 49: ec.SECP384R1(), 67: ec.SECP521R1()} + curve = curve_map.get(len(key_bytes)) + if curve is None: + raise ValueError( + f"Unsupported compressed key length: {len(key_bytes)} bytes. " + "Expected 33 (secp256r1), 49 (secp384r1), " + "or 67 (secp521r1) bytes." + ) + try: + loaded_key = ec.EllipticCurvePublicKey.from_encoded_point( + curve, key_bytes + ) + return loaded_key + except (ValueError, exceptions.UnsupportedAlgorithm) as e: + raise ValueError( + f"Failed to load compressed public key: {e}. " + f"Expected {len(key_bytes)} bytes compressed format " + f"for {curve.name}." + ) from e + else: + try: + loaded_key = serialization.load_pem_public_key(key_bytes) + except ValueError: + loaded_key = serialization.load_der_public_key(key_bytes) + + if not isinstance(loaded_key, ec.EllipticCurvePublicKey): + raise ValueError("Only elliptic curve public keys are supported") + curve = loaded_key.curve.name + if curve not in ["secp256r1", "secp384r1", "secp521r1"]: + raise ValueError(f"Unsupported key for curve '{curve}'") + return loaded_key + + class Signer(sigstore_pb.Signer): """Signer using an elliptic curve private key.""" def __init__( - self, private_key_path: pathlib.Path, password: str | None = None + self, private_key: pathlib.Path | bytes, password: str | None = None ): """Initializes the signer with the private key and optional password. Args: - private_key_path: The path to the PEM encoded private key. + private_key: Either a path to a PEM-encoded private key file, + or bytes containing the PEM-encoded private key. password: Optional password for the private key. """ - self._private_key = serialization.load_pem_private_key( - private_key_path.read_bytes(), password - ) - _check_supported_ec_key(self._private_key.public_key()) + self._private_key = _load_private_key(private_key, password) @override def sign(self, payload: signing.Payload) -> signing.Signature: @@ -149,17 +242,18 @@ def _get_verification_material(self) -> bundle_pb.VerificationMaterial: class Verifier(sigstore_pb.Verifier): """Verifier for signatures generated with an elliptic curve private key.""" - def __init__(self, public_key_path: pathlib.Path): + def __init__(self, public_key: pathlib.Path | bytes): """Initializes the verifier with the public key to use. Args: - public_key_path: The path to the public key to use. This must be - paired with the private key used to generate the signature. + public_key: + - A path to a PEM-encoded public key file + - Bytes containing PEM-encoded public key + - 33 bytes of compressed public key (for secp256r1) + This must be paired with the private key used to generate + the signature. """ - self._public_key = serialization.load_pem_public_key( - public_key_path.read_bytes() - ) - _check_supported_ec_key(self._public_key) + self._public_key = _load_public_key(public_key) @override def _verify_bundle(self, bundle: bundle_pb.Bundle) -> tuple[str, bytes]: diff --git a/src/model_signing/_signing/sign_sigstore.py b/src/model_signing/_signing/sign_sigstore.py index 080f98ab..2828e71f 100644 --- a/src/model_signing/_signing/sign_sigstore.py +++ b/src/model_signing/_signing/sign_sigstore.py @@ -55,8 +55,30 @@ def write(self, path: pathlib.Path) -> None: @classmethod @override - def read(cls, path: pathlib.Path) -> Self: - content = path.read_text(encoding="utf-8") + def read(cls, path_or_content: pathlib.Path | str | bytes) -> Self: + """Read a signature from a file path, JSON string, or bytes. + + Args: + path_or_content: + - A path to a JSON signature file + - A JSON string containing the signature + - Bytes containing the JSON signature (UTF-8 encoded) + + Returns: + The loaded signature. + """ + match path_or_content: + case pathlib.Path(): + content = path_or_content.read_text(encoding="utf-8") + case bytes(): + content = path_or_content.decode("utf-8") + case str(): + content = path_or_content + case _: + raise TypeError( + f"Expected pathlib.Path, str, or bytes, " + f"got {type(path_or_content)}" + ) return cls(sigstore_models.Bundle.from_json(content)) diff --git a/src/model_signing/_signing/sign_sigstore_pb.py b/src/model_signing/_signing/sign_sigstore_pb.py index 5a1fcc0d..577166c8 100644 --- a/src/model_signing/_signing/sign_sigstore_pb.py +++ b/src/model_signing/_signing/sign_sigstore_pb.py @@ -109,8 +109,30 @@ def write(self, path: pathlib.Path) -> None: @classmethod @override - def read(cls, path: pathlib.Path) -> Self: - content = path.read_text(encoding="utf-8") + def read(cls, path_or_content: pathlib.Path | str | bytes) -> Self: + """Read a signature from a file path, JSON string, or bytes. + + Args: + path_or_content: + - A path to a JSON signature file + - A JSON string containing the signature + - Bytes containing the JSON signature (UTF-8 encoded) + + Returns: + The loaded signature. + """ + match path_or_content: + case pathlib.Path(): + content = path_or_content.read_text(encoding="utf-8") + case bytes(): + content = path_or_content.decode("utf-8") + case str(): + content = path_or_content + case _: + raise TypeError( + f"Expected pathlib.Path, str, or bytes, " + f"got {type(path_or_content)}" + ) parsed_dict = json.loads(content) # adjust parsed_dict due to previous usage of protobufs diff --git a/src/model_signing/signing.py b/src/model_signing/signing.py index 4de79be2..a2b2a8ea 100644 --- a/src/model_signing/signing.py +++ b/src/model_signing/signing.py @@ -187,7 +187,10 @@ def use_sigstore_signer( return self def use_elliptic_key_signer( - self, *, private_key: hashing.PathLike, password: str | None = None + self, + *, + private_key: hashing.PathLike | bytes, + password: str | None = None, ) -> Self: """Configures the signing to be performed using elliptic curve keys. @@ -195,13 +198,16 @@ def use_elliptic_key_signer( using a private key based on elliptic curve cryptography. Args: - private_key: The path to the private key to use for signing. + private_key: Either a path to a PEM-encoded private key file, + or bytes containing the PEM-encoded private key. password: An optional password for the key, if encrypted. Return: The new signing configuration. """ - self._signer = ec_key.Signer(pathlib.Path(private_key), password) + if not isinstance(private_key, bytes): + private_key = pathlib.Path(private_key) + self._signer = ec_key.Signer(private_key, password) return self def use_certificate_signer( diff --git a/src/model_signing/verifying.py b/src/model_signing/verifying.py index 9eaf8b39..6bd79b1a 100644 --- a/src/model_signing/verifying.py +++ b/src/model_signing/verifying.py @@ -76,12 +76,18 @@ def __init__(self): self._ignore_unsigned_files = False def verify( - self, model_path: hashing.PathLike, signature_path: hashing.PathLike + self, + model_path: hashing.PathLike, + signature: hashing.PathLike | str | bytes, ): """Verifies that a model conforms to a signature. Args: model_path: The path to the model to verify. + signature: + - A path to a JSON signature file + - A JSON string containing the signature + - Bytes containing the JSON signature (UTF-8 encoded) Raises: ValueError: No verifier has been configured. @@ -90,11 +96,11 @@ def verify( raise ValueError("Attempting to verify with no configured verifier") if self._uses_sigstore: - signature = sigstore.Signature.read(pathlib.Path(signature_path)) + sig = sigstore.Signature.read(signature) else: - signature = sigstore_pb.Signature.read(pathlib.Path(signature_path)) + sig = sigstore_pb.Signature.read(signature) - expected_manifest = self._verifier.verify(signature) + expected_manifest = self._verifier.verify(sig) if self._hashing_config is None: self._guess_hashing_config(expected_manifest) @@ -248,7 +254,7 @@ def use_sigstore_verifier( return self def use_elliptic_key_verifier( - self, *, public_key: hashing.PathLike + self, *, public_key: hashing.PathLike | bytes ) -> Self: """Configures the verification of signatures generated by a private key. @@ -258,13 +264,18 @@ def use_elliptic_key_verifier( used during signing. Args: - public_key: The path to the public key to verify with. + public_key: + - A path to a PEM-encoded public key file + - Bytes containing PEM-encoded public key + - 33 bytes of compressed public key (for secp256r1) Return: The new verification configuration. """ self._uses_sigstore = False - self._verifier = ec_key.Verifier(pathlib.Path(public_key)) + if not isinstance(public_key, bytes): + public_key = pathlib.Path(public_key) + self._verifier = ec_key.Verifier(public_key) return self def use_certificate_verifier( diff --git a/tests/api_test.py b/tests/api_test.py index 1195d9f1..e22774fb 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -287,6 +287,91 @@ def test_sign_and_verify(self, base_path, populate_tmpdir): ) assert get_model_name(signature) == os.path.basename(model_path) + def test_sign_and_verify_with_bytes_keys(self, base_path, populate_tmpdir): + os.chdir(base_path) + + model_path = populate_tmpdir + ignore_paths = [] + ignore_git_paths = False + signature = Path(model_path / "model.sig") + private_key_path = Path(TESTDATA / "keys/certificate/signing-key.pem") + public_key_path = Path( + TESTDATA / "keys/certificate/signing-key-pub.pem" + ) + + private_key_bytes = private_key_path.read_bytes() + public_key_bytes = public_key_path.read_bytes() + + signing.Config().use_elliptic_key_signer( + private_key=private_key_bytes, password=None + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).sign(model_path, signature) + + verifying.Config().use_elliptic_key_verifier( + public_key=public_key_bytes + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).verify(model_path, signature) + + signature_bytes = signature.read_bytes() + verifying.Config().use_elliptic_key_verifier( + public_key=public_key_bytes + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).verify(model_path, signature_bytes) + + def test_verify_with_compressed_public_key( + self, base_path, populate_tmpdir + ): + os.chdir(base_path) + + model_path = populate_tmpdir + ignore_paths = [] + ignore_git_paths = False + signature = Path(model_path / "model.sig") + private_key_path = Path(TESTDATA / "keys/certificate/signing-key.pem") + public_key_path = Path( + TESTDATA / "keys/certificate/signing-key-pub.pem" + ) + + signing.Config().use_elliptic_key_signer( + private_key=private_key_path, password=None + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).sign(model_path, signature) + + from cryptography.hazmat.primitives import serialization + + public_key_pem = public_key_path.read_bytes() + public_key = serialization.load_pem_public_key(public_key_pem) + + compressed_key = public_key.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.CompressedPoint, + ) + + verifying.Config().use_elliptic_key_verifier( + public_key=compressed_key + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).verify(model_path, signature) + class TestCertificateSigning: def test_sign_and_verify(self, base_path, populate_tmpdir): From 059d6e3be0e3282497f9b61c647aca41e5f17dfd Mon Sep 17 00:00:00 2001 From: SequeI Date: Fri, 16 Jan 2026 12:56:32 +0000 Subject: [PATCH 2/4] code review, slight refactoring Signed-off-by: SequeI --- pyproject.toml | 3 + src/model_signing/_signing/sign_ec_key.py | 102 ++++++++---------- src/model_signing/_signing/sign_sigstore.py | 15 +-- .../_signing/sign_sigstore_pb.py | 15 +-- src/model_signing/_signing/signing.py | 45 +++++++- src/model_signing/signing.py | 8 +- src/model_signing/verifying.py | 17 +-- 7 files changed, 108 insertions(+), 97 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 51ae37fa..2f1fe0a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -158,6 +158,9 @@ markers = [ "integration: mark a test as an integration test.", ] pythonpath = "src" +filterwarnings = [ + "ignore:builtin type Swig.*has no __module__ attribute:DeprecationWarning", +] [tool.ruff] line-length = 80 diff --git a/src/model_signing/_signing/sign_ec_key.py b/src/model_signing/_signing/sign_ec_key.py index 764f659f..121e95e7 100644 --- a/src/model_signing/_signing/sign_ec_key.py +++ b/src/model_signing/_signing/sign_ec_key.py @@ -33,6 +33,19 @@ from model_signing._signing import signing +_SUPPORTED_CURVES = [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()] + + +def _compressed_key_size(curve: ec.EllipticCurve) -> int: + """Compressed EC public keys: 1 byte prefix + key_size_bytes.""" + return 1 + (curve.key_size + 7) // 8 + + +_COMPRESSED_SIZE_TO_CURVE: dict[int, ec.EllipticCurve] = { + _compressed_key_size(curve): curve for curve in _SUPPORTED_CURVES +} + + def _check_supported_ec_key(public_key: crypto_types.PublicKeyTypes): """Checks if the elliptic curve key is supported by our package. @@ -49,9 +62,9 @@ def _check_supported_ec_key(public_key: crypto_types.PublicKeyTypes): if not isinstance(public_key, ec.EllipticCurvePublicKey): raise ValueError("Only elliptic curve keys are supported") - curve = public_key.curve.name - if curve not in ["secp256r1", "secp384r1", "secp521r1"]: - raise ValueError(f"Unsupported key for curve '{curve}'") + supported_names = {c.name for c in _SUPPORTED_CURVES} + if public_key.curve.name not in supported_names: + raise ValueError(f"Unsupported key for curve '{public_key.curve.name}'") def get_ec_key_hash( @@ -83,7 +96,7 @@ def get_ec_key_hash( def _load_private_key( - private_key: pathlib.Path | bytes, password: str | None = None + private_key: signing.KeyInput, password: str | None = None ) -> ec.EllipticCurvePrivateKey: """Load a private key from a path or bytes. @@ -98,16 +111,7 @@ def _load_private_key( Raises: ValueError: If the key format is invalid or unsupported. """ - match private_key: - case pathlib.Path(): - key_bytes = private_key.read_bytes() - case bytes(): - key_bytes = private_key - case _: - raise TypeError( - f"Expected pathlib.Path or bytes, got {type(private_key)}" - ) - + key_bytes = signing.read_bytes_input(private_key) loaded_key = serialization.load_pem_private_key(key_bytes, password) if not isinstance(loaded_key, ec.EllipticCurvePrivateKey): raise ValueError("Only elliptic curve private keys are supported") @@ -116,64 +120,49 @@ def _load_private_key( def _load_public_key( - public_key: pathlib.Path | bytes, + public_key: signing.KeyInput, ) -> ec.EllipticCurvePublicKey: - """Load a public key from a path, bytes (PEM), or compressed format. + """Load a public key from a path, bytes (PEM/DER), or compressed format. Args: public_key: - - A path to a PEM-encoded public key file - - Bytes containing PEM-encoded public key - - 33 bytes of compressed public key (for secp256r1) + - A path to a PEM or DER-encoded public key file + - Bytes containing PEM or DER-encoded public key + - Compressed public key bytes (33 for secp256r1, 49 for secp384r1, + 67 for secp521r1) Returns: The loaded public key. Raises: ValueError: If the key format is invalid or unsupported. + TypeError: If the input type is not supported. """ - match public_key: - case pathlib.Path(): - key_bytes = public_key.read_bytes() - case bytes(): - key_bytes = public_key - case _: - raise TypeError( - f"Expected pathlib.Path or bytes, got {type(public_key)}" - ) + key_bytes = signing.read_bytes_input(public_key) - # 33 bytes for secp256r1, 49 for secp384r1, 67 for secp521r1 - if len(key_bytes) in [33, 49, 67]: - curve_map = {33: ec.SECP256R1(), 49: ec.SECP384R1(), 67: ec.SECP521R1()} - curve = curve_map.get(len(key_bytes)) - if curve is None: - raise ValueError( - f"Unsupported compressed key length: {len(key_bytes)} bytes. " - "Expected 33 (secp256r1), 49 (secp384r1), " - "or 67 (secp521r1) bytes." - ) + curve = _COMPRESSED_SIZE_TO_CURVE.get(len(key_bytes)) + if curve is not None: try: - loaded_key = ec.EllipticCurvePublicKey.from_encoded_point( - curve, key_bytes - ) - return loaded_key + return ec.EllipticCurvePublicKey.from_encoded_point(curve, key_bytes) except (ValueError, exceptions.UnsupportedAlgorithm) as e: raise ValueError( - f"Failed to load compressed public key: {e}. " - f"Expected {len(key_bytes)} bytes compressed format " - f"for {curve.name}." + f"Failed to load compressed public key for {curve.name}: {e}" ) from e - else: + + try: + loaded_key = serialization.load_pem_public_key(key_bytes) + except ValueError: try: - loaded_key = serialization.load_pem_public_key(key_bytes) - except ValueError: loaded_key = serialization.load_der_public_key(key_bytes) + except ValueError as e: + raise ValueError( + "Failed to load public key. Expected PEM, DER, or compressed " + "EC point format." + ) from e if not isinstance(loaded_key, ec.EllipticCurvePublicKey): raise ValueError("Only elliptic curve public keys are supported") - curve = loaded_key.curve.name - if curve not in ["secp256r1", "secp384r1", "secp521r1"]: - raise ValueError(f"Unsupported key for curve '{curve}'") + _check_supported_ec_key(loaded_key) return loaded_key @@ -181,7 +170,7 @@ class Signer(sigstore_pb.Signer): """Signer using an elliptic curve private key.""" def __init__( - self, private_key: pathlib.Path | bytes, password: str | None = None + self, private_key: signing.KeyInput, password: str | None = None ): """Initializes the signer with the private key and optional password. @@ -242,14 +231,15 @@ def _get_verification_material(self) -> bundle_pb.VerificationMaterial: class Verifier(sigstore_pb.Verifier): """Verifier for signatures generated with an elliptic curve private key.""" - def __init__(self, public_key: pathlib.Path | bytes): + def __init__(self, public_key: signing.KeyInput): """Initializes the verifier with the public key to use. Args: public_key: - - A path to a PEM-encoded public key file - - Bytes containing PEM-encoded public key - - 33 bytes of compressed public key (for secp256r1) + - A path to a PEM or DER-encoded public key file + - Bytes containing PEM or DER-encoded public key + - Compressed public key bytes (33 for secp256r1, 49 for + secp384r1, 67 for secp521r1) This must be paired with the private key used to generate the signature. """ diff --git a/src/model_signing/_signing/sign_sigstore.py b/src/model_signing/_signing/sign_sigstore.py index 2828e71f..9e153c45 100644 --- a/src/model_signing/_signing/sign_sigstore.py +++ b/src/model_signing/_signing/sign_sigstore.py @@ -55,7 +55,7 @@ def write(self, path: pathlib.Path) -> None: @classmethod @override - def read(cls, path_or_content: pathlib.Path | str | bytes) -> Self: + def read(cls, path_or_content: signing.SignatureInput) -> Self: """Read a signature from a file path, JSON string, or bytes. Args: @@ -67,18 +67,7 @@ def read(cls, path_or_content: pathlib.Path | str | bytes) -> Self: Returns: The loaded signature. """ - match path_or_content: - case pathlib.Path(): - content = path_or_content.read_text(encoding="utf-8") - case bytes(): - content = path_or_content.decode("utf-8") - case str(): - content = path_or_content - case _: - raise TypeError( - f"Expected pathlib.Path, str, or bytes, " - f"got {type(path_or_content)}" - ) + content = signing.read_text_input(path_or_content) return cls(sigstore_models.Bundle.from_json(content)) diff --git a/src/model_signing/_signing/sign_sigstore_pb.py b/src/model_signing/_signing/sign_sigstore_pb.py index 577166c8..4287eb42 100644 --- a/src/model_signing/_signing/sign_sigstore_pb.py +++ b/src/model_signing/_signing/sign_sigstore_pb.py @@ -109,7 +109,7 @@ def write(self, path: pathlib.Path) -> None: @classmethod @override - def read(cls, path_or_content: pathlib.Path | str | bytes) -> Self: + def read(cls, path_or_content: signing.SignatureInput) -> Self: """Read a signature from a file path, JSON string, or bytes. Args: @@ -121,18 +121,7 @@ def read(cls, path_or_content: pathlib.Path | str | bytes) -> Self: Returns: The loaded signature. """ - match path_or_content: - case pathlib.Path(): - content = path_or_content.read_text(encoding="utf-8") - case bytes(): - content = path_or_content.decode("utf-8") - case str(): - content = path_or_content - case _: - raise TypeError( - f"Expected pathlib.Path, str, or bytes, " - f"got {type(path_or_content)}" - ) + content = signing.read_text_input(path_or_content) parsed_dict = json.loads(content) # adjust parsed_dict due to previous usage of protobufs diff --git a/src/model_signing/_signing/signing.py b/src/model_signing/_signing/signing.py index 13681ac5..04022c7e 100644 --- a/src/model_signing/_signing/signing.py +++ b/src/model_signing/_signing/signing.py @@ -40,7 +40,7 @@ import json import pathlib import sys -from typing import Any +from typing import Any, TypeAlias from in_toto_attestation.v1 import statement @@ -55,6 +55,38 @@ from typing_extensions import Self +KeyInput: TypeAlias = pathlib.Path | bytes +SignatureInput: TypeAlias = pathlib.Path | str | bytes + + +def read_text_input(path_or_content: SignatureInput) -> str: + """Read text content from a file path, string, or bytes.""" + match path_or_content: + case pathlib.Path(): + return path_or_content.read_text(encoding="utf-8") + case bytes(): + return path_or_content.decode("utf-8") + case str(): + return path_or_content + case _: + raise TypeError( + f"Expected pathlib.Path, str, or bytes, got {type(path_or_content)}" + ) + + +def read_bytes_input(path_or_bytes: KeyInput) -> bytes: + """Read bytes from a file path or raw bytes.""" + match path_or_bytes: + case pathlib.Path(): + return path_or_bytes.read_bytes() + case bytes(): + return path_or_bytes + case _: + raise TypeError( + f"Expected pathlib.Path or bytes, got {type(path_or_bytes)}" + ) + + # The expected in-toto payload type for the signature. _IN_TOTO_JSON_PAYLOAD_TYPE: str = "application/vnd.in-toto+json" @@ -306,14 +338,17 @@ def write(self, path: pathlib.Path) -> None: @classmethod @abc.abstractmethod - def read(cls, path: pathlib.Path) -> Self: - """Reads the signature from disk. + def read(cls, path_or_content: SignatureInput) -> Self: + """Reads a signature from a file path, JSON string, or bytes. Does not perform any signature verification, except what is needed to - parse the signature file. + parse the signature. Args: - path: The path to read the signature from. + path_or_content: + - A path to a JSON signature file + - A JSON string containing the signature + - Bytes containing the JSON signature (UTF-8 encoded) Returns: An instance of the class which can be passed to a `Verifier` for diff --git a/src/model_signing/signing.py b/src/model_signing/signing.py index a2b2a8ea..b632a002 100644 --- a/src/model_signing/signing.py +++ b/src/model_signing/signing.py @@ -205,9 +205,11 @@ def use_elliptic_key_signer( Return: The new signing configuration. """ - if not isinstance(private_key, bytes): - private_key = pathlib.Path(private_key) - self._signer = ec_key.Signer(private_key, password) + match private_key: + case bytes(): + self._signer = ec_key.Signer(private_key, password) + case _: + self._signer = ec_key.Signer(pathlib.Path(private_key), password) return self def use_certificate_signer( diff --git a/src/model_signing/verifying.py b/src/model_signing/verifying.py index 6bd79b1a..caecd002 100644 --- a/src/model_signing/verifying.py +++ b/src/model_signing/verifying.py @@ -259,23 +259,26 @@ def use_elliptic_key_verifier( """Configures the verification of signatures generated by a private key. The verifier in this configuration is changed to one that performs - verification of sgistore bundles signed by an elliptic curve private + verification of sigstore bundles signed by an elliptic curve private key. The public key used in the configuration must match the private key used during signing. Args: public_key: - - A path to a PEM-encoded public key file - - Bytes containing PEM-encoded public key - - 33 bytes of compressed public key (for secp256r1) + - A path to a PEM or DER-encoded public key file + - Bytes containing PEM or DER-encoded public key + - Compressed public key bytes (33 for secp256r1, 49 for + secp384r1, 67 for secp521r1) Return: The new verification configuration. """ self._uses_sigstore = False - if not isinstance(public_key, bytes): - public_key = pathlib.Path(public_key) - self._verifier = ec_key.Verifier(public_key) + match public_key: + case bytes(): + self._verifier = ec_key.Verifier(public_key) + case _: + self._verifier = ec_key.Verifier(pathlib.Path(public_key)) return self def use_certificate_verifier( From 8fc955dbeca6482d302cefce0d8fdbcbc967b8c1 Mon Sep 17 00:00:00 2001 From: SequeI Date: Fri, 16 Jan 2026 13:00:13 +0000 Subject: [PATCH 3/4] lint fix Signed-off-by: SequeI --- src/model_signing/_signing/sign_ec_key.py | 9 ++++----- src/model_signing/_signing/signing.py | 3 ++- src/model_signing/signing.py | 3 ++- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/model_signing/_signing/sign_ec_key.py b/src/model_signing/_signing/sign_ec_key.py index 121e95e7..48428556 100644 --- a/src/model_signing/_signing/sign_ec_key.py +++ b/src/model_signing/_signing/sign_ec_key.py @@ -16,7 +16,6 @@ import base64 import hashlib -import pathlib from cryptography import exceptions from cryptography.hazmat.primitives import hashes @@ -119,9 +118,7 @@ def _load_private_key( return loaded_key -def _load_public_key( - public_key: signing.KeyInput, -) -> ec.EllipticCurvePublicKey: +def _load_public_key(public_key: signing.KeyInput) -> ec.EllipticCurvePublicKey: """Load a public key from a path, bytes (PEM/DER), or compressed format. Args: @@ -143,7 +140,9 @@ def _load_public_key( curve = _COMPRESSED_SIZE_TO_CURVE.get(len(key_bytes)) if curve is not None: try: - return ec.EllipticCurvePublicKey.from_encoded_point(curve, key_bytes) + return ec.EllipticCurvePublicKey.from_encoded_point( + curve, key_bytes + ) except (ValueError, exceptions.UnsupportedAlgorithm) as e: raise ValueError( f"Failed to load compressed public key for {curve.name}: {e}" diff --git a/src/model_signing/_signing/signing.py b/src/model_signing/_signing/signing.py index 04022c7e..4a2fb365 100644 --- a/src/model_signing/_signing/signing.py +++ b/src/model_signing/_signing/signing.py @@ -70,7 +70,8 @@ def read_text_input(path_or_content: SignatureInput) -> str: return path_or_content case _: raise TypeError( - f"Expected pathlib.Path, str, or bytes, got {type(path_or_content)}" + "Expected pathlib.Path, str, or bytes, " + f"got {type(path_or_content)}" ) diff --git a/src/model_signing/signing.py b/src/model_signing/signing.py index b632a002..9953d01f 100644 --- a/src/model_signing/signing.py +++ b/src/model_signing/signing.py @@ -209,7 +209,8 @@ def use_elliptic_key_signer( case bytes(): self._signer = ec_key.Signer(private_key, password) case _: - self._signer = ec_key.Signer(pathlib.Path(private_key), password) + key_path = pathlib.Path(private_key) + self._signer = ec_key.Signer(key_path, password) return self def use_certificate_signer( From 2273d0a24b8e5e516a82c8fe5b4f31fa7fdc8d2b Mon Sep 17 00:00:00 2001 From: SequeI Date: Fri, 16 Jan 2026 13:15:05 +0000 Subject: [PATCH 4/4] fix:check curve name Signed-off-by: SequeI --- src/model_signing/_signing/sign_ec_key.py | 27 +++++++++-------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/src/model_signing/_signing/sign_ec_key.py b/src/model_signing/_signing/sign_ec_key.py index 48428556..76a492be 100644 --- a/src/model_signing/_signing/sign_ec_key.py +++ b/src/model_signing/_signing/sign_ec_key.py @@ -21,7 +21,6 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives.asymmetric import types as crypto_types from google.protobuf import json_format from sigstore_models import intoto as intoto_pb from sigstore_models.bundle import v1 as bundle_pb @@ -44,26 +43,20 @@ def _compressed_key_size(curve: ec.EllipticCurve) -> int: _compressed_key_size(curve): curve for curve in _SUPPORTED_CURVES } +_SUPPORTED_CURVE_NAMES: frozenset[str] = frozenset( + c.name for c in _SUPPORTED_CURVES +) -def _check_supported_ec_key(public_key: crypto_types.PublicKeyTypes): - """Checks if the elliptic curve key is supported by our package. + +def _check_supported_curve(curve_name: str): + """Check if the curve is supported. We only support a family of curves, trying to match those specified by Sigstore's protobuf specs. See https://github.com/sigstore/model-transparency/issues/385. - - Args: - public_key: The public key to check. Can be obtained from a private key. - - Raises: - ValueError: The key is not supported, or is not an elliptic curve one. """ - if not isinstance(public_key, ec.EllipticCurvePublicKey): - raise ValueError("Only elliptic curve keys are supported") - - supported_names = {c.name for c in _SUPPORTED_CURVES} - if public_key.curve.name not in supported_names: - raise ValueError(f"Unsupported key for curve '{public_key.curve.name}'") + if curve_name not in _SUPPORTED_CURVE_NAMES: + raise ValueError(f"Unsupported curve '{curve_name}'") def get_ec_key_hash( @@ -114,7 +107,7 @@ def _load_private_key( loaded_key = serialization.load_pem_private_key(key_bytes, password) if not isinstance(loaded_key, ec.EllipticCurvePrivateKey): raise ValueError("Only elliptic curve private keys are supported") - _check_supported_ec_key(loaded_key.public_key()) + _check_supported_curve(loaded_key.curve.name) return loaded_key @@ -161,7 +154,7 @@ def _load_public_key(public_key: signing.KeyInput) -> ec.EllipticCurvePublicKey: if not isinstance(loaded_key, ec.EllipticCurvePublicKey): raise ValueError("Only elliptic curve public keys are supported") - _check_supported_ec_key(loaded_key) + _check_supported_curve(loaded_key.curve.name) return loaded_key