Apache Airflow: печать запроса успешна при регистрации информации, ошибка запроса при logging.error - PullRequest
0 голосов
/ 20 ноября 2018

Мой вопрос касается регистрации успешных или неудачных запросов, выполненных BigQueryOperator из Apache Airflow 1.10.0. Мне интересно, возможно ли напечатать успешный запрос на logging.info, и если это не удалось распечатать наlogging.error?

from airflow.contrib.operators import bigquery_operator
# Query recent StackOverflow questions.
bq_recent_questions_query = bigquery_operator.BigQueryOperator(
    task_id='bq_recent_questions_query',
    bql="""
    SELECT owner_display_name, title, view_count
    FROM `bigquery-public-data.stackoverflow.posts_questions`
    WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
        AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
    ORDER BY view_count DESC
    LIMIT 100
    """.format(max_date=max_query_date, min_date=min_query_date),
    use_legacy_sql=False,
    destination_dataset_table=bq_recent_questions_table_id)

https://cloud.google.com/composer/docs/how-to/using/writing-dags

Ответы [ 2 ]

0 голосов
/ 21 ноября 2018

Вы можете создать своего собственного оператора, просто скопировав BigQueryOperator и внеся следующие изменения в функции execute и on_kill внутри него, или вы также можете переопределить существующий BigQueryOperator.

def execute(self, context):
    if self.bq_cursor is None:
        self.log.info( "Beginnging Execution." )
        hook = BigQueryHook(
            bigquery_conn_id=self.bigquery_conn_id,
            use_legacy_sql=self.use_legacy_sql,
            delegate_to=self.delegate_to)
        conn = hook.get_conn()
        self.bq_cursor = conn.cursor()
    self.bq_cursor.run_query(
        self.sql,
        destination_dataset_table=self.destination_dataset_table,
        write_disposition=self.write_disposition,
        allow_large_results=self.allow_large_results,
        flatten_results=self.flatten_results,
        udf_config=self.udf_config,
        maximum_billing_tier=self.maximum_billing_tier,
        maximum_bytes_billed=self.maximum_bytes_billed,
        create_disposition=self.create_disposition,
        query_params=self.query_params,
        labels=self.labels,
        schema_update_options=self.schema_update_options,
        priority=self.priority,
        time_partitioning=self.time_partitioning
    )
    self.log.info( "Executed: %s" % self.sql )

def on_kill(self):
    super(BigQueryOperator, self).on_kill()
    self.log.error( "Failed to Execute: %s" % self.sql )
    if self.bq_cursor is not None:
        self.log.info('Canceling running query due to execution timeout')
        self.bq_cursor.cancel_query()

Вы должны добавить собственные операторы в каталог plugins .

0 голосов
/ 20 ноября 2018

Похоже, код регистрирует запрос до до выполнения , поэтому на момент записи журнала результат неизвестен.

...