Я использую TriggerDagRunOperator, чтобы один DAG контроллера мог вызвать целевой DAG.Однако после того, как группа DAG контроллера запускает целевую группу DAG, целевая группа DAG переключается на «выполнение», но ни одна из ее задач не запланирована.Я хотел бы, чтобы задачи целевой группы обеспечения доступности баз данных планировались, как только целевая группа обеспечения доступности баз данных запускается контроллером группы доступности базы данных.
# Controller DAG's callable
def conditionally_trigger(context, dag_run_object):
condition_param = context['params']['condition_param']
if condition_param:
return dag_run_obj
return None
# Target DAG's callable
def say_hello():
print("Hello")
# Controller DAG
controller_dag = DAG(
dag_id="controller",
default_args = {
"owner":"Patrick Stump",
"start_date":datetime.utcnow(),
},
schedule_interval='@once',
)
# Target DAG
target_dag = DAG(
dag_id="target",
default_args = {
"owner":"Patrick Stump",
"start_date":datetime.utcnow(),
},
schedule_interval=None,
)
# Controller DAG's task
controller_task = TriggerDagRunOperator(
task_id="trigger_dag",
trigger_dag_id="target",
python_callable=conditionally_trigger,
params={'condition_param':True},
dag=controller_dag,
)
# Target DAG's task -- never scheduled!
target_task = PythonOperator(
task_id="print_hello",
python_callable=say_hello,
dag=target_dag,
)
Заранее спасибо:)