I have an embedded device which uses a nordic-semi MCU and firmware I wrote using their Zephyr sdk. It uses 'just-works' pairing. When I click a button on the device, it begins advertising with an accept list. This works as expected and pairs with the windows laptop i'm using.
My goal is to robustly connect this to a python GUI which uses BLE communication handled by the python bleak library. The main difficulty here is that bleak does not support determining when a device is paired or not. To resolve this, I just reread an attribute 10 times before giving up. The pairing is handled by the operating system, but the GUI remains open while this occurs allowing the user 20 seconds before they must click the connect button in the GUI again.
The flow is as-follows:
- Power-on embedded device & start the GUI
- Click the button on the embedded device to start BLE advertising
- Click connect on the GUI to initiate a connection attempt
- Pair via windows settings when the OS popup occurs within 20 seconds (this step is skipped if the device is already paired)
- Paired connection established
Everything works well except when I power-off the device while the GUI is still running. When I plug it back in and click the button, the device automatically pairs & connects with the windows laptop. This re-connection is seemingly completely hidden from bleak. No attempt to use a new thread or manually call disconnect seems to work. The only fix is closing the GUI program and restarting it, which I don't want to do.
When this occurs, it also seems to resend the notification enable message which I send immediately after connection. However, no program prints occurs. This is weird to me.
On the other hand, if I click the button to start advertising again after the GUI is closed, the device will remain in advertising mode and make no connection with windows despite being paired. This is correct behavior.
Does anyone have ideas on why this occurs and if there's a workaround for it using bleak? In summary, the device automatically pairs with windows after an initial connection has been established when running the GUI and this connection is invisible to the bleak program.
Here's a stripped-down version of the BLE class i've made:
@dataclass
class ConnectionInfo:
device_address: str = "None"
_connection_status: ConnectionStatus = field(
default=ConnectionStatus.IDLE, init=False
)
num_attribute_reads: int = 0
num_attribute_writes: int = 0
num_notifs_received: int = 0
callback: Optional[Callable[["ConnectionInfo"], None]] = None
# callback stuff for status changes
class BLEDevice:
DEVICE_NAME = "Device"
SOME_CHARACTERISTIC_UUID = "stuff"
NOTIFICATION_UUID = "stuff"
def __init__(self, info_cb, data_cb, cam_complete_cb):
self.info = ConnectionInfo(callback=info_cb)
self.client = None
self._running = True
self.attempt_connection = False
self.data_cb = data_cb
def stop(self) -> None:
self._running = False
def connect(self) -> None:
self.attempt_connection = True
async def run(self) -> None:
self.info.connection_status = ConnectionStatus.IDLE
while self._running:
if self.attempt_connection:
self.attempt_connection = False
print("Attempting connection...")
self.info.connection_status = ConnectionStatus.SEARCHING
device = await BleakScanner.find_device_by_name(
BLEDevice.DEVICE_NAME, timeout=2
)
if device:
print(f"Found device: {device.address}")
self.info.connection_status = ConnectionStatus.CONNECTING
if await self._connect_device(device):
await self._enable_data_notifications() # Writing these is necessary to receive status updates & data
self.info.device_address = device.address
self.info.connection_status = ConnectionStatus.CONNECTED
else:
self.info.connection_status = ConnectionStatus.DISCONNECTED
else:
self.info.connection_status = ConnectionStatus.NO_DEVICE_FOUND
self.info.device_address = "None"
print("No device found")
await asyncio.sleep(2)
async def _connect_device(self, device: BLEDevice) -> None:
self.client = BleakClient(device, self._disconnected_callback)
print("Connecting...")
await self.client.connect()
attempts = 0
while True:
pair_success = await self._attempt_read()
if pair_success:
print(f"Connected: {device.address}")
return True
attempts += 1
if attempts >= 10:
return False
print("Retrying to read attribute...")
await asyncio.sleep(2)
def _disconnected_callback(self, _) -> None:
self.info.connection_status = ConnectionStatus.DISCONNECTED
print("Disconnect event")
async def _attempt_read(self) -> bool:
"""Jank fix. Reads from a characteristic to verify if pairing finished"""
try:
await self.client.read_gatt_char(
BLEDevice.SOME_CHARACTERISTIC_UUID
)
return True
except Exception as e:
return False
def _data_cb(self, sender, data) -> None:
# stuff
async def _enable_data_notifications(self) -> None:
await self.client.start_notify(
BLEDevice.NOTIFICATION_UUID, self._data_cb
)
self.info.num_attribute_writes += 1