Этот ответ, вероятно, усугубит некоторых, но, тем не менее, это один из возможных путей, поэтому его стоит документировать.Основная особенность, которая делает Airflow более мощным, чем его конкуренты, заключается в том, что все определяется с помощью кода.В конце концов, если Airflow не предоставляет нам функцию, мы всегда можем просто создать эту функцию самостоятельно, используя Python.
Вы хотите иметь возможность объединять задачи в группу DAG, но только для этого конкретного запуска DAG.Поэтому постарайтесь просто создать собственный пул для ваших задач.Вот какой-то псевдокод на макушке моей головы
List<String> tasksPoolQueue = new ArrayList<String>();
def taskOnesFunction()
while true:
if tasksPoolQueue.get(0) == "taskOnesTurn":
print("Do some work it's your turn")
# Delete this run from the list and shift the list over to the left one index
# So that the next value is now the first value in the list
tasksPoolQueue.delete(0)
return 0
else:
sleep(10 seconds)
def taskTwosFunction()
while true:
if tasksPoolQueue.get(0) == "taskTwosTurn":
print("Do some work it's your turn")
# Delete this run from the list and shift the list over to the left one index
# So that the next value is now the first value in the list
tasksPoolQueue.delete(0)
return 0
else:
sleep(10 seconds)
def createLogicalOrderingOfTaskPoolQueue():
if foobar == true:
tasksPoolQueue[0] = "taskOnesTurn"
tasksPoolQueue[1] = "taskTwosTurn"
else:
tasksPoolQueue[0] = "taskTwosTurn"
tasksPoolQueue[1] = "taskOnesTurn"
return 0
determine_pool_queue_ordering = PythonOperator(
task_id='determine_pool_queue_ordering',
retries=0,
dag=dag,
provide_context=True,
python_callable=createLogicalOrderingOfTaskPoolQueue,
op_args=[])
task1 = PythonOperator(
task_id='task1',
retries=0,
dag=dag,
provide_context=True,
python_callable=taskOnesFunction,
op_args=[])
task2= PythonOperator(
task_id='task2',
retries=0,
dag=dag,
provide_context=True,
python_callable=taskTwosFunction,
op_args=[])
determine_pool_queue_ordering.set_downstream(task1)
determine_pool_queue_ordering.set_downstream(task2)
Так что, надеюсь, каждый сможет следовать моему псевдокоду.Я не знаю, каков будет лучший способ создания пользовательского пула, в котором нет «условия гонки», поэтому идея с очередью списка была тем, что я придумал на первый взгляд.Но главное здесь заключается в том, что task1 и task2 будут работать одновременно, НО внутри их функции. Я могу сделать так, чтобы функция не делала ничего значимого, пока не пройдет оператор if, препятствующий запуску реального кода.
Первая задача будет динамически определять, какие задачи будут выполняться первыми и в каком порядке, используя список.Затем все функции, которые должны быть в этом пользовательском пуле, ссылаются на этот список.Поскольку наши операторы if равны true, когда их имя-задачи первое в списке, это означает, что одновременно может выполняться только одна задача.Первая задача в списке удалит себя из списка, как только она выполнит обработку того, что ей нужно сделать.Тогда другие задачи будут находиться в спящем режиме, пока они ожидают, что их имя будет первым в списке.
Так что просто сделайте некоторую пользовательскую логику, похожую на мою.