Воздушный поток не распознает zip-файл DAG, созданный с помощью приспособления pytest - PullRequest
0 голосов
/ 16 мая 2019

Мы используем Google Composer (управляемый сервис Airflow) с airflow v1.10 и Python 3.6.8. Для развертывания нашей DAGS мы используем пакетную DAG (https://airflow.apache.org/concepts.html?highlight=zip#packaged-dags) метод.

Все хорошо, когда zip-файл создается из строки cmd, например

zip -r dag_under_test.zip test_dag.py

но когда я пытаюсь сделать это с помощью прибора pytest, поэтому я загружаю в DagBag и проверяю целостность моего DAG, airflow вообще не распознает этот zip-файл. вот код моего приспособления pytest

@fixture
def setup(config):
    os.system("zip -r dag_under_test.zip test_zip.py")


def test_import_dags(setup):
    dagbag = DagBag(include_examples=False)
    noOfDags = len(dagbag.dags)
    dagbag.process_file("dag_under_test.zip")
    assert len(dagbag.dags) == noOfDags + 1, 'DAG import failures. Errors: {}'.format(dagbag.import_errors)

Я скопировал этот zip-файл в папку DAGs, но airflow его вообще не распознает, сообщений об ошибках нет. Но zip-файл, созданный с помощью той же команды из cmdline, загружается потоком воздуха !! Кажется, я что-то упускаю здесь очевидное, не могу понять.

Ответы [ 2 ]

0 голосов
/ 30 мая 2019

Так что оказалось, что где я создаю zip-файл, важно.Как и в этом случае, я создаю zip-файл из тестовой папки и архивирую файлы в папках src.Хотя окончательный почтовый файл выглядит идеально невооруженным глазом, поток воздуха отклоняет его.Я попытался добавить '-j' к команде zip (чтобы спрятать имена каталогов), и мой тест начал работать.тот же сценарий, когда в моем проекте DAG есть полная структура папок.Файл dag на верхнем уровне, который ссылается на множество модулей Python в проекте.Я не мог заставить это работать вышеупомянутым уловкой, но придумал обходной путь.Я создал небольшой скрипт оболочки, который выполняет zip-часть, например так:

SCRIPT_PATH=${0%/*/*}
cd $SCRIPT_PATH

zip -r -q test/dag_under_test.zip DagRunner.py
zip -r -q test/dag_under_test.zip tasks dag common resources

Этот скрипт оболочки меняет currentdir на проект home и архивирует оттуда.Я вызываю эту оболочку из приспособления pytest, как это

@fixture
def setup():
    os.system('rm {}'.format(DAG_UNDER_TEST))
    os.system('sh {}'.format(PACKAGE_SCRIPT))
    yield
    print("-------- clean up -----------")
    os.system('rm {}'.format(DAG_UNDER_TEST))

Это прекрасно работает с моим интеграционным тестом.

def test_conversionDAG(setup):
    configuration.load_test_config()
    dagbag = DagBag(include_examples=False)
    noOfDags = len(dagbag.dags)
    dagbag.process_file(DAG_UNDER_TEST)
    assert len(dagbag.dags) == noOfDags + 1, 'DAG import failures. Errors: {}'.format(dagbag.import_errors)
    assert dagbag.get_dag("name of the dag")
0 голосов
/ 17 мая 2019

В этом случае, похоже, что существует несоответствие между рабочим каталогом os.system и тем, куда смотрит загрузчик DagBag.Если вы проверяете код airflow/dagbag.py, путь, принятый process_file, передается os.path.isfile:

def process_file(self, filepath, only_if_updated=True, safe_mode=True):
  if filepath is None or not os.path.isfile(filepath):
    ...

Это означает, что в вашем тесте вы, вероятно, можете сделать некоторыетестирование, чтобы убедиться, что все они соответствуют:

# Make sure this works
os.path.isfile(filepath)

# Make sure these are equal
os.system('pwd')
os.getcwd()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...