Позвольте RabbitMQ и Pika поддерживать связь всегда открытой - PullRequest
0 голосов
/ 27 мая 2019

У меня есть сценарий Python, который считывает данные из потока, а когда читается новая строка, она помещает свое содержимое (строку) в очередь RabbitMQ.

Дело в том, что поток может не отправлять сообщения в течение 1, 2 или 9 часов или около того, поэтому я хотел бы, чтобы соединение RabbitMQ всегда открывалось .

Проблема в том, что когда я создаю соединение и канал:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
channel = self.connection.channel()
channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')

... и если через час приходит сообщение, я получаю эту ошибку:

  File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/var/opt/rabbitmq-agent.py", line 34, in push_to_queue
    raise Exception("Error sending the message to the queue: " + format(e))
Exception: Error sending the message to the queue: Send message to publisher error: Channel allocation requires an open connection: <SelectConnection CLOSED socket=None params=<ConnectionParameters host=x port=xvirtual_host=/ ssl=False>>

Предполагается, что соединение между сервером rabbitmq и клиентом было закрыто.

Как я могу избежать этого? Я хотел бы иметь «, пожалуйста, поддерживайте соединение всегда ». Может быть, установка супер-большого пульса в параметрах подключения Пика? Примерно так:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials, heartbeat=6000))

Любые другие более крутые решения будут высоко оценены.

Заранее спасибо

Ответы [ 2 ]

0 голосов
/ 02 июля 2019

Вы можете попробовать добавить heartbeat к вашему ConnectionParameters. Это создаст легкий трафик, посылая сердцебиение каждые указанные секунды. Это будет осуществлять связи. Некоторые брандмауэры или прокси-серверы имеют тенденцию очищать простаивающие соединения. Даже у RabbitMQ есть тайм-аут на незанятых соединениях.

import pika

# Set the connection parameters to connect to rabbit-server1 on port 5672
# on the / virtual host using the username "guest" and password "guest"
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('rabbit-server1',
                                       5672,
                                       '/',
                                       heartbeat=60,
                                       credentials)

См. здесь для документации pika.

Кроме того, у вас должен быть код, который предотвращает отключение от сети. Такое всегда может случиться и будет. Так что appart from the heartbeat имеет некоторую обработку исключений, готовую к повторному открытию закрытых соединений.

0 голосов
/ 27 мая 2019

Я бы посоветовал вам проверять соединение каждый раз перед отправкой сообщения, и если соединение закрыто, просто переподключите.

if not self.connection or self.connection.is_closed:
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
    channel = self.connection.channel()
    channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')
...