Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ All versions prior to 1.0.0 are untracked.
## [Unreleased]

### Added
-Added the `digest` subcommand to compute and print a model's digest. This enables other tools to easily pair the attestations with a model directory.
- Added the `digest` subcommand to compute and print a model's digest. This enables other tools to easily pair the attestations with a model directory.
- Added support for providing elliptic curve keys and signatures from memory (as bytes) in the library API. Private and public keys can now be passed as bytes in addition to file paths, and public keys also support compressed format (33 bytes for secp256r1). Signatures can be provided as JSON strings or bytes in addition to file paths.

### Changed
- ...
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@ model_signing.signing.Config().use_elliptic_key_signer(
)
```

Keys can also be provided from memory as bytes instead of file paths:
```python
private_key_bytes = Path("key.priv").read_bytes()
model_signing.signing.Config().use_elliptic_key_signer(
private_key=private_key_bytes
).sign("finbert", "finbert.sig")
```

The same signing configuration can be used to sign multiple models:

```python
Expand Down Expand Up @@ -395,6 +403,16 @@ for model in all_models:
verifying_config.verify(model, f"{model}_sharded.sig")
```

Public keys can also be provided from memory as bytes (PEM format or compressed 33-byte format for secp256r1), and signatures can be provided as JSON strings or bytes:
```python
public_key_bytes = Path("key.pub").read_bytes()
signature_bytes = Path("model.sig").read_bytes()
verifying_config = model_signing.verifying.Config().use_elliptic_key_verifier(
public_key=public_key_bytes
)
verifying_config.verify("finbert", signature_bytes)
```

Consult the
[official documentation](https://sigstore.github.io/model-transparency/model_signing.html)
for more details.
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ markers = [
"integration: mark a test as an integration test.",
]
pythonpath = "src"
filterwarnings = [
"ignore:builtin type Swig.*has no __module__ attribute:DeprecationWarning",
]

[tool.ruff]
line-length = 80
Expand Down
6 changes: 5 additions & 1 deletion src/model_signing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@

This example generates a signature using a private key based on elliptic curve
cryptography. It also hashes the model by ignoring `README.md` and any git
related file present in the model directory.
related file present in the model directory. Keys can also be provided as bytes
from memory instead of file paths.

We also support signing with signing certificates, using a similar API as above.

Expand Down Expand Up @@ -98,6 +99,9 @@
).verify("finbert", "finbert.sig")
```

Public keys can also be provided as bytes (PEM format or compressed format),
and signatures can be provided as JSON strings or bytes instead of file paths.

A reminder that we still need to set the verification configuration. This sets
up the cryptographic primitives to verify the signature and is needed to know
how to parse the signature file.
Expand Down
134 changes: 105 additions & 29 deletions src/model_signing/_signing/sign_ec_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

import base64
import hashlib
import pathlib

from cryptography import exceptions
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import types as crypto_types
from google.protobuf import json_format
from sigstore_models import intoto as intoto_pb
from sigstore_models.bundle import v1 as bundle_pb
Expand All @@ -33,25 +31,32 @@
from model_signing._signing import signing


def _check_supported_ec_key(public_key: crypto_types.PublicKeyTypes):
"""Checks if the elliptic curve key is supported by our package.
_SUPPORTED_CURVES = [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()]


def _compressed_key_size(curve: ec.EllipticCurve) -> int:
"""Compressed EC public keys: 1 byte prefix + key_size_bytes."""
return 1 + (curve.key_size + 7) // 8


_COMPRESSED_SIZE_TO_CURVE: dict[int, ec.EllipticCurve] = {
_compressed_key_size(curve): curve for curve in _SUPPORTED_CURVES
}

_SUPPORTED_CURVE_NAMES: frozenset[str] = frozenset(
c.name for c in _SUPPORTED_CURVES
)


def _check_supported_curve(curve_name: str):
"""Check if the curve is supported.

We only support a family of curves, trying to match those specified by
Sigstore's protobuf specs.
See https://github.com/sigstore/model-transparency/issues/385.

Args:
public_key: The public key to check. Can be obtained from a private key.

