См. Приведенный ниже пример:
Надеюсь, что это поможет.
args = {
'owner': 'airflow',
'start_date': start_date
}
dag = DAG(dag_id = 'test_dag', schedule_interval=None, default_args=args)
y = 0
def LoadYaml(**kwargs):
y = 'df-12345567789'
kwargs['ti'].xcom_push(key='name',value=y)
return True
def CreatePipeLine(**kwargs):
print("I m client")
def ActivatePipeline(client,pipelineId):
print("activated", client, pipelineId)
start_task = DummyOperator(task_id='Start_Task', dag=dag)
LoadYaml_task = ShortCircuitOperator(task_id='LoadYaml_task',provide_context=True,python_callable=LoadYaml,dag=dag)
start_task.set_downstream(LoadYaml_task)
CreatePipeLine_task = ShortCircuitOperator(task_id='CreatePipeLine_task',provide_context=True,python_callable=CreatePipeLine,op_kwargs = {'client' : 'HeyImclient'},dag=dag)
LoadYaml_task.set_downstream(CreatePipeLine_task)
ActivatePipeline_task= ShortCircuitOperator(task_id='ActivatePipeline_task',provide_context=True,python_callable=ActivatePipeline,op_kwargs = {'client' : 'You','pipelineId' : '1234'},dag=dag)
CreatePipeLine_task.set_downstream(ActivatePipeline_task)