RabbitMQ, Пика и стратегия переподключения - PullRequest
15 голосов
/ 01 марта 2012

Я использую Pika для обработки данных из RabbitMQ. Поскольку у меня возникли проблемы различного рода, я решил написать небольшое тестовое приложение, чтобы посмотреть, как я могу обрабатывать разъединения.

Я написал это тестовое приложение, которое выполняет следующее:

  1. Подключиться к брокеру, повторить попытку до
  2. При подключении создайте очередь.
  3. Использовать эту очередь и поместить результат в очередь Python. Queue (0)
  4. Получить элемент из Queue.Queue (0) и вывести его обратно в очередь брокера.

Что я заметил, так это 2 проблемы:

  1. Когда я запускаю свой сценарий с одного хоста, подключенного к rabbitmq на другом хосте (внутри виртуальной машины), тогда этот сценарий завершается в случайные моменты времени без выдачи ошибки.
  2. Когда я запускаю свой скрипт на том же хосте, на котором установлен RabbitMQ, он работает нормально и продолжает работать.

Это может быть объяснено из-за проблем с сетью, пакеты отброшены, хотя я нахожу соединение не очень надежным.

Когда сценарий выполняется локально на сервере RabbitMQ, и я завершаю работу RabbitMQ, сценарий завершается с ошибкой: «ОШИБКА pika SelectConnection: Ошибка гнезда в 3: 104»

Похоже, я не могу заставить стратегию переподключения работать так, как должно быть. Может ли кто-нибудь взглянуть на код, чтобы увидеть, что я делаю не так?

Спасибо

Jay

#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock

class Broker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.logging = logging.getLogger(__name__)
        self.to_broker = Queue.Queue(0)
        self.from_broker = Queue.Queue(0)
        self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
        self.srs = SimpleReconnectionStrategy()
        self.properties = pika.BasicProperties(delivery_mode=2)

        self.connection = None
        while True:
            try:
                self.connection = SelectConnection(self.parameters, self.on_connected,  reconnection_strategy=self.srs)
                break
            except Exception as err:
                self.logging.warning('Cant connect. Reason: %s' % err)
                time.sleep(1)

        self.daemon=True
    def run(self):
        while True:
            self.submitData(self.from_broker.get(block=True))
        pass
    def on_connected(self,connection):
        connection.channel(self.on_channel_open)
    def on_channel_open(self,new_channel):
        self.channel = new_channel
        self.channel.queue_declare(queue='sandbox', durable=True)
        self.channel.basic_consume(self.processData, queue='sandbox')    
    def processData(self, ch, method, properties, body):
        self.logging.info('Received data from broker')
        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        self.from_broker.put(body)
    def submitData(self,data):
        self.logging.info('Submitting data to broker.')
        self.channel.basic_publish(exchange='',
                    routing_key='sandbox',
                    body=data,
                    properties=self.properties)
if __name__ == '__main__':
    format=('%(asctime)s %(levelname)s %(name)s %(message)s')
    logging.basicConfig(level=logging.DEBUG, format=format)
    broker=Broker()
    broker.start()
    try:
        broker.connection.ioloop.start()
    except Exception as err:
        print err

1 Ответ

19 голосов
/ 10 августа 2012

Основная проблема вашего скрипта заключается в том, что он взаимодействует с одним каналом как из основного потока (в котором работает ioloop), так и из потока «Брокер» (вызывает submitData в цикле).Это небезопасно .

Кроме того, SimpleReconnectionStrategy, похоже, не делает ничего полезного.Это не вызывает переподключение, если соединение прервано.Я считаю, что это ошибка в Pika: https://github.com/pika/pika/issues/120

Я попытался реорганизовать ваш код, чтобы заставить его работать так, как я думаю, вы этого хотели, но столкнулся с другой проблемой.По-видимому, у Pika нет способа обнаружить сбой доставки, а это означает, что данные могут быть потеряны в случае разрыва соединения.Это кажется таким очевидным требованием!Как не может быть способа обнаружить, что basic_publish не удалось?Я пробовал все виды вещей, включая транзакции и add_on_return_callback (которые казались неуклюжими и чрезмерно сложными), но ничего не придумали.Если на самом деле пути нет, тогда pika может оказаться полезной только в ситуациях, которые могут допустить потерю данных, отправляемых в RabbitMQ, или в программах, которые нужно использовать только из RabbitMQ.

Это не надежно, но для справкиВот код, который решает вашу многопоточную проблему:

