Итак, в конце концов, я нашел способ сделать это. Как я описал в комментарии к ответу Люка, я не смог исправить сломанный канал. Поэтому я решил сохранить выходные данные работника, прежде чем пытаться подтвердить сообщение. Это происходит сбой при разрыве соединения, поэтому сообщение никогда не подтверждается и будет отправлено работнику снова, запуская новую сопрограмму, которая затем может повторно использовать предыдущий результат.
Надеюсь, этот код прояснит:
def on_foo_message(connection_loss_event):
async def context_aware_on_message(message: aio_pika.IncomingMessage):
try:
# Obtain a message-specific lock, so only one
# coroutine can work on a message
# Check if the result was already calculated
# If yes, just ack here and return
# Work on the message
# Save the result
# Ack the message
except pika.exceptions.ConnectionClosed:
# This can happen while interacting with channel or message
connection_loss_event.set()
return context_aware_on_message
async def worker():
rabbit_urls = get_rabbitmq_connection_uris()
url_index = 0
while True:
try:
# Connect to rabbit
url_index = url_index % len(rabbit_urls)
url = rabbit_urls[url_index]
connection = await aio_pika.connect(url)
channel = await connection.channel()
logger.info(f'Connected to rabbit at {url}')
# Configure queues
await channel.set_qos(prefetch_count=MAX_MESSAGES_IN_PARALLEL)
foo_queue = await channel.declare_queue('foo', durable=True)
bar_queue = await channel.declare_queue('bar', durable=True)
# Start listening to queues
connection_loss_event = asyncio.Event()
await foo_queue.consume(
on_foo_message(connection_loss_event))
logger.info(f'Now listening to queue "foo"')
await bar_queue.consume(
on_bar_message(connection_loss_event))
logger.info(f'Now listening to queue "bar"')
# Wait for connection loss
await connection_loss_event.wait()
raise ConnectionRefusedError()
except ConnectionRefusedError:
logger.info('No connection to rabbit, will try next url')
url_index += 1
await asyncio.sleep(2)
def main():
loop = asyncio.get_event_loop()
loop.create_task(worker())
loop.run_forever()
if __name__ == '__main__':
main()