Воздушный поток ExternalTaskSensor Stuck - PullRequest
1 голос
/ 25 сентября 2019

Я пытаюсь заставить Airflow ExternalTaskSensor работать, но до сих пор не удалось его завершить, он всегда зависает и никогда не завершает работу, поэтому группа обеспечения доступности баз данных может перейти к следующей задаче.

Вот код, который я использую для тестирования:


DEFAULT_ARGS = {
    'owner': 'NAME',
    'depends_on_past': False,
    'start_date': datetime(2019, 9, 9),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

external_watch_dag = DAG(
    'DAG-External_watcher-Test',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

start_op = DummyOperator(
    task_id='start_op',
    dag=external_watch_dag
)


trigger_external = TriggerDagRunOperator(
    task_id='trigger_external',
    trigger_dag_id='DAG-Dummy',
    dag=external_watch_dag
)

external_watch_op = ExternalTaskSensor(
    task_id='external_watch_op',
    external_dag_id='DAG-Dummy',
    external_task_id='dummy_task',
    check_existence=True,
    execution_delta=timedelta(minutes=-1),
    # execution_date_fn=datetime(2019, 9, 25),
    execution_timeout=timedelta(minutes=30),
    dag=external_watch_dag
)

end_op = DummyOperator(
    task_id='end_op',
    dag=external_watch_dag
)

start_op >> trigger_external >> external_watch_op >> end_op
# start_op >> [external_watch_op, trigger_external]
# external_watch_op >> end_op


# Below is the setup for the dummy DAG that is called above by the Trigger and watched by the TaskSensor
dummy_dag = DAG(
    'DAG-Dummy',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

dummy_task = BashOperator(
    task_id='dummy_task',
    bash_command='sleep 10',
    dag=dummy_dag
)

Я пытался настроить этот код несколькими способами, но не добился никакого успеха с ExternalTaskSensor.

Кто-нибудь знает, как решить эту проблему и заставить должным образом работать ExternalTaskSensor?Я также читал, что проблемы могут возникать через интервалы планирования при использовании ExternalTaskSensor. Возможно ли, что часть проблемы заключается в том, что обе группы обеспечения доступности баз данных имеют schedule_interval=None?

Я получил это для работы с обоими группами DAG, установленными на точно такой же schedule_interval, но это не будет работать в производстве.Цель состоит в том, чтобы основной DAG external-watch-dag работал по регулярному расписанию и включал DAG-Dummy во время своего запуска с помощью DAG-Dummy сама имеет schedule_interval=None.

Любая помощь очень ценится.

...