Ответ Зака был полезен в решении этой проблемы (поскольку оператор ветвления был необходим), но решение, с которым я собираюсь, заключается в использовании TriggerDagRunOperator
.
Вот даги, которые я построил для проверки этого подхода.
Целевой перевод
def alert(ti, **kwargs):
message = f"Execution date is {ti.execution_date}"
print(message)
with target_dag:
PythonOperator(
python_callable=alert,
task_id='target_task',
provide_context=True,
)
Запуск триггера
def check_day(ti, **kwargs):
execution_date = ti.execution_date
if execution_date.minute % 7 == 0:
return ['weekday_trigger', 'saturday_trigger', 'sunday_trigger']
elif execution_date.minute % 7 in range(1, 5):
return ['weekday_trigger']
else:
return []
with trigger_dag:
check_day_task = BranchPythonOperator(
task_id='check_day_task',
python_callable=check_day,
provide_context=True,
)
weekday_trigger = TriggerDagRunOperator(
task_id='weekday_trigger',
trigger_dag_id='target_dag',
execution_date='{{ execution_date }}'
)
saturday_trigger = TriggerDagRunOperator(
task_id='saturday_trigger',
trigger_dag_id='target_dag',
execution_date='{{ execution_date + macros.timedelta(days=-1) }}'
)
sunday_trigger = TriggerDagRunOperator(
task_id='sunday_trigger',
trigger_dag_id='target_dag',
execution_date='{{ execution_date + macros.timedelta(days=-2) }}'
)
check_day_task >> [weekday_trigger, saturday_trigger, sunday_trigger]
Почему бы просто не использовать оператор ветвления?
Причина, по которой я предпочитаю этот подход, заключается в том, что мой целевой даг не должен заботиться о сложном планировании.Все, что нужно заботиться, это дата исполнения.Бывает, что по понедельникам мы хотим выполнить execution_date - 1
и execution_date - 2
в дополнение к execution_date
.Но целевой dag работает одинаково, несмотря ни на что: он делает определенную вещь, основываясь на execution_date
.
Если я попытаюсь включить оператор перехода в целевой dag, он очень быстро запутается.Например, если ваш целевой даг имеет 4 задания, вам нужно дублировать эти 2 раза по понедельникам.Кроме того, древовидное представление истории запусков dag было бы уродливым, с большим количеством пропущенных задач и обратной засыпкой, вероятно, было бы странно.В понедельник по будням наш триггер dag вызывает target_dag
с той же датой выполнения, что и триггер dag.На выходных спусковой крючок не срабатываетИ в понедельник, он выполняет 3 прогона target_dag
, для понедельника и предыдущих двух дней.
Примечание. Я использовал минуты для имитации дней при тестировании запланированных прогонов.
Вот график с триггером: dag:
Деревовид основной задачи остается чистым и простым: