Существуют некоторые расхождения в том, как работает холст Celery в асинхронном и активном режиме. Я заметил, что группа, за которой следует цепочка в динамической задаче, которая заменяет себя, не отправляет результаты следующему в цепочке.
Ну, это кажется сложным, позвольте мне показать пример:
Дано следующее задание:
@shared_task(bind=True)
def grouped(self, val):
task = (
group(asum.s(val, n) for n in range(val)) | asum.s(val)
)
raise self.replace(task)
когда оно сгруппировано на другом холсте, как это:
@shared_task(bind=True)
def flow(self, val):
workflow = (asum.s(1, val) |
asum.s(2) |
grouped.s() |
amul.s(3))
return self.replace(workflow)
задание amul не будет получать результаты от сгруппированных в активном режиме.
Чтобы по-настоящему проиллюстрировать проблему, я создал пример проекта на github, где вы можете погрузиться в проблему и помочь мне с некоторыми быстрыми решениями и, возможно, с некоторыми пиарщиками в проекте по сельдерею.
https://github.com/gutomaia/celery_equation
---- отредактировано ----
В проекте я заявляю о различном поведении обоих способов использования сельдерея. В асинхронном режиме эти задачи работают должным образом.
>>> from equation.main import *
>>> from equation.tasks import *
>>> flow.delay(1).get()
78
>>> flow.delay(2).get()
120
>>> flow.delay(100).get()
47895