Сначала task_id
в leader_dag
называется print_date
, но вы настраиваете dependent_dag
с задачей wait_for_task
, которая ожидает задачи leader_dag
с именем t1
.Нет задачи с именем t1
.То, что вы назначили в файле py
, не имеет значения и не используется в БД "Воздушный поток" и поперек датчиком.Он должен ждать имени задачи print_date
.
Во-вторых, ваши журналы не выстраиваются в очередь, в которой запускает leader_dag, вы показываете, что ожидает зависимый_даг.
Наконец, я не могуРекомендуем использовать Airflow для планирования задач каждую минуту.Конечно, не две зависимые задачи вместе.Рассмотрите возможность создания потоковых заданий в другой системе, такой как Spark, или используйте для этого собственную среду Celery или Dask.
Вы также можете избежать использования ExternalTaskSensor
, добавив TriggerDagRunOperator
в конце вашего leader_dag кзапустите зависимый_даг и удалите из него расписание, установив для schedule_interval
значение None
.
. В ваших журналах я вижу журнал для лидера 2018-10-13T19: 08: 11.В лучшем случае это будет dagrun для execute_date 2018-10-13 19:07:00, потому что минутный период, начинающийся с 19:07, заканчивается в 19:08, что является самым ранним из запланированных периодов.И я вижу некоторую задержку между планированием и выполнением около 11 секунд , если это так .Однако может быть несколько минут задержки планирования в Airflow.
Я также вижу журнал из dependent_dag
, который работает с 19:14:04 до 19:14:34 и ищет завершениесоответствующий 19:13:00 дагрун.Нет никаких признаков того, что ваш планировщик достаточно свободен для запуска dagrun 19:13:00 из leader_dag
к 19:14:34.Вы могли бы лучше убедить меня, если бы показывали, что тыкаешь 5 минут или около того.Конечно, никогда не почувствует leader_dag.t1 , потому что это не то, что вы назвали показанными задачами.
Итак, у Airflow есть задержка планирования, если у вас былонесколько тысяч дагов в системе, это может быть больше, чем 1 минута, так что при catchup=False
вы получите несколько прогонов, следующих друг за другом IE 19:08, 19:09 и некоторые прогоны, которые пропускают минуту (или 6) может произойти, например, 19:10, за которым следует 19:16, и поскольку задержка является случайной на основе принципа dag-by-dag, вы можете получить трассы без выравнивания с датчиком, ожидающим вечно, даже если у вас есть правильныйИдентификатор задачи для ожидания:
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='leader_dag',
- external_task_id='t1',
+ external_task_id='print_date',
dag=dag)