Сельдерей бегущая цепочка заданий в группе - PullRequest
2 голосов
/ 24 апреля 2020

Я пытаюсь запустить Celery для управления задачами и у меня возникают проблемы при выполнении нескольких задач в группе. После того, как все задачи в группе выполнены, я хочу собрать результаты. Рабочий процесс работает нормально, если в группе только 1 задание, и он ожидает завершения всех заданий. Однако, это терпит неудачу, если есть 2 или больше задач в группе, или я не выполняю это правильно. Ниже приведен пример кода

@celery2.task(name='square')
def square(a):
    log.info(f'In square group {a}')
    return a**a

@celery2.task(name='add_one')
def add_one(a):
    b = a+1
    return b

@celery2.task(name='add_one_and_square')
def add_one_and_square(a):
    return (add_one.s(a) | square.s())

@celery2.task(name='collect')
def collect(a):
    return a

@celery2.task(name='group-task')
def group_square(num):
    return group([(add_one_and_square(i)) for i in range(num)])

Рабочий процесс сельдерея:

res = (add.s(2,3) | group_square.s()|collect.s())
res.apply_async()

Ниже приведены данные, полученные из выходных данных. Я вижу, что подписи создаются не уверенно, если это правильно подход или как запустить цепочку задач в одной группе, чтобы она была похожа на одну задачу в группе.

{'task': 'celery.group',
 'args': [],
 'kwargs': {'tasks': [{'task': 'celery.chain',
    'args': [],
    'kwargs': {'tasks': [{'task': 'add_one',
       'args': [0],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None},
      {'task': 'square',
       'args': [],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None}]},
    'options': {},
    'subtask_type': 'chain',
    'immutable': False,
    'chord_size': None},
   {'task': 'celery.chain',
    'args': [],
    'kwargs': {'tasks': [{'task': 'add_one',
       'args': [1],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None},
      {'task': 'square',
       'args': [],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None}]},
    'options': {},
    'subtask_type': 'chain',
    'immutable': False,
    'chord_size': None},
   {'task': 'celery.chain',
    'args': [],
    'kwargs': {'tasks': [{'task': 'add_one',
       'args': [2],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None},
      {'task': 'square',
       'args': [],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None}]},
    'options': {},
    'subtask_type': 'chain',
    'immutable': False,
    'chord_size': None},
   {'task': 'celery.chain',
    'args': [],
    'kwargs': {'tasks': [{'task': 'add_one',
       'args': [3],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None},
      {'task': 'square',
       'args': [],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None}]},
    'options': {},
    'subtask_type': 'chain',
    'immutable': False,
    'chord_size': None},
   {'task': 'celery.chain',
    'args': [],
    'kwargs': {'tasks': [{'task': 'add_one',
       'args': [4],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None},
      {'task': 'square',
       'args': [],
       'kwargs': {},
       'options': {},
       'subtask_type': None,
       'immutable': False,
       'chord_size': None}]},
    'options': {},
    'subtask_type': 'chain',
    'immutable': False,
    'chord_size': None}]},
 'options': {},
 'subtask_type': 'group',
 'immutable': False,
 'chord_size': None}

Буду признателен за любую проницательность. Спасибо!

1 Ответ

1 голос
/ 24 апреля 2020

Я думаю, что запуск задачи сельдерея в рамках другой задачи сельдерея является плохой практикой и в некоторых случаях может привести к тупику (я думаю, что это также где-то в документации). Если вы хотите сделать это - может быть безопаснее запустить его ayn c.

В вашем сценарии я предлагаю добавить вызов сбора в задачу group_square. Что-то вроде:

@celery2.task(name='group-task')
def group_square(num):
    canvas_flow = group([(add_one_and_square.si(i)) for i in range(num)]) | collect.s()
    return canvas_flow.apply_async()

теперь результат group_square будет ResultAsync чего-то подобного. Вы можете проверить всякий раз, когда он .ready(), а затем получить .result().

...