Можно ли использовать BigqueryOperator для хранения результатов данных в нескольких разделах за один вызов? - PullRequest
0 голосов
/ 06 апреля 2020

В настоящее время у меня есть этот query_to_table_x. sql

SELECT column_a, column_b, column_c
FROM table_x
WHERE _PARTITIONTIME = {{ execution_date }}

У меня есть airflow_dag.py, как показано ниже

def func(dag):
  day_no_dash = {{ ds_nodash }}
  day = {{ ds }}
  transform_op = BigQueryOperator(
            sql='query_to_table_x.sql',
            params={
                'execution_date': day
            },
            destination_dataset_table='project.dataset.result_table_x' + '$' + day_no_dash,
                time_macros
            ),
            task_id='job_to_get_result_table_x',
            create_disposition='CREATE_NEVER',
            write_disposition='WRITE_TRUNCATE'
  return operators

dag = DAG(
    'daily_job',
    default_args=default_args,
    schedule_interval="00 02 * * *",
)
result_table_x = func(dag)

В приведенном выше случае я буду запускать query_to_table_x. sql и сохраните его в project.dataset.result_table_x $ yyyyMMdd. Пример: сегодня 2020-04-06, поэтому я запустите команду query_to_table_x. sql с фильтром _PARTITIONTIME = '2020-04-06', затем сохраните результат в project.dataset.result_table_x$20200406

Я планирую запускать эту группу DAG не ежедневно , но раз в две недели. Вопрос, возможно ли сделать один вызов BigQueryOperator, но выбрать несколько дат, а затем сохранить результат в другом PARTITIONTIME. Так что у меня будет такой запрос

SELECT column_a, column_b, column_c
FROM table_x
WHERE _PARTITIONTIME BETWEEN TIMESTAMP_SUB({{ execution_date }}, INTERVAL 14 DAY) AND {{ execution_date }}

Но я не знаю, доступен ли для BigqueryOperator такой параметр для установки.

Заранее спасибо.

1 Ответ

0 голосов
/ 07 апреля 2020

Вы можете динамически создавать операторы внутри вашего airflow_dag.py, используя циклы.

Если вы переведете следующее SQL logi c в python TIMESTAMP_SUB({{ execution_date }}, INTERVAL 14 DAY) AND {{ execution_date }}, вы можете создать итератор, который будет использоваться для создания потока DAG.

Таким образом, ваша DAG будет выглядеть примерно так:

for date in ['2020-04-06', '2020-04-07', '...']:
    transform_op = BigQueryOperator(...)

Это создаст N задач BigQueryOperator, где каждая из них будет запрашивать и перезаписывать одну дату. (Убедитесь, что 'task_id' уникален для каждого экземпляра BigQueryOperator)

Обратите внимание, что эти N операторов будут запрашивать BigQuery несколько раз (в вашем случае 14 раз), но, поскольку вы работаете с разбиением по времени приема, затраты должны оставаться так же, как вы сканируете только один день с каждой задачей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...