Как работает task_ids, если указано несколько задач?
В этом конкретном примере кода я ожидал получить load_cycle_id_2 от обеих задач в кортеже (5555,22222), но вместо этого он вышел (None, 22222).
Почему это?
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow',
'start_date': datetime.now(),
'provide_context': True
}
demo_dag = DAG(dag_id='first', start_date=datetime.now(), schedule_interval='@once',default_args=args)
def push_load_id(**kwargs):
kwargs['ti'].xcom_push(key='load_cycle_id_2',value=22222)
kwargs['ti'].xcom_push(key='load_cycle_id_3',value=44444)
def another_push_load_id(**kwargs):
kwargs['ti'].xcom_push(key='load_cycle_id_2',value=5555)
kwargs['ti'].xcom_push(key='anotherload_cycle_id_3',value=6666)
def pull_load_id(**kwargs):
ti = kwargs['ti'].xcom_pull(key='load_cycle_id_2', task_ids=['another_push_load_id','push_load_id'])
print(ti)
push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)
push_operator >> pull_operator