повторное выполнение воздушного потока - PullRequest
0 голосов
/ 30 ноября 2018

У меня есть следующий DAG с несколькими простыми задачами:

hour_list = [“0:00”, “1:00", “2:00”]

 for hour in hour_list:
    bash_op = BashOperator(
                task_id=‘task1_op1’+hour
                ,bash_command=“date”
                ,dag=dag
         )


    py_op = PythonOperator(
            task_id='doit’+hour,
            provide_context=True,
            python_callable=python_method,
            dag=dag)

    py_op.set_upstream(bash_op)

Теперь я вижу, что dag выполняется параллельно для всех часов с 0:00 до 2:00.Это ожидаемое поведение.Но я хочу запустить дагов один час за другим, так как выполнение второго часа зависит от первого часа.Я не уверен, что любое изменение в настройках могло бы помочь здесь.Я ценю ваши мысли.спасибо.

1 Ответ

0 голосов
/ 30 ноября 2018

Вы можете сделать это, используя airflow.operators.sensors.TimeSensor "между" задачами.Нечто похожее на следующее:

from datetime import time

from airflow.operators.sensors import TimeSensor

[...]

for hour in ["00:00", "01:00", "02:00"]:
    TimeSensor(
        dag=dag,
        task_id="wait_{}".format(hour),
        target_time=time(*map(int, hour.split(":")))
    ) >> BashOperator(
        dag=dag,
        task_id="task1_op1_{}".format(hour),
        bash_command="date"
    ) >> PythonOperator(
        dag=dag,
        task_id="doit_{}".format(hour),
        provide_context=True,
        python_callable=python_method
    )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...