diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f4ce652..49621e61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,8 @@ All versions prior to 1.0.0 are untracked. ## [Unreleased] ### Added --Added the `digest` subcommand to compute and print a model's digest. This enables other tools to easily pair the attestations with a model directory. +- Added the `digest` subcommand to compute and print a model's digest. This enables other tools to easily pair the attestations with a model directory. +- Added support for providing elliptic curve keys and signatures from memory (as bytes) in the library API. Private and public keys can now be passed as bytes in addition to file paths, and public keys also support compressed format (33 bytes for secp256r1). Signatures can be provided as JSON strings or bytes in addition to file paths. ### Changed - ... diff --git a/README.md b/README.md index cbfe4a2c..d97e5b64 100644 --- a/README.md +++ b/README.md @@ -359,6 +359,14 @@ model_signing.signing.Config().use_elliptic_key_signer( ) ``` +Keys can also be provided from memory as bytes instead of file paths: +```python +private_key_bytes = Path("key.priv").read_bytes() +model_signing.signing.Config().use_elliptic_key_signer( + private_key=private_key_bytes +).sign("finbert", "finbert.sig") +``` + The same signing configuration can be used to sign multiple models: ```python @@ -395,6 +403,16 @@ for model in all_models: verifying_config.verify(model, f"{model}_sharded.sig") ``` +Public keys can also be provided from memory as bytes (PEM format or compressed 33-byte format for secp256r1), and signatures can be provided as JSON strings or bytes: +```python +public_key_bytes = Path("key.pub").read_bytes() +signature_bytes = Path("model.sig").read_bytes() +verifying_config = model_signing.verifying.Config().use_elliptic_key_verifier( + public_key=public_key_bytes +) +verifying_config.verify("finbert", signature_bytes) +``` + Consult the [official documentation](https://sigstore.github.io/model-transparency/model_signing.html) for more details. diff --git a/pyproject.toml b/pyproject.toml index 51ae37fa..2f1fe0a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -158,6 +158,9 @@ markers = [ "integration: mark a test as an integration test.", ] pythonpath = "src" +filterwarnings = [ + "ignore:builtin type Swig.*has no __module__ attribute:DeprecationWarning", +] [tool.ruff] line-length = 80 diff --git a/src/model_signing/__init__.py b/src/model_signing/__init__.py index 6d45803e..5eaad5c8 100644 --- a/src/model_signing/__init__.py +++ b/src/model_signing/__init__.py @@ -60,7 +60,8 @@ This example generates a signature using a private key based on elliptic curve cryptography. It also hashes the model by ignoring `README.md` and any git -related file present in the model directory. +related file present in the model directory. Keys can also be provided as bytes +from memory instead of file paths. We also support signing with signing certificates, using a similar API as above. @@ -98,6 +99,9 @@ ).verify("finbert", "finbert.sig") ``` +Public keys can also be provided as bytes (PEM format or compressed format), +and signatures can be provided as JSON strings or bytes instead of file paths. + A reminder that we still need to set the verification configuration. This sets up the cryptographic primitives to verify the signature and is needed to know how to parse the signature file. diff --git a/src/model_signing/_signing/sign_ec_key.py b/src/model_signing/_signing/sign_ec_key.py index 08cde115..76a492be 100644 --- a/src/model_signing/_signing/sign_ec_key.py +++ b/src/model_signing/_signing/sign_ec_key.py @@ -16,13 +16,11 @@ import base64 import hashlib -import pathlib from cryptography import exceptions from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives.asymmetric import types as crypto_types from google.protobuf import json_format from sigstore_models import intoto as intoto_pb from sigstore_models.bundle import v1 as bundle_pb @@ -33,25 +31,32 @@ from model_signing._signing import signing -def _check_supported_ec_key(public_key: crypto_types.PublicKeyTypes): - """Checks if the elliptic curve key is supported by our package. +_SUPPORTED_CURVES = [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()] + + +def _compressed_key_size(curve: ec.EllipticCurve) -> int: + """Compressed EC public keys: 1 byte prefix + key_size_bytes.""" + return 1 + (curve.key_size + 7) // 8 + + +_COMPRESSED_SIZE_TO_CURVE: dict[int, ec.EllipticCurve] = { + _compressed_key_size(curve): curve for curve in _SUPPORTED_CURVES +} + +_SUPPORTED_CURVE_NAMES: frozenset[str] = frozenset( + c.name for c in _SUPPORTED_CURVES +) + + +def _check_supported_curve(curve_name: str): + """Check if the curve is supported. We only support a family of curves, trying to match those specified by Sigstore's protobuf specs. See https://github.com/sigstore/model-transparency/issues/385. - - Args: - public_key: The public key to check. Can be obtained from a private key. - - Raises: - ValueError: The key is not supported, or is not an elliptic curve one. """ - if not isinstance(public_key, ec.EllipticCurvePublicKey): - raise ValueError("Only elliptic curve keys are supported") - - curve = public_key.curve.name - if curve not in ["secp256r1", "secp384r1", "secp521r1"]: - raise ValueError(f"Unsupported key for curve '{curve}'") + if curve_name not in _SUPPORTED_CURVE_NAMES: + raise ValueError(f"Unsupported curve '{curve_name}'") def get_ec_key_hash( @@ -82,22 +87,91 @@ def get_ec_key_hash( raise ValueError(f"Unexpected key size {key_size}") +def _load_private_key( + private_key: signing.KeyInput, password: str | None = None +) -> ec.EllipticCurvePrivateKey: + """Load a private key from a path or bytes. + + Args: + private_key: Either a path to a PEM-encoded private key file, or bytes + containing the PEM-encoded private key. + password: Optional password for the private key. + + Returns: + The loaded private key. + + Raises: + ValueError: If the key format is invalid or unsupported. + """ + key_bytes = signing.read_bytes_input(private_key) + loaded_key = serialization.load_pem_private_key(key_bytes, password) + if not isinstance(loaded_key, ec.EllipticCurvePrivateKey): + raise ValueError("Only elliptic curve private keys are supported") + _check_supported_curve(loaded_key.curve.name) + return loaded_key + + +def _load_public_key(public_key: signing.KeyInput) -> ec.EllipticCurvePublicKey: + """Load a public key from a path, bytes (PEM/DER), or compressed format. + + Args: + public_key: + - A path to a PEM or DER-encoded public key file + - Bytes containing PEM or DER-encoded public key + - Compressed public key bytes (33 for secp256r1, 49 for secp384r1, + 67 for secp521r1) + + Returns: + The loaded public key. + + Raises: + ValueError: If the key format is invalid or unsupported. + TypeError: If the input type is not supported. + """ + key_bytes = signing.read_bytes_input(public_key) + + curve = _COMPRESSED_SIZE_TO_CURVE.get(len(key_bytes)) + if curve is not None: + try: + return ec.EllipticCurvePublicKey.from_encoded_point( + curve, key_bytes + ) + except (ValueError, exceptions.UnsupportedAlgorithm) as e: + raise ValueError( + f"Failed to load compressed public key for {curve.name}: {e}" + ) from e + + try: + loaded_key = serialization.load_pem_public_key(key_bytes) + except ValueError: + try: + loaded_key = serialization.load_der_public_key(key_bytes) + except ValueError as e: + raise ValueError( + "Failed to load public key. Expected PEM, DER, or compressed " + "EC point format." + ) from e + + if not isinstance(loaded_key, ec.EllipticCurvePublicKey): + raise ValueError("Only elliptic curve public keys are supported") + _check_supported_curve(loaded_key.curve.name) + return loaded_key + + class Signer(sigstore_pb.Signer): """Signer using an elliptic curve private key.""" def __init__( - self, private_key_path: pathlib.Path, password: str | None = None + self, private_key: signing.KeyInput, password: str | None = None ): """Initializes the signer with the private key and optional password. Args: - private_key_path: The path to the PEM encoded private key. + private_key: Either a path to a PEM-encoded private key file, + or bytes containing the PEM-encoded private key. password: Optional password for the private key. """ - self._private_key = serialization.load_pem_private_key( - private_key_path.read_bytes(), password - ) - _check_supported_ec_key(self._private_key.public_key()) + self._private_key = _load_private_key(private_key, password) @override def sign(self, payload: signing.Payload) -> signing.Signature: @@ -149,17 +223,19 @@ def _get_verification_material(self) -> bundle_pb.VerificationMaterial: class Verifier(sigstore_pb.Verifier): """Verifier for signatures generated with an elliptic curve private key.""" - def __init__(self, public_key_path: pathlib.Path): + def __init__(self, public_key: signing.KeyInput): """Initializes the verifier with the public key to use. Args: - public_key_path: The path to the public key to use. This must be - paired with the private key used to generate the signature. + public_key: + - A path to a PEM or DER-encoded public key file + - Bytes containing PEM or DER-encoded public key + - Compressed public key bytes (33 for secp256r1, 49 for + secp384r1, 67 for secp521r1) + This must be paired with the private key used to generate + the signature. """ - self._public_key = serialization.load_pem_public_key( - public_key_path.read_bytes() - ) - _check_supported_ec_key(self._public_key) + self._public_key = _load_public_key(public_key) @override def _verify_bundle(self, bundle: bundle_pb.Bundle) -> tuple[str, bytes]: diff --git a/src/model_signing/_signing/sign_sigstore.py b/src/model_signing/_signing/sign_sigstore.py index 080f98ab..9e153c45 100644 --- a/src/model_signing/_signing/sign_sigstore.py +++ b/src/model_signing/_signing/sign_sigstore.py @@ -55,8 +55,19 @@ def write(self, path: pathlib.Path) -> None: @classmethod @override - def read(cls, path: pathlib.Path) -> Self: - content = path.read_text(encoding="utf-8") + def read(cls, path_or_content: signing.SignatureInput) -> Self: + """Read a signature from a file path, JSON string, or bytes. + + Args: + path_or_content: + - A path to a JSON signature file + - A JSON string containing the signature + - Bytes containing the JSON signature (UTF-8 encoded) + + Returns: + The loaded signature. + """ + content = signing.read_text_input(path_or_content) return cls(sigstore_models.Bundle.from_json(content)) diff --git a/src/model_signing/_signing/sign_sigstore_pb.py b/src/model_signing/_signing/sign_sigstore_pb.py index 5a1fcc0d..4287eb42 100644 --- a/src/model_signing/_signing/sign_sigstore_pb.py +++ b/src/model_signing/_signing/sign_sigstore_pb.py @@ -109,8 +109,19 @@ def write(self, path: pathlib.Path) -> None: @classmethod @override - def read(cls, path: pathlib.Path) -> Self: - content = path.read_text(encoding="utf-8") + def read(cls, path_or_content: signing.SignatureInput) -> Self: + """Read a signature from a file path, JSON string, or bytes. + + Args: + path_or_content: + - A path to a JSON signature file + - A JSON string containing the signature + - Bytes containing the JSON signature (UTF-8 encoded) + + Returns: + The loaded signature. + """ + content = signing.read_text_input(path_or_content) parsed_dict = json.loads(content) # adjust parsed_dict due to previous usage of protobufs diff --git a/src/model_signing/_signing/signing.py b/src/model_signing/_signing/signing.py index 13681ac5..4a2fb365 100644 --- a/src/model_signing/_signing/signing.py +++ b/src/model_signing/_signing/signing.py @@ -40,7 +40,7 @@ import json import pathlib import sys -from typing import Any +from typing import Any, TypeAlias from in_toto_attestation.v1 import statement @@ -55,6 +55,39 @@ from typing_extensions import Self +KeyInput: TypeAlias = pathlib.Path | bytes +SignatureInput: TypeAlias = pathlib.Path | str | bytes + + +def read_text_input(path_or_content: SignatureInput) -> str: + """Read text content from a file path, string, or bytes.""" + match path_or_content: + case pathlib.Path(): + return path_or_content.read_text(encoding="utf-8") + case bytes(): + return path_or_content.decode("utf-8") + case str(): + return path_or_content + case _: + raise TypeError( + "Expected pathlib.Path, str, or bytes, " + f"got {type(path_or_content)}" + ) + + +def read_bytes_input(path_or_bytes: KeyInput) -> bytes: + """Read bytes from a file path or raw bytes.""" + match path_or_bytes: + case pathlib.Path(): + return path_or_bytes.read_bytes() + case bytes(): + return path_or_bytes + case _: + raise TypeError( + f"Expected pathlib.Path or bytes, got {type(path_or_bytes)}" + ) + + # The expected in-toto payload type for the signature. _IN_TOTO_JSON_PAYLOAD_TYPE: str = "application/vnd.in-toto+json" @@ -306,14 +339,17 @@ def write(self, path: pathlib.Path) -> None: @classmethod @abc.abstractmethod - def read(cls, path: pathlib.Path) -> Self: - """Reads the signature from disk. + def read(cls, path_or_content: SignatureInput) -> Self: + """Reads a signature from a file path, JSON string, or bytes. Does not perform any signature verification, except what is needed to - parse the signature file. + parse the signature. Args: - path: The path to read the signature from. + path_or_content: + - A path to a JSON signature file + - A JSON string containing the signature + - Bytes containing the JSON signature (UTF-8 encoded) Returns: An instance of the class which can be passed to a `Verifier` for diff --git a/src/model_signing/signing.py b/src/model_signing/signing.py index 4de79be2..9953d01f 100644 --- a/src/model_signing/signing.py +++ b/src/model_signing/signing.py @@ -187,7 +187,10 @@ def use_sigstore_signer( return self def use_elliptic_key_signer( - self, *, private_key: hashing.PathLike, password: str | None = None + self, + *, + private_key: hashing.PathLike | bytes, + password: str | None = None, ) -> Self: """Configures the signing to be performed using elliptic curve keys. @@ -195,13 +198,19 @@ def use_elliptic_key_signer( using a private key based on elliptic curve cryptography. Args: - private_key: The path to the private key to use for signing. + private_key: Either a path to a PEM-encoded private key file, + or bytes containing the PEM-encoded private key. password: An optional password for the key, if encrypted. Return: The new signing configuration. """ - self._signer = ec_key.Signer(pathlib.Path(private_key), password) + match private_key: + case bytes(): + self._signer = ec_key.Signer(private_key, password) + case _: + key_path = pathlib.Path(private_key) + self._signer = ec_key.Signer(key_path, password) return self def use_certificate_signer( diff --git a/src/model_signing/verifying.py b/src/model_signing/verifying.py index 9eaf8b39..caecd002 100644 --- a/src/model_signing/verifying.py +++ b/src/model_signing/verifying.py @@ -76,12 +76,18 @@ def __init__(self): self._ignore_unsigned_files = False def verify( - self, model_path: hashing.PathLike, signature_path: hashing.PathLike + self, + model_path: hashing.PathLike, + signature: hashing.PathLike | str | bytes, ): """Verifies that a model conforms to a signature. Args: model_path: The path to the model to verify. + signature: + - A path to a JSON signature file + - A JSON string containing the signature + - Bytes containing the JSON signature (UTF-8 encoded) Raises: ValueError: No verifier has been configured. @@ -90,11 +96,11 @@ def verify( raise ValueError("Attempting to verify with no configured verifier") if self._uses_sigstore: - signature = sigstore.Signature.read(pathlib.Path(signature_path)) + sig = sigstore.Signature.read(signature) else: - signature = sigstore_pb.Signature.read(pathlib.Path(signature_path)) + sig = sigstore_pb.Signature.read(signature) - expected_manifest = self._verifier.verify(signature) + expected_manifest = self._verifier.verify(sig) if self._hashing_config is None: self._guess_hashing_config(expected_manifest) @@ -248,23 +254,31 @@ def use_sigstore_verifier( return self def use_elliptic_key_verifier( - self, *, public_key: hashing.PathLike + self, *, public_key: hashing.PathLike | bytes ) -> Self: """Configures the verification of signatures generated by a private key. The verifier in this configuration is changed to one that performs - verification of sgistore bundles signed by an elliptic curve private + verification of sigstore bundles signed by an elliptic curve private key. The public key used in the configuration must match the private key used during signing. Args: - public_key: The path to the public key to verify with. + public_key: + - A path to a PEM or DER-encoded public key file + - Bytes containing PEM or DER-encoded public key + - Compressed public key bytes (33 for secp256r1, 49 for + secp384r1, 67 for secp521r1) Return: The new verification configuration. """ self._uses_sigstore = False - self._verifier = ec_key.Verifier(pathlib.Path(public_key)) + match public_key: + case bytes(): + self._verifier = ec_key.Verifier(public_key) + case _: + self._verifier = ec_key.Verifier(pathlib.Path(public_key)) return self def use_certificate_verifier( diff --git a/tests/api_test.py b/tests/api_test.py index 1195d9f1..e22774fb 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -287,6 +287,91 @@ def test_sign_and_verify(self, base_path, populate_tmpdir): ) assert get_model_name(signature) == os.path.basename(model_path) + def test_sign_and_verify_with_bytes_keys(self, base_path, populate_tmpdir): + os.chdir(base_path) + + model_path = populate_tmpdir + ignore_paths = [] + ignore_git_paths = False + signature = Path(model_path / "model.sig") + private_key_path = Path(TESTDATA / "keys/certificate/signing-key.pem") + public_key_path = Path( + TESTDATA / "keys/certificate/signing-key-pub.pem" + ) + + private_key_bytes = private_key_path.read_bytes() + public_key_bytes = public_key_path.read_bytes() + + signing.Config().use_elliptic_key_signer( + private_key=private_key_bytes, password=None + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).sign(model_path, signature) + + verifying.Config().use_elliptic_key_verifier( + public_key=public_key_bytes + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).verify(model_path, signature) + + signature_bytes = signature.read_bytes() + verifying.Config().use_elliptic_key_verifier( + public_key=public_key_bytes + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).verify(model_path, signature_bytes) + + def test_verify_with_compressed_public_key( + self, base_path, populate_tmpdir + ): + os.chdir(base_path) + + model_path = populate_tmpdir + ignore_paths = [] + ignore_git_paths = False + signature = Path(model_path / "model.sig") + private_key_path = Path(TESTDATA / "keys/certificate/signing-key.pem") + public_key_path = Path( + TESTDATA / "keys/certificate/signing-key-pub.pem" + ) + + signing.Config().use_elliptic_key_signer( + private_key=private_key_path, password=None + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).sign(model_path, signature) + + from cryptography.hazmat.primitives import serialization + + public_key_pem = public_key_path.read_bytes() + public_key = serialization.load_pem_public_key(public_key_pem) + + compressed_key = public_key.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.CompressedPoint, + ) + + verifying.Config().use_elliptic_key_verifier( + public_key=compressed_key + ).set_hashing_config( + hashing.Config().set_ignored_paths( + paths=list(ignore_paths) + [signature], + ignore_git_paths=ignore_git_paths, + ) + ).verify(model_path, signature) + class TestCertificateSigning: def test_sign_and_verify(self, base_path, populate_tmpdir):