import logging
import pika
import Queue
import sys
import threading
import time
from functools import partial
from pika.adapters import SelectConnection, BlockingConnection
from pika.exceptions import AMQPConnectionError
from pika.reconnection_strategies import SimpleReconnectionStrategy

log = logging.getLogger(__name__)

DEFAULT_PROPERTIES = pika.BasicProperties(delivery_mode=2)


class Broker(object):

    def __init__(self, parameters, on_channel_open, name='broker'):
        self.parameters = parameters
        self.on_channel_open = on_channel_open
        self.name = name

    def connect(self, forever=False):
        name = self.name
        while True:
            try:
                connection = SelectConnection(
                    self.parameters, self.on_connected)
                log.debug('%s connected', name)
            except Exception:
                if not forever:
                    raise
                log.warning('%s cannot connect', name, exc_info=True)
                time.sleep(10)
                continue

            try:
                connection.ioloop.start()
            finally:
                try:
                    connection.close()
                    connection.ioloop.start() # allow connection to close
                except Exception:
                    pass

            if not forever:
                break

    def on_connected(self, connection):
        connection.channel(self.on_channel_open)


def setup_submitter(channel, data_queue, properties=DEFAULT_PROPERTIES):
    def on_queue_declared(frame):
        # PROBLEM pika does not appear to have a way to detect delivery
        # failure, which means that data could be lost if the connection
        # drops...
        channel.confirm_delivery(on_delivered)
        submit_data()

    def on_delivered(frame):
        if frame.method.NAME in ['Confirm.SelectOk', 'Basic.Ack']:
            log.info('submission confirmed %r', frame)
            # increasing this value seems to cause a higher failure rate
            time.sleep(0)
            submit_data()
        else:
            log.warn('submission failed: %r', frame)
            #data_queue.put(...)

    def submit_data():
        log.info('waiting on data queue')
        data = data_queue.get()
        log.info('got data to submit')
        channel.basic_publish(exchange='',
                    routing_key='sandbox',
                    body=data,
                    properties=properties,
                    mandatory=True)
        log.info('submitted data to broker')

    channel.queue_declare(
        queue='sandbox', durable=True, callback=on_queue_declared)


def blocking_submitter(parameters, data_queue,
        properties=DEFAULT_PROPERTIES):
    while True:
        try:
            connection = BlockingConnection(parameters)
            channel = connection.channel()
            channel.queue_declare(queue='sandbox', durable=True)
        except Exception:
            log.error('connection failure', exc_info=True)
            time.sleep(1)
            continue
        while True:
            log.info('waiting on data queue')
            try:
                data = data_queue.get(timeout=1)
            except Queue.Empty:
                try:
                    connection.process_data_events()
                except AMQPConnectionError:
                    break
                continue
            log.info('got data to submit')
            try:
                channel.basic_publish(exchange='',
                            routing_key='sandbox',
                            body=data,
                            properties=properties,
                            mandatory=True)
            except Exception:
                log.error('submission failed', exc_info=True)
                data_queue.put(data)
                break
            log.info('submitted data to broker')


def setup_receiver(channel, data_queue):
    def process_data(channel, method, properties, body):
        log.info('received data from broker')
        data_queue.put(body)
        channel.basic_ack(delivery_tag=method.delivery_tag)

    def on_queue_declared(frame):
        channel.basic_consume(process_data, queue='sandbox')

    channel.queue_declare(
        queue='sandbox', durable=True, callback=on_queue_declared)


if __name__ == '__main__':
    if len(sys.argv) != 2:
        print 'usage: %s RABBITMQ_HOST' % sys.argv[0]
        sys.exit()

    format=('%(asctime)s %(levelname)s %(name)s %(message)s')
    logging.basicConfig(level=logging.DEBUG, format=format)

    host = sys.argv[1]
    log.info('connecting to host: %s', host)
    parameters = pika.ConnectionParameters(host=host, heartbeat=True)
    data_queue = Queue.Queue(0)
    data_queue.put('message') # prime the pump

    # run submitter in a thread

    setup = partial(setup_submitter, data_queue=data_queue)
    broker = Broker(parameters, setup, 'submitter')
    thread = threading.Thread(target=
         partial(broker.connect, forever=True))

    # uncomment these lines to use the blocking variant of the submitter
    #thread = threading.Thread(target=
    #    partial(blocking_submitter, parameters, data_queue))

    thread.daemon = True
    thread.start()

    # run receiver in main thread
    setup = partial(setup_receiver, data_queue=data_queue)
    broker = Broker(parameters, setup, 'receiver')
    broker.connect(forever=True)
...