Производить контент бесконечно в отдельном потоке для всех подключений? - PullRequest
2 голосов
/ 17 января 2012

У меня есть проект Twisted, который стремится по существу ретранслировать собранные данные по TCP в JSON. По сути, у меня есть библиотека USB, на которую мне нужно подписаться и синхронно читать в цикле while неопределенно долго, например:

while True:
    for line in usbDevice.streamData():
        data = MyBrandSpankingNewUSBDeviceData(line)
        # parse the data, convert to JSON
        output = convertDataToJSON(data)
        # broadcast the data
        ...

Проблема, конечно же, в .... По сути, мне нужно запустить этот процесс, как только сервер запустится, и завершить его, когда сервер завершает работу (Protocol.doStart и Protocol.doStop), и он постоянно работает и передает output каждому подключенному транспорту.

Как я могу сделать это в Twisted? Очевидно, мне нужно было бы запустить цикл while в своем собственном потоке, но как я могу «подписать» клиентов на прослушивание вывода? Также важно, чтобы сбор данных USB запускался только один раз, поскольку это может серьезно испортить ситуацию, если он будет запущен более одного раза.

В двух словах, вот моя архитектура:

  1. Сервер имеет USB-концентратор, который постоянно передает данные . Сервер постоянно подписан на этот USB-концентратор и постоянно читает данные.
  2. Клиенты будут приходить и уходить, подключаться и отключаться по желанию.

Мы хотим отправлять данные всем подключенным клиентам, когда они доступны. Как я могу сделать это в Twisted?

1 Ответ

2 голосов
/ 18 января 2012

Одна вещь, которую вы, вероятно, захотите сделать, это попытаться расширить общий протокол / независимость от транспорта.Даже если вам нужен поток с длительным циклом, вы можете скрыть это от протокола.Преимущество такое же, как обычно: протокол становится проще для тестирования, и, если вам когда-либо удастся реализовать не-поточную реализацию чтения событий USB, вы можете просто изменить транспорт без изменения протокола.

from threading import Thread

class USBThingy(Thread):
    def __init__(self, reactor, device, protocol):
        self._reactor = reactor
        self._device = device
        self._protocol = protocol

    def run(self):
        while True:
            for line in self._device.streamData():
                self._reactor.callFromThread(self._protocol.usbStreamLineReceived, line)

Использование callFromThread является частью того, что делает это решение пригодным для использования.Он гарантирует, что метод usbStreamLineReceived вызывается в потоке реактора, а не в потоке, считывающем данные с устройства USB.Таким образом, с точки зрения этого объекта протокола, нет ничего особенного в отношении потоков: у него просто есть метод, вызываемый время от времени, когда нужно обработать данные.

В этом случае ваш протокол просто должен реализовать usbStreamLineReceived как-нибудь, и реализуйте свою другую прикладную логику, например, ведение списка наблюдателей:

class SomeUSBProtocol(object):
    def __init__(self):
        self.observers = []

    def usbStreamLineReceived(self, line):
        data = MyBrandSpankingNewUSBDeviceData(line)
        # broadcast the data
        for obs in self.observers[:]:
            obs(output)

И тогда наблюдатели могут зарегистрировать себя в экземпляре этого класса и делать с данными все, что они захотят:

class USBObserverThing(Protocol):
    def connectionMade(self):
        self.factory.usbProto.observers.append(self.emit)

    def connectionLost(self):
        self.factory.usbProto.observers.remove(self.emit)

    def emit(self, output):
        # parse the data, convert to JSON
        output = convertDataToJSON(data)
        self.transport.write(output)

Соедините все вместе:

usbDevice = ...
usbProto = SomeUSBProtocol()
thingy = USBThingy(reactor, usbDevice, usbProto)
thingy.start()

factory = ServerFactory()
factory.protocol = USBObserverThing
factory.usbProto = usbProto
reactor.listenTCP(12345, factory)
reactor.run()

Вы можете представить себе лучший интерфейс регистрации / отмены регистрации наблюдателя (например, использующий реальные методы вместо прямого доступа к этому списку).Вы могли бы также представить, чтобы USBThingy метод для выключения, чтобы SomeUSBProtocol мог контролировать, когда он останавливается (так что ваш процесс действительно сможет выйти).

...