Есть ли у вас что-нибудь на самом деле установить переменные xcom?
Попробуйте следующие значения:
d1
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
args = {
'owner': 'znovak',
'email': ['me@me.com'],
'depends_on_past': False,
'email_on_retry': False,
'start_date': datetime(2019, 11, 4)
}
dag = DAG(
dag_id='d1',
default_args=args,
catchup=False,
schedule_interval=None
)
###############################
##### Create DAG Parameters ###
###############################
def set_xcom_params(**kwargs):
kwargs['ti'].xcom_push(key='start_date',value=start_date)
kwargs['ti'].xcom_push(key='end_date',value=end_date)
t1 = PythonOperator(
task_id='t1',
python_callable=set_xcom_params,
dag=dag,
provide_context=True
)
d2
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
args = {
'owner': 'znovak',
'email': ['me@me.com'],
'depends_on_past': False,
'email_on_retry': False,
'start_date': datetime(2019, 11, 4)
}
dag = DAG(
dag_id='d2',
default_args=args,
catchup=False,
schedule_interval=None
)
def pull_xcom_params(**kwargs):
start_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="start_date")
end_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="end_date")
print(start_date)
print(end_date)
t2 = PythonOperator(
task_id='t2',
python_callable=pull_xcom_params,
dag=dag,
provide_context=True
)