Вызов beam.io.WriteToBigQuery в луче.DoFn - PullRequest
0 голосов
/ 28 мая 2020

Я создал шаблон потока данных с некоторыми параметрами. Когда я записываю данные в BigQuery, я хотел бы использовать эти параметры, чтобы определить, в какую таблицу они должны записывать. Я пробовал вызвать WriteToBigQuery в ParDo, как предлагается в следующей ссылке.

Как я могу писать в Big Query, используя поставщик значений времени выполнения в Apache Beam?

Конвейер работал успешно, но не создает и не загружает данные в BigQuery . Есть идеи, в чем может быть проблема?

def run():
  pipeline_options = PipelineOptions()
  pipeline_options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']

  with beam.Pipeline(options=pipeline_options) as p:
    custom_options = pipeline_options.view_as(CustomOptions)

    _ = (
      p
      | beam.Create([None])
      | 'Year to periods' >> beam.ParDo(SplitYearToPeriod(custom_options.year))
      | 'Read plan data' >> beam.ParDo(GetPlanDataByPeriod(custom_options.secret_name))
      | 'Transform record' >> beam.Map(transform_record)
      | 'Write to BQ' >> beam.ParDo(WritePlanDataToBigQuery(custom_options.year))
    )

if __name__ == '__main__':
  run()
class CustomOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--year', type=int)
    parser.add_value_provider_argument('--secret_name', type=str)
class WritePlanDataToBigQuery(beam.DoFn):
  def __init__(self, year_vp):
    self._year_vp = year_vp

  def process(self, element):
    year = self._year_vp.get()

    table = f's4c.plan_data_{year}'
    schema = {
      'fields': [ ...some fields properties ]
    }

    beam.io.WriteToBigQuery(
      table=table,
      schema=schema,
      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
      method=beam.io.WriteToBigQuery.Method.FILE_LOADS
    )

1 Ответ

0 голосов
/ 29 мая 2020

Вы создали экземпляр PTransform beam.io.gcp.bigquery.WriteToBigQuery внутри метода process вашего DoFn. Здесь есть пара проблем:

  • Метод process вызывается для каждого элемента входной коллекции PCollection. Он не используется для построения конвейерного графа. Этот подход к динамическому построению графика не работает.
  • После того, как вы переместите его из DoFn, вам нужно применить PTransform beam.io.gcp.bigquery.WriteToBigQuery к PCollection, чтобы он имел какой-либо эффект. См. Beam pydo c или учебную документацию Beam .

Чтобы создать поставщик производных значений для имени вашей таблицы, вам понадобится " вложенный "поставщик значений". К сожалению, это не поддерживается для Python SDK . Тем не менее, вы можете использовать опцию поставщика значений напрямую.

В качестве расширенного варианта вам может быть интересно попробовать «гибкие шаблоны», которые, по сути, упаковывают всю вашу программу как изображение docker и запускают его с параметры.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...