Airflow XCom - как разделить вары между DAG с помощью TriggerDagRunOperator? - PullRequest
0 голосов
/ 10 июля 2020

Предположим, у меня есть группа DAG, которая выполняет некоторую обработку данных, и результат этой обработки записывается в переменную dest_path. Есть ли способ передать sh эту переменную в другой DAG, используя TriggerDagRunOperator

def trigger(context, dag_run_obj):
    dest_path = context['ti'].xcom_pull(task_ids='download_data')
    return str(dest_path)

...
    trigger_next_dag = TriggerDagRunOperator(
        task_id="trigger_next_dag",
        trigger_dag_id="send_mined_data",  # Ensure this equals the dag_id of the DAG to trigger
        provide_context=True,
        python_callable=trigger,
        dag=dag,
    )

Но эта задача не выполняется с AttributeError: 'str' object has no attribute 'run_id'

Предполагается второй DAG (dag_id="send_mined_data") чтобы вытащить эту переменную обычным способом:

    ti = kwargs['ti']
    pulled_string = ti.xcom_pull(task_ids='trigger_next_dag')

1 Ответ

0 голосов
/ 10 июля 2020

Исходный код TriggerDagRunOperator необходимо расширить для вашего варианта использования. Здесь вы увидите исходный код .

Что вам нужно сделать, так это создать подкласс этого оператора и расширить его, вставив код вашей функции trigger внутри execute перед вызовом функции trigger_dag.

...