Создание большего количества заданий из сельдерея в группе в цепочке - PullRequest
1 голос
/ 19 октября 2019

У меня есть цепочка задач Celery, которая извлекает кучу документов через HTTP-запросы и выглядит примерно так:

  1. Получить блокировку для fetch-batch
  2. Получить все элементы для пакета
  3. Снять блокировку для пакетной выборки

Шаг 2 - это группа, состоящая из нескольких отдельных задач выборки, каждая из которых обрабатывает один элемент в пакете. Все выборки выполняются параллельно.

Шаг 3 должен быть выполнен после того, как все задачи в шаге 2 выполнены.

В коде это примерно выглядит так:

   (acquire_lock.s() |
    celery.group([fetch.s(item) for item in batch]) |
    release_lock.s()).delay()

Теперь проблема в том, что выбранный элемент может содержать ссылки на дополнительные элементы, которые необходимо получить. Я хотел бы также запускать эти дополнительные извлечения как отдельные задачи, чтобы их можно было распараллеливать.

Есть ли способ для моей fetch -задачи вызывать больше задач таким образом, чтобы они заканчивалисьв той же группе, что и fetch -задача, то есть такая, что моя release_lock -задача не запускается до тех пор, пока все эти задачи также не будут выполнены?

Я пытался просто порождать эти задачи независимо и иметьродительская fetch -задача ждет их, используя get(), и это вроде работает, но это связывает одного работника на родительскую задачу и, таким образом, может фактически привести к тупикам, поэтому, вероятно, вызов get() из задачи агрессивноне одобряется API и документацией.

...