Запуск задач в произвольном порядке в воздушном потоке - PullRequest
0 голосов
/ 08 мая 2020

В настоящее время у меня есть список задач, которые все необходимо запускать в одно и то же время каждый день, однако все они независимы друг от друга. Я знаю, что могу настроить их запуск в определенном порядке, например t1 >> t2 >> t3, однако я бы хотел, чтобы порядок был случайным, чтобы порядок их завершения sh не всегда был одинаковым. Как запустить список задач обдува в случайном порядке?

1 Ответ

2 голосов
/ 09 мая 2020

Вы только что сказали, что они независимы друг от друга, почему бы вам просто не запустить их все одновременно?

enter image description here

Этого можно достичь, просто не используя какие-либо операторы сдвига, например:

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'Airflow',
    'start_date': days_ago(0)
}

dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)

first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)

Но если вы действительно хотите иметь случайный порядок задач и сделать их выполняемыми в какой-то случайной очереди, вы можете добавить все ваши задачи в список и их просто перемешать. Затем выполните итерацию по задачам и сделайте текущую задачу зависимой от следующей, например:
enter image description here

Для этого используйте random.shuffle(), который перемешивает список на месте:

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
import random

args = {
    'owner': 'Airflow',
    'start_date': days_ago(0)
}

dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)

first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)

tasks_list = [first_operator, second_operator, third_operator]
random.shuffle(tasks_list)

i = 0
while i < len(tasks_list) - 1:
    tasks_list[i] << tasks_list[i + 1]
    i += 1

Удачи!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...