Мне нужно использовать вывод oracleOperator в другой задаче для дальнейшего выполнения. У меня проблема в том, что когда я перетаскиваю данные в другое задание и печатаю их, это дает результат как None. Ошибка не выдана, но данные не переданы. Также на вкладке xcom в пользовательском интерфейсе задачи отображается пустое поле для ключей и значений.
Мой код выглядит следующим образом:
from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'xyz',
'start_date': days_ago(2),
}
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])
def puller(**kwargs):
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
print("VALUE IN PULLER ")
print(pulled_value_1)
pull = PythonOperator(
task_id='pullee',
dag=dag,
python_callable=puller,
provide_context=True,
)
push = OracleOperator(
task_id='data',
sql='SELECT * FROM CUSTOMERS',
oracle_conn_id='1',
provide_context=True,
dag=dag,
)
push>>pull