最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - BLE connection issues with bleak using 'just works' pairing - Stack Overflow

programmeradmin2浏览0评论

I have an embedded device which uses a nordic-semi MCU and firmware I wrote using their Zephyr sdk. It uses 'just-works' pairing. When I click a button on the device, it begins advertising with an accept list. This works as expected and pairs with the windows laptop i'm using.

My goal is to robustly connect this to a python GUI which uses BLE communication handled by the python bleak library. The main difficulty here is that bleak does not support determining when a device is paired or not. To resolve this, I just reread an attribute 10 times before giving up. The pairing is handled by the operating system, but the GUI remains open while this occurs allowing the user 20 seconds before they must click the connect button in the GUI again.

The flow is as-follows:

  1. Power-on embedded device & start the GUI
  2. Click the button on the embedded device to start BLE advertising
  3. Click connect on the GUI to initiate a connection attempt
  4. Pair via windows settings when the OS popup occurs within 20 seconds (this step is skipped if the device is already paired)
  5. Paired connection established

Everything works well except when I power-off the device while the GUI is still running. When I plug it back in and click the button, the device automatically pairs & connects with the windows laptop. This re-connection is seemingly completely hidden from bleak. No attempt to use a new thread or manually call disconnect seems to work. The only fix is closing the GUI program and restarting it, which I don't want to do.

When this occurs, it also seems to resend the notification enable message which I send immediately after connection. However, no program prints occurs. This is weird to me.

On the other hand, if I click the button to start advertising again after the GUI is closed, the device will remain in advertising mode and make no connection with windows despite being paired. This is correct behavior.

Does anyone have ideas on why this occurs and if there's a workaround for it using bleak? In summary, the device automatically pairs with windows after an initial connection has been established when running the GUI and this connection is invisible to the bleak program.

Here's a stripped-down version of the BLE class i've made:

@dataclass
class ConnectionInfo:
    device_address: str = "None"
    _connection_status: ConnectionStatus = field(
        default=ConnectionStatus.IDLE, init=False
    )
    num_attribute_reads: int = 0
    num_attribute_writes: int = 0
    num_notifs_received: int = 0

    callback: Optional[Callable[["ConnectionInfo"], None]] = None

    # callback stuff for status changes

class BLEDevice:
    DEVICE_NAME = "Device"

    SOME_CHARACTERISTIC_UUID = "stuff"
    NOTIFICATION_UUID = "stuff"

    def __init__(self, info_cb, data_cb, cam_complete_cb):
        self.info = ConnectionInfo(callback=info_cb)
        self.client = None
        self._running = True
        self.attempt_connection = False

        self.data_cb = data_cb

    def stop(self) -> None:
        self._running = False

    def connect(self) -> None:
        self.attempt_connection = True

    async def run(self) -> None:
        self.info.connection_status = ConnectionStatus.IDLE
        while self._running:
            if self.attempt_connection:
                self.attempt_connection = False
                print("Attempting connection...")

                self.info.connection_status = ConnectionStatus.SEARCHING
                device = await BleakScanner.find_device_by_name(
                    BLEDevice.DEVICE_NAME, timeout=2
                )

                if device:
                    print(f"Found device: {device.address}")
                    self.info.connection_status = ConnectionStatus.CONNECTING
                    if await self._connect_device(device):
                        await self._enable_data_notifications()  # Writing these is necessary to receive status updates & data
                        self.info.device_address = device.address
                        self.info.connection_status = ConnectionStatus.CONNECTED
                    else:
                        self.info.connection_status = ConnectionStatus.DISCONNECTED
                else:
                    self.info.connection_status = ConnectionStatus.NO_DEVICE_FOUND
                    self.info.device_address = "None"
                    print("No device found")

            await asyncio.sleep(2)

    async def _connect_device(self, device: BLEDevice) -> None:
 
        self.client = BleakClient(device, self._disconnected_callback)
        print("Connecting...")
        await self.client.connect()
        attempts = 0
        while True:
            pair_success = await self._attempt_read()

            if pair_success:
                print(f"Connected: {device.address}")
                return True

            attempts += 1

            if attempts >= 10:
                return False

            print("Retrying to read attribute...")
            await asyncio.sleep(2)

    def _disconnected_callback(self, _) -> None:
        self.info.connection_status = ConnectionStatus.DISCONNECTED
        print("Disconnect event")

    async def _attempt_read(self) -> bool:
        """Jank fix. Reads from a characteristic to verify if pairing finished"""
        try:
            await self.client.read_gatt_char(
                BLEDevice.SOME_CHARACTERISTIC_UUID
            )
            return True
        except Exception as e:
            return False

    def _data_cb(self, sender, data) -> None:
        # stuff

    async def _enable_data_notifications(self) -> None:
        await self.client.start_notify(
            BLEDevice.NOTIFICATION_UUID, self._data_cb
        )
        self.info.num_attribute_writes += 1
发布评论

评论列表(0)

  1. 暂无评论