Как использовать MySqlOperator с xcom в Airflow? - PullRequest
0 голосов
/ 04 октября 2018

Я прочитал это Как использовать XCMS с воздушным потоком с MySqlOperator , и хотя он имеет похожее название, он не решает мою проблему.

У меня есть следующий код:

def branch_func_is_new_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    logging.info(string_to_print)
    if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
        return 'import_orders'
    else:
        return 'skip_operation'

query_get_max_order_id  = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
        task_id='query_get_max_order_id',
        sql= query_get_max_order_id,
        mysql_conn_id=MyCon,
        xcom_push=True,
        dag=dag)

branch_op_is_new_records = BranchPythonOperator(
    task_id='branch_operation_is_new_records',
    provide_context=True,
    python_callable=branch_func_is_new_records,
    dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation

MySqlOperator возвращает число в соответствии с номером, который BranchPythonOperator выбирает следующую задачу.Гарантируется, что MySqlOperator вернул значение больше 0.

Моя проблема в том, что ничто не подталкивается к XCOM в пользовательском интерфейсе MySqlOperator при переходе на XCOM Я вижуничего такого.BranchPythonOperator очевидно ничего не читает, поэтому мой код не работает.

Почему XCOM здесь не работает?

1 Ответ

0 голосов
/ 04 октября 2018

Оператор MySQL в настоящее время (airflow 1.10.0 на момент написания) не поддерживает возврат чего-либо в XCom, поэтому на данный момент для вас исправление - написать небольшой оператор самостоятельно.Вы можете сделать это прямо в файле DAG (не проверено, поэтому могут быть глупые ошибки):

from airflow.operators.mysql_operator import MySqlOperator as BaseMySqlOperator
from airflow.hooks.mysql_hook import MySqlHook

class ReturningMySqlOperator(BaseMySqlOperator):
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        return hook.get_first(
            self.sql,
            parameters=self.parameters)


def branch_func_is_new_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    logging.info(string_to_print)
    if str(xcom) == 'NewRecords':
        return 'import_orders'
    else:
        return 'skip_operation'

query_get_max_order_id  = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = ReturningMySqlOperator(
        task_id='query_get_max_order_id',
        sql= query_get_max_order_id,
        mysql_conn_id=MyCon,
        # xcom_push=True,
        dag=dag)

branch_op_is_new_records = BranchPythonOperator(
    task_id='branch_operation_is_new_records',
    provide_context=True,
    python_callable=branch_func_is_new_records,
    dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
...