Как увидеть результат MySqlHook в журнале - PullRequest
0 голосов
/ 08 января 2019

Я использую MySqlHook для установления соединения с airflow_db , и я выполняю какой-то запрос, но мне нужно где-нибудь увидеть результат запроса (скажем, в журнале), как можно посмотреть?

Вот пример кода

t1 = MySqlOperator(
    task_id='basic_mysql',
    mysql_conn_id='airflow_db',
    sql="select * from xcom",
    dag=dag)

Ответы [ 3 ]

0 голосов
/ 08 января 2019

Как правило, с Airflow ваш запрос должен быть написан так, чтобы результаты помещались во временную таблицу (возможно, включая results_name_{{ds_nodash}}). Затем вы можете использовать MySqlTo SomethingElse Operator для перемещения результатов временной таблицы. Затем уберитесь, уронив стол.

Я не вижу причин, по которым регистрация результатов в журналах Airflow была бы достаточной работой для DAG.

0 голосов
/ 09 января 2019

Оператор MySQL в настоящее время (airflow 1.10.1 на момент написания) не поддерживает возврат чего-либо в XCom, поэтому сейчас вам нужно написать небольшой оператор самостоятельно. Вы можете сделать это прямо в файле DAG:

from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook

class ReturningMySqlOperator(MySqlOperator):
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        return hook.get_records(
            self.sql,
            parameters=self.parameters)

t1 = ReturningMySqlOperator(
    task_id='basic_mysql',
    mysql_conn_id='airflow_db',
    sql="select * from xcom",
    dag=dag)

def get_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='basic_mysql')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    # Get data in your logs
    logging.info(string_to_print)

t2 = PythonOperator(
    task_id='records',
    provide_context=True,
    python_callable=get_records,
    dag=dag)

t1 >> t2
0 голосов
/ 08 января 2019

AFAIK, MySqlOperator служит для выполнения запроса UPDATE / DELETE и т. Д .; другими словами запросы, которые:

  • не возвращает никакого результата
  • возвращает результат, но вы не беспокоитесь об этом

Чтобы получить фактический результат, вы должны использовать MySqlHook. Вот небольшой фрагмент кода (Python 3.6+) для начала (не проверено, но только для подсказок)

from typing import List, Optional, Any
from airflow.hooks.mysql_hook import MySqlHook

# instantiate a MySqlHook
mysql_hook: MySqlHook = MySqlHook(mysql_conn_id="airflow_db")

# get records (this method comes from airflow.hooks.db_api_hook.DbApiHook)
records: List[List[Optional[Any]]] = mysql_hook.get_records(sql="select * from xcom")

# print records
print(records)

# alternatively, you can write records to task's logger
# note that here 'operator' = reference to your Operator
# operator.log.info("\n".join(records))

Вывод print() / log.info() появится в журнале задач на интерфейсе пользователя

...