Я реализовал тестовый пример для запуска отдельного дага, но он, похоже, не работает в 1.9 и может быть из-за более жесткого пула, который был введен в поток воздуха 1.8
,
Я пытаюсь запустить тестовый пример ниже:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
class DAGTest(unittest.TestCase):
def make_tasks(self):
dag = DAG('test_dag', description='a test',
schedule_interval='@once',
start_date=datetime(2018, 6, 26),
catchup=False)
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1 >> du2 >> du3
dag.run()
def test_execute(self):
self.make_tasks()
исключение:
Dependencies not met for <TaskInstance: test_dag.dummy3 2018-06-26 00:00:00 [upstream_failed]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all
upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'skipped': 0L, 'successes': 0L, 'failed': 0L,'upstream_failed': 1L, 'done': 1L, 'total': 1}, upstream_task_ids=['dummy2']
Что я делаю не так?
Я пробовал как LocalExecutor, так и SequentialExecutor
Окружающая среда:
Python 2.7
Airflow 1.9
Я полагаю, что он пытается выполнить все задачи одновременно, без учета зависимостей.
Примечание: аналогичный код используется для работы в Airflow 1.7