diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 59e9f82..54f5a0d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,5 +18,7 @@ jobs: run: make .venv - name: Run lint run: make lint + - name: Run typecheck + run: make typecheck - name: Run tests run: make test diff --git a/pyproject.toml b/pyproject.toml index bdf0cee..24c5959 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,16 +21,16 @@ dependencies = [ "bluetooth-adapters ~= 0.16", "cachetools ~= 5.3", "dbus-fast ~= 2.15", - "pybricks ~= 3.5.0", + "pybricks ~= 3.6.0", ] [dependency-groups] debug = ["bluetooth-data-tools ~= 1.15"] dev = [ "async-timer ~= 1.1.6", - "mypy ~= 1.15.0", + "mypy ~= 1.16.0", "pytest ~= 8.3", - "pytest-asyncio ~= 0.26.0", + "pytest-asyncio ~= 1.0.0", "python-dbusmock ~= 0.34.3", "ruff ~= 0.11.8", "types-cachetools ~= 5.3", diff --git a/src/pb_ble/bluezdbus/broadcaster.py b/src/pb_ble/bluezdbus/broadcaster.py index 2457364..3a29a71 100644 --- a/src/pb_ble/bluezdbus/broadcaster.py +++ b/src/pb_ble/bluezdbus/broadcaster.py @@ -83,7 +83,8 @@ async def stop_broadcast(self, adv: str | BroadcastAdvertisement): pass finally: self.bus.unexport(path) - del self.advertisements[path] + if path in self.advertisements: + del self.advertisements[path] async def stop(self): """ @@ -114,7 +115,8 @@ async def broadcast(self, adv: BroadcastAdvertisement): def release_advertisement(path): try: self.bus.unexport(path) - del self.advertisements[path] + if path in self.advertisements: + del self.advertisements[path] finally: on_release(path) diff --git a/src/pb_ble/constants.py b/src/pb_ble/constants.py index f315d47..81d5bdd 100644 --- a/src/pb_ble/constants.py +++ b/src/pb_ble/constants.py @@ -18,7 +18,7 @@ """Type of a value that can be broadcast.""" PybricksBroadcastData: TypeAlias = ( - PybricksBroadcastValue | tuple[PybricksBroadcastValue] + PybricksBroadcastValue | tuple[PybricksBroadcastValue, ...] ) """Type of the broadcast data.""" diff --git a/src/pb_ble/messages.py b/src/pb_ble/messages.py index 7b02544..c9d9cc8 100644 --- a/src/pb_ble/messages.py +++ b/src/pb_ble/messages.py @@ -10,11 +10,14 @@ """ +import logging from enum import IntEnum from struct import pack, unpack, unpack_from -from typing import Literal, Tuple +from typing import Literal, Tuple, cast -from .constants import PybricksBroadcast, PybricksBroadcastValue +from .constants import PybricksBroadcast, PybricksBroadcastData, PybricksBroadcastValue + +logger = logging.getLogger(__name__) def decode_message( @@ -30,24 +33,32 @@ def decode_message( or a tuple. """ + logger.debug(f"decoding[{len(data)}]: {data!r}") + # idx 0 is the channel channel: int = unpack_from(" bytes: @@ -63,6 +74,7 @@ def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes: # idx 0 is the channel encoded_channel = pack(" {encoded_channel!r}") # idx 1 is the message start encoded_data = bytearray(encoded_channel) @@ -70,10 +82,14 @@ def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes: if len(values) == 1: # set SINGLE_OBJECT marker header = PybricksBleBroadcastDataType.SINGLE_OBJECT << 5 + logger.debug(f"data[{len(encoded_data)}]: SINGLE_OBJECT marker") encoded_data.append(header) for val in values: header, encoded_val = _encode_value(val) + logger.debug( + f"data[{len(encoded_data)}] of {type(val)!s}: {val!r} -> ({header!r}, {encoded_val!r})" + ) encoded_data.append(header) if encoded_val is not None: @@ -85,7 +101,9 @@ def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes: f"Payload too large: {len(encoded_data)} bytes (maximum is {OBSERVED_DATA_MAX_SIZE} bytes)" ) - return bytes(encoded_data) + message = bytes(encoded_data) + logger.debug(f"encoded[{len(message)}]: {message!r}") + return message def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]: @@ -99,9 +117,14 @@ def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]: :return: Tuple containing the vendor ID type (`BT` or `USB`), the vendor ID, the product ID and the product revision. """ + vid_type: int + vid: int + pid: int + rev: int + vid_type, vid, pid, rev = unpack(" None: - if data is None: + async def broadcast(self, *data: PybricksBroadcastValue | None) -> None: # type: ignore [override] + if len(data) == 0: + raise ValueError("Broadcast must be a value or tuple.") + if None in data: await self._broadcaster.stop_broadcast(self._adv) else: if not self._broadcaster.is_broadcasting(self._adv): await self._broadcaster.broadcast(self._adv) - self._adv.message = data + self._adv.message = cast(PybricksBroadcastData, tuple(data)) def observe( self, channel: int ) -> Optional[Tuple[Union[bool, int, float, str, bytes], ...]]: advertisement = self._observer.observe(channel) - return advertisement.data if advertisement is not None else None + + if advertisement is not None: + if isinstance(advertisement.data, tuple): + return advertisement.data + else: + # TODO: Pybricks does expose single-value broadcasts + # in a single-object tuple. However, that doesn't match + # the type signature of the observe() method. To adhere + # to the type signature, we only return wrapped values. + return (advertisement.data,) + else: + return None def signal_strength(self, channel: int) -> int: advertisement = self._observer.observe(channel) diff --git a/tests/fixtures/bluez5_mock.py b/tests/fixtures/bluez5_mock.py index 230d141..afc7cd2 100644 --- a/tests/fixtures/bluez5_mock.py +++ b/tests/fixtures/bluez5_mock.py @@ -1,8 +1,8 @@ import sys import unittest.mock -import dbus import pytest +from dbus.proxies import ProxyObject from dbusmock import SpawnedMock from dbusmock.testcase import PrivateDBus @@ -19,7 +19,7 @@ def pytest_runtest_setup(item) -> None: @pytest.fixture -def bluez_mock(dbusmock_system: PrivateDBus) -> YieldFixture[dbus.proxies.ProxyObject]: +def bluez_mock(dbusmock_system: PrivateDBus) -> YieldFixture[ProxyObject]: template = "bluez5" parameters = { "advertise": True, @@ -34,9 +34,7 @@ def bluez_mock(dbusmock_system: PrivateDBus) -> YieldFixture[dbus.proxies.ProxyO @pytest.fixture(autouse=True) -def adapter_mock( - bluez_mock: dbus.proxies.ProxyObject, adapter_name: str -) -> YieldFixture[str]: +def adapter_mock(bluez_mock: ProxyObject, adapter_name: str) -> YieldFixture[str]: device_name = adapter_name # Mock out the DBus adapter diff --git a/tests/test_messages.py b/tests/test_messages.py index 4940b1a..2e29f26 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -5,6 +5,7 @@ class TestPybricksBleDecodeMessage: + # h.ble.broadcast(5) def test_decode_message_single_object(self): # channel 200 # single object marker @@ -13,11 +14,11 @@ def test_decode_message_single_object(self): channel, data = decode_message(message) assert channel == 200 - assert not isinstance(data, tuple) + assert isinstance(data, int), type(data) assert data == 5 - # TODO: Check behaviour against reference implementation + # h.ble.broadcast((5,)) def test_decode_message_single_object_tuple(self): # channel 200 # int8: 5 @@ -25,10 +26,23 @@ def test_decode_message_single_object_tuple(self): channel, data = decode_message(message) assert channel == 200 + assert isinstance(data, tuple), type(data) + assert len(data) == 1 + + assert data == (5,) + + # h.ble.broadcast((5,)) + def test_decode_message_single_object_tuple_0(self): + # channel 0 + # int8: 5 + message = b"\x00a\x05" + channel, data = decode_message(message) + + assert channel == 0 assert isinstance(data, tuple) assert len(data) == 1 - assert data[0] == 5 + assert data == (5,) def test_decode_message_int8_int16_int32(self): # channel: 200 @@ -127,9 +141,9 @@ def test_encode_message_single_object(self): data = encode_message(200, 5) assert data == b"\xc8\x00\x61\x05" - @pytest.mark.skip("Check behaviour against reference implementation") + @pytest.mark.skip("Encoding single-object tuples is not supported") def test_encode_message_single_object_tuple(self): - data = encode_message(200, (1)) + data = encode_message(200, (1,)) # type: ignore[arg-type] assert data == b"\xc8\x61\x01" def test_encode_message_int8_int16_int32(self): diff --git a/tests/test_vhub.py b/tests/test_vhub.py index 8b786df..cb29364 100644 --- a/tests/test_vhub.py +++ b/tests/test_vhub.py @@ -16,15 +16,37 @@ class TestVirtualBLE: async def test_create_vble(self, adapter): ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2]) assert ble is not None + assert not ble._broadcaster.is_broadcasting() - async def test_observe(self): + async def test_observe_none(self): ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2]) data = ble.observe(2) assert data is None + assert not ble._broadcaster.is_broadcasting() - async def test_broadcast(self): + async def test_broadcast_single(self): ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2]) await ble.broadcast(42) + assert ble._broadcaster.is_broadcasting() + assert ble._adv.message == 42 + + async def test_broadcast_multiple(self): + ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2]) + await ble.broadcast(42, 24) + assert ble._broadcaster.is_broadcasting() + assert ble._adv.message == (42, 24) + + async def test_broadcast_none(self): + ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2]) + await ble.broadcast(None) + assert not ble._broadcaster.is_broadcasting() + + async def test_broadcast_start_stop(self): + ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2]) + await ble.broadcast(42) + assert ble._broadcaster.is_broadcasting() + await ble.broadcast(None) + assert not ble._broadcaster.is_broadcasting() async def test_context(self): async with await get_virtual_ble(