Получать результаты от BigQueryOperator в потоке воздуха - PullRequest
0 голосов
/ 01 декабря 2018

Я пытаюсь получить результаты из BigQueryOperator, используя поток воздуха, но я не смог найти способ сделать это.Я попытался вызвать метод next() в элементе bq_cursor (доступен в 1.10), однако он возвращает None.Вот как я пытался это сделать

import datetime
import logging

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time()
)

def MyChequer(**kwargs):
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql='select count(*) from mydataset.mytable'
    )

    big_query_count.execute(context=kwargs)

    logging.info(big_query_count)
    logging.info(big_query_count.__dict__)
    logging.info(big_query_count.bq_cursor.next())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'project_id': 'myproject'
}

with models.DAG(
        'bigquery_results_execution',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    myoperator = python_operator.PythonOperator(
        task_id='threshold_operator',
        provide_context=True,
        python_callable=MyChequer
    )

    # Define DAG
    myoperator

Взглянув на bigquery_hook.py и bigquery_operator.py , похоже, это единственный доступный способ получитьрезультаты, достижения.

Ответы [ 3 ]

0 голосов
/ 02 декабря 2018

Я создаю свой собственный оператор с помощью ловушки BigQuery всякий раз, когда мне нужно получить данные из запроса BigQuery и использовать их для чего-то. Я обычно называю это BigQueryToXOperator, и у нас есть несколько таких для отправки данных BigQuery в другие внутренние системы,

Например, у меня есть оператор BigQueryToPubSub, который может оказаться полезным в качестве примера того, как запрашивать BigQuery, а затем обрабатывать результаты построчно, отправляя их в Google PubSub.Рассмотрим следующий обобщенный пример кода, как это сделать самостоятельно:

class BigQueryToXOperator(BaseOperator):
    template_fields = ['sql']
    ui_color = '#000000'

    @apply_defaults
    def __init__(
            self,
            sql,
            keys,
            bigquery_conn_id='bigquery_default',
            delegate_to=None,
            *args,
            **kwargs):
        super(BigQueryToXOperator, self).__init__(*args, **kwargs)
        self.sql = sql
        self.keys = keys # A list of keys for the columns in the result set of sql
        self.bigquery_conn_id = bigquery_conn_id
        self.delegate_to = delegate_to


    def execute(self, context):
        """
        Run query and handle results row by row.
        """
        cursor = self._query_bigquery()
        for row in cursor.fetchall():
            # Zip keys and row together because the cursor returns a list of list (not list of dicts)
            row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')

            # Do what you want with the row...
            handle_row(row_dict)


    def _query_bigquery(self):
        """
        Queries BigQuery and returns a cursor to the results.
        """
        bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                          use_legacy_sql=False)
        conn = bq.get_conn()
        cursor = conn.cursor()
        cursor.execute(self.sql)
        return cursor
0 голосов
/ 03 декабря 2018

Спасибо @kaxil и @Mike за ответы.Я нашел проблему.В BigQueryCursor есть какая-то ошибка (на мой взгляд).Как часть run_with_configuration, running_job_id возвращается, но никогда не присваивается job_id, который используется для получения результатов в методе next.Обходной путь (не очень элегантный, но хороший, если вы не хотите все заново реализовывать), назначает job_id на основе running_job_id в хуке, вот так

big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())

Одна проблема, которую нужно решитьисправлено run_with_configuration назначение правильного job_id в конце процесса, строка после обхода может быть удалена

0 голосов
/ 01 декабря 2018

Вы можете использовать BigQueryOperator для сохранения результатов во временной таблице назначения, а затем использовать BigQueryGetDataOperator для извлечения результатов, как показано ниже, а затем используйте BigQueryTableDeleteOperator для удаления таблицы:

get_data = BigQueryGetDataOperator(
    task_id='get_data_from_bq',
    dataset_id='test_dataset',
    table_id='Transaction_partitions',
    max_results='100',
    selected_fields='DATE',
    bigquery_conn_id='airflow-service-account'
)

Документы:

...