Надежное соединение aio-pika с несколькими хостами RabbitMQ - PullRequest
0 голосов
/ 06 июля 2018

В нашей настройке у нас есть центральный экземпляр RabbitMQ, работающий на трех хостах, каждый со своим собственным URL.

В целях обслуживания любой из этих хостов может выйти из строя в любое время на несколько часов. Когда это происходит, мы хотели бы подключиться к одному из других хостов.

Мы использовали aio_pika.connect_robust для подключения, но он принимает только один хост в качестве параметра.

Было бы идеально, если бы переподключение могло происходить незаметно в фоновом режиме. Рабочий может получить сообщение от соединения с одним хостом, поработать над ним и затем подтвердить его через другое соединение.

Каков наилучший способ решить эту проблему?

Ответы [ 2 ]

0 голосов
/ 10 июля 2018

Итак, в конце концов, я нашел способ сделать это. Как я описал в комментарии к ответу Люка, я не смог исправить сломанный канал. Поэтому я решил сохранить выходные данные работника, прежде чем пытаться подтвердить сообщение. Это происходит сбой при разрыве соединения, поэтому сообщение никогда не подтверждается и будет отправлено работнику снова, запуская новую сопрограмму, которая затем может повторно использовать предыдущий результат.

Надеюсь, этот код прояснит:

def on_foo_message(connection_loss_event):
    async def context_aware_on_message(message: aio_pika.IncomingMessage):

        try:
            # Obtain a message-specific lock, so only one
            # coroutine can work on a message

            # Check if the result was already calculated
            # If yes, just ack here and return

            # Work on the message

            # Save the result

            # Ack the message

        except pika.exceptions.ConnectionClosed:
            # This can happen while interacting with channel or message
            connection_loss_event.set()

    return context_aware_on_message


async def worker():
    rabbit_urls = get_rabbitmq_connection_uris()
    url_index = 0

    while True:

        try:
            # Connect to rabbit
            url_index = url_index % len(rabbit_urls)
            url = rabbit_urls[url_index]
            connection = await aio_pika.connect(url)
            channel = await connection.channel()
            logger.info(f'Connected to rabbit at {url}')

            # Configure queues
            await channel.set_qos(prefetch_count=MAX_MESSAGES_IN_PARALLEL)
            foo_queue = await channel.declare_queue('foo', durable=True)
            bar_queue = await channel.declare_queue('bar', durable=True)

            # Start listening to queues
            connection_loss_event = asyncio.Event()
            await foo_queue.consume(
                on_foo_message(connection_loss_event))
            logger.info(f'Now listening to queue "foo"')
            await bar_queue.consume(
                on_bar_message(connection_loss_event))
            logger.info(f'Now listening to queue "bar"')

            # Wait for connection loss
            await connection_loss_event.wait()
            raise ConnectionRefusedError()

        except ConnectionRefusedError:
            logger.info('No connection to rabbit, will try next url')
            url_index += 1
            await asyncio.sleep(2)


def main():
    loop = asyncio.get_event_loop()
    loop.create_task(worker())
    loop.run_forever()    

if __name__ == '__main__':
    main()
0 голосов
/ 07 июля 2018

Рабочий может получить сообщение от соединения с одним хостом, работать на нем и затем подтвердить его через другое соединение

Это невозможно, так как acks привязаны к каналам. Когда первый канал закрывается, RabbitMQ повторно помещает сообщение в очередь и повторно доставляет его другому потребителю.

Похоже, что aio-pika не поддерживает несколько хостов, с которых можно выбрать подключение. Я рекомендую либо самостоятельно перехватывать исключения, связанные с подключением, чтобы выбрать другой хост, либо поместить haproxy между вашим приложением и RabbitMQ.

...