DAG-дизайн Airflow для ориентированного на пользователя рабочего процесса - PullRequest
0 голосов
/ 12 декабря 2018

Мы рассматриваем возможность использования воздушного потока для замены нашего в настоящее время настраиваемого рабочего процесса на основе RQ, но я не уверен в том, как лучше его спроектировать.Или, если даже имеет смысл использовать воздушный поток.Вариант использования:

  1. Мы получаем загрузку данных от пользователя.
  2. Учитывая полученные типы данных, мы по выбору запускаем ноль или более заданий
  3. Каждое задание выполняетсяесли была получена определенная комбинация типов данных.Он выполняется для этого пользователя в течение периода времени, определенного по полученным данным
  4. Задание считывает данные из базы данных и записывает результаты в базу данных.
  5. В результате этих заданий потенциально могут запускаться другие задания.

например,

После загрузки данных мы помещаем элемент в очередь:

загрузка:

user: 'a'
data:
 - type: datatype1
   start: 1
   end: 3
 - type: datatype2
   start: 2
   end: 3

И это сработало бы:

  • job1, пользователь 'a', начало: 1, конец: 3
  • job2, пользователь'a', начало: 2, конец: 3

, а затем, возможно, в job1 будет выполнено некоторое задание по очистке, которое будет выполняться после него.(Также было бы хорошо, если бы можно было ограничить выполнение заданий только при отсутствии других заданий для того же пользователя.)

Подходы, которые я рассмотрел:

1.

Запуск группы обеспечения доступности баз данных, когда загрузка данных поступает в очередь сообщений.

Затем эта группа обеспечения доступности баз данных определяет, какие зависимые задания выполнять, и передает в качестве аргументов (или xcom) пользователя и диапазон времени.

2.

Запуск группы обеспечения доступности баз данных, когда выгрузка данных поступает в очередь сообщений.

Затем эта группа обеспечения доступности баз данных динамически создает группы обеспечения доступности баз данных для заданий на основе типов данных и шаблонов пользователя и таймфрейма.,

Таким образом, вы получаете динамические группы доступности базы данных для каждого пользователя, задания, комбинированного списка времени.


Я даже не уверен, как вызывать группы доступности базы данных из очереди сообщений ... И это сложночтобы найти примеры, похожие на этот вариант использования.Может быть, это потому, что Airflow не подходит?

Любая помощь / мысли / советы будет принята с благодарностью.

Спасибо.

1 Ответ

0 голосов
/ 18 декабря 2018

Воздушный поток строится вокруг расписаний, основанных на времени.Он не предназначен для запуска прогонов на основе данных.Есть и другие системы, предназначенные для этого.Я слышал что-то вроде pachyderm.io или, возможно, dvs.org.Даже перепрофилирование инструмента CI или настройка установки Jenkins могут запускаться в зависимости от событий изменения файла или очереди сообщений.

Однако вы можете попробовать работать с Airflow, если прослушиватель внешней очереди использует rest API callк потоку воздуха , чтобы вызвать DAG.Например, если очередь представляет собой очередь SNS AWS, вы можете сделать это для прослушивателя лямбда-AWS в простом Python.

Я бы порекомендовал одну группу обеспечения доступности баз данных для каждого типа задания (или пользователя, в зависимости от того, что меньше), который использует логику запуска.определяет правильно на основе очереди.Если есть общие задачи очистки и т.п., группа обеспечения доступности баз данных может использовать TriggerDagRunOperator для их запуска, или у вас может быть просто общая библиотека этих задач очистки, включенная в каждую группу обеспечения доступности баз данных.Я думаю, что последнее чище.

Задачи DAG могут быть ограничены определенными пулами.Вы можете создать пул для каждого пользователя, чтобы ограничить количество заданий для каждого пользователя.В качестве альтернативы, если у вас есть группа доступности базы данных для каждого пользователя, вы можете установить максимальное число одновременных прогонов группы доступности базы данных для этой группы доступности.

...