diff --git a/bluetooth_mesh/network/crypto.py b/bluetooth_mesh/network/crypto.py index 62cb37b..5725c3c 100644 --- a/bluetooth_mesh/network/crypto.py +++ b/bluetooth_mesh/network/crypto.py @@ -20,8 +20,6 @@ # # -# pylint: disable=C0103 - from functools import lru_cache import bitstring @@ -99,7 +97,7 @@ def __init__(self, key): self.bytes = key def __str__(self): - return "<%s: %s>" % (type(self).__name__, self.bytes.hex()) + return f"<{type(self).__name__}: {self.bytes.hex()}>" def __eq__(self, other): return isinstance(other, self.__class__) and self.bytes == other.bytes diff --git a/bluetooth_mesh/network/mesh.py b/bluetooth_mesh/network/mesh.py index ad902d5..6a0affd 100644 --- a/bluetooth_mesh/network/mesh.py +++ b/bluetooth_mesh/network/mesh.py @@ -27,7 +27,7 @@ import bitstring from construct import ValidationError -from crc import Configuration, Calculator +from crc import Calculator, Configuration from bluetooth_mesh.network.crypto import ( ApplicationKey, @@ -37,10 +37,7 @@ aes_cmac, aes_ecb, ) -from bluetooth_mesh.network.provisioning import ( - GenericProvisioning, - GenericProvisioningPDUType, -) +from bluetooth_mesh.network.provisioning import GenericProvisioning, GenericProvisioningPDUType class BeaconType(enum.Enum): @@ -62,12 +59,7 @@ def __init__(self, uuid, oob, uri_hash=None): self.uri_hash = uri_hash def __str__(self): - return "<%s: uuid=%s, oob=%x, uri_hash=%s>" % ( - type(self).__name__, - self.uuid, - self.oob, - self.uri_hash, - ) + return f"<{type(self).__name__}: uuid={self.uuid}, oob={self.oob}, uri_hash={self.uri_hash}>" @classmethod def unpack(cls, beacon): @@ -79,9 +71,7 @@ def unpack(cls, beacon): return cls(UUID(bytes=uuid), oob, uri_hash or None) def pack(self): - beacon = bitstring.pack( - self.BEACON_FORMAT, self.uuid.bytes, self.oob, self.uri_hash or b"" - ).bytes + beacon = bitstring.pack(self.BEACON_FORMAT, self.uuid.bytes, self.oob, self.uri_hash or b"").bytes return beacon @@ -96,12 +86,10 @@ def __init__(self, key_refresh, iv_update, iv_index, network_id): self.network_id = network_id def __str__(self): - return "<%s: key_refresh=%s, iv_update=%s, ivindex=%d, network_id=%s>" % ( - type(self).__name__, - self.key_refresh, - self.iv_update, - self.iv_index, - self.network_id.hex(), + return ( + f"<{type(self).__name__}: key_refresh={self.key_refresh}, " + f"iv_update={self.iv_update}, ivindex={self.iv_index}, " + f"network_id={self.network_id.hex()}>" ) @classmethod @@ -111,9 +99,7 @@ def unpack(cls, message): message[-cls.BEACON_AUTH_SIZE :], ) - iv_update, key_refresh, network_id, iv_index = bitstring.BitStream( - beacon - ).unpack(cls.BEACON_FORMAT) + iv_update, key_refresh, network_id, iv_index = bitstring.BitStream(beacon).unpack(cls.BEACON_FORMAT) return cls(bool(key_refresh), bool(iv_update), iv_index, network_id), auth @@ -146,28 +132,24 @@ def __init__(self, private_beacon_key, random, key_refresh, iv_update, iv_index) self.random = random def __str__(self): - return "<%s: key_refresh=%s, iv_update=%s, ivindex=%s>" % ( - type(self).__name__, - self.key_refresh, - self.iv_update, - self.iv_index, + return ( + f"<{type(self).__name__}: key_refresh={self.key_refresh}, " + f"iv_update={self.iv_update}, ivindex={self.iv_index}>" ) @classmethod def unpack(cls, message, private_beacon_key): random, obfuscated_private_beacon_data, auth_tag = ( - message[:cls.RANDOM_SIZE], - message[cls.RANDOM_SIZE: -cls.BEACON_AUTH_SIZE], - message[-cls.BEACON_AUTH_SIZE:], + message[: cls.RANDOM_SIZE], + message[cls.RANDOM_SIZE : -cls.BEACON_AUTH_SIZE], + message[-cls.BEACON_AUTH_SIZE :], ) c1 = b"\x01" + random + b"\x00\x01" s = aes_ecb(private_beacon_key, c1) private_beacon_data = bytes(map(operator.xor, s[:5], obfuscated_private_beacon_data)) - iv_update, key_refresh, iv_index = bitstring.BitStream( - private_beacon_data - ).unpack(cls.BEACON_FORMAT) + iv_update, key_refresh, iv_index = bitstring.BitStream(private_beacon_data).unpack(cls.BEACON_FORMAT) return cls(private_beacon_key, random, bool(key_refresh), bool(iv_update), iv_index), auth_tag @@ -177,10 +159,8 @@ def pack(self, private_beacon_key): c1 = b"\x01" + self.random + b"\x00\x01" private_beacon_data = bitstring.pack( - self.BEACON_FORMAT, - self.iv_update, - self.key_refresh, - self.iv_index).bytes + self.BEACON_FORMAT, self.iv_update, self.key_refresh, self.iv_index + ).bytes s = aes_ecb(private_beacon_key, c1) p = private_beacon_data + (b"\x00" * 11) @@ -290,8 +270,7 @@ def _segments(self, application_key, seq, payload, szmic, seg): if seg: segments = list( - payload[i : i + self.SEGMENT_SIZE] - for i in range(0, len(payload), self.SEGMENT_SIZE) + payload[i : i + self.SEGMENT_SIZE] for i in range(0, len(payload), self.SEGMENT_SIZE) ) seg_n = len(segments) - 1 @@ -325,16 +304,13 @@ def segments(self, application_key, seq, iv_index, szmic=False, seg=False): long_mic_len = len(self.payload) + 8 # Use large MIC if it doesn't affect segmentation - if len(self.payload) >= self.SEGMENT_SIZE and len(self.payload) < 376: + if self.SEGMENT_SIZE <= len(self.payload) < 376: szmic = szmic or ( - math.ceil(short_mic_len / self.SEGMENT_SIZE) - == math.ceil(long_mic_len / self.SEGMENT_SIZE) + math.ceil(short_mic_len / self.SEGMENT_SIZE) == math.ceil(long_mic_len / self.SEGMENT_SIZE) ) akf = isinstance(application_key, ApplicationKey) - nonce = (self.nonce.application if akf else self.nonce.device)( - seq, iv_index, szmic - ) + nonce = (self.nonce.application if akf else self.nonce.device)(seq, iv_index, szmic) upper_transport_pdu = aes_ccm_encrypt( application_key.bytes, nonce, self.payload, b"", 8 if szmic else 4 @@ -349,10 +325,10 @@ def segments(self, application_key, seq, iv_index, szmic=False, seg=False): ) @classmethod - def decrypt(cls, app_key, iv_index, ctl, ttl, seq, src, dst, transport_pdu): - seg, akf, aid = bitstring.BitStream(transport_pdu).unpack( - "uint:1, uint:1, uint:6" - ) + def decrypt( # pylint: disable=too-many-arguments + cls, app_key, iv_index, ctl, ttl, seq, src, dst, transport_pdu + ): + seg, akf, aid = bitstring.BitStream(transport_pdu).unpack("uint:1, uint:1, uint:6") # works only for unsegmented messages! if seg: @@ -360,11 +336,9 @@ def decrypt(cls, app_key, iv_index, ctl, ttl, seq, src, dst, transport_pdu): if app_key.aid != aid: raise KeyError transport_nonce = Nonce(src, dst, ttl, ctl) - nonce = (transport_nonce.application if akf else transport_nonce.device)( - seq, iv_index - ) + nonce = (transport_nonce.application if akf else transport_nonce.device)(seq, iv_index) decrypted_access = aes_ccm_decrypt(app_key.bytes, nonce, transport_pdu[1:]) - return AccessMessage(src, dst, ttl, decrypted_access) + return AccessMessage(transport_nonce.src, transport_nonce.dst, transport_nonce.ttl, decrypted_access) class ControlMessage(Segment): @@ -373,19 +347,17 @@ def __init__(self, src, dst, ttl, opcode, payload): self.payload = payload self.opcode = opcode - def get_opcode(self, application_key): + def get_opcode(self, _application_key): return bitstring.pack("uint:7", self.opcode) - def segments(self, application_key, seq, iv_index, szmic=False, seg=False): + def segments(self, application_key, seq, _iv_index, szmic=False, seg=False): if szmic: raise NotImplementedError("Control messages do not support long MIC") if seg: raise NotImplementedError("Control messages do not support segmentation") - yield from super()._segments( - application_key, seq, payload=self.payload, szmic=False, seg=False - ) + yield from super()._segments(application_key, seq, payload=self.payload, szmic=False, seg=False) @classmethod def decrypt(cls, ttl, src, dst, transport_pdu): @@ -403,13 +375,11 @@ def __init__(self, src, opcode, payload): self.payload = payload self.opcode = opcode - def get_opcode(self, application_key): + def get_opcode(self, _application_key): return bitstring.pack("uint:7", self.opcode) - def segments(self, application_key, seq, iv_index, szmic=False, seg=False): - yield from super()._segments( - application_key, seq, payload=self.payload, szmic=False, seg=seg - ) + def segments(self, application_key, seq, _iv_index, _szmic=False, seg=False): + yield from super()._segments(application_key, seq, payload=self.payload, szmic=False, seg=seg) @classmethod def decrypt(cls, src, transport_pdu): @@ -423,10 +393,10 @@ def __init__(self, src, dst=0): self.payload = bytes() self.opcode = bytes() - def get_opcode(self, application_key): + def get_opcode(self, _application_key): return bitstring.BitStream() - def segments(self, application_key, seq, iv_index, szmic=False, seg=False): + def segments(self, _application_key, _seq, _iv_index, _szmic=False, _seg=False): yield bytes() @classmethod @@ -473,21 +443,19 @@ def pack( transport_seq = seq # remove segments that were ack-ed and encrypt with new network sequence - segments = list( - self.message.segments(application_key, transport_seq, iv_index, seg=seg) - ) + segments = list(self.message.segments(application_key, transport_seq, iv_index)) for index in sorted(skip_segments, reverse=True): segments.pop(index) - for seq, pdu in enumerate(segments, start=seq): + for network_seq, pdu in enumerate(segments, start=seq): if isinstance(self.message, ProxyConfigMessage): - nonce = self.message.nonce.proxy(seq, iv_index) + nonce = self.message.nonce.proxy(network_seq, iv_index) elif isinstance(self.message, SolicitationMessage): assert iv_index == 0x00000000 - nonce = self.message.nonce.solicitation(seq) + nonce = self.message.nonce.solicitation(network_seq) else: - nonce = self.message.nonce.network(seq, iv_index) + nonce = self.message.nonce.network(network_seq, iv_index) network_pdu = aes_ccm_encrypt( encryption_key, nonce, @@ -500,19 +468,17 @@ def pack( "uint:1, uint:7, uintbe:24, uintbe:16", self.message.ctl, self.message.ttl, - seq, + network_seq, self.message.src, ).bytes - privacy_random = bitstring.pack( - "pad:40, uintbe:32, bytes:7", iv_index, network_pdu[:7] - ).bytes + privacy_random = bitstring.pack("pad:40, uintbe:32, bytes:7", iv_index, network_pdu[:7]).bytes pecb = aes_ecb(privacy_key, privacy_random)[:6] obfuscated_header = bytes(map(operator.xor, network_header, pecb)) - yield seq, bitstring.pack( + yield network_seq, bitstring.pack( "uint:1, uint:7, bits, bytes", iv_index & 1, nid, @@ -529,46 +495,30 @@ def unpack( network_pdu: bytes, proxy=False, ): - # pylint: disable=R0914 _nid, encryption_key, privacy_key = net_key.encryption_keys - last_iv, nid, obfuscated_header, encoded_data_mic = bitstring.BitStream( - network_pdu - ).unpack("uint:1, uint:7, bytes:6, bytes") + last_iv, nid, obfuscated_header, encoded_data_mic = bitstring.BitStream(network_pdu).unpack( + "uint:1, uint:7, bytes:6, bytes" + ) if nid != _nid: raise KeyError - iv_index = ( - local_iv_index if (local_iv_index & 0x01) == last_iv else local_iv_index - 1 - ) - privacy_random = bitstring.pack( - "pad:40, uintbe:32, bytes:7", iv_index, encoded_data_mic[:7] - ).bytes + iv_index = local_iv_index if (local_iv_index & 0x01) == last_iv else local_iv_index - 1 + privacy_random = bitstring.pack("pad:40, uintbe:32, bytes:7", iv_index, encoded_data_mic[:7]).bytes pecb = aes_ecb(privacy_key, privacy_random)[:6] deobfuscated = bytes(map(operator.xor, obfuscated_header, pecb)) - ctl, ttl, seq, src = bitstring.BitStream(deobfuscated).unpack( - "uint:1, uint:7, uintbe:24, uintbe:16" - ) + ctl, ttl, seq, src = bitstring.BitStream(deobfuscated).unpack("uint:1, uint:7, uintbe:24, uintbe:16") net_mic_len = 8 if ctl else 4 - nonce = ( - Nonce(src, 0, ttl, ctl).proxy if proxy else Nonce(src, 0, ttl, ctl).network - )(seq, iv_index) - decrypted_net = aes_ccm_decrypt( - encryption_key, nonce, encoded_data_mic, tag_length=net_mic_len - ) - - dst, transport_pdu = bitstring.BitStream(decrypted_net).unpack( - "uintbe:16, bytes" - ) + nonce = (Nonce(src, 0, ttl, ctl).proxy if proxy else Nonce(src, 0, ttl, ctl).network)(seq, iv_index) + decrypted_net = aes_ccm_decrypt(encryption_key, nonce, encoded_data_mic, tag_length=net_mic_len) + dst, transport_pdu = bitstring.BitStream(decrypted_net).unpack("uintbe:16, bytes") if proxy: transport_msg = ProxyConfigMessage.decrypt(src, transport_pdu) elif ctl: transport_msg = ControlMessage.decrypt(ttl, src, dst, transport_pdu) else: - transport_msg = AccessMessage.decrypt( - app_key, iv_index, ctl, ttl, seq, src, dst, transport_pdu - ) + transport_msg = AccessMessage.decrypt(app_key, iv_index, ctl, ttl, seq, src, dst, transport_pdu) net_message = NetworkMessage(transport_msg) return iv_index, seq, net_message @@ -601,22 +551,22 @@ def pack(pdu): fcs = CRC_CALCULATOR.checksum(pdu) yield GenericProvisioning.build( - dict( - last_segment_number=len(segments) - 1, - gpcf=GenericProvisioningPDUType.START, - total_length=total_len, - frame_check=fcs, - data=segments.pop(0), - ) + { + "last_segment_number": len(segments) - 1, + "gpcf": GenericProvisioningPDUType.START, + "total_length": total_len, + "frame_check": fcs, + "data": segments.pop(0), + } ) for index, segment in enumerate(segments, start=1): yield GenericProvisioning.build( - dict( - segment_index=index, - gpcf=GenericProvisioningPDUType.CONTINUATION, - data=segment, - ) + { + "segment_index": index, + "gpcf": GenericProvisioningPDUType.CONTINUATION, + "data": segment, + } ) @staticmethod @@ -644,8 +594,7 @@ def key(segment): return getattr(segment, "segment_index", 0) start, *continuations = ( - next(group) - for _, group in itertools.groupby(sorted(filtered, key=key), key=key) + next(group) for _, group in itertools.groupby(sorted(filtered, key=key), key=key) ) pdu = start.data + b"".join(continuation.data for continuation in continuations) diff --git a/bluetooth_mesh/network/provisioning.py b/bluetooth_mesh/network/provisioning.py index 220a635..77656d7 100644 --- a/bluetooth_mesh/network/provisioning.py +++ b/bluetooth_mesh/network/provisioning.py @@ -19,7 +19,6 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # -# pylint: disable=W0223 import enum import struct @@ -42,8 +41,8 @@ this, ) -from bluetooth_mesh.network.crypto import aes_ccm_decrypt, aes_ccm_encrypt, aes_cmac, k1, s1 from bluetooth_mesh.messages.util import BitList, EmbeddedBitStruct, EnumAdapter +from bluetooth_mesh.network.crypto import aes_ccm_decrypt, aes_ccm_encrypt, aes_cmac, k1, s1 class BearerOpcode(enum.IntEnum): @@ -132,10 +131,12 @@ def _decode(self, obj, content, path): def _encode(self, obj, content, path): value = obj.to_string() - return dict(x=value[:32], y=value[32:]) + return { + "x": value[:32], + "y": value[32:], + } -# fmt: off ProvisioningInvite = Struct( "attention" / Int8ub, ) @@ -153,11 +154,15 @@ def _encode(self, obj, content, path): ProvisioningStart = Struct( "algorithm" / EnumAdapter(Int8ub, ProvisioningAlgorithm), - "public_key" / ExprAdapter(Int8ub, - lambda obj, ctx: bool(obj), - lambda obj, ctx: 1 if obj else 0), + "public_key" + / ExprAdapter( + Int8ub, + lambda obj, ctx: bool(obj), + lambda obj, ctx: 1 if obj else 0, + ), "authentication_method" / EnumAdapter(Int8ub, ProvisioningAuthenticationMethod), - "authentication_action" / Switch( + "authentication_action" + / Switch( this.authentication_method, { ProvisioningAuthenticationMethod.NONE: Const(0, Int8ub), @@ -166,7 +171,8 @@ def _encode(self, obj, content, path): ProvisioningAuthenticationMethod.INPUT: EnumAdapter(Int8ub, ProvisioningInputOOBAction), }, ), - "authentication_size" / Switch( + "authentication_size" + / Switch( this.authentication_method, { ProvisioningAuthenticationMethod.NONE: Const(0, Int8ub), @@ -178,7 +184,8 @@ def _encode(self, obj, content, path): ) ProvisioningPublicKey = Struct( - "key" / ProvisioningPublicKeyAdapter( + "key" + / ProvisioningPublicKeyAdapter( Struct( "x" / Bytes(32), "y" / Bytes(32), @@ -209,11 +216,10 @@ def _encode(self, obj, content, path): "unicast_address" / Int16ub, ) -ProvisioningComplete = Struct( -) +ProvisioningComplete = Struct() ProvisioningFailed = Struct( - "error_code" / EnumAdapter(Int8ub, ProvisioningErrorCode) + "error_code" / EnumAdapter(Int8ub, ProvisioningErrorCode), ) ProvisioningPDU = Struct( @@ -222,7 +228,8 @@ def _encode(self, obj, content, path): Padding(2), "type" / EnumAdapter(BitsInteger(6), ProvisioningPDUType), ), - "parameters" / Switch( + "parameters" + / Switch( this.type, { ProvisioningPDUType.INVITE: ProvisioningInvite, @@ -241,11 +248,11 @@ def _encode(self, obj, content, path): ) LinkOpen = Struct( - "device_uuid" / Bytes(16) + "device_uuid" / Bytes(16), ) LinkClose = Struct( - "reason" / EnumAdapter(Int8ub, LinkCloseReason) + "reason" / EnumAdapter(Int8ub, LinkCloseReason), ) ProvisioningBearerControl = Struct( @@ -254,12 +261,13 @@ def _encode(self, obj, content, path): "opcode" / EnumAdapter(BitsInteger(6), BearerOpcode), "gpcf" / Const(GenericProvisioningPDUType.CONTROL, BitsInteger(2)), ), - "parameters" / Switch( + "parameters" + / Switch( this.opcode, { BearerOpcode.LINK_OPEN: Struct("device_uuid" / Bytes(16)), BearerOpcode.LINK_ACK: Struct(), - BearerOpcode.LINK_CLOSE: Struct("reason" / EnumAdapter(Int8ub, LinkCloseReason)) + BearerOpcode.LINK_CLOSE: Struct("reason" / EnumAdapter(Int8ub, LinkCloseReason)), }, default=GreedyBytes, ), @@ -273,7 +281,7 @@ def _encode(self, obj, content, path): ), "total_length" / Int16ub, "frame_check" / Int8ub, - "data" / GreedyBytes + "data" / GreedyBytes, ) TransactionAck = Struct( @@ -290,7 +298,7 @@ def _encode(self, obj, content, path): "segment_index" / BitsInteger(6), "gpcf" / Const(GenericProvisioningPDUType.CONTINUATION, BitsInteger(2)), ), - "data" / GreedyBytes + "data" / GreedyBytes, ) GenericProvisioning = Select( @@ -303,9 +311,8 @@ def _encode(self, obj, content, path): PBADVPDU = Struct( "link_id" / Bytes(4), "transaction_id" / Int8ub, - "data" / GreedyBytes + "data" / GreedyBytes, ) -# fmt: on class ProvisioningEncryption: @@ -327,9 +334,7 @@ def data_decrypt(secret, inputs, data, mic=b""): return ( provisioning_salt, - aes_ccm_decrypt( - provisioning_key, provisioning_nonce, data + mic, tag_length=8 - ), + aes_ccm_decrypt(provisioning_key, provisioning_nonce, data + mic, tag_length=8), ) @staticmethod @@ -338,7 +343,13 @@ def provisioning_device_key(secret, provisioning_salt): @staticmethod def confirmation_encrypt(secret, inputs, random, auth=None): - """inputs = invite(attention) + capabilities(without opcode) + start(msg) + provisioner_key + device_key""" + """ + inputs = invite(attention) + + capabilities(without opcode) + + start(msg) + + provisioner_key + + device_key + """ confirmation_salt = s1(inputs) confirmation_key = k1(secret, confirmation_salt, b"prck") @@ -350,6 +361,4 @@ def confirmation_encrypt(secret, inputs, random, auth=None): @staticmethod def confirmation_validate(confirmation_key, confirmation, random, auth=None): - return confirmation == aes_cmac( - confirmation_key, random + struct.pack("16s", auth or b"") - ) + return confirmation == aes_cmac(confirmation_key, random + struct.pack("16s", auth or b"")) diff --git a/pyproject.toml b/pyproject.toml index 8b4ae51..c6905e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ multi_line_output = 3 include_trailing_comma = true use_parentheses = true combine_as_imports = true +known_first_party = ["bluetooth_mesh"] [tool.pylint] disable = [ @@ -78,10 +79,17 @@ disable = [ "missing-class-docstring", "missing-function-docstring", "invalid-name", + "unbalanced-tuple-unpacking", + "too-few-public-methods", + "too-many-locals", + "unused-argument", ] enable = [ "useless-suppression", ] +max-args = 8 +max-positional-arguments = 9 + [tool.pylint.main] py-version = "3.14" diff --git a/tests/test_key_derivation.py b/tests/test_key_derivation.py index 447798a..dfc4c66 100644 --- a/tests/test_key_derivation.py +++ b/tests/test_key_derivation.py @@ -24,18 +24,18 @@ from bluetooth_mesh.network.crypto import ApplicationKey, DeviceKey, NetworkKey -@fixture -def app_key(): +@fixture(name="app_key") +def fixture_app_key(): return ApplicationKey(bytes.fromhex("63964771734fbd76e3b40519d1d94a48")) -@fixture -def net_key(): +@fixture(name="net_key") +def fixture_net_key(): return NetworkKey(bytes.fromhex("7dd7364cd842ad18c17c2b820c84c3d6")) -@fixture -def dev_key(): +@fixture(name="dev_key") +def fixture_dev_key(): return DeviceKey(bytes.fromhex("9d6dd0e96eb25dc19a40ed9914f8f03f")) diff --git a/tests/test_mesh.py b/tests/test_mesh.py index e6ba155..8e4505d 100644 --- a/tests/test_mesh.py +++ b/tests/test_mesh.py @@ -27,6 +27,7 @@ from bluetooth_mesh.network.mesh import ( AccessMessage, ControlMessage, + MeshPrivateBeacon, NetworkMessage, Nonce, ProxyConfigMessage, @@ -34,79 +35,72 @@ SegmentAckMessage, SolicitationMessage, UnprovisionedDeviceBeacon, - MeshPrivateBeacon ) -@fixture -def app_key(): +@fixture(name="app_key") +def fixture_app_key(): return ApplicationKey(bytes.fromhex("63964771734fbd76e3b40519d1d94a48")) -@fixture -def net_key(): +@fixture(name="net_key") +def fixture_net_key(): return NetworkKey(bytes.fromhex("7dd7364cd842ad18c17c2b820c84c3d6")) -@fixture -def dev_key(): +@fixture(name="dev_key") +def fixture_dev_key(): return DeviceKey(bytes.fromhex("9d6dd0e96eb25dc19a40ed9914f8f03f")) -@fixture -def health_current_status_message(): +@fixture(name="health_current_status_message") +def fixture_health_current_status_message(): payload = bytes.fromhex("0400000000") return AccessMessage(src=0x1201, dst=0xFFFF, ttl=0x03, payload=payload) -@fixture -def config_appkey_status_message(): +@fixture(name="config_appkey_status_message") +def fixture_config_appkey_status_message(): payload = bytes.fromhex("800300563412") return AccessMessage(src=0x1201, dst=0x0003, ttl=0x0B, payload=payload) -@fixture -def config_appkey_add_message(): +@fixture(name="config_appkey_add_message") +def fixture_config_appkey_add_message(): payload = bytes.fromhex("0056341263964771734fbd76e3b40519d1d94a48") return AccessMessage(src=0x0003, dst=0x1201, ttl=0x04, payload=payload) -@fixture -def control_appkey_add_ack_message(): - return SegmentAckMessage( - src=0x2345, dst=0x0003, ttl=0x0B, seq_zero=0x09AB, ack_segments=[1], obo=True - ) +@fixture(name="control_appkey_add_ack_message") +def fixture_control_appkey_add_ack_message(): + return SegmentAckMessage(src=0x2345, dst=0x0003, ttl=0x0B, seq_zero=0x09AB, ack_segments=[1], obo=True) -@fixture -def control_friend_offer_message(): +@fixture(name="control_friend_offer_message") +def fixture_control_friend_offer_message(): parameters = bytes.fromhex("320308ba072f") - return ControlMessage( - src=0x2345, dst=0x1201, ttl=0x00, opcode=0x04, payload=parameters - ) + return ControlMessage(src=0x2345, dst=0x1201, ttl=0x00, opcode=0x04, payload=parameters) -@fixture -def proxy_use_whitelist_message(): +@fixture(name="proxy_use_whitelist_message") +def fixture_proxy_use_whitelist_message(): parameters = bytes.fromhex("00") return ProxyConfigMessage(src=0x0001, opcode=0x00, payload=parameters) -@fixture -def app_sar_message(): +@fixture(name="app_sar_message") +def fixture_app_sar_message(): payload = bytes.fromhex("510c00000000020b0c1f00efcdab071b1c") return AccessMessage(src=1234, dst=4321, ttl=1, payload=payload) -@fixture -def proxy_solicitation_message(): +@fixture(name="proxy_solicitation_message") +def fixture_proxy_solicitation_message(): return SolicitationMessage(src=0x0031, dst=0x0100) def test_network_beacon_received(net_key): - beacon, auth = SecureNetworkBeacon.unpack( - bytes.fromhex("003ecaff672f673370123456788ea261582f364f6f") - ) + beacon, auth = SecureNetworkBeacon.unpack(bytes.fromhex("003ecaff672f673370123456788ea261582f364f6f")) assert beacon.verify(auth, net_key) @@ -117,9 +111,7 @@ def test_network_beacon_received(net_key): def test_network_beacon_received_iv_update(net_key): - beacon, auth = SecureNetworkBeacon.unpack( - bytes.fromhex("023ecaff672f67337012345679c2af80ad072a135c") - ) + beacon, auth = SecureNetworkBeacon.unpack(bytes.fromhex("023ecaff672f67337012345679c2af80ad072a135c")) assert beacon.verify(auth, net_key) @@ -130,9 +122,7 @@ def test_network_beacon_received_iv_update(net_key): def test_network_beacon_created(net_key): - beacon = SecureNetworkBeacon( - False, False, 0x12345678, network_id=net_key.network_id - ) + beacon = SecureNetworkBeacon(False, False, 0x12345678, network_id=net_key.network_id) assert beacon.pack(net_key) == ( bytes.fromhex("003ecaff672f67337012345678"), @@ -143,22 +133,20 @@ def test_network_beacon_created(net_key): def test_private_beacon_received_iv_update(net_key): net_key = NetworkKey(bytes.fromhex("f7a2a44f8e8a8029064f173ddc1e2b00")) private_beacon_key = net_key.private_beacon_key - beacon, auth = MeshPrivateBeacon.unpack( - bytes.fromhex("435f18f85cf78a3121f58478a561e488e7cbf3174f022a514741"), - private_beacon_key + beacon, _auth = MeshPrivateBeacon.unpack( + bytes.fromhex("435f18f85cf78a3121f58478a561e488e7cbf3174f022a514741"), private_beacon_key ) assert not beacon.key_refresh assert beacon.iv_update - assert beacon.iv_index == 0x1010abcd + assert beacon.iv_index == 0x1010ABCD def test_private_beacon_received_iv_update_complete(net_key): net_key = NetworkKey(bytes.fromhex("3bbb6f1fbd53e157417f308ce7aec58f")) private_beacon_key = net_key.private_beacon_key - beacon, auth = MeshPrivateBeacon.unpack( - bytes.fromhex("1b998f82927535ea6f3076f422ce827408ab2f0ffb94cf97f881"), - private_beacon_key + beacon, _auth = MeshPrivateBeacon.unpack( + bytes.fromhex("1b998f82927535ea6f3076f422ce827408ab2f0ffb94cf97f881"), private_beacon_key ) assert not beacon.key_refresh @@ -171,7 +159,11 @@ def test_mesh_private_beacon_created_iv_update_in_progress(): private_beacon_key = net_key.private_beacon_key random = bytes.fromhex("435f18f85cf78a3121f58478a5") beacon = MeshPrivateBeacon( - private_beacon_key, random, False, True, 0x1010abcd, + private_beacon_key, + random, + False, + True, + 0x1010ABCD, ) assert beacon.pack(private_beacon_key) == ( bytes.fromhex("61e488e7cb"), @@ -184,7 +176,11 @@ def test_mesh_private_beacon_created_iv_update_complete(): private_beacon_key = net_key.private_beacon_key random = bytes.fromhex("1b998f82927535ea6f3076f422") beacon = MeshPrivateBeacon( - private_beacon_key, random, False, False, 00000000, + private_beacon_key, + random, + False, + False, + 00000000, ) assert beacon.pack(private_beacon_key) == ( bytes.fromhex("ce827408ab"), @@ -193,18 +189,14 @@ def test_mesh_private_beacon_created_iv_update_complete(): def test_unprovisioned_beacon_received(): - beacon = UnprovisionedDeviceBeacon.unpack( - bytes.fromhex("25bdf2eb03cc4383a65add3e8007fb554243") - ) + beacon = UnprovisionedDeviceBeacon.unpack(bytes.fromhex("25bdf2eb03cc4383a65add3e8007fb554243")) assert beacon.uuid == uuid.UUID("25bdf2eb-03cc-4383-a65a-dd3e8007fb55") assert beacon.oob == 0x4243 def test_unprovisioned_beacon_received_uri_hash(): - beacon = UnprovisionedDeviceBeacon.unpack( - bytes.fromhex("25bdf2eb03cc4383a65add3e8007fb55424301020304") - ) + beacon = UnprovisionedDeviceBeacon.unpack(bytes.fromhex("25bdf2eb03cc4383a65add3e8007fb55424301020304")) assert beacon.uuid == uuid.UUID("25bdf2eb-03cc-4383-a65a-dd3e8007fb55") assert beacon.oob == 0x4243 @@ -213,15 +205,11 @@ def test_unprovisioned_beacon_received_uri_hash(): def test_unprovisioned_beacon_received_uri_hash_too_short(): with raises(ValueError, match="expected 4 bytes"): - UnprovisionedDeviceBeacon.unpack( - bytes.fromhex("25bdf2eb03cc4383a65add3e8007fb554243010203") - ) + UnprovisionedDeviceBeacon.unpack(bytes.fromhex("25bdf2eb03cc4383a65add3e8007fb554243010203")) def test_unprovisioned_beacon_created(): - beacon = UnprovisionedDeviceBeacon( - uuid.UUID("25bdf2eb-03cc-4383-a65a-dd3e8007fb55"), 0x4243 - ) + beacon = UnprovisionedDeviceBeacon(uuid.UUID("25bdf2eb-03cc-4383-a65a-dd3e8007fb55"), 0x4243) assert beacon.pack() == bytes.fromhex("25bdf2eb03cc4383a65add3e8007fb554243") @@ -233,14 +221,12 @@ def test_unprovisioned_beacon_created_uri_hash(): bytes.fromhex("04030201"), ) - assert beacon.pack() == bytes.fromhex( - "25bdf2eb03cc4383a65add3e8007fb55424304030201" - ) + assert beacon.pack() == bytes.fromhex("25bdf2eb03cc4383a65add3e8007fb55424304030201") def test_unprovisioned_beacon_created_uri_hash_too_short(): with raises(ValueError, match="expected 4 bytes"): - beacon = UnprovisionedDeviceBeacon( + UnprovisionedDeviceBeacon( uuid.UUID("25bdf2eb-03cc-4383-a65a-dd3e8007fb55"), 0x4243, bytes.fromhex("040302"), @@ -253,9 +239,7 @@ def test_network_nonce(config_appkey_status_message): config_appkey_status_message.dst, config_appkey_status_message.ttl, False, - ).network(seq=0x000006, iv_index=0x12345678) == bytes.fromhex( - "000b0000061201000012345678" - ) + ).network(seq=0x000006, iv_index=0x12345678) == bytes.fromhex("000b0000061201000012345678") def test_device_nonce(config_appkey_status_message): @@ -264,9 +248,7 @@ def test_device_nonce(config_appkey_status_message): config_appkey_status_message.dst, config_appkey_status_message.ttl, False, - ).device(seq=0x000006, iv_index=0x12345678) == bytes.fromhex( - "02000000061201000312345678" - ) + ).device(seq=0x000006, iv_index=0x12345678) == bytes.fromhex("02000000061201000312345678") def test_application_nonce(health_current_status_message): @@ -275,44 +257,32 @@ def test_application_nonce(health_current_status_message): health_current_status_message.dst, health_current_status_message.ttl, False, - ).application(seq=0x000007, iv_index=0x12345678) == bytes.fromhex( - "01000000071201ffff12345678" - ) + ).application(seq=0x000007, iv_index=0x12345678) == bytes.fromhex("01000000071201ffff12345678") def test_device_pdu(config_appkey_status_message, dev_key): - (segment,) = config_appkey_status_message.segments( - dev_key, seq=0x000006, iv_index=0x12345678 - ) + (segment,) = config_appkey_status_message.segments(dev_key, seq=0x000006, iv_index=0x12345678) assert segment == bytes.fromhex("0089511bf1d1a81c11dcef") def test_application_pdu(health_current_status_message, app_key): - (segment,) = health_current_status_message.segments( - app_key, seq=0x000007, iv_index=0x12345678 - ) + (segment,) = health_current_status_message.segments(app_key, seq=0x000007, iv_index=0x12345678) assert segment == bytes.fromhex("665a8bde6d9106ea078a") def test_application_pdu_segmented(config_appkey_add_message, dev_key): - segments = list( - config_appkey_add_message.segments(dev_key, seq=0x3129AB, iv_index=0x12345678) - ) + segments = list(config_appkey_add_message.segments(dev_key, seq=0x3129AB, iv_index=0x12345678)) assert segments[0] == bytes.fromhex("8026ac01ee9dddfd2169326d23f3afdf") assert segments[1] == bytes.fromhex("8026ac21cfdc18c52fdef772e0e17308") def test_application_pdu_segmented_long_mic(dev_key): - bogus_long_message = AccessMessage( - src=0x0003, dst=0x1201, ttl=0x04, payload=bytes(range(21)) - ) + bogus_long_message = AccessMessage(src=0x0003, dst=0x1201, ttl=0x04, payload=bytes(range(21))) - segments = list( - bogus_long_message.segments(dev_key, seq=0x3129AB, iv_index=0x12345678) - ) + segments = list(bogus_long_message.segments(dev_key, seq=0x3129AB, iv_index=0x12345678)) assert len(segments) == 3 assert segments[0] == bytes.fromhex("80a6ac023008177a35ada597ba2ad9a4") @@ -326,9 +296,7 @@ def test_application_pack_to_network_pdu( net_key: NetworkKey, ): network_message = NetworkMessage(health_current_status_message) - ((seq, network_pdu),) = network_message.pack( - app_key, net_key, seq=0x000007, iv_index=0x12345678 - ) + _seq, network_pdu = next(network_message.pack(app_key, net_key, seq=0x000007, iv_index=0x12345678)) assert network_pdu.hex() == "6848cba437860e5673728a627fb938535508e21a6baf57" @@ -339,13 +307,15 @@ def test_application_pack_to_network_pdu_skip_segments( net_key: NetworkKey, ): network_message = NetworkMessage(app_sar_message) - ((seq, network_pdu),) = network_message.pack( - app_key, - net_key, - seq=0x000007, - iv_index=0x12345678, - transport_seq=0x000008, - skip_segments=[0], + _seq, network_pdu = next( + network_message.pack( + app_key, + net_key, + seq=0x000007, + iv_index=0x12345678, + transport_seq=0x000008, + skip_segments=[0], + ) ) assert network_pdu.hex() == "689353e01b0a211f82734ad0c7c92081616414e0f8b7e7048c7e" @@ -366,25 +336,21 @@ def test_application_unpack_from_network_pdu( assert unpacked_network_message.message == health_current_status_message -def test_application_network_pdu_segmented_retry( - health_current_status_message, app_key, net_key -): +def test_application_network_pdu_segmented_retry(health_current_status_message, app_key, net_key): network_message = NetworkMessage(health_current_status_message) - ((seq, network_pdu),) = network_message.pack( - app_key, net_key, seq=0x000017, iv_index=0x12345678, transport_seq=0x000007 + _seq, network_pdu = next( + network_message.pack(app_key, net_key, seq=0x000017, iv_index=0x12345678, transport_seq=0x000007) ) - assert seq == 0x000017 + assert _seq == 0x000017 assert network_pdu.hex() == "6860f30170e2192e84fb4385254e9e71657aa1bcf2ca90" def test_control_segment_ack_message(control_appkey_add_ack_message, app_key, net_key): network_message = NetworkMessage(control_appkey_add_ack_message) - ((seq, network_pdu),) = network_message.pack( - app_key, net_key, seq=0x014835, iv_index=0x12345678 - ) + _seq, network_pdu = next(network_message.pack(app_key, net_key, seq=0x014835, iv_index=0x12345678)) assert network_pdu.hex() == "68e476b5579c980d0d730f94d7f3509df987bb417eb7c05f" @@ -392,15 +358,13 @@ def test_control_segment_ack_message(control_appkey_add_ack_message, app_key, ne def test_control_pack_to_network_pdu(control_friend_offer_message, app_key, net_key): network_message = NetworkMessage(control_friend_offer_message) - ((seq, network_pdu),) = network_message.pack( - app_key, net_key, seq=0x014820, iv_index=0x12345678 - ) + _seq, network_pdu = next(network_message.pack(app_key, net_key, seq=0x014820, iv_index=0x12345678)) assert network_pdu.hex() == "68d4c826296d7979d7dbc0c9b4d43eebec129d20a620d01e" def test_control_unpack_from_network_pdu( - control_friend_offer_message: ControlMessage, net_key: NetworkKey + control_friend_offer_message: ControlMessage, app_key: ApplicationKey, net_key: NetworkKey ): _, _, unpacked_network_message = NetworkMessage.unpack( app_key, @@ -414,18 +378,18 @@ def test_control_unpack_from_network_pdu( def test_proxy_config_pack_to_network_pdu( proxy_use_whitelist_message: ProxyConfigMessage, + app_key: ApplicationKey, ): network_message = NetworkMessage(proxy_use_whitelist_message) net_key_local = NetworkKey(bytes.fromhex("d1aafb2a1a3c281cbdb0e960edfad852")) - ((seq, network_pdu),) = network_message.pack( - app_key, net_key_local, seq=0x000001, iv_index=0x12345678 - ) + _seq, network_pdu = next(network_message.pack(app_key, net_key_local, seq=0x000001, iv_index=0x12345678)) assert network_pdu.hex() == "10386bd60efbbb8b8c28512e792d3711f4b526" def test_proxy_config_unpack_from_network_pdu( proxy_use_whitelist_message: ProxyConfigMessage, + app_key: ApplicationKey, ): net_key_local = NetworkKey(bytes.fromhex("d1aafb2a1a3c281cbdb0e960edfad852")) _, _, unpacked_network_message = NetworkMessage.unpack( @@ -441,11 +405,10 @@ def test_proxy_config_unpack_from_network_pdu( def test_proxy_solicitation_pack_to_network_pdu( proxy_solicitation_message: SolicitationMessage, + app_key: ApplicationKey, ): network_message = NetworkMessage(proxy_solicitation_message) net_key_local = NetworkKey(bytes.fromhex("18eed9c2a56add85049ffc3c59ad0e12")) - ((seq, network_pdu),) = network_message.pack( - app_key, net_key_local, seq=0x000001, iv_index=0x00000000 - ) + _seq, network_pdu = next(network_message.pack(app_key, net_key_local, seq=0x000001, iv_index=0x00000000)) assert network_pdu.hex() == "7415fd26d31ba53425f13b423508c0019a" diff --git a/tests/test_provisioning.py b/tests/test_provisioning.py index c47591c..08ed6e6 100644 --- a/tests/test_provisioning.py +++ b/tests/test_provisioning.py @@ -40,97 +40,100 @@ ) valid = [ - # fmt: off pytest.param( - bytes.fromhex('0010'), - dict( - type=ProvisioningPDUType.INVITE, - parameters=dict( - attention=16 - ) - ), - id="invite" + bytes.fromhex("0010"), + { + "type": ProvisioningPDUType.INVITE, + "parameters": { + "attention": 16, + }, + }, + id="invite", ), pytest.param( - bytes.fromhex('010100030101080009050002'), - dict( - type=ProvisioningPDUType.CAPABILITIES, - parameters=dict( - num_elements=1, - algorithms={ProvisioningAlgorithm.P256_CMAC_AES128, ProvisioningAlgorithm.P256_HMAC_SHA256}, - public_key_type={ProvisioningPublicKeyType.OOB}, - static_oob_type={ProvisioningStaticOOBType.AVAILABLE}, - output_oob_size=8, - output_oob_action={ProvisioningOutputOOBAction.BLINK, - ProvisioningOutputOOBAction.OUTPUT_NUMERIC}, - input_oob_size=5, - input_oob_action={ProvisioningInputOOBAction.TWIST}, - ) - ), + bytes.fromhex("010100030101080009050002"), + { + "type": ProvisioningPDUType.CAPABILITIES, + "parameters": { + "num_elements": 1, + "algorithms": { + ProvisioningAlgorithm.P256_CMAC_AES128, + ProvisioningAlgorithm.P256_HMAC_SHA256, + }, + "public_key_type": {ProvisioningPublicKeyType.OOB}, + "static_oob_type": {ProvisioningStaticOOBType.AVAILABLE}, + "output_oob_size": 8, + "output_oob_action": { + ProvisioningOutputOOBAction.BLINK, + ProvisioningOutputOOBAction.OUTPUT_NUMERIC, + }, + "input_oob_size": 5, + "input_oob_action": {ProvisioningInputOOBAction.TWIST}, + }, + }, id="capabilities", ), pytest.param( - bytes.fromhex('020100000000'), - dict( - type=ProvisioningPDUType.START, - parameters=dict( - algorithm=ProvisioningAlgorithm.P256_HMAC_SHA256, - public_key=False, - authentication_method=ProvisioningAuthenticationMethod.NONE, - authentication_action=0, - authentication_size=0, - ) - ), + bytes.fromhex("020100000000"), + { + "type": ProvisioningPDUType.START, + "parameters": { + "algorithm": ProvisioningAlgorithm.P256_HMAC_SHA256, + "public_key": False, + "authentication_method": ProvisioningAuthenticationMethod.NONE, + "authentication_action": 0, + "authentication_size": 0, + }, + }, id="start, no authentication", ), pytest.param( - bytes.fromhex('020101020304'), - dict( - type=ProvisioningPDUType.START, - parameters=dict( - algorithm=ProvisioningAlgorithm.P256_HMAC_SHA256, - public_key=True, - authentication_method=ProvisioningAuthenticationMethod.OUTPUT, - authentication_action=ProvisioningOutputOOBAction.OUTPUT_NUMERIC, - authentication_size=4, - ) - ), + bytes.fromhex("020101020304"), + { + "type": ProvisioningPDUType.START, + "parameters": { + "algorithm": ProvisioningAlgorithm.P256_HMAC_SHA256, + "public_key": True, + "authentication_method": ProvisioningAuthenticationMethod.OUTPUT, + "authentication_action": ProvisioningOutputOOBAction.OUTPUT_NUMERIC, + "authentication_size": 4, + }, + }, id="start, output numeric", ), pytest.param( - bytes.fromhex('05b38a114dfdca1fe153bd2c1e0dc46ac2'), - dict( - type=ProvisioningPDUType.CONFIRMATION, - parameters=dict( - confirmation=bytes.fromhex('b38a114dfdca1fe153bd2c1e0dc46ac2'), - ) - ), + bytes.fromhex("05b38a114dfdca1fe153bd2c1e0dc46ac2"), + { + "type": ProvisioningPDUType.CONFIRMATION, + "parameters": { + "confirmation": bytes.fromhex("b38a114dfdca1fe153bd2c1e0dc46ac2"), + }, + }, id="confirmation", ), pytest.param( - bytes.fromhex('068b19ac31d58b124c946209b5db1021b9'), - dict( - type=ProvisioningPDUType.RANDOM, - parameters=dict( - random=bytes.fromhex('8b19ac31d58b124c946209b5db1021b9'), - ) - ), + bytes.fromhex("068b19ac31d58b124c946209b5db1021b9"), + { + "type": ProvisioningPDUType.RANDOM, + "parameters": { + "random": bytes.fromhex("8b19ac31d58b124c946209b5db1021b9"), + }, + }, id="random", ), pytest.param( - bytes.fromhex('07' - 'd0bd7f4a89a2ff6222af59a90a60ad58acfe3123356f5cec29' - '73e0ec50783b10c7'), - dict( - type=ProvisioningPDUType.DATA, - parameters=dict( - encrypted_provisioning_data=bytes.fromhex('d0bd7f4a89a2ff6222af59a90a60ad58acfe3123356f5cec29'), - provisioning_data_mic=bytes.fromhex('73e0ec50783b10c7'), - ) - ), + bytes.fromhex("07d0bd7f4a89a2ff6222af59a90a60ad58acfe3123356f5cec2973e0ec50783b10c7"), + { + "type": ProvisioningPDUType.DATA, + "parameters": { + "encrypted_provisioning_data": bytes.fromhex( + "d0bd7f4a89a2ff6222af59a90a60ad58acfe3123356f5cec29" + ), + "provisioning_data_mic": bytes.fromhex("73e0ec50783b10c7"), + }, + }, id="data", ), - # fmt: on ] @@ -148,31 +151,29 @@ def test_parse(encoded, decoded): # NOTE: ecdsa.VerifyingKey does not overload __eq__ @pytest.mark.parametrize( - # fmt: off "encoded,decoded", [ pytest.param( bytes.fromhex( - '03' - '2c31a47b5779809ef44cb5eaaf5c3e43d5f8faad4a8794cb987e9b03745c78dd' - '919512183898dfbecd52e2408e43871fd021109117bd3ed4eaf8437743715d4f' + "03" + "2c31a47b5779809ef44cb5eaaf5c3e43d5f8faad4a8794cb987e9b03745c78dd" + "919512183898dfbecd52e2408e43871fd021109117bd3ed4eaf8437743715d4f" ), - dict( - type=ProvisioningPDUType.PUBLIC_KEY, - parameters=dict( - key=ecdsa.VerifyingKey.from_string( + { + "type": ProvisioningPDUType.PUBLIC_KEY, + "parameters": { + "key": ecdsa.VerifyingKey.from_string( bytes.fromhex( - '2c31a47b5779809ef44cb5eaaf5c3e43d5f8faad4a8794cb987e9b03745c78dd' - '919512183898dfbecd52e2408e43871fd021109117bd3ed4eaf8437743715d4f' + "2c31a47b5779809ef44cb5eaaf5c3e43d5f8faad4a8794cb987e9b03745c78dd" + "919512183898dfbecd52e2408e43871fd021109117bd3ed4eaf8437743715d4f" ), curve=ecdsa.NIST256p, ) - ) - ), + }, + }, id="public key", ), - ] - # fmt: on + ], ) def test_parse_key(encoded, decoded): result = ProvisioningPDU.parse(data=encoded) @@ -192,14 +193,12 @@ def test_confirmation(): "f465e43ff23d3f1b9dc7dfc04da8758184dbc966204796eccf0d6cf5e16500cc" "0201d048bcbbd899eeefc424164e33c201c2b010ca6b4d43a8a155cad8ecb279" ) - ecdh_secret = bytes.fromhex( - "ab85843a2f6d883f62e5684b38e307335fe6e1945ecd19604105c6f23221eb69" - ) + ecdh_secret = bytes.fromhex("ab85843a2f6d883f62e5684b38e307335fe6e1945ecd19604105c6f23221eb69") random = bytes.fromhex("8b19ac31d58b124c946209b5db1021b9") auth = None - salt, key, confirmation = ProvisioningEncryption.confirmation_encrypt( + _salt, _key, confirmation = ProvisioningEncryption.confirmation_encrypt( ecdh_secret, invite + capabilities + start + provisioner_key + device_key, random, @@ -210,39 +209,33 @@ def test_confirmation(): bearer_ctrl = [ - # fmt: off pytest.param( - bytes.fromhex('03 70cf7c9732a345b691494810d2e9cbf4'), - dict( - opcode=BearerOpcode.LINK_OPEN, - parameters=dict( - device_uuid=bytes.fromhex("70cf7c9732a345b691494810d2e9cbf4") - ), - gpcf=GenericProvisioningPDUType.CONTROL - ), - id="link open" + bytes.fromhex("03 70cf7c9732a345b691494810d2e9cbf4"), + { + "opcode": BearerOpcode.LINK_OPEN, + "parameters": {"device_uuid": bytes.fromhex("70cf7c9732a345b691494810d2e9cbf4")}, + "gpcf": GenericProvisioningPDUType.CONTROL, + }, + id="link open", ), pytest.param( - bytes.fromhex('07'), - dict( - opcode=BearerOpcode.LINK_ACK, - parameters=dict(), - gpcf=GenericProvisioningPDUType.CONTROL - ), - id="link ack" + bytes.fromhex("07"), + { + "opcode": BearerOpcode.LINK_ACK, + "parameters": {}, + "gpcf": GenericProvisioningPDUType.CONTROL, + }, + id="link ack", ), pytest.param( - bytes.fromhex('0b00'), - dict( - opcode=BearerOpcode.LINK_CLOSE, - parameters=dict( - reason=LinkCloseReason.SUCCESS - ), - gpcf=GenericProvisioningPDUType.CONTROL - ), - id="link close" + bytes.fromhex("0b00"), + { + "opcode": BearerOpcode.LINK_CLOSE, + "parameters": {"reason": LinkCloseReason.SUCCESS}, + "gpcf": GenericProvisioningPDUType.CONTROL, + }, + id="link close", ), - # fmt: on ] @@ -259,70 +252,78 @@ def test_parse_provisioning(encoded, decoded): valid = [ - # fmt: off pytest.param( - [bytes.fromhex('00 0002 14 0000')], - dict( - type=ProvisioningPDUType.INVITE, - parameters=dict( - attention=0 - ) - ), - id="invite" + [bytes.fromhex("00 0002 14 0000")], + { + "type": ProvisioningPDUType.INVITE, + "parameters": { + "attention": 0, + }, + }, + id="invite", ), pytest.param( - [bytes.fromhex('00 0006 64 020000000000')], - dict( - type=ProvisioningPDUType.START, - parameters=dict( - algorithm=ProvisioningAlgorithm.P256_CMAC_AES128, - public_key=False, - authentication_method=ProvisioningAuthenticationMethod.NONE, - authentication_action=0, - authentication_size=0, - ) - ), - id="start" + [bytes.fromhex("00 0006 64 020000000000")], + { + "type": ProvisioningPDUType.START, + "parameters": { + "algorithm": ProvisioningAlgorithm.P256_CMAC_AES128, + "public_key": False, + "authentication_method": ProvisioningAuthenticationMethod.NONE, + "authentication_action": 0, + "authentication_size": 0, + }, + }, + id="start", ), pytest.param( - [bytes.fromhex('04 0022 8b 07d0bd7f4a89a2ff6222af59a90a60ad58acfe31'), - bytes.fromhex('06 23356f5cec2973e0ec50783b10c7')], - dict( - type=ProvisioningPDUType.DATA, - parameters=dict( - encrypted_provisioning_data=bytes.fromhex('d0bd7f4a89a2ff6222af59a90a60ad58acfe3123356f5cec29'), - provisioning_data_mic=bytes.fromhex('73e0ec50783b10c7'), - ) - ), + [ + bytes.fromhex("04 0022 8b 07d0bd7f4a89a2ff6222af59a90a60ad58acfe31"), + bytes.fromhex("06 23356f5cec2973e0ec50783b10c7"), + ], + { + "type": ProvisioningPDUType.DATA, + "parameters": { + "encrypted_provisioning_data": bytes.fromhex( + "d0bd7f4a89a2ff6222af59a90a60ad58acfe3123356f5cec29" + ), + "provisioning_data_mic": bytes.fromhex("73e0ec50783b10c7"), + }, + }, id="data", ), pytest.param( - [bytes.fromhex('08 0041 10 03f465e43ff23d3f1b9dc7dfc04da8758184dbc9'), - bytes.fromhex('06 66204796eccf0d6cf5e16500cc0201d048bcbbd899eeef'), - bytes.fromhex('0a c424164e33c201c2b010ca6b4d43a8a155cad8ecb279')], - dict( - type=ProvisioningPDUType.PUBLIC_KEY, - parameters=dict( - key=ecdsa.VerifyingKey.from_string( - bytes.fromhex('f465e43ff23d3f1b9dc7dfc04da8758184dbc966204796eccf0d6cf5e16500cc' - '0201d048bcbbd899eeefc424164e33c201c2b010ca6b4d43a8a155cad8ecb279'), + [ + bytes.fromhex("08 0041 10 03f465e43ff23d3f1b9dc7dfc04da8758184dbc9"), + bytes.fromhex("06 66204796eccf0d6cf5e16500cc0201d048bcbbd899eeef"), + bytes.fromhex("0a c424164e33c201c2b010ca6b4d43a8a155cad8ecb279"), + ], + { + "type": ProvisioningPDUType.PUBLIC_KEY, + "parameters": { + "key": ecdsa.VerifyingKey.from_string( + bytes.fromhex( + "f465e43ff23d3f1b9dc7dfc04da8758184dbc966204796eccf0d6cf5e16500cc" + "0201d048bcbbd899eeefc424164e33c201c2b010ca6b4d43a8a155cad8ecb279" + ), curve=ecdsa.NIST256p, ) - ) - ), + }, + }, id="public key", ), pytest.param( - [bytes.fromhex('000011d3068b19ac31d58b124c946209b5db1021b9')], - dict( - type=ProvisioningPDUType.RANDOM, - parameters=dict( - random=bytes.fromhex('8b19ac31d58b124c946209b5db1021b9') - ) - ), + [bytes.fromhex("000011d3068b19ac31d58b124c946209b5db1021b9")], + { + "type": ProvisioningPDUType.RANDOM, + "parameters": { + "random": bytes.fromhex( + "8b19ac31d58b124c946209b5db1021b9", + ) + }, + }, id="random", ), - # fmt: on ] @@ -339,50 +340,38 @@ def test_unpack_generic(encoded, decoded): prov_params = [ - # fmt: off pytest.param( - dict( - secret=bytes.fromhex('ab85843a2f6d883f62e5684b38e307335fe6e1945ecd19604105c6f23221eb69'), - - confirmation_salt=bytes.fromhex('5faabe187337c71cc6c973369dcaa79a'), - random_provisioner=bytes.fromhex('8b19ac31d58b124c946209b5db1021b9'), - random_device=bytes.fromhex('55a2a2bca04cd32ff6f346bd0a0c1a3a'), - - net_key=bytes.fromhex('efb2255e6422d330088e09bb015ed707'), - net_key_index=b"\x05\x67", - flags=b"\x00", - ivindex=b"\x01\x02\x03\x04", - address=b"\x0b\x0c", - - prov_data=bytes.fromhex('efb2255e6422d330088e09bb015ed707056700010203040b0c'), - enc_provisioning_data=bytes.fromhex('d0bd7f4a89a2ff6222af59a90a60ad58acfe3123356f5cec29'), - mic=bytes.fromhex('73e0ec50783b10c7'), - salt=bytes.fromhex('a21c7d45f201cf9489a2fb57145015b4') - ), - id="" + { + "secret": bytes.fromhex("ab85843a2f6d883f62e5684b38e307335fe6e1945ecd19604105c6f23221eb69"), + "confirmation_salt": bytes.fromhex("5faabe187337c71cc6c973369dcaa79a"), + "random_provisioner": bytes.fromhex("8b19ac31d58b124c946209b5db1021b9"), + "random_device": bytes.fromhex("55a2a2bca04cd32ff6f346bd0a0c1a3a"), + "net_key": bytes.fromhex("efb2255e6422d330088e09bb015ed707"), + "net_key_index": b"\x05\x67", + "flags": b"\x00", + "ivindex": b"\x01\x02\x03\x04", + "address": b"\x0b\x0c", + "prov_data": bytes.fromhex("efb2255e6422d330088e09bb015ed707056700010203040b0c"), + "enc_provisioning_data": bytes.fromhex("d0bd7f4a89a2ff6222af59a90a60ad58acfe3123356f5cec29"), + "mic": bytes.fromhex("73e0ec50783b10c7"), + "salt": bytes.fromhex("a21c7d45f201cf9489a2fb57145015b4"), + }, + id="", ) - # fmt: on ] @pytest.mark.parametrize("params", prov_params) def test_encrypt_provisioning_data(params): - prov_data = ( - params["net_key"] - + params["net_key_index"] - + params["flags"] - + params["ivindex"] - + params["address"] + params["net_key"] + params["net_key_index"] + params["flags"] + params["ivindex"] + params["address"] ) assert prov_data == params["prov_data"] enc_provisioning_data = ProvisioningEncryption.data_encrypt( secret=params["secret"], - inputs=params["confirmation_salt"] - + params["random_provisioner"] - + params["random_device"], + inputs=params["confirmation_salt"] + params["random_provisioner"] + params["random_device"], data=prov_data, ) @@ -395,9 +384,7 @@ def test_decrypt_provisioning_data(params): salt, provisioning_data = ProvisioningEncryption.data_decrypt( secret=params["secret"], - inputs=params["confirmation_salt"] - + params["random_provisioner"] - + params["random_device"], + inputs=params["confirmation_salt"] + params["random_provisioner"] + params["random_device"], data=enc_provisioning_data, ) @@ -406,7 +393,6 @@ def test_decrypt_provisioning_data(params): @pytest.mark.parametrize( - # fmt :off "confirmation_key, confirmation, random, auth", [ pytest.param( @@ -423,30 +409,22 @@ def test_decrypt_provisioning_data(params): None, id="", ), - ] - # fmt: on + ], ) def test_confirmation_validate(confirmation_key, confirmation, random, auth): - assert ProvisioningEncryption.confirmation_validate( - confirmation_key, confirmation, random, auth - ) + assert ProvisioningEncryption.confirmation_validate(confirmation_key, confirmation, random, auth) @pytest.mark.parametrize( - # fmt: off "secret, provisioning_salt, device_key", [ pytest.param( bytes.fromhex("ab85843a2f6d883f62e5684b38e307335fe6e1945ecd19604105c6f23221eb69"), bytes.fromhex("a21c7d45f201cf9489a2fb57145015b4"), bytes.fromhex("0520adad5e0142aa3e325087b4ec16d8"), - id="" + id="", ), - ] - # fmt: on + ], ) def test_provisioning_device_key(secret, provisioning_salt, device_key): - assert ( - ProvisioningEncryption.provisioning_device_key(secret, provisioning_salt) - == device_key - ) + assert ProvisioningEncryption.provisioning_device_key(secret, provisioning_salt) == device_key diff --git a/tests/test_security.py b/tests/test_security.py index 10a22fc..04a6be8 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -24,18 +24,18 @@ from bluetooth_mesh.network.crypto import k1, k2, k3, k4, s1 -@fixture -def app_key(): +@fixture(name="app_key") +def fixture_app_key(): return bytes.fromhex("3216d1509884b533248541792b877f98") -@fixture -def net_key(): +@fixture(name="net_key") +def fixture_net_key(): return bytes.fromhex("f7a2a44f8e8a8029064f173ddc1e2b00") -@fixture -def dev_key(): +@fixture(name="dev_key") +def fixture_dev_key(): return bytes.fromhex("37c612c4a2d337cb7b98355531b3617f")