Я пытаюсь запустить Celery для управления задачами и у меня возникают проблемы при выполнении нескольких задач в группе. После того, как все задачи в группе выполнены, я хочу собрать результаты. Рабочий процесс работает нормально, если в группе только 1 задание, и он ожидает завершения всех заданий. Однако, это терпит неудачу, если есть 2 или больше задач в группе, или я не выполняю это правильно. Ниже приведен пример кода
@celery2.task(name='square')
def square(a):
log.info(f'In square group {a}')
return a**a
@celery2.task(name='add_one')
def add_one(a):
b = a+1
return b
@celery2.task(name='add_one_and_square')
def add_one_and_square(a):
return (add_one.s(a) | square.s())
@celery2.task(name='collect')
def collect(a):
return a
@celery2.task(name='group-task')
def group_square(num):
return group([(add_one_and_square(i)) for i in range(num)])
Рабочий процесс сельдерея:
res = (add.s(2,3) | group_square.s()|collect.s())
res.apply_async()
Ниже приведены данные, полученные из выходных данных. Я вижу, что подписи создаются не уверенно, если это правильно подход или как запустить цепочку задач в одной группе, чтобы она была похожа на одну задачу в группе.
{'task': 'celery.group',
'args': [],
'kwargs': {'tasks': [{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [0],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [1],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [2],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [3],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [4],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'group',
'immutable': False,
'chord_size': None}
Буду признателен за любую проницательность. Спасибо!