Испытание Dag на Airflow 1.9 в тесте - PullRequest
0 голосов
/ 26 июня 2018

Я реализовал тестовый пример для запуска отдельного дага, но он, похоже, не работает в 1.9 и может быть из-за более жесткого пула, который был введен в поток воздуха 1.8 , Я пытаюсь запустить тестовый пример ниже:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

class DAGTest(unittest.TestCase):
    def make_tasks(self):
        dag = DAG('test_dag', description='a test',
                  schedule_interval='@once',
                  start_date=datetime(2018, 6, 26),
                  catchup=False)


        du1 = DummyOperator(task_id='dummy1', dag=dag)
        du2 = DummyOperator(task_id='dummy2', dag=dag)
        du3 = DummyOperator(task_id='dummy3', dag=dag)
        du1 >> du2 >> du3
        dag.run()

    def test_execute(self):
        self.make_tasks() 

исключение:

Dependencies not met for <TaskInstance: test_dag.dummy3 2018-06-26 00:00:00  [upstream_failed]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all
    upstream tasks to have succeeded, but found 1 non-success(es).
    upstream_tasks_state={'skipped': 0L, 'successes': 0L, 'failed': 0L,'upstream_failed': 1L, 'done': 1L, 'total': 1}, upstream_task_ids=['dummy2']

Что я делаю не так? Я пробовал как LocalExecutor, так и SequentialExecutor

Окружающая среда:

Python 2.7
Airflow 1.9

Я полагаю, что он пытается выполнить все задачи одновременно, без учета зависимостей. Примечание: аналогичный код используется для работы в Airflow 1.7

Ответы [ 2 ]

0 голосов
/ 05 сентября 2018

Вот функция, которую вы можете использовать в pytest тестовом примере, который будет запускать задачи вашей DAG по порядку.

from datetime import timedelta
import pytest
from unittest import TestCase


@pytest.fixture
def test_dag(dag):
    dag._schedule_interval = timedelta(days=1)  # override cuz @once gets skipped
    done = set([])

    def run(key):
        task = dag.task_dict[key]
        for k in task._upstream_task_ids:
            run(k)
        if key not in done:
            print(f'running task {key}...')
            date = dag.default_args['start_date']
            task.run(date, date, ignore_ti_state=True)
            done.add(key)
    for k, _ in dag.task_dict.items():
        run(k)

Затем вы можете использовать test_dag (dag) вместо dag.run() в вашем тесте.

Вам необходимо убедиться, что при входе в систему в пользовательских операторах используется self.log.info() вместо logging.info() или print(), иначе они не будут отображаться.

Вам также может потребоваться запустить тест с использованием python -m pytest -s test_my_dag.py, так как без флага -s стандартный вывод Airflow не будет захвачен.

Я все еще пытаюсь выяснить, как обрабатывать зависимости между DAG.

0 голосов
/ 27 июня 2018

Я не знаком с Airflow 1.7, но, думаю, у него не было той же концепции "DagBag", как у Airflow1.8 и выше.

Вы не можете запустить созданную вами группу DAG следующим образом: dag.run() запускает новый процесс python, и ему нужно будет найти DAG из папки dag, которую он анализирует на диске, но не может. Это было включено как сообщение в вывод (но вы не включили полное сообщение об ошибке / вывод)

Что вы пытаетесь проверить, создав в тестовых файлах метку? Это пользовательский оператор? Тогда вам лучше проверить это напрямую. Например, вот как я тестирую автономный оператор:

class MyPluginTest(unittest.TestCase)
    def setUp(self):
        dag = DAG(TEST_DAG_ID, schedule_interval='* * * * Thu', default_args={'start_date': DEFAULT_DATE})
        self.dag = dag
        self.op = myplugin.FindTriggerFileForExecutionPeriod(
            dag=dag,
            task_id='test',
            prefix='s3://bucket/some/prefix',
        )
        self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE)

        # Other S3 setup here, specific to my test


    def test_execute_no_trigger(self):
        with self.assertRaises(RuntimeError):
            self.ti.run(ignore_ti_state=True)

        # It shouldn't have anything in XCom
        self.assertEqual(
            self.ti.xcom_pull(task_ids=self.op.task_id),
            None
        )
...