У меня есть исключение RabbitMQ pika.exceptions.ConnectionClosed, появляющееся время от времени, и я уже просматривал различные источники, в том числе
, где говорится об использованииheartbeats установлен в 0 и использует connection.process_data_events в цикле, который я использую в своем коде, но все равно получаю исключение.
Файл "/ home / developer5/testfiles/dreamlead/text_cleaner/rpc_client.py ", строка 50, в теле вызова = str (body)) Файл" /usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py ",строка 2077, в basic_publish обязательно, немедленно) Файл "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", строка 2164, в файле publish self._flush_output () "/ usr /local / lib / python3.5 / dist-packages / pika / adapters / blocking_connection.py ", строка 1250, в _flush_output * waiters) Файл" /usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py ", строка 474, в _flush_output result.reason_text) pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError (104, 'Connection reset by peer')")
Трассировка (последний вызов был последним): файл "app.py", строка 120, в handle_delivery registry_text = text_rpc_client.call (rpc_request_body) Файл "/home/developer5/testfiles/dreamlead/text_cleaner/rpc_client.py", строка 67, вызов файла self.channel.close () Файл "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", строка 1426, в закрытом файле self._cancel_all_consumers ()"/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", строка 1389, в файле _cancel_all_consumers self.basic_cancel (consumer_tag) "/usr/local/lib/python3.5/dist-packages / pika / adapters / blocking_connection.py ", строка 1675, в basic_cancel consumer_tag в собственной.MqLogs:
закрытие соединения AMQP <0.12978.0> (127.0.0.1:47224 -> 127.0.0.1:5672, клиент неожиданно закрыл соединение TCP пропустил тактовые импульсы от клиента, время ожидания: 60 с = ОТЧЕТ ОБ ОШИБКЕ ==== 18 мая 2018 года :: 17: 01: 40 === закрытие соединения AMQP <0.12942.0> (127.0.0.1:47214 -> 127.0.0.1:5672): пропущенные импульсы от клиента, время ожидания: 60 с =ОТЧЕТ ОБ ОШИБКАХ ==== 18 мая 2018 года :: 17: 09: 12 === закрытие соединения AMQP <0.12602.0> (127.0.0.1:46896 -> 127.0.0.1:5672):
Вот код:
import pika
import uuid
from socket import error as SocketError
import errno
import traceback
from logging_settings import app_log
class RpcClient(object):
def __init__(self, conf):
credentials = pika.PlainCredentials(conf.uname, conf.password)
parameters = pika.ConnectionParameters(host=conf.host,
credentials=credentials, connection_attempts=1000,
retry_delay=5,heartbeat_interval=25, socket_timeout=30000000)
self.connection = pika.BlockingConnection(parameters)
self.rpc_exchange = conf.rpc_exchange
self.rpc_key = conf.rpc_key
self.rpc_header_key = conf.rpc_header_key
self.rpc_header_value = conf.rpc_header_value
print('credentials: ',credentials)
print('Connection: ', self.connection)
print('rpc_key: ', self.rpc_header_key)
self.channel = self.connection.channel()
print('Channel: ', self.channel)
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
print('----------------')
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = str(body, 'utf-8')
def call(self, body):
try:
self.response = None
self.corr_id = str(uuid.uuid4())
print('Call Body: ', body)
self.channel.basic_publish(exchange=self.rpc_exchange,
routing_key=self.rpc_key,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
headers={self.rpc_header_key: self.rpc_header_value}
),
body=str(body))
while self.response is None:
self.connection.process_data_events()
return self.response
# except pika.exceptions.ConnectionClosed as err:
# print('----------Connection Closed------------')
# app_log.error("Connection closed error: " + traceback.format_exc())
# time.sleep(1)
# continue
# # raise err
# except pika.exceptions.ChannelClosed as err:
# print('----------Channel Closed------------')
# app_log.error("Channel Closed error: " + traceback.format_exc())
# time.sleep(1)
# continue
except:
if self.channel is not None:
self.channel.close()
if self.connection is not None:
self.connection.close()
Возможно, проблема связана с моим компьютером?"ConnectionResetError (104, 'Сброс соединения по пиру')")?
Или размещение self.connection.process_data_events () в моем коде?