Проблема воздушного потока с задачами ветвления - PullRequest
0 голосов
/ 18 октября 2018

Я пытаюсь настроить группу обеспечения доступности баз данных, где задача запускается каждую минуту, а затем другая задача запускается на 5-й минуте (прямо перед 1-минутной задачей).Это действительно просто тестирование, я не планирую запускать задания с такими короткими интервалами.

Визуально мой DAG выглядит так:

DAG

И сам код такой:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 9)
}

now = datetime.now()
minute_check = now.minute % 5

dag = DAG(
    dag_id='test3',
    default_args=default_args,
    schedule_interval='* * * * *',
    dagrun_timeout=timedelta(minutes=5),
    catchup=False,
    max_active_runs=99
        )

def check_minute():
    if minute_check == 0:
        return "branch_fiveminute"
    else:
        return "branch_minute"

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=check_minute,
    trigger_rule='all_done',
    dag=dag)

branch_minute = BashOperator(
    task_id='branch_minute',
    bash_command='test1min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_fiveminute = BashOperator(
    task_id='branch_fiveminute',
    bash_command='test5min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_task.set_downstream(branch_minute)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)

Проблема, которую я получаю, состоит в том, что на 5-й минуте воздушный поток пропускает 1-минутное задание:

enter image description here

Я попытался поиграть с настройками trigger_rule без особого успеха.

Есть идеи, что не так?Я использую Airflow 1.10, если это имеет значение.

1 Ответ

0 голосов
/ 18 октября 2018

Поскольку вы выполняете другой путь выполнения для 5-минутного задания, одно-минутное задание пропускается.Это немного противоречит интуиции на диаграмме, но только 1 путь с execute.

Так что вам нужно сделать, это иметь ветку в начале, один путь приводит к фиктивному оператору для false, а один путь ведет к5-минутное задание, однако и 5-минутное задание, и фиктивный оператор приведут к 1-минутному заданию.

Таким образом, фиктивная задача будет пропущена, но поток выполнения заканчивается в 1-минутном задании, независимо от того, какойпуть выполнения выбран.

from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator  import DummyOperator
from airflow.operators.bash_operator   import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 9)
}

now = datetime.now()
minute_check = now.minute % 5

dag = DAG(
    dag_id='test3',
    default_args=default_args,
    schedule_interval='* * * * *',
    dagrun_timeout=timedelta(minutes=5),
    catchup=False,
    max_active_runs=99
        )

def check_minute():
    if minute_check == 0:
        return "branch_fiveminute"
    else:
        return "branch_false_1"

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=check_minute,
    trigger_rule='all_done',
    dag=dag)

branch_minute = BashOperator(
    task_id='branch_minute',
    bash_command='test1min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_fiveminute = BashOperator(
    task_id='branch_fiveminute',
    bash_command='test5min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_false_1 = DummyOperator( task_id= "branch_false_1", dag=dag )

branch_task.set_downstream(branch_false_1)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)
branch_false_1.set_downstream(branch_minute)
...