Воздушный поток - использование задачи вверх по течению для нескольких задач вниз по течению - PullRequest
0 голосов
/ 16 мая 2018

Хорошо, я прошу прощения, если это глупый вопрос, потому что кажется настолько очевидным, что он должен работать.Но я не могу найти это документированным, и, поскольку мы рассматриваем наши варианты в процессе создания нового конвейера данных, я действительно хочу, чтобы это было функцией ...

Могут ли несколько последующих процессов зависеть отодин восходящий процесс, где восходящий процесс выполняется только один раз.Другими словами, могу ли я извлечь таблицу один раз, а затем загрузить ее в свое хранилище данных и получить несколько агрегаций, зависящих от того, что загрузка завершена?

Для получения дополнительной информации мы пытаемсяперейдите к асинхронному извлечению-загрузке-преобразованию, где начинается извлечение, а затем загрузки и преобразования завершаются, как только у них есть подмножество таблиц, которые им нужны из извлечения.

Ответы [ 2 ]

0 голосов
/ 16 мая 2018

Если я понимаю вопрос, да, вы можете настроить последующие задачи в зависимости от успеха вышестоящей задачи.

Мы используем dummyOperators во многих случаях, подобных этому DAG: enter image description here

В случае, если мы хотим, чтобы dummyOperator сначала сработал и что-то сделал, прежде чем сработают последующие задачи. Это облегчает очистку неудачных запусков, так как мы можем просто очистить фиктивный оператор и последующие задачи одновременно.

Вы можете использовать параметр depends_on_past=True, чтобы требовать выполнения вышестоящих задач до того, как последующие задачи будут поставлены в очередь, в противном случае их можно пропустить на основе логики в вышестоящей задаче.

0 голосов
/ 16 мая 2018

Мне кажется, что это обычный DAG с необычной формулировкой. Я понимаю необходимую структуру следующим образом:

extract_table_task \
                   |- task1_do_stuff
                   |- task2_do_some_other_stuff
                   |- task3_...

Или в коде воздушного потока:

extract_table_task.set_downstream(task1_do_stuff)
extract_table_task.set_downstream(task2_do_some_other_stuff)
extract_table_task.set_downstream(task3_...)

Затем убедитесь, что вы выбрали правильные правила запуска для вашего рабочего процесса, например, если некоторые задачи должны выполняться, даже если что-то пошло не так: https://airflow.apache.org/concepts.html#trigger-rules

...