Проверка DAG Airflow: добавление пользовательских операторов - PullRequest
0 голосов
/ 11 июня 2019

Я работаю над тестовым конвейером, который проверяет группы доступности воздушного потока и возвращает сообщения об ошибках / успехах. Пример кода выглядит следующим образом:

dag_folder = os.getenv('AIRFLOW_DAGS', False)
dagbag = DagBag(include_examples=False, dag_folder=dag_folder)
self.assertFalse(len(dagbag.import_errors),'DAG import failures. Errors: {}'.format(dagbag.import_errors))

Теперь этот скрипт отлично работает с группами обеспечения доступности баз данных, которые используют собственные операторы. Однако, когда мне нужно протестировать группу доступности базы данных с пользовательскими операторами, они имеют следующий импорт:

from airflow.operators.custom import MyOperator

Эти группы обеспечения доступности баз данных не работают, поскольку MyOperator недоступен во время выполнения сценария. Есть ли способ, которым я могу динамически добавлять пользовательские операторы в airflow.operators (просматривая, скажем, каталог) перед запуском моего тестового сценария?

Кроме того, этот сценарий запускается из образа докера, и все каталоги предоставляются в виде томов, поэтому я не думаю, что изменение airflow.cfg - это вариант для меня, поскольку не запущен экземпляр воздушного потока.

...