Асинхронная обработка сообщений клиентом Pika RabbitMQ - PullRequest
5 голосов
/ 06 марта 2012

После Пример получения Pika , я бы хотел, чтобы клиент обрабатывал больше одновременных запросов.У меня такой вопрос: можно ли как-то вызывать handle_delivery каждый раз, когда новое сообщение получено и не ожидает предыдущего возврата handle_delivery?

1 Ответ

3 голосов
/ 27 апреля 2012

Похоже, что вызов handle_delivery блокируется, но вы могли бы добавить вторичный обработчик в цикл событий ввода / вывода, используя add_timeout. Я думаю, это то, что вы хотите сделать:

"""
Asyncronous amqp consumer; do our processing via an ioloop timeout
"""

import sys
import time

from pika.adapters import SelectConnection
from pika.connection import ConnectionParameters

connection = None
channel = None


def on_connected(connection):
    print "timed_receive: Connected to RabbitMQ"
    connection.channel(on_channel_open)


def on_channel_open(channel_):
    global channel
    channel = channel_
    print "timed_receive: Received our Channel"
    channel.queue_declare(queue="test", durable=True,
                          exclusive=False, auto_delete=False,
                          callback=on_queue_declared)

class TimingHandler(object):
    count = 0
    last_count = 0

    def __init__(self, delay=0):
        self.start_time = time.time()
        self.delay = delay

    def handle_delivery(self, channel, method, header, body):
        connection.add_timeout(self.delay, self)

    def __call__(self):
        self.count += 1
        if not self.count % 1000:
            now = time.time()
            duration = now - self.start_time
            sent = self.count - self.last_count
            rate = sent / duration
            self.last_count = self.count
            self.start_time = now
            print "timed_receive: %i Messages Received, %.4f per second" %\
                  (self.count, rate)

def on_queue_declared(frame):
    print "timed_receive: Queue Declared"
    channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True)


if __name__ == '__main__':

    # Connect to RabbitMQ
    host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1'
    connection = SelectConnection(ConnectionParameters(host),
                                  on_connected)
    # Loop until CTRL-C
    try:
        # Start our blocking loop
        connection.ioloop.start()

    except KeyboardInterrupt:

        # Close the connection
        connection.close()

        # Loop until the connection is closed
        connection.ioloop.start()
...