Сельдерей: как дождаться завершения вложенных задач sh в группе - PullRequest
4 голосов
/ 21 июня 2020

Мне нужно делать запросы к API с некоторыми параметрами и обрабатывать данные из запросов один за другим. Но иногда данные ответа разбиваются на страницы, что означает, что мне нужно сделать дополнительный запрос с такими же параметрами. Celery group позволяет нам запускать задачи одну за другой, но если задачи производят дочерние задачи, а дочерние задачи могут создавать больше дочерних задач ...,

Есть ли способ дождаться завершения всех дочерних задач перед запуском следующее задание в группе? Или, может быть, сельдерей лучше подойдёт к решению моей задачи?

def some_api_call(start_date=None, end_date=None, token_value=None):
    pass


@celery_app.task(name='run_task', max_retries=None, bind=True)
def run_task(self):
    group_items = [
        task1.s('2020-01-02', '2020-01-03'),
        task1.s('2020-01-05', '2020-01-01'),
        task1.s('2020-01-010', '2020-01-04'),
    ]
    group(group_items)()

@celery_app.task(name='task1', max_retries=None, bind=True)
def task1(self, start_date=None, end_date=None, token_value=None, *args, **kwargs):
    res = some_api_call(start_date, end_date, token_value)
    if res['token_value']:
        # NEXT ELEMENT IN THE GROUP SHOULD WAIT UNTIL NESTED CHILD TASKS DONE
        task1.delay(token_value=token_value)

Брокер - Redis. Мои возможные решения [псевдокод]:

  1. Подождите, пока дочерние задачи будут выполнены в родительской задаче
    res = task1.delay(token_value=token_value) 

    res.get()

Решение плохое - мы блокируем протектор. Не уверен, что у сельдерея есть альтернатива.

Пользователь task.retry(), чтобы проверить, завершена ли дочерняя задача.
    taskid = task1.delay(token_value=token_value)
    if AsyncResult(taskid).state != "successes": 

      self.retry()

Таким образом, мы будем повторять родительские задачи до тех пор, пока дочерние задачи не завершатся, и не блокируем поток. Но как и в первом решении: родительские задачи обработали свои данные, но состояние будет повторяться.

1 Ответ

0 голосов
/ 21 июня 2020

Если вам нужно дождаться завершения задач в группе sh, вы должны использовать примитив Chord . Кроме того, если вам нужно, чтобы ваши задачи выполнялись последовательно (одна за другой), используйте примитив Chain. Аккорд - это, по сути, цепочка Групп, и финальная задача ...

...