Мне нужно делать запросы к API с некоторыми параметрами и обрабатывать данные из запросов один за другим. Но иногда данные ответа разбиваются на страницы, что означает, что мне нужно сделать дополнительный запрос с такими же параметрами. Celery group
позволяет нам запускать задачи одну за другой, но если задачи производят дочерние задачи, а дочерние задачи могут создавать больше дочерних задач ...,
Есть ли способ дождаться завершения всех дочерних задач перед запуском следующее задание в группе? Или, может быть, сельдерей лучше подойдёт к решению моей задачи?
def some_api_call(start_date=None, end_date=None, token_value=None):
pass
@celery_app.task(name='run_task', max_retries=None, bind=True)
def run_task(self):
group_items = [
task1.s('2020-01-02', '2020-01-03'),
task1.s('2020-01-05', '2020-01-01'),
task1.s('2020-01-010', '2020-01-04'),
]
group(group_items)()
@celery_app.task(name='task1', max_retries=None, bind=True)
def task1(self, start_date=None, end_date=None, token_value=None, *args, **kwargs):
res = some_api_call(start_date, end_date, token_value)
if res['token_value']:
# NEXT ELEMENT IN THE GROUP SHOULD WAIT UNTIL NESTED CHILD TASKS DONE
task1.delay(token_value=token_value)
Брокер - Redis
. Мои возможные решения [псевдокод]:
- Подождите, пока дочерние задачи будут выполнены в родительской задаче
res = task1.delay(token_value=token_value)
res.get()
Решение плохое - мы блокируем протектор. Не уверен, что у сельдерея есть альтернатива.
Пользователь
task.retry()
, чтобы проверить, завершена ли дочерняя задача.
taskid = task1.delay(token_value=token_value)
if AsyncResult(taskid).state != "successes":
self.retry()
Таким образом, мы будем повторять родительские задачи до тех пор, пока дочерние задачи не завершатся, и не блокируем поток. Но как и в первом решении: родительские задачи обработали свои данные, но состояние будет повторяться.