Raises:
ValueError: The key is not supported, or is not an elliptic curve one.
"""
if not isinstance(public_key, ec.EllipticCurvePublicKey):
raise ValueError("Only elliptic curve keys are supported")

curve = public_key.curve.name
if curve not in ["secp256r1", "secp384r1", "secp521r1"]:
raise ValueError(f"Unsupported key for curve '{curve}'")
if curve_name not in _SUPPORTED_CURVE_NAMES:
raise ValueError(f"Unsupported curve '{curve_name}'")


def get_ec_key_hash(
Expand Down Expand Up @@ -82,22 +87,91 @@ def get_ec_key_hash(
raise ValueError(f"Unexpected key size {key_size}")


def _load_private_key(
private_key: signing.KeyInput, password: str | None = None
) -> ec.EllipticCurvePrivateKey:
"""Load a private key from a path or bytes.

Args:
private_key: Either a path to a PEM-encoded private key file, or bytes
containing the PEM-encoded private key.
password: Optional password for the private key.

Returns:
The loaded private key.

Raises:
ValueError: If the key format is invalid or unsupported.
"""
key_bytes = signing.read_bytes_input(private_key)
loaded_key = serialization.load_pem_private_key(key_bytes, password)
if not isinstance(loaded_key, ec.EllipticCurvePrivateKey):
raise ValueError("Only elliptic curve private keys are supported")
_check_supported_curve(loaded_key.curve.name)
return loaded_key


def _load_public_key(public_key: signing.KeyInput) -> ec.EllipticCurvePublicKey:
"""Load a public key from a path, bytes (PEM/DER), or compressed format.

Args:
public_key:
- A path to a PEM or DER-encoded public key file
- Bytes containing PEM or DER-encoded public key
- Compressed public key bytes (33 for secp256r1, 49 for secp384r1,
67 for secp521r1)

Returns:
The loaded public key.

Raises:
ValueError: If the key format is invalid or unsupported.
TypeError: If the input type is not supported.
"""
key_bytes = signing.read_bytes_input(public_key)

curve = _COMPRESSED_SIZE_TO_CURVE.get(len(key_bytes))
if curve is not None:
try:
return ec.EllipticCurvePublicKey.from_encoded_point(
curve, key_bytes
)
except (ValueError, exceptions.UnsupportedAlgorithm) as e:
raise ValueError(
f"Failed to load compressed public key for {curve.name}: {e}"
) from e

try:
loaded_key = serialization.load_pem_public_key(key_bytes)
except ValueError:
try:
loaded_key = serialization.load_der_public_key(key_bytes)
except ValueError as e:
raise ValueError(
"Failed to load public key. Expected PEM, DER, or compressed "
"EC point format."
) from e

if not isinstance(loaded_key, ec.EllipticCurvePublicKey):
raise ValueError("Only elliptic curve public keys are supported")
_check_supported_curve(loaded_key.curve.name)
return loaded_key


class Signer(sigstore_pb.Signer):
"""Signer using an elliptic curve private key."""

def __init__(
self, private_key_path: pathlib.Path, password: str | None = None
self, private_key: signing.KeyInput, password: str | None = None
):
"""Initializes the signer with the private key and optional password.

Args:
private_key_path: The path to the PEM encoded private key.
private_key: Either a path to a PEM-encoded private key file,
or bytes containing the PEM-encoded private key.
password: Optional password for the private key.
"""
self._private_key = serialization.load_pem_private_key(
private_key_path.read_bytes(), password
)
_check_supported_ec_key(self._private_key.public_key())
self._private_key = _load_private_key(private_key, password)

@override
def sign(self, payload: signing.Payload) -> signing.Signature:
Expand Down Expand Up @@ -149,17 +223,19 @@ def _get_verification_material(self) -> bundle_pb.VerificationMaterial:
class Verifier(sigstore_pb.Verifier):
"""Verifier for signatures generated with an elliptic curve private key."""

def __init__(self, public_key_path: pathlib.Path):
def __init__(self, public_key: signing.KeyInput):
"""Initializes the verifier with the public key to use.

