Воздушный поток BigQueryOperator: как сохранить результат запроса в секционированную таблицу? - PullRequest
0 голосов
/ 24 мая 2018

У меня есть простая группа DAG

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(dag_id='my_dags.my_dag') as dag:

    start = DummyOperator(task_id='start')

    end = DummyOperator(task_id='end')
    sql = """
             SELECT *
             FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                            destination_dataset_table='my_dataset.my_table20180524'),
                            task_id='bq_query',
                            bigquery_conn_id='my_bq_connection',
                            use_legacy_sql=False,
                            write_disposition='WRITE_TRUNCATE',
                            create_disposition='CREATE_IF_NEEDED',
                            query_params={})
    start >> bq_query >> end

При выполнении задачи bq_query SQL-запрос сохраняется в заштрихованной таблице.Я хочу, чтобы он сохранялся в ежедневной многораздельной таблице.Для этого я изменил только destination_dataset_table на my_dataset.my_table$20180524.Я получил ошибку ниже при выполнении bq_task:

Partitioning specification must be provided in order to create partitioned table

Как я могу указать BigQuery для сохранения результата запроса в ежедневную многораздельную таблицу?Моим первым предположением было использование query_params в BigQueryOperator, но я не нашел ни одного примера использования этого параметра.

РЕДАКТИРОВАТЬ:

Я использую google-cloud==0.27.0 клиент Python ... и это тот, который используется в Prod: (

Ответы [ 2 ]

0 голосов
/ 14 июня 2018

Основная проблема заключается в том, что у меня нет доступа к новой версии Google Cloud Python API, продукт использует версию 0.27.0 .Итак, чтобы выполнить работу, я сделал что-то плохое и грязное:

  • сохранил результат запроса в заштрихованной таблице, пусть он будет table_sharded
  • got table_sharded 'Схема, пусть это будет table_schema
  • сохраненный " SELECT * FROM dataset.table_sharded" запрос к многораздельной таблице, обеспечивающий table_schema

Все это абстрагируется в одном операторе, который использует ловушку.Хук отвечает за создание / удаление таблиц / разделов, получение схемы таблиц и выполнение запросов в BigQuery.

Посмотрите на код .Если есть другое решение, пожалуйста, дайте мне знать.

0 голосов
/ 24 мая 2018

Сначала необходимо создать пустую многораздельную таблицу назначения.Следуйте инструкциям здесь: ссылка , чтобы создать пустую многораздельную таблицу

, а затем снова запустить под трубопроводом воздушного потока.Вы можете попробовать код:

import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    sql = """
         SELECT *
         FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                        destination_dataset_table={{ params.t_name }}),
                        task_id='bq_query',
                        bigquery_conn_id='my_bq_connection',
                        use_legacy_sql=False,
                        write_disposition='WRITE_TRUNCATE',
                        create_disposition='CREATE_IF_NEEDED',
                        query_params={'t_name': table_name},
                        dag=dag
                        )
start >> bq_query >> end

Итак, я создал переменную динамического имени таблицы и передал ее оператору BQ.

...