Цикл здесь просто показывает вам, как динамически создавать DAG.Порядок, в котором выполняются вещи, зависит от того, что вы устанавливаете как «восходящие» или «последующие» задачи.
Пример, на который вы ссылались, также можно выполнить, как в следующем примере.Однако что, если вы хотите добавить еще 10 задач?Чтобы добиться того же, вам придется сделать довольно много кодов копирования / вставки, лучше просто поместить их в цикл, как в связанном примере:
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag)
task_0 = PythonOperator(
task_id='sleep_for_' + 0,
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(0) / 10},
dag=dag)
task_1 = PythonOperator(
task_id='sleep_for_' + 1,
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(1) / 10},
dag=dag)
task_2 = PythonOperator(
task_id='sleep_for_' + 2,
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(2) / 10},
dag=dag)
task_3 = PythonOperator(
task_id='sleep_for_' + 3,
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(3) / 10},
dag=dag)
task_4 = PythonOperator(
task_id='sleep_for_' + 4,
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(4) / 10},
dag=dag)
task_0.set_upstream(run_this)
task_1.set_upstream(run_this)
task_2.set_upstream(run_this)
task_3.set_upstream(run_this)
task_4.set_upstream(run_this)