Как использовать listen на basic.return в python-клиенте AMQP - PullRequest
2 голосов
/ 03 февраля 2010

Я хочу убедиться, что мое сообщение доставлено в очередь.

Для этого я добавляю обязательный параметр в basic_publish.Что еще я должен сделать, чтобы получить сообщение basic.return, если мое сообщение не было успешно доставлено?

Я не могу использовать channel.wait() для прослушивания basic.return, потому что когда мое сообщение успешно доставлено,wait() функция зависает навсегда.(Тайм-аута нет) С другой стороны.Когда я не звоню channel.wait(), channel.returned_messages остается пустым, даже если сообщение не доставлено.

Я использую py-amqplib версию 0.6.

Любое решение приветствуется.

Ответы [ 3 ]

1 голос
/ 05 июля 2011

Вы пробовали единственную библиотеку Python AMQP, которая в комплекте?Он не так широко используется, поскольку он не аккуратно упакован.

Шаг 1. Скомпилируйте библиотеку C - вам может понадобиться sudo apt-get install autotools-dev autoconf automake libtool

mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install

Шаг 2. Установите библиотеку Python

pip install pylibrabbitmq
1 голос
/ 13 октября 2011

Вы не можете делать это синхронно, так как это асинхронная система. Но вы можете решить эту проблему с помощью потоков.

Основная идея заключается в том, что вы запускаете поток, который выполняет ожидание на канале, всякий раз, когда он выходит из ожидания, он вызывает функцию call_back для любого возвращенного сообщения в очереди возвращенных сообщений. Затем вы можете обработать это сообщение, как захотите, в функции call_back

def registerCallback(channel, call_back):
    """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange.
    """
    def wait():
        try:
            channel.wait()
        except Exception, e:
            print("Problem waiting on publish channel: %s" % str(e))

        while not channel.returned_messages.empty():
            returnedMessage = channel.returned_messages.get()
            processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage))
            processReturnedMessageThread.start()

        wait()

    waiting = Thread(target=wait) 
    waiting.start()
1 голос
/ 11 февраля 2010

В настоящее время это невозможно, поскольку basic.return отправляется асинхронно, когда сообщение сбрасывается в брокере. Когда сообщение было успешно отправлено, с сервера не поступало никаких данных. Поэтому pyAMQP не может прослушивать такие сообщения.

Я прочитал несколько тем об этой проблеме. Возможные решения были:

  • использовать txAMQP, витую версию amqp, которая обрабатывает basic.return
  • использовать pyAMQP с ожиданием с таймаутом. (Я не уверен, что это возможно в настоящее время)
  • часто пингует сервер с синхронными командами, чтобы pyAMQP мог выбирать basic.return сообщения, когда они приходят.

Поскольку уровень поддержки pyAMQP и rabbitMQ в целом довольно низок, мы решили вообще не использовать amqp broker.

...