Ниже фрагмент кода - это настройка сельдерея, которая у меня есть. Здесь я запускаю скрученный реактор в каждом дочернем рабочем процессе сельдерея.
import os
from threading import Thread
from celery import Celery
from twisted.internet import threads, reactor
from celery import signals
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
thread = Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
print('STARTED NEW WORKER', os.getpid())
@signals.worker_process_shutdown.connect()
def shutdown_reactor(**kwargs):
reactor.callFromThread(reactor.stop)
print('REACTOR SHUTDOWN', os.getpid())
class OrgUnitEventHandler():
@inlineCallbacks
def process(self, *args, **kwargs):
val = sum(args)
yield val
returnValue(val)
def inThread(x, y):
obj = OrgUnitEventHandler()
output = threads.blockingCallFromThread(reactor,
obj.process, x, y)
return output
@app.task
def add(x, y):
print('ADD --> '+str(os.getpid()))
result = inThread(x, y)
print("FINAL RESULT--> "+str(result)+"-->"+str(os.getpid()))
return result
С помощью приведенного выше кода я большую часть времени вижу рабочего сельдерея завис и был освобожден, когда новая задача снова выполняется на том же воркере. см. журналы ниже ..
[2020-05-07 20:12:26,481: WARNING/ForkPoolWorker-1] STARTED NEW WORKER
[2020-05-07 20:12:26,481: WARNING/ForkPoolWorker-1] 11651
[2020-05-07 20:12:26,498: WARNING/ForkPoolWorker-2] STARTED NEW WORKER
[2020-05-07 20:12:26,499: WARNING/ForkPoolWorker-2] 11653
[2020-05-07 20:12:26,515: WARNING/ForkPoolWorker-3] STARTED NEW WORKER
[2020-05-07 20:12:26,515: WARNING/ForkPoolWorker-3] 11655
[2020-05-07 20:12:26,533: WARNING/ForkPoolWorker-4] STARTED NEW WORKER
[2020-05-07 20:12:26,534: WARNING/ForkPoolWorker-4] 11659
[2020-05-07 20:12:32,239: WARNING/ForkPoolWorker-1] ADD --> 11651 # task-1 hang
[2020-05-07 20:12:36,611: WARNING/ForkPoolWorker-2] ADD --> 11653 # task-2 hang
[2020-05-07 20:13:00,858: WARNING/ForkPoolWorker-3] ADD --> 11655 # task-3 hang
[2020-05-07 20:13:00,859: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->11653 # task-2 Done
[2020-05-07 20:13:00,859: WARNING/ForkPoolWorker-3] FINAL RESULT--> 3-->11655 # task-3 done
Я никогда не вижу проблемы с зависанием сельдерея, когда я использую Loopingcall
при запуске реактора.
...
class EventLoop(object):
def _startLoopingCall(self, reactor):
lc = LoopingCall(print)
lc.start(1, False)
def setup(self, reactor):
reactor.callFromThread(self._startLoopingCall, reactor)
thread = Thread(target=lambda: reactor.run(installSignalHandlers=False),name="reactor ").start()
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
EventLoop().setup(reactor)
#thread = Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
print('STARTED NEW WORKER', os.getpid())
...
Журналы, подтверждающие, что рабочие сельдерея никогда не висели с указанным выше кодом:
[2020-05-07 20:22:08,824: WARNING/ForkPoolWorker-1] STARTED NEW WORKER
[2020-05-07 20:22:08,824: WARNING/ForkPoolWorker-1] 12302
[2020-05-07 20:22:08,841: WARNING/ForkPoolWorker-2] STARTED NEW WORKER
[2020-05-07 20:22:08,841: WARNING/ForkPoolWorker-2] 12304
[2020-05-07 20:22:08,857: WARNING/ForkPoolWorker-3] STARTED NEW WORKER
[2020-05-07 20:22:08,857: WARNING/ForkPoolWorker-3] 12306
[2020-05-07 20:22:08,873: WARNING/ForkPoolWorker-4] STARTED NEW WORKER
[2020-05-07 20:22:08,874: WARNING/ForkPoolWorker-4] 12308
[2020-05-07 20:22:12,446: WARNING/ForkPoolWorker-1] ADD --> 12302
[2020-05-07 20:22:12,826: WARNING/ForkPoolWorker-1] FINAL RESULT--> 3-->12302
[2020-05-07 20:22:15,489: WARNING/ForkPoolWorker-2] ADD --> 12304
[2020-05-07 20:22:15,842: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->12304
[2020-05-07 20:22:17,828: WARNING/ForkPoolWorker-1] ADD --> 12302
[2020-05-07 20:22:17,828: WARNING/ForkPoolWorker-1] FINAL RESULT--> 3-->12302
[2020-05-07 20:22:20,475: WARNING/ForkPoolWorker-2] ADD --> 12304
[2020-05-07 20:22:20,842: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->12304
Я не могу понять, что такое маг c LoopingCall тут делает с закрученным реактором, чтобы он работал нормально. Поделитесь своими идеями