Отправить вывод OracleOperator для другой задачи в потоке - PullRequest
0 голосов
/ 23 марта 2020

Мне нужно использовать вывод oracleOperator в другой задаче для дальнейшего выполнения. У меня проблема в том, что когда я перетаскиваю данные в другое задание и печатаю их, это дает результат как None. Ошибка не выдана, но данные не переданы. Также на вкладке xcom в пользовательском интерфейсе задачи отображается пустое поле для ключей и значений.

Мой код выглядит следующим образом:

from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'xyz',
    'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])


def puller(**kwargs):
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    print("VALUE IN PULLER ")
    print(pulled_value_1)

pull = PythonOperator(
    task_id='pullee',
    dag=dag,
    python_callable=puller,
    provide_context=True,
)
push = OracleOperator(
    task_id='data',
    sql='SELECT * FROM CUSTOMERS', 
    oracle_conn_id='1',
    provide_context=True,
    dag=dag,
)


push>>pull

Ответы [ 2 ]

1 голос
/ 24 марта 2020

Вы можете использовать следующий код. В основном используется PythonOperator с OracleHook.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.oracle_hook import OracleHook
from airflow.utils.dates import days_ago

args = {
    'owner': 'xyz',
    'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])


def puller(**kwargs):
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(task_ids='data')
    print("VALUE IN PULLER : ", pulled_value_1)


def get_data_from_oracle(**kwargs):
    oracle_hook = OracleHook(oracle_conn_id=kwargs['oracle_conn_id'])
    return oracle_hook.get_records(sql=kwargs['sql'])

push = PythonOperator(
    task_id='data',
    op_kwargs={'oracle_conn_id': 'oracle_conn_id', 'sql': 'SELECT * FROM CUSTOMERS'}
    provide_context=True,
    python_callable=get_data_from_oracle,
    dag=dag,
)

pull = PythonOperator(
    task_id='pullee',
    dag=dag,
    python_callable=puller,
    provide_context=True,
)


push >> pull

0 голосов
/ 23 марта 2020

Ваш DAG, кажется, здесь в порядке. Однако, изучая исходный код airflow.operators.oracle_operator , вы можете понять, что основной метод execute на самом деле использует OracleHook() помимо airflow.hooks.dbapi_hook расширений модуля:

def execute(self, context):
  self.log.info('Executing: %s', self.sql)
  hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
  hook.run(
      self.sql,
      autocommit=self.autocommit,
      parameters=self.parameters)

И виновником здесь является hook.run() метод, который выполняет запрос SQL, но ничего не возвращает Xcom, таким образом, метод xcom_pull() не получает ни одной записи.

run (self, sql, autocommit = False, параметры = нет) [source]

Запускает команду или список команд. Передайте список операторов sql параметру sql, чтобы они выполнялись последовательно

В качестве решения вы можете создать собственный Airflow Operator , дублирующий исходный код подлинного OracleOperator() и замену hook.run() на hook.get_records () , тогда вы ожидаете выполнить вложенный запрос, извлекая результирующие записи для дальнейшего перехода к следующей задаче.

Hope Вы находите это полезным.

...