diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 39754b4f..e00cf8e3 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -22,7 +22,14 @@ import functools import logging import struct -from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence +from collections.abc import ( + AsyncIterator, + Awaitable, + Callable, + Iterable, + Sequence, + Mapping, +) from dataclasses import dataclass, field from typing import ClassVar, SupportsBytes, TypeVar @@ -1049,11 +1056,9 @@ class GetItemAttributesCommand(Command): scope: Scope = field(metadata=Scope.type_metadata(1)) uid: int = field(metadata=_UINT64_BE_METADATA) uid_counter: int = field(metadata=hci.metadata('>2')) - start_item: int = field(metadata=hci.metadata('>4')) - end_item: int = field(metadata=hci.metadata('>4')) # When attributes is empty, all attributes will be requested. attributes: Sequence[MediaAttributeId] = field( - metadata=MediaAttributeId.type_metadata(1, list_begin=True, list_end=True) + metadata=MediaAttributeId.type_metadata(4, list_begin=True, list_end=True) ) @@ -1512,7 +1517,9 @@ class PlaybackPositionChangedEvent(Event): @dataclass class TrackChangedEvent(Event): event_id = EventId.TRACK_CHANGED - identifier: bytes = field(metadata=hci.metadata('*')) + NO_TRACK = 0xFFFFFFFFFFFFFFFF + + uid: int = field(metadata=_UINT64_BE_METADATA) # ----------------------------------------------------------------------------- @@ -1536,16 +1543,19 @@ class Setting(hci.HCI_Dataclass_Object): def __post_init__(self) -> None: super().__post_init__() - if self.attribute_id == ApplicationSetting.AttributeId.EQUALIZER_ON_OFF: - self.value_id = ApplicationSetting.EqualizerOnOffStatus(self.value_id) - elif self.attribute_id == ApplicationSetting.AttributeId.REPEAT_MODE: - self.value_id = ApplicationSetting.RepeatModeStatus(self.value_id) - elif self.attribute_id == ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: - self.value_id = ApplicationSetting.ShuffleOnOffStatus(self.value_id) - elif self.attribute_id == ApplicationSetting.AttributeId.SCAN_ON_OFF: - self.value_id = ApplicationSetting.ScanOnOffStatus(self.value_id) - else: - self.value_id = ApplicationSetting.GenericValue(self.value_id) + match self.attribute_id: + case ApplicationSetting.AttributeId.EQUALIZER_ON_OFF: + self.value_id = ApplicationSetting.EqualizerOnOffStatus( + self.value_id + ) + case ApplicationSetting.AttributeId.REPEAT_MODE: + self.value_id = ApplicationSetting.RepeatModeStatus(self.value_id) + case ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: + self.value_id = ApplicationSetting.ShuffleOnOffStatus(self.value_id) + case ApplicationSetting.AttributeId.SCAN_ON_OFF: + self.value_id = ApplicationSetting.ScanOnOffStatus(self.value_id) + case _: + self.value_id = ApplicationSetting.GenericValue(self.value_id) player_application_settings: Sequence[Setting] = field( metadata=hci.metadata(Setting.parse_from_bytes, list_begin=True, list_end=True) @@ -1619,6 +1629,8 @@ def __init__(self, status_code: avc.ResponseFrame.ResponseCode) -> None: supported_events: list[EventId] supported_company_ids: list[int] + supported_player_app_settings: dict[ApplicationSetting.AttributeId, list[int]] + player_app_settings: dict[ApplicationSetting.AttributeId, int] volume: int playback_status: PlayStatus @@ -1626,11 +1638,23 @@ def __init__( self, supported_events: Iterable[EventId] = (), supported_company_ids: Iterable[int] = (AVRCP_BLUETOOTH_SIG_COMPANY_ID,), + supported_player_app_settings: ( + Mapping[ApplicationSetting.AttributeId, Sequence[int]] | None + ) = None, ) -> None: self.supported_company_ids = list(supported_company_ids) self.supported_events = list(supported_events) self.volume = 0 self.playback_status = PlayStatus.STOPPED + self.supported_player_app_settings = ( + {key: list(value) for key, value in supported_player_app_settings.items()} + if supported_player_app_settings + else {} + ) + self.player_app_settings = {} + self.uid_counter = 0 + self.addressed_player_id = 0 + self.current_track_uid = TrackChangedEvent.NO_TRACK async def get_supported_events(self) -> list[EventId]: return self.supported_events @@ -1663,6 +1687,38 @@ async def on_key_event( async def get_playback_status(self) -> PlayStatus: return self.playback_status + async def get_supported_player_app_settings( + self, + ) -> dict[ApplicationSetting.AttributeId, list[int]]: + return self.supported_player_app_settings + + async def get_current_player_app_settings( + self, + ) -> dict[ApplicationSetting.AttributeId, int]: + return self.player_app_settings + + async def set_player_app_settings( + self, attribute: ApplicationSetting.AttributeId, value: int + ) -> None: + self.player_app_settings[attribute] = value + + async def play_item(self, scope: Scope, uid: int, uid_counter: int) -> None: + logger.debug( + "@@@ play_item: scope=%s, uid=%s, uid_counter=%s", + scope, + uid, + uid_counter, + ) + + async def get_uid_counter(self) -> int: + return self.uid_counter + + async def get_addressed_player_id(self) -> int: + return self.addressed_player_id + + async def get_current_track_uid(self) -> int: + return self.current_track_uid + # TODO add other delegate methods @@ -1910,6 +1966,51 @@ async def get_element_attributes( response = self._check_response(response_context, GetElementAttributesResponse) return list(response.attributes) + async def list_supported_player_app_settings( + self, attribute_ids: Sequence[ApplicationSetting.AttributeId] = () + ) -> dict[ApplicationSetting.AttributeId, list[int]]: + """Get element attributes from the connected peer.""" + response_context = await self.send_avrcp_command( + avc.CommandFrame.CommandType.STATUS, + ListPlayerApplicationSettingAttributesCommand(), + ) + if not attribute_ids: + list_attribute_response = self._check_response( + response_context, ListPlayerApplicationSettingAttributesResponse + ) + attribute_ids = list_attribute_response.attribute + + supported_settings: dict[ApplicationSetting.AttributeId, list[int]] = {} + for attribute_id in attribute_ids: + response_context = await self.send_avrcp_command( + avc.CommandFrame.CommandType.STATUS, + ListPlayerApplicationSettingValuesCommand(attribute_id), + ) + list_value_response = self._check_response( + response_context, ListPlayerApplicationSettingValuesResponse + ) + supported_settings[attribute_id] = list(list_value_response.value) + + return supported_settings + + async def get_player_app_settings( + self, attribute_ids: Sequence[ApplicationSetting.AttributeId] + ) -> dict[ApplicationSetting.AttributeId, int]: + """Get element attributes from the connected peer.""" + response_context = await self.send_avrcp_command( + avc.CommandFrame.CommandType.STATUS, + GetCurrentPlayerApplicationSettingValueCommand(attribute_ids), + ) + response: GetCurrentPlayerApplicationSettingValueResponse = ( + self._check_response( + response_context, GetCurrentPlayerApplicationSettingValueResponse + ) + ) + return { + attribute_id: value + for attribute_id, value in zip(response.attribute, response.value) + } + async def monitor_events( self, event_id: EventId, playback_interval: int = 0 ) -> AsyncIterator[Event]: @@ -1961,13 +2062,13 @@ async def monitor_playback_status( async def monitor_track_changed( self, - ) -> AsyncIterator[bytes]: + ) -> AsyncIterator[int]: """Monitor Track changes from the connected peer.""" async for event in self.monitor_events(EventId.TRACK_CHANGED, 0): if not isinstance(event, TrackChangedEvent): logger.warning("unexpected event class") continue - yield event.identifier + yield event.uid async def monitor_playback_position( self, playback_interval: int @@ -2060,11 +2161,9 @@ def notify_playback_status_changed(self, status: PlayStatus) -> None: """Notify the connected peer of a Playback Status change.""" self.notify_event(PlaybackStatusChangedEvent(status)) - def notify_track_changed(self, identifier: bytes) -> None: + def notify_track_changed(self, uid: int) -> None: """Notify the connected peer of a Track change.""" - if len(identifier) != 8: - raise core.InvalidArgumentError("identifier must be 8 bytes") - self.notify_event(TrackChangedEvent(identifier)) + self.notify_event(TrackChangedEvent(uid)) def notify_playback_position_changed(self, position: int) -> None: """Notify the connected peer of a Position change.""" @@ -2280,21 +2379,40 @@ def _on_command_pdu(self, pdu_id: PduId, pdu: bytes) -> None: ): # TODO: catch exceptions from delegates command = Command.from_bytes(pdu_id, pdu) - if isinstance(command, GetCapabilitiesCommand): - self._on_get_capabilities_command(transaction_label, command) - elif isinstance(command, SetAbsoluteVolumeCommand): - self._on_set_absolute_volume_command(transaction_label, command) - elif isinstance(command, RegisterNotificationCommand): - self._on_register_notification_command(transaction_label, command) - elif isinstance(command, GetPlayStatusCommand): - self._on_get_play_status_command(transaction_label, command) - else: - # Not supported. - # TODO: check that this is the right way to respond in this case. - logger.debug("unsupported PDU ID") - self.send_rejected_avrcp_response( - transaction_label, pdu_id, StatusCode.INVALID_PARAMETER - ) + match command: + case GetCapabilitiesCommand(): + self._on_get_capabilities_command(transaction_label, command) + case SetAbsoluteVolumeCommand(): + self._on_set_absolute_volume_command(transaction_label, command) + case RegisterNotificationCommand(): + self._on_register_notification_command(transaction_label, command) + case GetPlayStatusCommand(): + self._on_get_play_status_command(transaction_label, command) + case ListPlayerApplicationSettingAttributesCommand(): + self._on_list_player_application_setting_attributes_command( + transaction_label, command + ) + case ListPlayerApplicationSettingValuesCommand(): + self._on_list_player_application_setting_values_command( + transaction_label, command + ) + case SetPlayerApplicationSettingValueCommand(): + self._on_set_player_application_setting_value_command( + transaction_label, command + ) + case GetCurrentPlayerApplicationSettingValueCommand(): + self._on_get_current_player_application_setting_value_command( + transaction_label, command + ) + case PlayItemCommand(): + self._on_play_item_command(transaction_label, command) + case _: + # Not supported. + # TODO: check that this is the right way to respond in this case. + logger.debug("unsupported PDU ID") + self.send_rejected_avrcp_response( + transaction_label, pdu_id, StatusCode.INVALID_PARAMETER + ) else: logger.debug("unsupported command type") self.send_rejected_avrcp_response( @@ -2322,26 +2440,29 @@ def _on_response_pdu(self, pdu_id: PduId, pdu: bytes) -> None: # is Ok, but if/when more responses are supported, a lookup mechanism would be # more appropriate. response: Response | None = None - if response_code == avc.ResponseFrame.ResponseCode.REJECTED: - response = RejectedResponse(pdu_id=pdu_id, status_code=StatusCode(pdu[0])) - elif response_code == avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED: - response = NotImplementedResponse(pdu_id=pdu_id, parameters=pdu) - elif response_code in ( - avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, - avc.ResponseFrame.ResponseCode.INTERIM, - avc.ResponseFrame.ResponseCode.CHANGED, - avc.ResponseFrame.ResponseCode.ACCEPTED, - ): - response = Response.from_bytes(pdu=pdu, pdu_id=PduId(pdu_id)) - else: - logger.debug("unexpected response code") - pending_command.response.set_exception( - core.ProtocolError( - error_code=None, - error_namespace="avrcp", - details="unexpected response code", + match response_code: + case avc.ResponseFrame.ResponseCode.REJECTED: + response = RejectedResponse( + pdu_id=pdu_id, status_code=StatusCode(pdu[0]) + ) + case avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED: + response = NotImplementedResponse(pdu_id=pdu_id, parameters=pdu) + case ( + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE + | avc.ResponseFrame.ResponseCode.INTERIM + | avc.ResponseFrame.ResponseCode.CHANGED + | avc.ResponseFrame.ResponseCode.ACCEPTED + ): + response = Response.from_bytes(pdu=pdu, pdu_id=PduId(pdu_id)) + case _: + logger.debug("unexpected response code") + pending_command.response.set_exception( + core.ProtocolError( + error_code=None, + error_namespace="avrcp", + details="unexpected response code", + ) ) - ) if response is None: self.recycle_pending_command(pending_command) @@ -2512,22 +2633,18 @@ def _on_get_capabilities_command( async def get_supported_events() -> None: capabilities: Sequence[bytes | SupportsBytes] - if ( - command.capability_id - == GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED - ): - capabilities = await self.delegate.get_supported_events() - elif ( - command.capability_id == GetCapabilitiesCommand.CapabilityId.COMPANY_ID - ): - company_ids = await self.delegate.get_supported_company_ids() - capabilities = [ - company_id.to_bytes(3, 'big') for company_id in company_ids - ] - else: - raise core.InvalidArgumentError( - f"Unsupported capability: {command.capability_id}" - ) + match command.capability_id: + case GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED: + capabilities = await self.delegate.get_supported_events() + case GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED.COMPANY_ID: + company_ids = await self.delegate.get_supported_company_ids() + capabilities = [ + company_id.to_bytes(3, 'big') for company_id in company_ids + ] + case _: + raise core.InvalidArgumentError( + f"Unsupported capability: {command.capability_id}" + ) self.send_avrcp_response( transaction_label, avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, @@ -2572,6 +2689,121 @@ async def get_playback_status() -> None: self._delegate_command(transaction_label, command, get_playback_status()) + def _on_list_player_application_setting_attributes_command( + self, + transaction_label: int, + command: ListPlayerApplicationSettingAttributesCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def get_supported_player_app_settings() -> None: + supported_settings = await self.delegate.get_supported_player_app_settings() + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + ListPlayerApplicationSettingAttributesResponse( + list(supported_settings.keys()) + ), + ) + + self._delegate_command( + transaction_label, command, get_supported_player_app_settings() + ) + + def _on_list_player_application_setting_values_command( + self, + transaction_label: int, + command: ListPlayerApplicationSettingValuesCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def get_supported_player_app_settings() -> None: + supported_settings = await self.delegate.get_supported_player_app_settings() + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + ListPlayerApplicationSettingValuesResponse( + supported_settings.get(command.attribute, []) + ), + ) + + self._delegate_command( + transaction_label, command, get_supported_player_app_settings() + ) + + def _on_get_current_player_application_setting_value_command( + self, + transaction_label: int, + command: GetCurrentPlayerApplicationSettingValueCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def get_supported_player_app_settings() -> None: + current_settings = await self.delegate.get_current_player_app_settings() + + if not all( + attribute in current_settings for attribute in command.attribute + ): + self.send_not_implemented_avrcp_response( + transaction_label, + PduId.GET_CURRENT_PLAYER_APPLICATION_SETTING_VALUE, + ) + return + + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + GetCurrentPlayerApplicationSettingValueResponse( + attribute=command.attribute, + value=[ + current_settings[attribute] for attribute in command.attribute + ], + ), + ) + + self._delegate_command( + transaction_label, command, get_supported_player_app_settings() + ) + + def _on_set_player_application_setting_value_command( + self, + transaction_label: int, + command: SetPlayerApplicationSettingValueCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def set_player_app_settings() -> None: + for attribute, value in zip(command.attribute, command.value): + await self.delegate.set_player_app_settings(attribute, value) + + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + SetPlayerApplicationSettingValueResponse(), + ) + + self._delegate_command(transaction_label, command, set_player_app_settings()) + + def _on_play_item_command( + self, + transaction_label: int, + command: PlayItemCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def play_item() -> None: + await self.delegate.play_item( + scope=command.scope, uid=command.uid, uid_counter=command.uid_counter + ) + + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + PlayItemResponse(status=StatusCode.OPERATION_COMPLETED), + ) + + self._delegate_command(transaction_label, command, play_item()) + def _on_register_notification_command( self, transaction_label: int, command: RegisterNotificationCommand ) -> None: @@ -2587,26 +2819,51 @@ async def register_notification() -> None: ) return - response: Response - if command.event_id == EventId.VOLUME_CHANGED: - volume = await self.delegate.get_absolute_volume() - response = RegisterNotificationResponse(VolumeChangedEvent(volume)) - elif command.event_id == EventId.PLAYBACK_STATUS_CHANGED: - playback_status = await self.delegate.get_playback_status() - response = RegisterNotificationResponse( - PlaybackStatusChangedEvent(play_status=playback_status) - ) - elif command.event_id == EventId.NOW_PLAYING_CONTENT_CHANGED: - playback_status = await self.delegate.get_playback_status() - response = RegisterNotificationResponse(NowPlayingContentChangedEvent()) - else: - logger.warning("Event supported but not handled %s", command.event_id) - return + event: Event + match command.event_id: + case EventId.VOLUME_CHANGED: + volume = await self.delegate.get_absolute_volume() + event = VolumeChangedEvent(volume) + case EventId.PLAYBACK_STATUS_CHANGED: + playback_status = await self.delegate.get_playback_status() + event = PlaybackStatusChangedEvent(play_status=playback_status) + case EventId.NOW_PLAYING_CONTENT_CHANGED: + event = NowPlayingContentChangedEvent() + case EventId.PLAYER_APPLICATION_SETTING_CHANGED: + settings = await self.delegate.get_current_player_app_settings() + event = PlayerApplicationSettingChangedEvent( + [ + PlayerApplicationSettingChangedEvent.Setting( + attribute, value # type: ignore + ) + for attribute, value in settings.items() + ] + ) + case EventId.AVAILABLE_PLAYERS_CHANGED: + event = AvailablePlayersChangedEvent() + case EventId.ADDRESSED_PLAYER_CHANGED: + event = AddressedPlayerChangedEvent( + AddressedPlayerChangedEvent.Player( + player_id=await self.delegate.get_addressed_player_id(), + uid_counter=await self.delegate.get_uid_counter(), + ) + ) + case EventId.UIDS_CHANGED: + event = UidsChangedEvent(await self.delegate.get_uid_counter()) + case EventId.TRACK_CHANGED: + event = TrackChangedEvent( + await self.delegate.get_current_track_uid() + ) + case _: + logger.warning( + "Event supported but not handled %s", command.event_id + ) + return self.send_avrcp_response( transaction_label, avc.ResponseFrame.ResponseCode.INTERIM, - response, + RegisterNotificationResponse(event), ) self._register_notification_listener(transaction_label, command) diff --git a/examples/run_avrcp.py b/examples/run_avrcp.py index 13b34d53..a05361e3 100644 --- a/examples/run_avrcp.py +++ b/examples/run_avrcp.py @@ -133,10 +133,10 @@ async def get_supported_events() -> None: utils.AsyncRunner.spawn(get_supported_events()) async def monitor_track_changed() -> None: - async for identifier in avrcp_protocol.monitor_track_changed(): - print("TRACK CHANGED:", identifier.hex()) + async for uid in avrcp_protocol.monitor_track_changed(): + print("TRACK CHANGED:", hex(uid)) websocket_server.send_message( - {"type": "track-changed", "params": {"identifier": identifier.hex()}} + {"type": "track-changed", "params": {"identifier": hex(uid)}} ) async def monitor_playback_status() -> None: diff --git a/tests/avrcp_test.py b/tests/avrcp_test.py index 755ff179..1c50f7f9 100644 --- a/tests/avrcp_test.py +++ b/tests/avrcp_test.py @@ -20,6 +20,7 @@ import asyncio import struct from collections.abc import Sequence +from unittest import mock import pytest @@ -118,8 +119,6 @@ async def create_with_avdtp(cls) -> TwoDevices: scope=avrcp.Scope.NOW_PLAYING, uid=0, uid_counter=1, - start_item=0, - end_item=0, attributes=[avrcp.MediaAttributeId.DEFAULT_COVER_ART], ), avrcp.GetTotalNumberOfItemsCommand(scope=avrcp.Scope.NOW_PLAYING), @@ -136,7 +135,7 @@ def test_command(command: avrcp.Command): "event,", [ avrcp.UidsChangedEvent(uid_counter=7), - avrcp.TrackChangedEvent(identifier=b'12356'), + avrcp.TrackChangedEvent(uid=12356), avrcp.VolumeChangedEvent(volume=9), avrcp.PlaybackStatusChangedEvent(play_status=avrcp.PlayStatus.PLAYING), avrcp.AddressedPlayerChangedEvent( @@ -581,6 +580,87 @@ async def test_get_supported_company_ids(): assert supported_company_ids == [avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID] +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_list_player_application_settings(): + two_devices: TwoDevices = await TwoDevices.create_with_avdtp() + + expected_settings = { + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: [ + avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, + avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT, + avrcp.ApplicationSetting.RepeatModeStatus.SINGLE_TRACK_REPEAT, + avrcp.ApplicationSetting.RepeatModeStatus.OFF, + ], + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: [ + avrcp.ApplicationSetting.ShuffleOnOffStatus.OFF, + avrcp.ApplicationSetting.ShuffleOnOffStatus.ALL_TRACKS_SHUFFLE, + avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE, + ], + } + two_devices.protocols[1].delegate = avrcp.Delegate( + supported_player_app_settings=expected_settings + ) + actual_settings = await two_devices.protocols[ + 0 + ].list_supported_player_app_settings() + assert actual_settings == expected_settings + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_get_set_player_app_settings(): + two_devices: TwoDevices = await TwoDevices.create_with_avdtp() + + delegate = two_devices.protocols[1].delegate + await two_devices.protocols[0].send_avrcp_command( + avc.CommandFrame.CommandType.CONTROL, + avrcp.SetPlayerApplicationSettingValueCommand( + attribute=[ + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF, + ], + value=[ + avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, + avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE, + ], + ), + ) + expected_settings = { + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE, + } + assert delegate.player_app_settings == expected_settings + + actual_settings = await two_devices.protocols[0].get_player_app_settings( + [ + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF, + ] + ) + assert actual_settings == expected_settings + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_play_item(): + two_devices: TwoDevices = await TwoDevices.create_with_avdtp() + + delegate = two_devices.protocols[1].delegate + + with mock.patch.object(delegate, delegate.play_item.__name__) as play_item_mock: + await two_devices.protocols[0].send_avrcp_command( + avc.CommandFrame.CommandType.CONTROL, + avrcp.PlayItemCommand( + scope=avrcp.Scope.MEDIA_PLAYER_LIST, uid=0, uid_counter=1 + ), + ) + + play_item_mock.assert_called_once_with( + scope=avrcp.Scope.MEDIA_PLAYER_LIST, uid=0, uid_counter=1 + ) + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_monitor_volume(): @@ -635,6 +715,102 @@ async def test_monitor_now_playing_content(): await anext(now_playing_iter) +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_track_changed(): + two_devices = await TwoDevices.create_with_avdtp() + + delegate = two_devices.protocols[1].delegate = avrcp.Delegate( + [avrcp.EventId.TRACK_CHANGED] + ) + delegate.current_track_uid = avrcp.TrackChangedEvent.NO_TRACK + track_iter = two_devices.protocols[0].monitor_track_changed() + + # Interim + assert (await anext(track_iter)) == avrcp.TrackChangedEvent.NO_TRACK + # Changed + two_devices.protocols[1].notify_track_changed(1) + assert (await anext(track_iter)) == 1 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_uid_changed(): + two_devices = await TwoDevices.create_with_avdtp() + + delegate = two_devices.protocols[1].delegate = avrcp.Delegate( + [avrcp.EventId.UIDS_CHANGED] + ) + delegate.uid_counter = 0 + uid_iter = two_devices.protocols[0].monitor_uids() + + # Interim + assert (await anext(uid_iter)) == 0 + # Changed + two_devices.protocols[1].notify_uids_changed(1) + assert (await anext(uid_iter)) == 1 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_addressed_player(): + two_devices = await TwoDevices.create_with_avdtp() + + delegate = two_devices.protocols[1].delegate = avrcp.Delegate( + [avrcp.EventId.ADDRESSED_PLAYER_CHANGED] + ) + delegate.uid_counter = 0 + delegate.addressed_player_id = 0 + addressed_player_iter = two_devices.protocols[0].monitor_addressed_player() + + # Interim + assert ( + await anext(addressed_player_iter) + ) == avrcp.AddressedPlayerChangedEvent.Player(player_id=0, uid_counter=0) + # Changed + two_devices.protocols[1].notify_addressed_player_changed( + avrcp.AddressedPlayerChangedEvent.Player(player_id=1, uid_counter=1) + ) + assert ( + await anext(addressed_player_iter) + ) == avrcp.AddressedPlayerChangedEvent.Player(player_id=1, uid_counter=1) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_player_app_settings(): + two_devices = await TwoDevices.create_with_avdtp() + + delegate = two_devices.protocols[1].delegate = avrcp.Delegate( + supported_events=[avrcp.EventId.PLAYER_APPLICATION_SETTING_CHANGED] + ) + delegate.player_app_settings = { + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT + } + settings_iter = two_devices.protocols[0].monitor_player_application_settings() + + # Interim + interim = await anext(settings_iter) + assert interim[0].attribute_id == avrcp.ApplicationSetting.AttributeId.REPEAT_MODE + assert ( + interim[0].value_id + == avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT + ) + + # Changed + two_devices.protocols[1].notify_player_application_settings_changed( + [ + avrcp.PlayerApplicationSettingChangedEvent.Setting( + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT, + ) + ] + ) + changed = await anext(settings_iter) + assert changed[0].attribute_id == avrcp.ApplicationSetting.AttributeId.REPEAT_MODE + assert changed[0].value_id == avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_frame_parser()