Вот так выглядит конфигурация сельдерея
from celery import Celery, group
celery = Celery('grouptest', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
celery.conf.CELERY_TASK_SERIALIZER = 'pickle'
celery.conf.CELERY_RESULT_SERIALIZER ='pickle'
celery.conf.CELERY_ACCEPT_CONTENT = {'json', 'pickle'}
@celery.task
def add(self, x,y):
print('Executing with arguments', x, y)
return x+y
Я запускаю демона работника сельдерея с помощью команды (в том же рабочем каталоге)
$celery worker -A grouptest -l info -c 5
в терминале.
Далее из другого процесса я вызываю службу следующим образом.
bigList=[(randint(60, 90), randint(60, 90)) for _ in range(10)]
jobresult=group([add.s(celery, i[0], i[1]) for i in bigList]).apply_async()
#Basically adding a ten pairs of random numbers
Забавно, только некоторые задачи выполняются даже после долгого ожидания. Например, jobresult[0].result
дает сумму двух чисел просто отлично, но jobresult[1].result
говорит Task of kind 'grouptest.add' never registered, please make sure it's imported.
Я даже проверяю, установлен ли jobresult.ready()
True
в REPL для Python. Иногда он выдает ошибку, иногда нет, в том же сеансе REPL. (После некоторого наблюдения, как я догадался, оно переходит от False
к ошибке до True
.)
Я новичок в сельдерее и следую некоторым шаблонам, но как убедиться, что я правильно выполнил задание зарегистрировано (что бы это ни значило), и все из них выполняются последовательно. Если мой код был неправильным, по крайней мере, ошибки были бы последовательными, не так ли?