From 5682eebaac96d6de5191b6341a21d6faae013a75 Mon Sep 17 00:00:00 2001 From: UnexDev <26cgould2@gmail.com> Date: Thu, 7 Mar 2024 10:46:17 -0500 Subject: [PATCH 1/4] Init BLE driver --- cflib/crtp/__init__.py | 3 ++- cflib/crtp/bledriver.py | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 cflib/crtp/bledriver.py diff --git a/cflib/crtp/__init__.py b/cflib/crtp/__init__.py index 756e9a66c..8d49126c9 100644 --- a/cflib/crtp/__init__.py +++ b/cflib/crtp/__init__.py @@ -33,6 +33,7 @@ from .tcpdriver import TcpDriver from .udpdriver import UdpDriver from .usbdriver import UsbDriver +from .bledriver import BLEDriver __author__ = 'Bitcraze AB' __all__ = [] @@ -59,7 +60,7 @@ def init_drivers(enable_debug_driver=False, enable_serial_driver=False): if enable_serial_driver: CLASSES.append(SerialDriver) - CLASSES.extend([UdpDriver, PrrtDriver, TcpDriver]) + CLASSES.extend([UdpDriver, PrrtDriver, TcpDriver, BLEDriver]) def scan_interfaces(address=None): diff --git a/cflib/crtp/bledriver.py b/cflib/crtp/bledriver.py new file mode 100644 index 000000000..43434ddcd --- /dev/null +++ b/cflib/crtp/bledriver.py @@ -0,0 +1,41 @@ +""" +CRTP BLE driver. +""" + +from .crtpdriver import CRTPDriver +from .crtpstack import CRTPPacket + +__author__ = 'UnexDev' +__all__ = ['CRTPDriver'] + + +class BLEDriver: + def __init__(self): + pass + + def connect(self, uri: str, link_quality_callback, link_error_callback): + pass + + def send_packet(self, pk: CRTPPacket): + pass + + def receive_packet(self, wait=0): + pass + + def get_status(self): + pass + + def get_name(self): + pass + + def scan_interface(self, address: str=None): + pass + + def enum(self): + pass + + def get_help(self): + pass + + def close(self): + pass From dd559aa8e19aab9c84170a0c00b99c658d6784af Mon Sep 17 00:00:00 2001 From: UnexDev <26cgould2@gmail.com> Date: Thu, 7 Mar 2024 11:01:13 -0500 Subject: [PATCH 2/4] Partial connect logic and async disclaimer. --- cflib/crtp/bledriver.py | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/cflib/crtp/bledriver.py b/cflib/crtp/bledriver.py index 43434ddcd..ec609be0c 100644 --- a/cflib/crtp/bledriver.py +++ b/cflib/crtp/bledriver.py @@ -2,19 +2,45 @@ CRTP BLE driver. """ +import platform +import re +import asyncio + +from bleak import BleakClient as BLEClient, BleakScanner as BLEScanner + from .crtpdriver import CRTPDriver from .crtpstack import CRTPPacket - +from .exceptions import WrongUriType __author__ = 'UnexDev' __all__ = ['CRTPDriver'] class BLEDriver: - def __init__(self): - pass + """ + Driver to interface with a CRTP-capable device over Bluetooth Low Energy (BLE). + """ + address: str + client: BLEClient + def __init__(self, loop: asyncio.AbstractEventLoop): + self.address = '' - def connect(self, uri: str, link_quality_callback, link_error_callback): - pass + async def connect(self, uri: str, link_quality_callback, link_error_callback): + if not uri.startswith('ble://'): + raise WrongUriType('Not a BLE URI') + + address = uri.removeprefix('ble://') + + is_valid_address = False + if platform.platform() == 'Darwin' and re.fullmatch(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', address): + is_valid_address = True + else: + if re.fullmatch(r'([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2})'): + is_valid_address = True + + if not is_valid_address: + raise WrongUriType('Not a BLE address') + + self.address = address def send_packet(self, pk: CRTPPacket): pass From d3418e7189d45a69202937a739c779e6f725d33b Mon Sep 17 00:00:00 2001 From: UnexDev <26cgould2@gmail.com> Date: Fri, 8 Mar 2024 09:02:47 -0500 Subject: [PATCH 3/4] Connect logic & partial sending. --- cflib/crtp/bledriver.py | 54 +++++++++++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/cflib/crtp/bledriver.py b/cflib/crtp/bledriver.py index ec609be0c..5066f8537 100644 --- a/cflib/crtp/bledriver.py +++ b/cflib/crtp/bledriver.py @@ -8,22 +8,32 @@ from bleak import BleakClient as BLEClient, BleakScanner as BLEScanner -from .crtpdriver import CRTPDriver -from .crtpstack import CRTPPacket -from .exceptions import WrongUriType +from crtpdriver import CRTPDriver +from crtpstack import CRTPPacket +from exceptions import WrongUriType __author__ = 'UnexDev' __all__ = ['CRTPDriver'] +SERVICE_UUID = '00000201-1C7F-4F9E-947B-43B7C00A9A08' +CRTPUP_UUID = '00000203-1C7F-4F9E-947B-43B7C00A9A08' +CRTPDOWN_UUID = '00000204-1C7F-4F9E-947B-43B7C00A9A08' + class BLEDriver: """ Driver to interface with a CRTP-capable device over Bluetooth Low Energy (BLE). + The BLE driver is asynchronous by nature, and is designed to be used with the `asyncio` package. + To use the BLE driver, you must call `asyncio.run(main())`, where `main` is the name of your main function. + This will allow you to mark the `main` function as `async`, thus allowing you to use the `await` keyword on methods in this class. """ address: str client: BLEClient + packet_id: int + def __init__(self, loop: asyncio.AbstractEventLoop): self.address = '' - + self.packet_id = 0, + async def connect(self, uri: str, link_quality_callback, link_error_callback): if not uri.startswith('ble://'): raise WrongUriType('Not a BLE URI') @@ -34,7 +44,7 @@ async def connect(self, uri: str, link_quality_callback, link_error_callback): if platform.platform() == 'Darwin' and re.fullmatch(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', address): is_valid_address = True else: - if re.fullmatch(r'([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2})'): + if re.fullmatch(r'([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2})', address): is_valid_address = True if not is_valid_address: @@ -42,8 +52,22 @@ async def connect(self, uri: str, link_quality_callback, link_error_callback): self.address = address - def send_packet(self, pk: CRTPPacket): - pass + self.client = BLEClient(address=self.address) + await self.client.connect() + + + async def send_packet(self, pk: CRTPPacket): + data1: bytearray + data2: bytearray | None = None + + + if (pk.size > 19): + + if len(pk._get_data()[19:]) > 19: + raise ValueError('Packet too large to send. Only packets up to 48 bytes in length are supported') + else: + data1 = pk._get_data() + def receive_packet(self, wait=0): pass @@ -65,3 +89,19 @@ def get_help(self): def close(self): pass + + @staticmethod + def _split_data(pk_data: bytearray) -> list[bytearray]: + datas: list[bytearray] = [] + + if len(pk_data > 19): + datas.append(pk_data[:19]) + datas = datas + BLEDriver._split_data(pk_data[19:]) + else: + datas.append(pk_data) + + return datas + + +def dec_to_bin(n): + return int(bin(n).replace("0b", "")) \ No newline at end of file From b520e8728b9add84e924890cc3c768f69bcd8f0a Mon Sep 17 00:00:00 2001 From: UnexDev <26cgould2@gmail.com> Date: Sat, 9 Mar 2024 23:44:31 -0500 Subject: [PATCH 4/4] Very soft impl of send/receive. --- cflib/crtp/bledriver.py | 145 ++++++++++++++++++++++++---------------- 1 file changed, 89 insertions(+), 56 deletions(-) diff --git a/cflib/crtp/bledriver.py b/cflib/crtp/bledriver.py index 5066f8537..a7062b288 100644 --- a/cflib/crtp/bledriver.py +++ b/cflib/crtp/bledriver.py @@ -1,107 +1,140 @@ """ CRTP BLE driver. """ +from __future__ import annotations import platform import re import asyncio from bleak import BleakClient as BLEClient, BleakScanner as BLEScanner +from bleak.backends.device import BLEDevice from crtpdriver import CRTPDriver from crtpstack import CRTPPacket + from exceptions import WrongUriType + + + __author__ = 'UnexDev' -__all__ = ['CRTPDriver'] +__all__ = ['BLEDriver'] -SERVICE_UUID = '00000201-1C7F-4F9E-947B-43B7C00A9A08' +CRTP_SERVICE_UUID = '00000201-1C7F-4F9E-947B-43B7C00A9A08' +CRTP_UUID = '00000202-1C7F-4F9E-947B-43B7C00A9A08' CRTPUP_UUID = '00000203-1C7F-4F9E-947B-43B7C00A9A08' CRTPDOWN_UUID = '00000204-1C7F-4F9E-947B-43B7C00A9A08' -class BLEDriver: +class BLEDriver(CRTPDriver): """ Driver to interface with a CRTP-capable device over Bluetooth Low Energy (BLE). + The BLE driver is asynchronous by nature, and is designed to be used with the `asyncio` package. To use the BLE driver, you must call `asyncio.run(main())`, where `main` is the name of your main function. This will allow you to mark the `main` function as `async`, thus allowing you to use the `await` keyword on methods in this class. """ - address: str + client: BLEClient - packet_id: int - def __init__(self, loop: asyncio.AbstractEventLoop): - self.address = '' - self.packet_id = 0, + def __init__(self): + # self.packet_id = 0 # For use with CRTPUP/DOWN. + pass + + @staticmethod + def parse_uri(uri: str) -> tuple[str, int] | None: + regex: str = r'ble:\/\/([0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2})\?connect_timeout=([0-9]+)' + if platform.platform().startswith('macOS'): + # MacOS uses random UUIDs instead of exposing the BT address of the device. + regex = r'ble:\/\/([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})\?connect_timeout=([0-9]+)' + print(regex) + result = re.fullmatch(regex, uri, re.RegexFlag.I) + if result == None: return None + + groups = result.groups() + if len(groups) != 2: return None # If not all params were provided. + + return ((groups[0]), int(groups[1])) async def connect(self, uri: str, link_quality_callback, link_error_callback): - if not uri.startswith('ble://'): - raise WrongUriType('Not a BLE URI') - - address = uri.removeprefix('ble://') - - is_valid_address = False - if platform.platform() == 'Darwin' and re.fullmatch(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', address): - is_valid_address = True - else: - if re.fullmatch(r'([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2}):([0-9A-F]{2})', address): - is_valid_address = True - - if not is_valid_address: - raise WrongUriType('Not a BLE address') - - self.address = address + _uri = BLEDriver.parse_uri(uri) + if _uri == None: raise WrongUriType(f'Invalid BLE URI: {uri}') + (address, timeout) = _uri - self.client = BLEClient(address=self.address) + self.client = BLEClient(address_or_ble_device=address, timeout=timeout, services=[CRTP_SERVICE_UUID]) # Used to interface with the device. await self.client.connect() + if self.client.is_connected: link_quality_callback(100) + else: link_error_callback() async def send_packet(self, pk: CRTPPacket): - data1: bytearray - data2: bytearray | None = None + if (pk.size > 20): + # TODO: Can use CRTPUP and CRTPDOWN characteristics to send larger than 20 bytes packets. + raise OverflowError('Sending packets greater than 20 bytes is currently not supported. Try a different driver.') + + if not self.client.is_connected: + raise RuntimeError('Not connected to Crazyflie.') + packet = bytearray([pk.get_header()]) + pk._get_data() + await self.client.write_gatt_char(CRTP_UUID, packet, False) + - if (pk.size > 19): - - if len(pk._get_data()[19:]) > 19: - raise ValueError('Packet too large to send. Only packets up to 48 bytes in length are supported') - else: - data1 = pk._get_data() + async def receive_packet(self, wait=2): + return await self.client.read_gatt_char(CRTP_UUID) + # if wait == 0: + # try: return ... + # except: return None + # elif wait == -1: + # try: return ... + # except: return None + # else: + # try: return ... + # except: return None - def receive_packet(self, wait=0): - pass def get_status(self): - pass + raise NotImplementedError() def get_name(self): - pass + return 'Bluetooth Low Energy (BLE)' + + async def scan_interface(self, address: str=None): + """ + Returns an async generator of devices. + """ + async with BLEScanner() as scanner: + async for (device, ad) in scanner.advertisement_data(): + if ad.service_uuids[0] != CRTP_SERVICE_UUID: continue + elif address == device.address: yield device + elif address == None: yield device - def scan_interface(self, address: str=None): - pass def enum(self): - pass + raise NotImplementedError() def get_help(self): - pass + raise NotImplementedError() - def close(self): - pass + async def close(self): + await self.client.disconnect() - @staticmethod - def _split_data(pk_data: bytearray) -> list[bytearray]: - datas: list[bytearray] = [] - - if len(pk_data > 19): - datas.append(pk_data[:19]) - datas = datas + BLEDriver._split_data(pk_data[19:]) - else: - datas.append(pk_data) - - return datas +# Control byte not needed for the CRTP characteristic; if we plan to support packets with a length > 20 bytes, +# we need to implement the CRTPUP and CRTPDOWN characteristics, which require a control byte. +class ControlByte: + raw: int + + def __init__(self, start: bool, pid: int, length: int) -> None: + self.raw = (0x80 if start else 0x00) | ((pid & 0x03) << 5) | ((length - 1) & 0x1f) + + @property + def pid(self) -> int: + return ((self.raw & 0b0110_0000) >> 5) + @property + def start(self) -> bool: + return ((self.raw & 0x80) != 0) -def dec_to_bin(n): - return int(bin(n).replace("0b", "")) \ No newline at end of file + @property + def length(self) -> int: + return (self.raw & 0b0001_1111) + 1