Шаблон для фона Twisted сервер, который заполняет очередь входящих сообщений и очищает очередь исходящих сообщений? - PullRequest
5 голосов
/ 12 ноября 2010

Я хотел бы сделать что-то вроде этого:

twistedServer.start() # This would be a nonblocking call

while True:
   while twistedServer.haveMessage():
      message = twistedServer.getMessage()
      response = handleMessage(message)
      twistedServer.sendResponse(response)
   doSomeOtherLogic()

Главное, что я хочу сделать, - запустить сервер в фоновом потоке.Я надеюсь сделать это с потоком, а не через многопроцессорность / очередь, потому что у меня уже есть один уровень обмена сообщениями для моего приложения, и я бы хотел избежать двух.Я поднимаю этот вопрос, потому что уже вижу, как сделать это в отдельном процессе, но я хотел бы знать, как это сделать в потоке, или, если смогу.Или, если, возможно, есть какой-то другой шаблон, который я могу использовать, который выполняет то же самое, например, возможно, написание моего собственного метода pipeline.run.Спасибо за любую помощь.:)

1 Ответ

10 голосов
/ 12 ноября 2010

Ключевым моментом, который я хочу сделать, является запуск сервера в фоновом потоке.

Вы не объясняете, почему это ключ.Обычно такие вещи, как «использовать потоки», являются деталями реализации.Возможно, темы уместны, а может и нет, но фактическая цель не зависит от сути дела.Какова ваша цель?Чтобы обрабатывать несколько клиентов одновременно?Чтобы обрабатывать сообщения такого рода одновременно с событиями из другого источника (например, веб-сервера)?Не зная конечной цели, невозможно определить, сработает ли предложенная мной стратегия реализации.

Имея это в виду, есть две возможности.

Во-первых, вы можете забыть опотоки.Это повлечет за собой определение вышеупомянутой логики обработки событий как только частей обработки событий.Часть, которая пытается получить событие, будет делегирована другой части приложения, вероятно, в конечном итоге на основе одного из API-интерфейсов реактора (например, вы можете настроить TCP-сервер, который принимает сообщения и превращает их в события, которые вы 'переработка, и в этом случае вы начнете с вызова какого-либо типа реактора.listenTCP).

Таким образом, ваш пример может превратиться в нечто подобное (с некоторой дополнительной спецификой, чтобы попытаться увеличить инструктивное значение):

from twisted.internet import reactor

class MessageReverser(object):
    """
    Accept messages, reverse them, and send them onwards.
    """
    def __init__(self, server):
        self.server = server

    def messageReceived(self, message):
        """
        Callback invoked whenever a message is received.  This implementation
        will reverse and re-send the message.
        """
        self.server.sendMessage(message[::-1])
        doSomeOtherLogic()

def main():
    twistedServer = ...
    twistedServer.start(MessageReverser(twistedServer))
    reactor.run()

main()

Несколько замечаний по поводу этого примера:

  • Я не уверен, как определяется ваш twistedServer.Я представляю, что он каким-то образом взаимодействует с сетью.Ваша версия кода будет получать сообщения и буферизировать их, пока они не будут удалены из буфера вашим циклом для обработки.Эта версия, вероятно, не будет иметь буфера, но вместо этого просто вызовите метод messageReceived объекта, переданного в start, как только появится сообщение.Вы все еще можете добавить буферизацию, если хотите, добавив ее в метод messageReceived.

  • Теперь есть вызов reactor.run, который будет блокироваться.Вместо этого вы можете написать этот код в виде плагина twistd или файла .tac, в этом случае вы не будете нести прямую ответственность за запуск реактора.Однако кто-то должен запустить реактор, иначе большинство API от Twisted ничего не сделают.Конечно, reactor.run блокируется до тех пор, пока кто-то не вызовет reactor.stop.

  • В этом подходе нет потоков, используемых.Подход Twisted к многозадачности, основанный на совместной работе, означает, что вы все равно можете делать несколько вещей одновременно, если вы готовы сотрудничать (что обычно означает возврат к реактору время от времени).

  • Точное время, когда вызывается функция doSomeOtherLogic, немного изменилось, потому что нет понятия «буфер пока пуст», отдельно от «Я только что обработал сообщение».Вы можете изменить это так, чтобы функция вызывалась один раз в секунду, или после каждых N сообщений, или что угодно.

Второй возможностью было бы действительно использовать потоки.Это может выглядеть очень похоже на предыдущий пример, но вы бы вызвали reactor.run в другом потоке, а не в основном потоке.Например,

from Queue import Queue
from threading import Thread

class MessageQueuer(object):
    def __init__(self, queue):
        self.queue = queue

    def messageReceived(self, message):
        self.queue.put(message)

def main():
    queue = Queue()
    twistedServer = ...
    twistedServer.start(MessageQueuer(queue))
    Thread(target=reactor.run, args=(False,)).start()

    while True:
        message = queue.get()
        response = handleMessage(message)
        reactor.callFromThread(twistedServer.sendResponse, response)

main()

В этой версии предполагается, что twistedServer работает аналогично, но использует поток, чтобы иметь цикл while True:.Примечание:

  • Вы должны вызвать reactor.run(False), если используете поток, чтобы Twisted не пытался установить какие-либо обработчики сигналов, которые Python позволяет устанавливать только в основном потоке.Это означает, что обработка Ctrl-C будет отключена и reactor.spawnProcess не будет работать надежно.

  • MessageQueuer имеет тот же интерфейс, что и MessageReverser, только его реализация messageReceived это отличается.Он использует потокобезопасный объект Queue для связи между потоком реактора (в котором он будет вызываться) и вашим основным потоком, где работает цикл while True:.

  • Вы должны использовать reactor.callFromThread для отправки сообщения обратно в поток реактора (при условии, что twistedServer.sendResponse фактически основано на Twisted API). Скрученные API, как правило, не являются потокобезопасными и должны вызываться в потоке реактора. Вот что reactor.callFromThread делает для вас.

  • Возможно, вы захотите реализовать какой-нибудь способ остановить цикл и реактор, полагают вы. Процесс python не завершится чисто до тех пор, пока вы не вызовете reactor.stop.

Обратите внимание, что хотя многопоточная версия дает вам знакомую, желаемую петлю while True, на самом деле она не работает намного лучше, чем не поточная версия. Это просто сложнее. Итак, подумайте, действительно ли вам нужны потоки или это просто метод реализации, который можно заменить на что-то другое.

...