Порядок исполнения в Airflow - PullRequest
0 голосов
/ 11 июня 2018

Я рассматриваю пример кода здесь

Есть две функции "операции":

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

и:

def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

Для каждого прогона my_sleeping_function мы запускаем print_context?

Что я не понимаю, так это порядок.Это график и дерево .. порядок выполнения не совпадает:

enter image description here enter image description here

Что происходитпервый?Что происходит после?Почему?

Я предполагаю, что в соответствии с этим:

for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag)

    task.set_upstream(run_this)

run_this выполняется, а затем выполняется задача, но цикл смущает меня.

Ответы [ 3 ]

0 голосов
/ 11 июня 2018

Я думаю, что ваша путаница основана на том, что вы ожидаете, что графическое представление и древовидное представление будут двумя отдельными визуализациями одного и того же.Однако они используются для визуализации различных вещей.

Графическое представление показывает порядок, в котором задачи будут выполняться в вашем рабочем процессе.В вашем случае print_the_context будет работать, а после завершения sleep_for_0, sleep_for_1, sleep_for_2, sleep_for_3, sleep_for_4 будет работать параллельно (или, по крайней мере, настолько параллельно, насколько позволяет конфигурация воздушного потока)

Древовидное представление представляет визуализацию DAG в глубину (и состояние каждой задачи с течением времени в квадратах справа).То есть первый уровень узлов в дереве - это конечные задачи в dag (конечные узлы), где dag будет считаться успешно выполненным.Он разветвляется для каждой зависимой задачи, которая должна быть запущена для запуска.

Иными словами, порядок выполнения одинаков в обоих представлениях, он просто визуализируется с разных сторон.

0 голосов
/ 21 ноября 2018

@ cwurtz прав!

Чтобы установить порядок задач, вы можете использовать priority_weight параметр задачи

Параметр пула можно использовать вместе с priority_weight для определения приоритетовв очереди, и какие задачи выполняются первыми, когда в пуле открываются слоты.( Читать дальше )

0 голосов
/ 11 июня 2018

Цикл здесь просто показывает вам, как динамически создавать DAG.Порядок, в котором выполняются вещи, зависит от того, что вы устанавливаете как «восходящие» или «последующие» задачи.

Пример, на который вы ссылались, также можно выполнить, как в следующем примере.Однако что, если вы хотите добавить еще 10 задач?Чтобы добиться того же, вам придется сделать довольно много кодов копирования / вставки, лучше просто поместить их в цикл, как в связанном примере:

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

task_0 = PythonOperator(
    task_id='sleep_for_' + 0,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(0) / 10},
    dag=dag)

task_1 = PythonOperator(
    task_id='sleep_for_' + 1,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(1) / 10},
    dag=dag)

task_2 = PythonOperator(
    task_id='sleep_for_' + 2,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(2) / 10},
    dag=dag)


task_3 = PythonOperator(
    task_id='sleep_for_' + 3,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(3) / 10},
    dag=dag)

task_4 = PythonOperator(
    task_id='sleep_for_' + 4,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(4) / 10},
    dag=dag)


task_0.set_upstream(run_this)
task_1.set_upstream(run_this)
task_2.set_upstream(run_this)
task_3.set_upstream(run_this)
task_4.set_upstream(run_this)
...