RabbitMQ pika.exceptions.ConnectionClosed и ChannelClosed heartbeats - PullRequest
0 голосов
/ 18 мая 2018

У меня есть исключение RabbitMQ pika.exceptions.ConnectionClosed, появляющееся время от времени, и я уже просматривал различные источники, в том числе

сообщение StackOverflow

, где говорится об использованииheartbeats установлен в 0 и использует connection.process_data_events в цикле, который я использую в своем коде, но все равно получаю исключение.

Вот исключение:

Файл "/ home / developer5/testfiles/dreamlead/text_cleaner/rpc_client.py ", строка 50, в теле вызова = str (body)) Файл" /usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py ",строка 2077, в basic_publish обязательно, немедленно) Файл "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", строка 2164, в файле publish self._flush_output () "/ usr /local / lib / python3.5 / dist-packages / pika / adapters / blocking_connection.py ", строка 1250, в _flush_output * waiters) Файл" /usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py ", строка 474, в _flush_output result.reason_text) pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError (104, 'Connection reset by peer')")

Во время обработки вышеуказанного исключения произошло другое исключение:

Трассировка (последний вызов был последним): файл "app.py", строка 120, в handle_delivery registry_text = text_rpc_client.call (rpc_request_body) Файл "/home/developer5/testfiles/dreamlead/text_cleaner/rpc_client.py", строка 67, вызов файла self.channel.close () Файл "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", строка 1426, в закрытом файле self._cancel_all_consumers ()"/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", строка 1389, в файле _cancel_all_consumers self.basic_cancel (consumer_tag) "/usr/local/lib/python3.5/dist-packages / pika / adapters / blocking_connection.py ", строка 1675, в basic_cancel consumer_tag в собственной.MqLogs:

закрытие соединения AMQP <0.12978.0> (127.0.0.1:47224 -> 127.0.0.1:5672, клиент неожиданно закрыл соединение TCP пропустил тактовые импульсы от клиента, время ожидания: 60 с = ОТЧЕТ ОБ ОШИБКЕ ==== 18 мая 2018 года :: 17: 01: 40 === закрытие соединения AMQP <0.12942.0> (127.0.0.1:47214 -> 127.0.0.1:5672): пропущенные импульсы от клиента, время ожидания: 60 с =ОТЧЕТ ОБ ОШИБКАХ ==== 18 мая 2018 года :: 17: 09: 12 === закрытие соединения AMQP <0.12602.0> (127.0.0.1:46896 -> 127.0.0.1:5672):

Вот код:

import pika
import uuid
from socket import error as SocketError
import errno
import traceback
from logging_settings import app_log

class RpcClient(object):
    def __init__(self, conf):
        credentials = pika.PlainCredentials(conf.uname, conf.password)
        parameters = pika.ConnectionParameters(host=conf.host, 
        credentials=credentials, connection_attempts=1000,
        retry_delay=5,heartbeat_interval=25, socket_timeout=30000000)
        self.connection = pika.BlockingConnection(parameters)
        self.rpc_exchange = conf.rpc_exchange
        self.rpc_key = conf.rpc_key
        self.rpc_header_key = conf.rpc_header_key
        self.rpc_header_value = conf.rpc_header_value

        print('credentials: ',credentials)
        print('Connection: ', self.connection)
        print('rpc_key: ', self.rpc_header_key)

        self.channel = self.connection.channel()
        print('Channel: ', self.channel)

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
        print('----------------')

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = str(body, 'utf-8')

    def call(self, body):
        try:
            self.response = None
            self.corr_id = str(uuid.uuid4())
            print('Call Body: ', body)
            self.channel.basic_publish(exchange=self.rpc_exchange,
                                       routing_key=self.rpc_key,
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id=self.corr_id,
                                           headers={self.rpc_header_key: self.rpc_header_value}
                                       ),
                                       body=str(body))
            while self.response is None:
                self.connection.process_data_events()
            return self.response
        # except pika.exceptions.ConnectionClosed as err:
        #     print('----------Connection Closed------------')
        #     app_log.error("Connection closed error: " + traceback.format_exc())
        #     time.sleep(1)
        #     continue
        #     # raise err
        # except pika.exceptions.ChannelClosed as err:
        #     print('----------Channel Closed------------')
        #     app_log.error("Channel Closed error: " + traceback.format_exc())
        #     time.sleep(1)
        #     continue
        except:
            if self.channel is not None:
                self.channel.close()
            if self.connection is not None:
                self.connection.close()

Возможно, проблема связана с моим компьютером?"ConnectionResetError (104, 'Сброс соединения по пиру')")?

Или размещение self.connection.process_data_events () в моем коде?

...