Слияние операторов вместе - PullRequest
0 голосов
/ 14 ноября 2018

Я все еще в процессе развертывания Airflow, и я уже почувствовал необходимость объединения operator с.Наиболее распространенным вариантом использования будет соединение оператора и соответствующий sensor.Например, можно связать воедино EmrStepOperator и EmrStepSensor.


Я создаю DAG s программно , и самый большой из нихсодержит 150+ (идентичных) ветвей , каждая из которых выполняет одну и ту же серию операций над разными битами данных (таблицами).Поэтому объединение задач, составляющих один логический шаг в моем DAG, очень помогло бы.

Вот 2 конкурирующих примера из моего проекта, чтобы дать мотивацию для моего аргумента.

1.Удаление данных из пути S3 и последующая запись новых данных

Этот шаг состоит из 2 операторов

  • DeleteS3PathOperator: расширяется от BaseOperator и использует S3Hook
  • HadoopDistcpOperator: расширяется от SSHOperator

2.Условное выполнение MSCK REPAIR для Hive таблицы

Этот шаг содержит 4 оператора

  • BranchPythonOperator: проверяет, является ли таблица Hive секционированной
  • MsckRepairOperator: продолжается от HiveOperator и выполняет MSCK REPAIR для ( разделенных ) таблиц
  • Dummy(Branch)Operator: создается альтернативный путь ветвления MsckRepairOperator (для однораздельных таблиц)
  • Dummy(Join)Operator: включает шаг объединения для обеих ветвей

Использование операторов в изоляция , конечно, предлагает меньшие модули и более мелкозернистые протоколирование / отладку, но в больших группах обеспечения доступности баз данных уменьшение помех может быть желательным.Из моего нынешнего понимания есть 2 способа объединить операторы вместе

  1. Hook s

    Записать фактическую логику обработки в хуках, а затем использовать каксколько нужно хуков в одном операторе (на мой взгляд, конечно, лучше)

  2. SubDagOperator

    A рискованно и спорный способ ведения дел;Кроме того, соглашение о присвоении имен для SubDagOperator вызывает недовольство .


Мои вопросы

  • Если операторы состоят из вообще или лучше иметь дискретных шагов?
  • Какие-нибудь подводные камни, улучшения в вышеуказанных подходах?
  • Любые другие способы объединения операторы вместе?
  • В таксономии Воздушного потока, является ли основной мотив крючков таким же, как указано выше, или они также служат некоторым другим целям?

1 Ответ

0 голосов
/ 15 ноября 2018

Я объединил различные хуки, чтобы создать единого оператора в зависимости от моих потребностей. Простой пример - я ударил gcs delete, copy, list method и get_size методы в hook, чтобы создать единственный оператор с именем GcsDataValidationOperator. Эмпирическое правило будет иметь: Идемпотентность , т. Е. Если вы запускаете несколько раз, он должен давать тот же результат.

Должны ли операторы быть составлены вообще или лучше иметь дискретные шаги?

Единственный подводный камень - это удобство сопровождения, иногда, когда перехваты меняются в основной ветке, вам нужно будет обновить весь оператор вручную, если есть какие-либо критические изменения.

Есть ли подводные камни, улучшения в вышеперечисленных подходах?

Вы можете использовать PythonOperator и использовать встроенные хуки с методом .execute, но это все равно будет означать много деталей в файле DAG. Следовательно, я все еще пошел бы для нового операторского подхода

Есть ли другие способы объединения операторов?

Хуки - это просто интерфейсы для внешних платформ и баз данных, таких как Hive, GCS и т. Д., И они образуют строительные блоки для операторов. Это позволяет создавать новые операторы. Кроме того, это означает, что вы можете настроить шаблонное поле, добавить слабое уведомление на каждом гранулированном шаге внутри вашего нового оператора и иметь свои собственные данные регистрации.

В таксономии воздушного потока, основной мотив крючков такой же, как указано выше, или они также служат некоторым другим целям?

FWIW: я член PMC и участник проекта Airflow.

...