Воздушный поток Dagrun для каждого датума вместо запланированного - PullRequest
0 голосов
/ 16 октября 2019

Текущая проблема, с которой я сталкиваюсь, состоит в том, что у меня есть документы в коллекции MongoDB, каждый из которых должен обрабатываться и обновляться с помощью задач, которые должны выполняться в ациклическом графе зависимостей. Если задача, находящаяся выше по течению, не может обработать документ, то ни одна из зависимых задач не может обработать этот документ, так как этот документ не был обновлен с необходимой информацией.

Если бы я использовал Airflow, это оставило бы меня сдва решения:

  1. Запуск DAG для каждого документа и передача идентификатора документа с помощью --conf. Проблема в том, что это не тот способ, которым должен использоваться Airflow;Я бы никогда не запустил запланированный процесс, и исходя из того, как документы появляются в коллекции, я мог бы делать 1440 дагрунов в день.

  2. Запуск DAG каждый период для обработки всех созданных документовв коллекции за этот период. Это следует из того, как ожидается, что Airflow будет работать, но проблема в том, что если задача не может обработать один документ, ни одна из зависимых задач не может обработать любой из других документов. Кроме того, если для обработки документа требуется больше времени, чем для других документов, эти другие документы ожидают этого единого документа для продолжения работы в группе обеспечения доступности баз данных.

Есть ли лучший способчем воздушный поток? Или есть лучший способ справиться с этим в Airflow, чем два метода, которые я сейчас вижу?

Ответы [ 2 ]

0 голосов
/ 17 октября 2019

Я бы делал 1440 дагрунов в день.

При хорошей архитектуре воздушного потока это вполне возможно. Точки удушения могут быть

  1. executor - используйте Celery Executor вместо Local Executor, например
  2. backend database - отслеживайте и настраивайте по мере необходимости (индексы, правильное хранение и т. Д.)
  3. веб-сервер - ну, для тысяч dagruns, задач и т. д. возможно, используйте webeserver только для сред dev / qa, а не для производственной, где у вас более высокая скорость отправки задач / dagruns. Вы можете использовать Cli и т. Д. Вместо.

Другой подход заключается в масштабировании путем запуска нескольких экземпляров Airflow - документов разделов, скажем, до десяти сегментов, и присвоения документов каждого раздела только одному экземпляру Airflow.

0 голосов
/ 16 октября 2019

Вы можете изменить trigger_rule с "all_success" на "all_done"

https://github.com/apache/airflow/blob/62b21d747582d9d2b7cdcc34a326a8a060e2a8dd/airflow/example_dags/example_latest_only_with_trigger.py#L40

, а также создать ветку, которая обрабатывает сбойные документы с установленным trigger_ruleв "one_failed", чтобы переместить процессы этих сбойных документов как-то по-другому (например, переместиться в "сбойную" папку и отправить уведомление)

...