最小 Python 脚本:使用 Python (Bleak) 列出和读取 BLE 设备特征值

此脚本使用 Python 的 Bleak 库列出蓝牙低功耗 (BLE) 设备并读取其特征值。它是一个最小示例,用于演示如何连接 BLE 设备并读取其特征值。

connect_ble_device.py
#!/usr/bin/env python3
"""
BLE 设备连接和服务浏览器

此脚本通过 MAC 地址连接到特定的 BLE 设备,并列出所有
可用的服务及其特征值/属性。

要求:
    - bleak 库:pip install bleak

用法:
    python connect_ble_device.py [MAC_ADDRESS]

示例:
    python connect_ble_device.py 24:EC:4A:76:00:32
"""

import asyncio
import sys
import argparse
from bleak import BleakClient
from bleak.exc import BleakError
from datetime import datetime

async def explore_device_services(client, show_descriptors=False):
    """
    探索已连接 BLE 设备的所有服务和特征值。

    参数:
        client (BleakClient): 已连接的 BLE 客户端
    """
    try:
        # 获取所有服务作为列表
        services = list(client.services)
        if not services:
            print("No services found on this device.")
            return

        print(f"Found {len(services)} service(s):")
        print("=" * 80)

        for service in services:
            print(f"\nService: {service.uuid}")
            print(f"Description: {service.description}")
            print(f"Handle: {service.handle}")

            # 获取此服务的特征值
            characteristics = service.characteristics

            if characteristics:
                print(f"  Characteristics ({len(characteristics)}):")
                print("  " + "-" * 76)

                for char in characteristics:
                    print(f"    UUID: {char.uuid}")
                    print(f"    Description: {char.description}")
                    print(f"    Handle: {char.handle}")
                    print(f"    Properties: {', '.join(char.properties)}")

                    # 如果特征值可读,尝试读取
                    if "read" in char.properties:
                        try:
                            value = await client.read_gatt_char(char.uuid)
                            # 尝试解码为字符串,否则显示为十六进制
                            try:
                                decoded_value = value.decode('utf-8')
                                print(f"    Value (string): {decoded_value}")
                            except UnicodeDecodeError:
                                hex_value = ' '.join(f'{b:02x}' for b in value)
                                print(f"    Value (hex): {hex_value}")
                                print(f"    Value (raw bytes): {value}")
                        except Exception as e:
                            print(f"    Value: <Could not read - {e}>")

                    # 仅在请求时显示描述符
                    if show_descriptors:
                        descriptors = char.descriptors
                        if descriptors:
                            print(f"    Descriptors ({len(descriptors)}):")
                            for desc in descriptors:
                                print(f"      UUID: {desc.uuid}")
                                print(f"      Description: {desc.description}")
                                print(f"      Handle: {desc.handle}")
                                # 尝试读取描述符(如果可能)
                                try:
                                    desc_value = await client.read_gatt_descriptor(desc.handle)
                                    try:
                                        decoded_desc = desc_value.decode('utf-8')
                                        print(f"      Value (string): {decoded_desc}")
                                    except UnicodeDecodeError:
                                        hex_desc = ' '.join(f'{b:02x}' for b in desc_value)
                                        print(f"      Value (hex): {hex_desc}")
                                except Exception as e:
                                    print(f"      Value: <Could not read - {e}>")
                            print()  # 特征值之间的空行
                    else:
                        print()
            else:
                print("    No characteristics found for this service.")

            print("-" * 80)

    except Exception as e:
        print(f"Error exploring services: {e}")


async def connect_and_explore(mac_address, show_descriptors=False):
    """
    连接到 BLE 设备并探索其服务。

    参数:
        mac_address (str): 要连接的设备的 MAC 地址
        scan_time (int): 扫描设备的持续时间
    """
    print(f"\nAttempting to connect to {mac_address} ...")
    try:
        async with BleakClient(mac_address) as client:
            if client.is_connected:
                print(f"Successfully connected to {mac_address}")
                print(f"Connected at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
                print()
                # 探索所有服务和特征值
                await explore_device_services(client, show_descriptors=show_descriptors)
                print(f"\nDisconnected from {mac_address}")
                return True
            else:
                print(f"Failed to connect to {mac_address}")
                return False
    except BleakError as e:
        print(f"Bluetooth error: {e}")
        return False
    except Exception as e:
        print(f"Unexpected error: {e}")
        return False


async def main():
    """
    运行 BLE 设备连接器和浏览器的主函数。
    """
    print("BLE Device Connection and Service Explorer")
    print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print()

    parser = argparse.ArgumentParser(description="Connect to a BLE device and list all services and attributes.")
    parser.add_argument("mac_address", nargs="?", default="24:EC:4A:76:00:32", help="MAC address of the BLE device (default: 24:EC:4A:76:00:32)")
    parser.add_argument("-d", "--descriptors", action="store_true", help="Show individual descriptors for each characteristic")
    args = parser.parse_args()

    mac_address = args.mac_address
    show_descriptors = args.descriptors

    print(f"Using MAC address: {mac_address}")
    if show_descriptors:
        print("Descriptor display enabled.")

    # 验证 MAC 地址格式(基本检查)
    if len(mac_address.replace(":", "").replace("-", "")) != 12:
        print(f"Invalid MAC address format: {mac_address}")
        print("Expected format: XX:XX:XX:XX:XX:XX or XX-XX-XX-XX-XX-XX")
        return

    print()

    # 连接并探索设备
    success = await connect_and_explore(mac_address, show_descriptors=show_descriptors)

    if success:
        print("\nDevice exploration completed successfully.")
    else:
        print("\nDevice exploration failed.")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nOperation interrupted by user.")
    except Exception as e:
        print(f"An error occurred: {e}")
        sys.exit(1)

Check out similar posts by category: Bluetooth, Python