Текущая проблема, с которой я сталкиваюсь, состоит в том, что у меня есть документы в коллекции MongoDB, каждый из которых должен обрабатываться и обновляться с помощью задач, которые должны выполняться в ациклическом графе зависимостей. Если задача, находящаяся выше по течению, не может обработать документ, то ни одна из зависимых задач не может обработать этот документ, так как этот документ не был обновлен с необходимой информацией.
Если бы я использовал Airflow, это оставило бы меня сдва решения:
Запуск DAG для каждого документа и передача идентификатора документа с помощью --conf
. Проблема в том, что это не тот способ, которым должен использоваться Airflow;Я бы никогда не запустил запланированный процесс, и исходя из того, как документы появляются в коллекции, я мог бы делать 1440 дагрунов в день.
Запуск DAG каждый период для обработки всех созданных документовв коллекции за этот период. Это следует из того, как ожидается, что Airflow будет работать, но проблема в том, что если задача не может обработать один документ, ни одна из зависимых задач не может обработать любой из других документов. Кроме того, если для обработки документа требуется больше времени, чем для других документов, эти другие документы ожидают этого единого документа для продолжения работы в группе обеспечения доступности баз данных.
Есть ли лучший способчем воздушный поток? Или есть лучший способ справиться с этим в Airflow, чем два метода, которые я сейчас вижу?