Воздушный поток: правильный способ запуска DAG для каждого файла - PullRequest
0 голосов
/ 05 февраля 2020

У меня есть следующая задача:

Файлы отправляются через нерегулярное время через конечную точку и хранятся локально. Мне нужно запустить DAG для каждого из этих файлов. Для каждого файла будут выполняться одинаковые задачи

В целом потоки выглядят следующим образом: Для каждого файла выполните задачи A-> B -> C -> D

Файлы обрабатываются партиями. Хотя эта задача казалась мне тривиальной, я нашел несколько способов сделать это, и меня смущает, какой из них является «правильным» (если есть).

Первый шаблон: используйте экспериментальный REST API для запуска dag .

То есть предоставить веб-сервис, который принимает запрос и файл, сохраняет его в папке и использует экспериментальный REST api для запуска DAG, передавая file_id как conf

Минусы : API REST по-прежнему экспериментальные , не уверен, как Airflow может обрабатывать нагрузочный тест с большим количеством запросов, приходящих в одну точку (что не должно происходить, но что, если это произойдет?)

Второй шаблон: 2 знака. Один распознает и запускает с помощью TriggerDagOperator, другой обрабатывает.

Всегда использует тот же ws, как описано выше, но на этот раз он сохраняет файл. Тогда у нас есть:

  • Первый символ: Использует FileSensor вместе с TriggerDagOperator для запуска N пакетов с данными N файлами
  • Второй код: Задача A-> B -> C

Минусы : необходимо избегать отправки одних и тех же файлов на два разных прогона DAG. Пример:

Файлы в папке x. json Датчик находит x, запускает DAG (1)

Датчик возвращается в исходное состояние и снова запускается по расписанию. Если DAG (1) не обработал / не переместил файл, DAG датчика может перенести новый прогон DAG с тем же файлом. Что нежелательно.

Третий шаблон: для файла в файлах, задача A-> B -> C

Как видно из этого вопроса .

Минусы : Это может сработать, однако мне не нравится, что пользовательский интерфейс, вероятно, будет испорчен, потому что каждый прогон DAG не будет выглядеть одинаково, но будет меняться в зависимости от количества обрабатываемых файлов. Кроме того, если нужно обработать 1000 файлов, запуск, вероятно, будет очень трудным для чтения

Четвертый шаблон: используйте подделки

Я пока не уверен, как они полностью работают, как я видел они не поощряются (в конце), однако должна быть возможность создавать подпакет для каждого файла и запускать его. Похоже на этот вопрос .

Минусы : похоже, субдаги можно использовать только с последовательным исполнителем.


Я что-то упустил и обдумывание чего-то, что должно быть (на мой взгляд) довольно простым? Спасибо

Ответы [ 2 ]

1 голос
/ 06 февраля 2020

Похоже, что вы должны иметь возможность запустить пакетный процессор с оператором bash, чтобы очистить папку, просто убедитесь, что вы установили depends_on_past=True для своего пакета, чтобы убедиться, что папка успешно очищена, прежде чем в следующий раз Даг запланирован.

0 голосов
/ 08 февраля 2020

Я нашел эту статью: https://medium.com/@igorlubimov / dynamici c -scheduling-in-airflow-52979b3e6b13

, где используется новый оператор, а именно TriggerMultiDagRunOperator. Я думаю, что это соответствует моим потребностям.

...