Я настроил периодическую задачу в Flask + Celery + RabbitMQ + Flower, задача которой заключается в том, чтобы синхронизировать запись из MySQL в MongoDB каждые 15 минут.Моя среда выглядит следующим образом:
сельдерей 4.2.1
RabbitMQ 3.7.7
peewee 3.0.19
колба 1.0.2
Цветок 0.9.2
Я установил параметр wait_timeout в peewee равным 600 секундам.
Рабочий сельдерея и биение сельдерея запускались в отдельных терминалах, как и ожидалось.Примерно через 5 минут в соответствующих окнах терминала возникли ошибки:
работник сельдерея
[2018-10-19 13:48:13,997: ERROR/MainProcess] Control command error: error(54, 'Connection reset by peer')
Traceback (most recent call last):
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/celery/worker/pidbox.py", line 46, in on_message
self.node.handle_message(body, message)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 129, in handle_message
return self.dispatch(**body)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 112, in dispatch
ticket=ticket)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 135, in reply
serializer=self.mailbox.serializer)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 265, in _publish_reply
**opts
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 194, in _publish
[maybe_declare(entity) for entity in declare]
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 102, in maybe_declare
return maybe_declare(entity, self.channel, retry, **retry_policy)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/common.py", line 129, in maybe_declare
return _maybe_declare(entity, declared, ident, channel, orig)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/common.py", line 135, in _maybe_declare
entity.declare(channel=channel)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/entity.py", line 185, in declare
nowait=nowait, passive=passive,
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/channel.py", line 614, in exchange_declare
wait=None if nowait else spec.Exchange.DeclareOk,
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/abstract_channel.py", line 59, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/abstract_channel.py", line 79, in wait
self.connection.drain_events(timeout=timeout)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/connection.py", line 491, in drain_events
while not self.blocking_read(timeout):
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/connection.py", line 496, in blocking_read
frame = self.transport.read_frame()
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/transport.py", line 243, in read_frame
frame_header = read(7, True)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/transport.py", line 418, in _read
s = recv(n - len(rbuf))
error: [Errno 54] Connection reset by peer
Цветок
[E 181019 12:34:28 gen:888] Multiple exceptions in yield list
Traceback (most recent call last):
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/concurrent/futures/_base.py", line 455, in result
return self.__get_result()
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/concurrent/futures/thread.py", line 63, in run
result = self.fn(*self.args, **self.kwargs)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/celery/app/control.py", line 130, in active_queues
return self._request('active_queues')
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/celery/app/control.py", line 95, in _request
timeout=self.timeout, reply=True,
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/celery/app/control.py", line 454, in broadcast
limit, callback, channel=channel,
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
serializer=serializer)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
serializer=serializer,
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/channel.py", line 1732, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/transport.py", line 275, in write
self._write(s)
File "/anaconda3/envs/snakes/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
Может ли кто-нибудь помочь и объяснить, как решить эту проблему?