У меня есть цепочка задач Celery, которая извлекает кучу документов через HTTP-запросы и выглядит примерно так:
- Получить блокировку для fetch-batch
- Получить все элементы для пакета
- Снять блокировку для пакетной выборки
Шаг 2 - это группа, состоящая из нескольких отдельных задач выборки, каждая из которых обрабатывает один элемент в пакете. Все выборки выполняются параллельно.
Шаг 3 должен быть выполнен после того, как все задачи в шаге 2 выполнены.
В коде это примерно выглядит так:
(acquire_lock.s() |
celery.group([fetch.s(item) for item in batch]) |
release_lock.s()).delay()
Теперь проблема в том, что выбранный элемент может содержать ссылки на дополнительные элементы, которые необходимо получить. Я хотел бы также запускать эти дополнительные извлечения как отдельные задачи, чтобы их можно было распараллеливать.
Есть ли способ для моей fetch
-задачи вызывать больше задач таким образом, чтобы они заканчивалисьв той же группе, что и fetch
-задача, то есть такая, что моя release_lock
-задача не запускается до тех пор, пока все эти задачи также не будут выполнены?
Я пытался просто порождать эти задачи независимо и иметьродительская fetch
-задача ждет их, используя get()
, и это вроде работает, но это связывает одного работника на родительскую задачу и, таким образом, может фактически привести к тупикам, поэтому, вероятно, вызов get()
из задачи агрессивноне одобряется API и документацией.