Args:
public_key_path: The path to the public key to use. This must be
paired with the private key used to generate the signature.
public_key:
- A path to a PEM or DER-encoded public key file
- Bytes containing PEM or DER-encoded public key
- Compressed public key bytes (33 for secp256r1, 49 for
secp384r1, 67 for secp521r1)
This must be paired with the private key used to generate
the signature.
"""
self._public_key = serialization.load_pem_public_key(
public_key_path.read_bytes()
)
_check_supported_ec_key(self._public_key)
self._public_key = _load_public_key(public_key)

@override
def _verify_bundle(self, bundle: bundle_pb.Bundle) -> tuple[str, bytes]:
Expand Down
15 changes: 13 additions & 2 deletions src/model_signing/_signing/sign_sigstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,19 @@ def write(self, path: pathlib.Path) -> None:

@classmethod
@override
def read(cls, path: pathlib.Path) -> Self:
content = path.read_text(encoding="utf-8")
def read(cls, path_or_content: signing.SignatureInput) -> Self:
"""Read a signature from a file path, JSON string, or bytes.

Args:
path_or_content:
- A path to a JSON signature file
- A JSON string containing the signature
- Bytes containing the JSON signature (UTF-8 encoded)

Returns:
The loaded signature.
"""
content = signing.read_text_input(path_or_content)
return cls(sigstore_models.Bundle.from_json(content))


Expand Down
15 changes: 13 additions & 2 deletions src/model_signing/_signing/sign_sigstore_pb.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,19 @@ def write(self, path: pathlib.Path) -> None:

@classmethod
@override
def read(cls, path: pathlib.Path) -> Self:
content = path.read_text(encoding="utf-8")
def read(cls, path_or_content: signing.SignatureInput) -> Self:
"""Read a signature from a file path, JSON string, or bytes.

Args:
path_or_content:
- A path to a JSON signature file
- A JSON string containing the signature
- Bytes containing the JSON signature (UTF-8 encoded)

Returns:
The loaded signature.
"""
content = signing.read_text_input(path_or_content)
parsed_dict = json.loads(content)

# adjust parsed_dict due to previous usage of protobufs
Expand Down
46 changes: 41 additions & 5 deletions src/model_signing/_signing/signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import json
import pathlib
import sys
from typing import Any
from typing import Any, TypeAlias

from in_toto_attestation.v1 import statement

Expand All @@ -55,6 +55,39 @@
from typing_extensions import Self


KeyInput: TypeAlias = pathlib.Path | bytes
SignatureInput: TypeAlias = pathlib.Path | str | bytes


def read_text_input(path_or_content: SignatureInput) -> str:
"""Read text content from a file path, string, or bytes."""
match path_or_content:
case pathlib.Path():
return path_or_content.read_text(encoding="utf-8")
case bytes():
return path_or_content.decode("utf-8")
case str():
return path_or_content
case _:
raise TypeError(
"Expected pathlib.Path, str, or bytes, "
f"got {type(path_or_content)}"
)


def read_bytes_input(path_or_bytes: KeyInput) -> bytes:
"""Read bytes from a file path or raw bytes."""
match path_or_bytes:
case pathlib.Path():
return path_or_bytes.read_bytes()
case bytes():
return path_or_bytes
case _:
raise TypeError(
f"Expected pathlib.Path or bytes, got {type(path_or_bytes)}"
)


# The expected in-toto payload type for the signature.
_IN_TOTO_JSON_PAYLOAD_TYPE: str = "application/vnd.in-toto+json"

Expand Down Expand Up @@ -306,14 +339,17 @@ def write(self, path: pathlib.Path) -> None:

@classmethod
@abc.abstractmethod
def read(cls, path: pathlib.Path) -> Self:
"""Reads the signature from disk.
def read(cls, path_or_content: SignatureInput) -> Self:
"""Reads a signature from a file path, JSON string, or bytes.

Does not perform any signature verification, except what is needed to
parse the signature file.
parse the signature.

Args:
path: The path to read the signature from.
path_or_content:
- A path to a JSON signature file
- A JSON string containing the signature
- Bytes containing the JSON signature (UTF-8 encoded)

Returns:
An instance of the class which can be passed to a `Verifier` for
Expand Down
Loading
Loading