diff --git a/xtest/fixtures/keys.py b/xtest/fixtures/keys.py index 13514086..4830bbbd 100644 --- a/xtest/fixtures/keys.py +++ b/xtest/fixtures/keys.py @@ -66,13 +66,14 @@ def pick_extra_key(extra_keys: dict[str, ExtraKey], kid: str) -> abac.KasPublicK ) -@pytest.fixture(scope="module") -def managed_key_km1_rsa( +def _get_or_create_managed_key( otdfctl: OpentdfCommandLineTool, - kas_entry_km1: abac.KasEntry, + kas_entry: abac.KasEntry, root_key: str, + key_id_prefix: str, + algorithm: str, ) -> abac.KasKey: - """Get or create RSA managed key on km1. + """Get or create a managed key on the given KAS. Key ID includes a hash of the root key to ensure that if the root key changes, a new key will be created instead of reusing an incompatible one. @@ -81,49 +82,75 @@ def managed_key_km1_rsa( if "key_management" not in pfs.features: pytest.skip("Key management feature is not enabled") - key_id = f"km1-rsa-{_key_id_suffix(root_key)}" - existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) + key_id = f"{key_id_prefix}-{_key_id_suffix(root_key)}" + existing_keys = otdfctl.kas_registry_keys_list(kas_entry) key = next((k for k in existing_keys if k.key.key_id == key_id), None) if key is None: key = otdfctl.kas_registry_create_key( - kas_entry_km1, + kas_entry, key_id=key_id, mode="local", - algorithm="rsa:2048", + algorithm=algorithm, wrapping_key=root_key, wrapping_key_id="root", ) return key +@pytest.fixture(scope="module") +def managed_key_km1_rsa( + otdfctl: OpentdfCommandLineTool, + kas_entry_km1: abac.KasEntry, + root_key: str, +) -> abac.KasKey: + """Get or create RSA managed key on km1.""" + return _get_or_create_managed_key( + otdfctl, kas_entry_km1, root_key, "km1-rsa", "rsa:2048" + ) + + @pytest.fixture(scope="module") def managed_key_km2_ec( otdfctl: OpentdfCommandLineTool, kas_entry_km2: abac.KasEntry, root_key: str, ) -> abac.KasKey: - """Get or create EC managed key on km2. + """Get or create EC P-256 managed key on km2.""" + return _get_or_create_managed_key( + otdfctl, kas_entry_km2, root_key, "km2-ec", "ec:secp256r1" + ) - Key ID includes a hash of the root key to ensure that if the root key changes, - a new key will be created instead of reusing an incompatible one. + +@pytest.fixture(scope="module") +def managed_key_km1_ec384( + otdfctl: OpentdfCommandLineTool, + kas_entry_km1: abac.KasEntry, + root_key: str, +) -> abac.KasKey: + """Get or create EC P-384 managed key on km1. + + Reproduces issue #3070: EC P-384/P-521 keys fail during ECDH rewrap + because UncompressECPubKey hardcodes P-256. """ - pfs = tdfs.PlatformFeatureSet() - if "key_management" not in pfs.features: - pytest.skip("Key management feature is not enabled") + return _get_or_create_managed_key( + otdfctl, kas_entry_km1, root_key, "km1-ec384", "ec:secp384r1" + ) - key_id = f"km2-ec-{_key_id_suffix(root_key)}" - existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2) - key = next((k for k in existing_keys if k.key.key_id == key_id), None) - if key is None: - key = otdfctl.kas_registry_create_key( - kas_entry_km2, - key_id=key_id, - mode="local", - algorithm="ec:secp256r1", - wrapping_key=root_key, - wrapping_key_id="root", - ) - return key + +@pytest.fixture(scope="module") +def managed_key_km2_ec521( + otdfctl: OpentdfCommandLineTool, + kas_entry_km2: abac.KasEntry, + root_key: str, +) -> abac.KasKey: + """Get or create EC P-521 managed key on km2. + + Reproduces issue #3070: EC P-384/P-521 keys fail during ECDH rewrap + because UncompressECPubKey hardcodes P-256. + """ + return _get_or_create_managed_key( + otdfctl, kas_entry_km2, root_key, "km2-ec521", "ec:secp521r1" + ) @pytest.fixture(scope="module") @@ -169,6 +196,51 @@ def attribute_allof_with_two_managed_keys( return (attr, [managed_key_km1_rsa.key.key_id, managed_key_km2_ec.key.key_id]) +@pytest.fixture(scope="module") +def attribute_allof_with_ec384_and_ec521_keys( + otdfctl: OpentdfCommandLineTool, + managed_key_km1_ec384: abac.KasKey, + managed_key_km2_ec521: abac.KasKey, + otdf_client_scs: abac.SubjectConditionSet, + temporary_namespace: abac.Namespace, +) -> tuple[abac.Attribute, list[str]]: + """Create an ALL_OF attribute with EC P-384 (km1) and EC P-521 (km2) keys. + + Reproduces issue #3070 in a multi-KAS scenario: both non-P-256 curves + must succeed during ECDH rewrap. Exercises UncompressECPubKey for each + curve in a single decrypt operation. + """ + pfs = tdfs.PlatformFeatureSet() + if "key_management" not in pfs.features: + pytest.skip( + "Key management feature is not enabled; skipping key assignment fixture" + ) + + attr = otdfctl.attribute_create( + temporary_namespace, + "km3070", + abac.AttributeRule.ALL_OF, + ["p384", "p521"], + ) + assert attr.values and len(attr.values) == 2 + v384, v521 = attr.values + assert v384.value == "p384" + assert v521.value == "p521" + + sm1 = otdfctl.scs_map(otdf_client_scs, v384) + assert sm1.attribute_value.value == v384.value + sm2 = otdfctl.scs_map(otdf_client_scs, v521) + assert sm2.attribute_value.value == v521.value + + otdfctl.key_assign_attr(managed_key_km1_ec384, attr) + otdfctl.key_assign_attr(managed_key_km2_ec521, attr) + + return ( + attr, + [managed_key_km1_ec384.key.key_id, managed_key_km2_ec521.key.key_id], + ) + + @pytest.fixture(scope="module") def public_key_kas_default_kid_r1( otdfctl: OpentdfCommandLineTool, @@ -267,31 +339,68 @@ def legacy_imported_golden_r1_key( ) -@pytest.fixture(scope="module") -def base_key_e1( +def _ensure_base_key( otdfctl: OpentdfCommandLineTool, - kas_entry_km1: abac.KasEntry, + kas_entry: abac.KasEntry, root_key: str, + key_id: str, + algorithm: str, ) -> None: - """ - Ensure a managed key with key_id 'e1' exists on the default KAS - and is configured as the base key. - """ + """Get or create a managed key and set it as the base key on the given KAS.""" pfs = tdfs.PlatformFeatureSet() if "key_management" not in pfs.features: pytest.skip("Key management feature is not enabled; skipping base key fixture") - existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) - key_id = "e1" + existing_keys = otdfctl.kas_registry_keys_list(kas_entry) key = next((k for k in existing_keys if k.key.key_id == key_id), None) if key is None: key = otdfctl.kas_registry_create_key( - kas_entry_km1, + kas_entry, key_id=key_id, mode="local", - algorithm="ec:secp256r1", + algorithm=algorithm, wrapping_key=root_key, wrapping_key_id="root", ) - return otdfctl.set_base_key(key, kas_entry_km1) + return otdfctl.set_base_key(key, kas_entry) + + +@pytest.fixture(scope="module") +def base_key_e1( + otdfctl: OpentdfCommandLineTool, + kas_entry_km1: abac.KasEntry, + root_key: str, +) -> None: + """Ensure a managed EC P-256 key exists and is set as the base key.""" + return _ensure_base_key(otdfctl, kas_entry_km1, root_key, "e1", "ec:secp256r1") + + +@pytest.fixture(scope="module") +def base_key_ec384( + otdfctl: OpentdfCommandLineTool, + kas_entry_km1: abac.KasEntry, + root_key: str, +) -> None: + """Ensure a managed EC P-384 key exists and is set as the base key. + + Reproduces issue #3070: EC P-384/P-521 keys fail during ECDH rewrap + because UncompressECPubKey hardcodes P-256. + """ + key_id = f"ec384-base-{_key_id_suffix(root_key)}" + return _ensure_base_key(otdfctl, kas_entry_km1, root_key, key_id, "ec:secp384r1") + + +@pytest.fixture(scope="module") +def base_key_ec521( + otdfctl: OpentdfCommandLineTool, + kas_entry_km1: abac.KasEntry, + root_key: str, +) -> None: + """Ensure a managed EC P-521 key exists and is set as the base key. + + Reproduces issue #3070: EC P-384/P-521 keys fail during ECDH rewrap + because UncompressECPubKey hardcodes P-256. + """ + key_id = f"ec521-base-{_key_id_suffix(root_key)}" + return _ensure_base_key(otdfctl, kas_entry_km1, root_key, key_id, "ec:secp521r1") diff --git a/xtest/test_abac.py b/xtest/test_abac.py index eae3b377..d71e206b 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -905,3 +905,109 @@ def test_encrypt_decrypt_all_containers_with_base_key_e1( rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container=container) assert filecmp.cmp(pt_file, rt_file) + + +@pytest.mark.parametrize( + "curve", + ["ec384", "ec521"], + ids=["P-384", "P-521"], +) +def test_encrypt_decrypt_all_containers_with_base_key_ec_curve( + curve: str, + request: pytest.FixtureRequest, + encrypt_sdk: tdfs.SDK, + decrypt_sdk: tdfs.SDK, + tmp_dir: Path, + pt_file: Path, + in_focus: set[tdfs.SDK], + container: tdfs.container_type, +): + """Reproduces issue #3070: EC P-384/P-521 decrypt fails because + UncompressECPubKey hardcodes elliptic.P256() instead of using the + actual curve parameter. + + Each parametrized variant creates a base key for the given curve, + encrypts across all container types, and attempts to decrypt. + With the bug present, ztdf-ecwrap decrypt fails with: + "ecdh failure: ecdsa: invalid public key" + """ + request.getfixturevalue(f"base_key_{curve}") + + if not in_focus & {encrypt_sdk, decrypt_sdk}: + pytest.skip("Not in focus") + tdfs.skip_if_unsupported(encrypt_sdk, "key_management") + tdfs.skip_if_unsupported(decrypt_sdk, "key_management") + pfs = tdfs.PlatformFeatureSet() + tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) + tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) + + sample_name = f"base-{curve}-{container}-{encrypt_sdk}" + ct_file = tmp_dir / f"{sample_name}.tdf" + encrypt_sdk.encrypt( + pt_file, + ct_file, + container=container, + ) + + rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf" + decrypt_sdk.decrypt(ct_file, rt_file, container=container) + assert filecmp.cmp(pt_file, rt_file) + + +def test_autoconfigure_key_management_ec384_ec521( + attribute_allof_with_ec384_and_ec521_keys: tuple[Attribute, list[str]], + encrypt_sdk: tdfs.SDK, + decrypt_sdk: tdfs.SDK, + tmp_dir: Path, + pt_file: Path, + kas_url_km1: str, + kas_url_km2: str, + in_focus: set[tdfs.SDK], +): + """Reproduces issue #3070 in multi-KAS scenario: ALL_OF attribute with + EC P-384 (km1) and EC P-521 (km2) keys. + + Encrypts with autoconfigure, verifies the manifest has two keyAccess + entries with the correct KIDs and KAS URLs, then decrypts. With the + bug present, decrypt fails because UncompressECPubKey hardcodes P-256 + for both curves. + """ + if not in_focus & {encrypt_sdk, decrypt_sdk}: + pytest.skip("Not in focus") + tdfs.skip_if_unsupported(encrypt_sdk, "key_management") + tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") + pfs = tdfs.PlatformFeatureSet() + tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) + tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) + + attr, key_ids = attribute_allof_with_ec384_and_ec521_keys + sample_name = f"km-ec384-ec521-{encrypt_sdk}" + if sample_name in cipherTexts: + ct_file = cipherTexts[sample_name] + else: + ct_file = tmp_dir / f"{sample_name}.tdf" + encrypt_sdk.encrypt( + pt_file, + ct_file, + mime_type="text/plain", + container="ztdf", + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + cipherTexts[sample_name] = ct_file + + manifest = tdfs.manifest(ct_file) + assert len(manifest.encryptionInformation.keyAccess) == 2 + assert {kao.kid for kao in manifest.encryptionInformation.keyAccess} == set(key_ids) + assert {kao.url for kao in manifest.encryptionInformation.keyAccess} == { + kas_url_km1, + kas_url_km2, + } + + if any( + kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess + ): + tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") + rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf" + decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") + assert filecmp.cmp(pt_file, rt_file)