Использование Tornado с Pika для мониторинга асинхронной очереди - PullRequest
8 голосов
/ 19 декабря 2010

У меня есть сервер AMQP ( RabbitMQ ), который я хотел бы опубликовать и прочитать с веб-сервера Tornado . Для этого я решил использовать асинхронную библиотеку amqp python; в частности Pika (вариант, который предположительно поддерживает Торнадо).

Я написал код, который, по-видимому, успешно читает из очереди, за исключением того, что в конце запроса я получаю исключение (браузер возвращает нормально):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
    Traceback (most recent call last):
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
        yield
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
        yield
      File "/usr/lib/python2.6/contextlib.py", line 113, in nested
        yield vars
      File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
        callback(*args, **kwargs)
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
        self._handle_read()
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
        self.on_data_available(chunk)
      File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
        self.channels[frame.channel_number].frame_handler(frame)
    KeyError: 1

Я не совсем уверен, что правильно использую эту библиотеку, поэтому я могу сделать что-то явно не так. Основной поток моего кода:

  1. Запрос приходит
  2. Создать соединение с RabbitMQ с помощью TornadoConnection; указать обратный звонок
  3. В обратном вызове соединения создайте канал, объявите / свяжите мою очередь и вызовите basic_consume; указать обратный звонок
  4. В обратном вызове потребления закройте канал и вызовите функцию завершения работы Торнадо.
  5. См. Исключение.

У меня несколько вопросов:

  1. Является ли этот поток даже правильным? Я не уверен, какова цель обратного вызова соединения, за исключением того, что он не работает, если я не использую его.
  2. Должен ли я создавать одно соединение AMQP для каждого веб-запроса? Документация RabbitMQ предполагает, что нет, я не должен, а должен придерживаться создания только каналов. Но где бы я мог создать соединение, и как мне попытаться восстановить соединение, если оно ненадолго прервется?
  3. Если я создаю одно соединение AMQP для каждого веб-запроса, где мне его закрывать? Вызов amqp.close () в моем обратном вызове, похоже, еще больше запутал.

Я постараюсь подготовить некоторый пример кода чуть позже, но шаги, которые я описал выше, довольно полно описывают потребляющую сторону вещей. У меня также есть проблемы с издательской стороной, но использование очередей более актуально.

Ответы [ 2 ]

8 голосов
/ 01 января 2011

Было бы полезно увидеть некоторый исходный код, но я использую этот же модуль pika с поддержкой торнадо без проблем в более чем одном производственном проекте.

Вы не хотите создавать соединение для каждого запроса.Создайте класс, который обернет все ваши операции AMQP, и создайте его экземпляр как одноэлементный на уровне приложения торнадо, который можно будет использовать между запросами (и между обработчиками запросов).Я делаю это в функции runapp (), которая выполняет такие вещи, а затем запускает основной торнадо ioloop.

Вот класс под названием «События».Это частичная реализация (в частности, я не определяю «self.handle_event» здесь. Это зависит от вас.

class Event(object):
  def __init__(self, config):
    self.host = 'localhost'
    self.port = '5672'
    self.vhost = '/'
    self.user = 'foo'
    self.exchange = 'myx'
    self.queue = 'myq'
    self.recv_routing_key = 'msgs4me'
    self.passwd = 'bar'

    self.connected = False 
    self.connect()


  def connect(self):

    credentials = pika.PlainCredentials(self.user, self.passwd)

    parameters = pika.ConnectionParameters(host = self.host,
                                         port = self.port,
                                         virtual_host = self.vhost,
                                         credentials = credentials)

    srs = pika.connection.SimpleReconnectionStrategy()

    logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host,
                                                              self.port))
    self.connection = tornado_adapter.TornadoConnection(parameters,
                                                      wait_for_open = False,
                                                      reconnection_strategy = srs,
                                                      callback = self.on_connected)

  def on_connected(self):

    # Open the channel
    logging.debug("Events: Opening a channel")
    self.channel = self.connection.channel()

    # Declare our exchange
    logging.debug("Events: Declaring the %s exchange" %  self.exchange)
    self.channel.exchange_declare(exchange = self.exchange,
                                type = "fanout",
                                auto_delete = False,
                                durable = True)

    # Declare our queue for this process
    logging.debug("Events: Declaring the %s queue" %  self.queue)
    self.channel.queue_declare(queue = self.queue,
                             auto_delete = False,
                             exclusive = False,
                             durable = True)


    # Bind to the exchange
    self.channel.queue_bind(exchange = self.exchange,
                          queue = self.queue,
                          routing_key = self.recv_routing_key)

    self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True)

    # We should be connected if we made it this far
    self.connected = True

И затем я помещаю это в файл под названием «events.py». Мой RequestHandlersи любой внутренний код использует модуль 'common.py', который оборачивает код, который полезен для обоих (мои RequestHandlers не вызывают напрямую какие-либо методы модуля amqp - тоже самое для db, cache и т. д.), поэтому я определяю 'events = None 'на уровне модуля в common.py, и я создаю экземпляр объекта Event примерно так:

import events

def runapp(config):
    if myapp.common.events is None: 
       myapp.common.events = myapp.events.Event(config)
    logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events)
    http_server = tornado.httpserver.HTTPServer(app,
                                            xheaders=config['HTTPServer']['xheaders'],
                                            no_keep_alive=config['HTTPServer']['no_keep_alive'])
    http_server.listen(port) 
    main_loop = tornado.ioloop.IOLoop.instance()
    logging.debug("MAIN IOLOOP: %s", main_loop)
    main_loop.start()

С новым годом :-D

0 голосов
/ 20 декабря 2010

Кто-то сообщил об успехе в слиянии Торнадо и Пика здесь .Насколько я могу судить, это не так просто, как просто вызвать Пику из Торнадо, поскольку обе библиотеки хотят иметь свои собственные циклы событий.

...