Сельдерей отключается от брокера - PullRequest
0 голосов
/ 12 октября 2019

Я использую Python с RabbitMQ и Celery, чтобы распределить задачи по работнику. Задачи занимают около 15 минут каждая и на 99% связаны с процессором. В моей системе 24 ядра, и всякий раз, когда мой работник выполняет эту задачу, я получаю сообщение об ошибке с брокером.

[2019-10-12 08:49:57,695: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
[...]
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

Я обнаружил несколько других сообщений с этой проблемой, но ни одна из них не устранила ее. Особенно с большой нагрузкой на процессор, есть идеи, как мне это решить?

Windows 10 (рабочий)

macOS 10.14 (RabbitMQ Server)

Python 3.7

Сельдерей 4.3.0 (ревень)

RabbitMQ 3.7.16 (Erlang 22.0.7 )

Моя конфигурация позволяет работнику потреблять только 1 задание за раз , и даже рабочий процесс перезапускается после каждого задания, но все равно не повезло:

CELERYD_MAX_TASKS_PER_CHILD = 1,
CELERYD_CONCURRENCY = 1,
CELERY_TASK_RESULT_EXPIRES=3600,
CELERYD_PREFETCH_MULTIPLIER = 1,
CELERY_MAX_CACHED_RESULTS = 1,
CELERY_ACKS_LATE = True,

И это весь стек вызовов:

[2019-10-12 08:49:57,695: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "C:\Python37\lib\site-packages\celery\worker\consumer\consumer.py", line 318, in start
    blueprint.start(self)
File "C:\Python37\lib\site-packages\celery\bootsteps.py", line 119, in start
    step.start(parent)
File "C:\Python37\lib\site-packages\celery\worker\consumer\consumer.py", line 596, in start
    c.loop(*c.loop_args())
File "C:\Python37\lib\site-packages\celery\worker\loops.py", line 118, in synloop
    qos.update()
File "C:\Python37\lib\site-packages\kombu\common.py", line 442, in update
    return self.set(self.value)
File "C:\Python37\lib\site-packages\kombu\common.py", line 435, in set
    self.callback(prefetch_count=new_value)
File "C:\Python37\lib\site-packages\celery\worker\consumer\tasks.py", line 47, in set_prefetch_count
    apply_global=qos_global,
File "C:\Python37\lib\site-packages\kombu\messaging.py", line 558, in qos
    apply_global)
File "C:\Python37\lib\site-packages\amqp\channel.py", line 1853, in basic_qos
    wait=spec.Basic.QosOk,
File "C:\Python37\lib\site-packages\amqp\abstract_channel.py", line 68, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
File "C:\Python37\lib\site-packages\amqp\abstract_channel.py", line 88, in wait
    self.connection.drain_events(timeout=timeout)
File "C:\Python37\lib\site-packages\amqp\connection.py", line 504, in drain_events
    while not self.blocking_read(timeout):
File "C:\Python37\lib\site-packages\amqp\connection.py", line 509, in blocking_read
    frame = self.transport.read_frame()
File "C:\Python37\lib\site-packages\amqp\transport.py", line 252, in read_frame
    frame_header = read(7, True)
File "C:\Python37\lib\site-packages\amqp\transport.py", line 438, in _read
    s = recv(n - len(rbuf))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
...