Я работаю над простым приложением GATT для Bluetooth, работающим на Raspberry pi 3 модели B. Это приложение использует Bluez (версия 5.49) для рекламы услуг и характеристик.Приложение основано на следующем примере: https://github.com/Jumperr-labs/python-gatt-server
До сих пор я почти остановил сервер: клиенты могут читать и писать в характеристиках, а услуги рекламируются.Проблема возникает, когда клиенты пытаются отключиться от сервера.Сервер, по-видимому, отклоняет отключения, поэтому клиент не может отключиться.Единственный способ отключить клиент - это либо отключить сервер, либо отключить адаптер Bluetooth на клиенте.
Клиент - это приложение Xamarin, использующее следующую библиотеку Bluetooth: https://github.com/aritchie/bluetoothle.Проблемы с клиентом были исключены из-за того, что проблема также связана с тестированием с помощью приложений для тестирования Bluetooth, таких как LightBlue (https://itunes.apple.com/us/app/lightblue-explorer/id557428110?mt=8).
Базовый сервер GATT выглядит следующим образом:
from __future__ import print_function
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import array
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
import advertising
import gatt_server
import argparse
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='')
args = parser.parse_args()
adapter_name = args.adapter_name
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
mainloop = GObject.MainLoop()
advertising.advertising_main(mainloop, bus, adapter_name)
gatt_server.gatt_server_main(mainloop, bus, adapter_name)
mainloop.run()
if __name__ == '__main__':
main()
В рекламе:
from __future__ import print_function
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import functools
import exceptions
import adapters
BLUEZ_SERVICE_NAME = 'org.bluez'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
class Advertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/example/advertisement'
def __init__(self, bus, index, advertising_type):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.include_tx_power = None
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='sv')
if self.include_tx_power is not None:
properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)
return {LE_ADVERTISEMENT_IFACE: properties}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service_uuid(self, uuid):
if not self.service_uuids:
self.service_uuids = []
self.service_uuids.append(uuid)
def add_solicit_uuid(self, uuid):
if not self.solicit_uuids:
self.solicit_uuids = []
self.solicit_uuids.append(uuid)
def add_manufacturer_data(self, manuf_code, data):
if not self.manufacturer_data:
self.manufacturer_data = dbus.Dictionary({}, signature='qv')
self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
def add_service_data(self, uuid, data):
if not self.service_data:
self.service_data = dbus.Dictionary({}, signature='sv')
self.service_data[uuid] = dbus.Array(data, signature='y')
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
print('GetAll')
if interface != LE_ADVERTISEMENT_IFACE:
raise exceptions.InvalidArgsException()
print('returning props')
return self.get_properties()[LE_ADVERTISEMENT_IFACE]
@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
print('%s: Released!' % self.path)
def __del__(self):
self.remove_from_connection()
class TestAdvertisement(Advertisement):
def __init__(self, bus, index):
Advertisement.__init__(self, bus, index, 'peripheral')
self.add_service_uuid('b5fd')
self.add_service_uuid('dfe8')
self.add_service_uuid('8ce1')
self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04])
self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04])
self.include_tx_power = True
def register_ad_cb():
print('Advertisement registered')
def register_ad_error_cb(mainloop, error):
print('Failed to register advertisement: ' + str(error))
mainloop.quit()
def advertising_main(mainloop, bus, adapter_name):
adapter = adapters.find_adapter(bus, LE_ADVERTISING_MANAGER_IFACE, adapter_name)
print('adapter: %s' % (adapter,))
if not adapter:
raise Exception('LEAdvertisingManager1 interface not found')
adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
"org.freedesktop.DBus.Properties")
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0))
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)
test_advertisement = TestAdvertisement(bus, 0)
ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=functools.partial(register_ad_error_cb, mainloop))
В gatt_server:
from __future__ import print_function
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import array
import functools
import configreadwrite
import startstoprestart
import wlanconfig
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
from random import randint
import exceptions
import adapters
BLUEZ_SERVICE_NAME = 'org.bluez'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
class Application(dbus.service.Object):
"""
org.bluez.GattApplication1 interface implementation
"""
def __init__(self, bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
self.add_service(ConfigurationService(bus,0))
self.add_service(StartStopRestartService(bus,1))
self.add_service(WlanConfigService(bus,2))
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
print('GetManagedObjects')
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
return response
def __del__(self):
self.remove_from_connection()
class Service(dbus.service.Object):
"""
org.bluez.GattService1 interface implementation
"""
PATH_BASE = '/org/bluez/example/service'
def __init__(self, bus, index, uuid, primary):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
self.get_characteristic_paths(),
signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_characteristic(self, characteristic):
self.characteristics.append(characteristic)
def get_characteristic_paths(self):
result = []
for chrc in self.characteristics:
result.append(chrc.get_path())
return result
def get_characteristics(self):
return self.characteristics
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_SERVICE_IFACE:
raise exceptions.InvalidArgsException()
return self.get_properties()[GATT_SERVICE_IFACE]
class Characteristic(dbus.service.Object):
"""
org.bluez.GattCharacteristic1 interface implementation
"""
def __init__(self, bus, index, uuid, flags, service):
self.path = service.path + '/char' + str(index)
self.bus = bus
self.uuid = uuid
self.service = service
self.flags = flags
self.descriptors = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Descriptors': dbus.Array(
self.get_descriptor_paths(),
signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_CHRC_IFACE:
raise exceptions.InvalidArgsException()
return self.get_properties()[GATT_CHRC_IFACE]
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
print('Default ReadValue called, returning error')
raise exceptions.NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
def WriteValue(self, value, options):
print('Default WriteValue called, returning error')
raise exceptions.NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StartNotify(self):
print('Default StartNotify called, returning error')
raise exceptions.NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StopNotify(self):
print('Default StopNotify called, returning error')
raise exceptions.NotSupportedException()
@dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
pass
(...) Службы и реализация характеристик (...)
def register_app_cb():
print('GATT application registered')
def register_app_error_cb(mainloop, error):
print('Failed to register application: ' + str(error))
mainloop.quit()
def gatt_server_main(mainloop, bus, adapter_name):
adapter = adapters.find_adapter(bus, GATT_MANAGER_IFACE, adapter_name)
if not adapter:
raise Exception('GattManager1 interface not found')
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter),
GATT_MANAGER_IFACE)
app = Application(bus)
print('Registering GATT application...')
service_manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=functools.partial(register_app_error_cb, mainloop))
И, наконец, в адаптере:
from __future__ import print_function
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
BLUEZ_SERVICE_NAME = 'org.bluez'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
def find_adapter(bus, adapter_interface_name, adapter_name):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
print('checking adapter %s, keys: %s' % (o, props.keys()))
if adapter_interface_name in props.keys():
print('found adapter %s' % (o,))
if '/' + adapter_name in o:
print('returning adapter %s' % (o,))
return o
return None
Любое предложение принято. Я работаю над этой ошибкой уже несколько недель, и я разочарован