Я пытаюсь настроить группу обеспечения доступности баз данных, где задача запускается каждую минуту, а затем другая задача запускается на 5-й минуте (прямо перед 1-минутной задачей).Это действительно просто тестирование, я не планирую запускать задания с такими короткими интервалами.
Визуально мой DAG выглядит так:

И сам код такой:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 9)
}
now = datetime.now()
minute_check = now.minute % 5
dag = DAG(
dag_id='test3',
default_args=default_args,
schedule_interval='* * * * *',
dagrun_timeout=timedelta(minutes=5),
catchup=False,
max_active_runs=99
)
def check_minute():
if minute_check == 0:
return "branch_fiveminute"
else:
return "branch_minute"
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=check_minute,
trigger_rule='all_done',
dag=dag)
branch_minute = BashOperator(
task_id='branch_minute',
bash_command='test1min.sh ',
trigger_rule='all_done',
dag=dag)
branch_fiveminute = BashOperator(
task_id='branch_fiveminute',
bash_command='test5min.sh ',
trigger_rule='all_done',
dag=dag)
branch_task.set_downstream(branch_minute)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)
Проблема, которую я получаю, состоит в том, что на 5-й минуте воздушный поток пропускает 1-минутное задание:

Я попытался поиграть с настройками trigger_rule без особого успеха.
Есть идеи, что не так?Я использую Airflow 1.10, если это имеет значение.