Воздушный поток xcom_pull не предоставляет данные того же самого запуска экземпляра задачи, а предоставляет самые последние данные - PullRequest
1 голос
/ 07 июня 2019

Я создаю Airflow @daily DAG, у него есть восходящая задача get_daily_data BigQueryGetDataOperator, которая извлекает данные на основе execute_date и для зависимой задачи downstream (PythonOperator) использует данные, основанные на дате выше, через xcom_pull.Когда я запускаю команду airf backfill, нижестоящую задачу process_data_from_bq, где я выполняю xcom_pull, она получает только последние данные, а не данные той же даты выполнения, которую ожидает последующая задача.В документации Airfow говорится, что если мы передадим Если xcom_pull передается одна строка для task_ids, то возвращается самое последнее значение XCom из этой задачи Однако в нем не говорится, как получить данные того же экземпляра выполнения DAG,

Я прошел один и тот же вопрос Как получить значение xcom из другого экземпляра задачи в том же прогоне DAG (не самого последнего)? однако, единственное решение, которое есть, это то, чтоЯ уже делаю.но, кажется, это не правильный ответ.

Определение DAG:

dag = DAG(
    'daily_motor',
    default_args=default_args,
    schedule_interval='@daily'
)

#This task creates data in a BigQuery table based on execution date
extract_daily_data  = BigQueryOperator(
    task_id='daily_data_extract',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    sql=policy_by_transaction_date_sql('{{ ds }}'), 
    destination_dataset_table='Test.daily_data_tmp',
    dag=dag)


get_daily_data = BigQueryGetDataOperator(
    task_id='get_daily_data',
    dataset_id='Test',
    table_id='daily_data_tmp',
    max_results='10000',
    dag=dag

)


#This is where I need to pull the data of the same execution date/same instance of DAG run not the most recent task run

def process_bq_data(**kwargs):
    bq_data = kwargs['ti'].xcom_pull(task_ids = 'get_daily_data')
    #This bq_data is most recent one not of the same execution date
    obj_creator = IibListToObject()
    items = obj_creator.create(bq_data, 'daily')
    save_daily_date_wise(items)


process_data = PythonOperator(
    task_id='process_data_from_bq',
    python_callable=process_bq_data,
    provide_context=True,
    dag = dag
)

get_daily_data.set_upstream(extract_daily_data)
process_data.set_upstream(get_daily_data)
...