используйте xcom pull для извлечения переменных, извлеченных из другого dag - PullRequest
0 голосов
/ 04 ноября 2019

Я очень плохо знаком с потоком воздуха и использую функции "xcom_push" и "xcom_pull".

У меня есть два знака d1 с заданием t1 и второй знак d2 с заданием t2.

Теперь я выталкиваю значения из dag d1, используя:

kwargs['ti'].xcom_push(key='start_date',value=start_date)
kwargs['ti'].xcom_push(key='end_date',value=end_date)

и извлекаю одинаковые start_date и end_date в dag d2, используя:

start_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t1',key="start_date")
end_date = kwargs['ti'].xcom_pull(dag_id = 'd1', task_ids='t2' , key="end_Date")

Однако получаю ошибку "NONETYPE" во время xcom_pull. Может кто-нибудь, пожалуйста, помогите мне, как я могу вытащить значения из DAG D1 в DAG D2

1 Ответ

0 голосов
/ 05 ноября 2019

Есть ли у вас что-нибудь на самом деле установить переменные xcom?

Попробуйте следующие значения:

d1

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

args = {
    'owner': 'znovak',
    'email': ['me@me.com'],
    'depends_on_past': False,
    'email_on_retry': False,
    'start_date': datetime(2019, 11, 4)
}

dag = DAG(
    dag_id='d1',
    default_args=args,
    catchup=False,
    schedule_interval=None
    )

###############################
##### Create DAG Parameters ###
###############################
def set_xcom_params(**kwargs):
    kwargs['ti'].xcom_push(key='start_date',value=start_date)
    kwargs['ti'].xcom_push(key='end_date',value=end_date)

t1 = PythonOperator(
    task_id='t1',
    python_callable=set_xcom_params,
    dag=dag,
    provide_context=True
    )

d2

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

args = {
    'owner': 'znovak',
    'email': ['me@me.com'],
    'depends_on_past': False,
    'email_on_retry': False,
    'start_date': datetime(2019, 11, 4)
}

dag = DAG(
    dag_id='d2',
    default_args=args,
    catchup=False,
    schedule_interval=None
    )

def pull_xcom_params(**kwargs):
    start_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="start_date")
    end_date = kwargs['ti'].xcom_pull(dag_id='d1',task_ids='t1',key="end_date")
    print(start_date)
    print(end_date)

t2 = PythonOperator(
    task_id='t2',
    python_callable=pull_xcom_params,
    dag=dag,
    provide_context=True
    )
...