Я хочу создать конвейер, который читает таблицы BigQuery, обрабатывает их и выводит их в BigQuery с помощью GoogleCloudDataflow. Входная таблица BigQuery отсортирована по дате, поэтому я хочу запускать конвейер каждый день и выводить по дате. Я пытаюсь использовать API CloudDataflow, используя cron AppEngine для его регулярного запуска. Я хочу получить дату выполнения cron, опубликовать ее в API в качестве пользовательского параметра и использовать дату для запроса BigQuery и имени выходной таблицы в конвейере потока данных, но хорошего способа нет. Я получаю параметр даты в виде RunValueProvider, но не могу использовать его в качестве запроса. Я использую Python для реализации конвейера AppEngine и Dataflow. Есть хороший способ?
Я хочу исправить этот конвейерный код
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
import json
import datetime
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--date', type=str,default='')
def run(argv=None):
parser = argparse.ArgumentParser()
pipeline_options = UserOptions()
pipeline_options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
pipeline_options.view_as(SetupOptions).save_main_session = True
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT
google_cloud_options.template_location = 'gs://test/template'
google_cloud_options.temp_location= 'gs://test/temp'
google_cloud_options.staging_location = 'gs://test/staging'
query = f'SELECT timestamp FROM `dataset.*`'
with beam.Pipeline(options=pipeline_options) as p:
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECT, use_standard_sql=True, query=query))
| 'write' >> beam.io.Write(beam.io.BigQuerySink(
f'dataflow_test.dataflow_test',
schema='timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()