Одна вещь, которую вы, вероятно, захотите сделать, это попытаться расширить общий протокол / независимость от транспорта.Даже если вам нужен поток с длительным циклом, вы можете скрыть это от протокола.Преимущество такое же, как обычно: протокол становится проще для тестирования, и, если вам когда-либо удастся реализовать не-поточную реализацию чтения событий USB, вы можете просто изменить транспорт без изменения протокола.
from threading import Thread
class USBThingy(Thread):
def __init__(self, reactor, device, protocol):
self._reactor = reactor
self._device = device
self._protocol = protocol
def run(self):
while True:
for line in self._device.streamData():
self._reactor.callFromThread(self._protocol.usbStreamLineReceived, line)
Использование callFromThread
является частью того, что делает это решение пригодным для использования.Он гарантирует, что метод usbStreamLineReceived
вызывается в потоке реактора, а не в потоке, считывающем данные с устройства USB.Таким образом, с точки зрения этого объекта протокола, нет ничего особенного в отношении потоков: у него просто есть метод, вызываемый время от времени, когда нужно обработать данные.
В этом случае ваш протокол просто должен реализовать usbStreamLineReceived
как-нибудь, и реализуйте свою другую прикладную логику, например, ведение списка наблюдателей:
class SomeUSBProtocol(object):
def __init__(self):
self.observers = []
def usbStreamLineReceived(self, line):
data = MyBrandSpankingNewUSBDeviceData(line)
# broadcast the data
for obs in self.observers[:]:
obs(output)
И тогда наблюдатели могут зарегистрировать себя в экземпляре этого класса и делать с данными все, что они захотят:
class USBObserverThing(Protocol):
def connectionMade(self):
self.factory.usbProto.observers.append(self.emit)
def connectionLost(self):
self.factory.usbProto.observers.remove(self.emit)
def emit(self, output):
# parse the data, convert to JSON
output = convertDataToJSON(data)
self.transport.write(output)
Соедините все вместе:
usbDevice = ...
usbProto = SomeUSBProtocol()
thingy = USBThingy(reactor, usbDevice, usbProto)
thingy.start()
factory = ServerFactory()
factory.protocol = USBObserverThing
factory.usbProto = usbProto
reactor.listenTCP(12345, factory)
reactor.run()
Вы можете представить себе лучший интерфейс регистрации / отмены регистрации наблюдателя (например, использующий реальные методы вместо прямого доступа к этому списку).Вы могли бы также представить, чтобы USBThingy
метод для выключения, чтобы SomeUSBProtocol
мог контролировать, когда он останавливается (так что ваш процесс действительно сможет выйти).