|
23 | 23 | )
|
24 | 24 | from bumble.device import Device
|
25 | 25 | from bumble.hci import Address, HCI_Write_Page_Timeout_Command
|
| 26 | +from bumble.l2cap import ClassicChannel, L2CAP_Connection_Response |
26 | 27 | from bumble.sdp import (
|
27 | 28 | SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
28 | 29 | SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
|
@@ -888,6 +889,59 @@ async def sdp_sa_discover_multiple_records(hci_port, shell, dut, address) -> Non
|
888 | 889 | assert found is True
|
889 | 890 |
|
890 | 891 |
|
| 892 | +async def sdp_ssa_discover_fail(hci_port, shell, dut, address) -> None: |
| 893 | + def on_app_connection_request(self, request) -> None: |
| 894 | + logger.info('Force L2CAP connection failure') |
| 895 | + self.destination_cid = request.source_cid |
| 896 | + self.send_control_frame( |
| 897 | + L2CAP_Connection_Response( |
| 898 | + identifier=request.identifier, |
| 899 | + destination_cid=self.source_cid, |
| 900 | + source_cid=self.destination_cid, |
| 901 | + result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, |
| 902 | + status=0x0000, |
| 903 | + ) |
| 904 | + ) |
| 905 | + |
| 906 | + # Save the origin method |
| 907 | + on_connection_request = ClassicChannel.on_connection_request |
| 908 | + # Replace the origin method with a new one to force L2CAP connection failure |
| 909 | + ClassicChannel.on_connection_request = on_app_connection_request |
| 910 | + |
| 911 | + logger.info('<<< SDP Discovery ...') |
| 912 | + async with await open_transport_or_link(hci_port) as hci_transport: |
| 913 | + device = Device.with_hci( |
| 914 | + 'Bumble', |
| 915 | + Address('F0:F1:F2:F3:F4:F5'), |
| 916 | + hci_transport.source, |
| 917 | + hci_transport.sink, |
| 918 | + ) |
| 919 | + device.classic_enabled = True |
| 920 | + device.le_enabled = False |
| 921 | + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: |
| 922 | + device.host.snooper = BtSnooper(snoop_file) |
| 923 | + await device_power_on(device) |
| 924 | + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) |
| 925 | + |
| 926 | + target_address = address.split(" ")[0] |
| 927 | + logger.info(f'=== Connecting to {target_address}...') |
| 928 | + try: |
| 929 | + connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) |
| 930 | + logger.info(f'=== Connected to {connection.peer_address}!') |
| 931 | + except Exception as e: |
| 932 | + logger.error(f'Fail to connect to {target_address}!') |
| 933 | + raise e |
| 934 | + |
| 935 | + # Discover SDP Record |
| 936 | + shell.exec_command("sdp_client ssa_discovery_fail") |
| 937 | + found, lines = await wait_for_shell_response(dut, "test pass") |
| 938 | + logger.info(f'{lines}') |
| 939 | + assert found is True |
| 940 | + |
| 941 | + # Restore the origin method |
| 942 | + ClassicChannel.on_connection_request = on_connection_request |
| 943 | + |
| 944 | + |
891 | 945 | class TestSdpServer:
|
892 | 946 | def test_sdp_ssa_discover_no_record(self, shell: Shell, dut: DeviceAdapter, sdp_client_dut):
|
893 | 947 | """Test case to request SDP records. No SDP record registered."""
|
@@ -966,3 +1020,9 @@ def test_sdp_sa_discover_multiple_records(
|
966 | 1020 | logger.info(f'test_sdp_sa_discover_multiple_records {sdp_client_dut}')
|
967 | 1021 | hci, iut_address = sdp_client_dut
|
968 | 1022 | asyncio.run(sdp_sa_discover_multiple_records(hci, shell, dut, iut_address))
|
| 1023 | + |
| 1024 | + def test_sdp_ssa_discover_fail(self, shell: Shell, dut: DeviceAdapter, sdp_client_dut): |
| 1025 | + """Test case to request SDP records. but the L2CAP connecting fail.""" |
| 1026 | + logger.info(f'test_sdp_ssa_discover_fail {sdp_client_dut}') |
| 1027 | + hci, iut_address = sdp_client_dut |
| 1028 | + asyncio.run(sdp_ssa_discover_fail(hci, shell, dut, iut_address)) |
0 commit comments