Неожиданные ошибки произошли в Celery + RabbitMQ для периодических задач - PullRequest
0 голосов
/ 19 октября 2018

Я настроил периодическую задачу в Flask + Celery + RabbitMQ + Flower, задача которой заключается в том, чтобы синхронизировать запись из MySQL в MongoDB каждые 15 минут.Моя среда выглядит следующим образом:

сельдерей 4.2.1

RabbitMQ 3.7.7

peewee 3.0.19

колба 1.0.2

Цветок 0.9.2

Я установил параметр wait_timeout в peewee равным 600 секундам.

Рабочий сельдерея и биение сельдерея запускались в отдельных терминалах, как и ожидалось.Примерно через 5 минут в соответствующих окнах терминала возникли ошибки:

работник сельдерея

[2018-10-19 13:48:13,997: ERROR/MainProcess] Control command error: error(54, 'Connection reset by peer')
Traceback (most recent call last):
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/celery/worker/pidbox.py", line 46, in on_message
    self.node.handle_message(body, message)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 129, in handle_message
    return self.dispatch(**body)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 112, in dispatch
    ticket=ticket)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 135, in reply
    serializer=self.mailbox.serializer)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 265, in _publish_reply
    **opts
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 194, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/common.py", line 129, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel, orig)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/common.py", line 135, in _maybe_declare
    entity.declare(channel=channel)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/entity.py", line 185, in declare
    nowait=nowait, passive=passive,
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/channel.py", line 614, in exchange_declare
    wait=None if nowait else spec.Exchange.DeclareOk,
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/abstract_channel.py", line 59, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/abstract_channel.py", line 79, in wait
    self.connection.drain_events(timeout=timeout)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/transport.py", line 243, in read_frame
    frame_header = read(7, True)
  File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/transport.py", line 418, in _read
    s = recv(n - len(rbuf))
error: [Errno 54] Connection reset by peer

Цветок

[E 181019 12:34:28 gen:888] Multiple exceptions in yield list
    Traceback (most recent call last):
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/tornado/gen.py", line 883, in callback
        result_list.append(f.result())
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/concurrent/futures/_base.py", line 455, in result
        return self.__get_result()
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/concurrent/futures/thread.py", line 63, in run
        result = self.fn(*self.args, **self.kwargs)
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/celery/app/control.py", line 130, in active_queues
        return self._request('active_queues')
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/celery/app/control.py", line 95, in _request
        timeout=self.timeout, reply=True,
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/celery/app/control.py", line 454, in broadcast
        limit, callback, channel=channel,
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
        serializer=serializer)
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
        serializer=serializer,
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
        exchange_name, declare,
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/kombu/messaging.py", line 203, in _publish
        mandatory=mandatory, immediate=immediate,
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/channel.py", line 1732, in _basic_publish
        (0, exchange, routing_key, mandatory, immediate), msg
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/abstract_channel.py", line 50, in send_method
        conn.frame_writer(1, self.channel_id, sig, args, content)
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/method_framing.py", line 166, in write_frame
        write(view[:offset])
      File "/anaconda3/envs/snakes/lib/python2.7/site-packages/amqp/transport.py", line 275, in write
        self._write(s)
      File "/anaconda3/envs/snakes/lib/python2.7/socket.py", line 228, in meth
        return getattr(self._sock,name)(*args)
    error: [Errno 32] Broken pipe

Может ли кто-нибудь помочь и объяснить, как решить эту проблему?

...