Выполнить запрос с информацией о текущем времени в BigQuery с CloudDataflow - PullRequest
0 голосов
/ 07 апреля 2020

Я хочу создать конвейер, который читает таблицы BigQuery, обрабатывает их и выводит их в BigQuery с помощью GoogleCloudDataflow. Входная таблица BigQuery отсортирована по дате, поэтому я хочу запускать конвейер каждый день и выводить по дате. Я пытаюсь использовать API CloudDataflow, используя cron AppEngine для его регулярного запуска. Я хочу получить дату выполнения cron, опубликовать ее в API в качестве пользовательского параметра и использовать дату для запроса BigQuery и имени выходной таблицы в конвейере потока данных, но хорошего способа нет. Я получаю параметр даты в виде RunValueProvider, но не могу использовать его в качестве запроса. Я использую Python для реализации конвейера AppEngine и Dataflow. Есть хороший способ?

Я хочу исправить этот конвейерный код

from __future__ import absolute_import

import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions

import json
import datetime

class UserOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--date', type=str,default='')

def run(argv=None):
  parser = argparse.ArgumentParser()
  pipeline_options = UserOptions()
  pipeline_options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
  pipeline_options.view_as(SetupOptions).save_main_session = True
  google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  google_cloud_options.project = PROJECT
  google_cloud_options.template_location = 'gs://test/template'
  google_cloud_options.temp_location= 'gs://test/temp'
  google_cloud_options.staging_location = 'gs://test/staging'
  query = f'SELECT timestamp FROM `dataset.*`'
  with beam.Pipeline(options=pipeline_options) as p:
    (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECT, use_standard_sql=True, query=query))
      | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        f'dataflow_test.dataflow_test',
        schema='timestamp:TIMESTAMP',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
...