Я создал шаблон потока данных с некоторыми параметрами. Когда я записываю данные в BigQuery, я хотел бы использовать эти параметры, чтобы определить, в какую таблицу они должны записывать. Я пробовал вызвать WriteToBigQuery в ParDo, как предлагается в следующей ссылке.
Как я могу писать в Big Query, используя поставщик значений времени выполнения в Apache Beam?
Конвейер работал успешно, но не создает и не загружает данные в BigQuery . Есть идеи, в чем может быть проблема?
def run():
pipeline_options = PipelineOptions()
pipeline_options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
with beam.Pipeline(options=pipeline_options) as p:
custom_options = pipeline_options.view_as(CustomOptions)
_ = (
p
| beam.Create([None])
| 'Year to periods' >> beam.ParDo(SplitYearToPeriod(custom_options.year))
| 'Read plan data' >> beam.ParDo(GetPlanDataByPeriod(custom_options.secret_name))
| 'Transform record' >> beam.Map(transform_record)
| 'Write to BQ' >> beam.ParDo(WritePlanDataToBigQuery(custom_options.year))
)
if __name__ == '__main__':
run()
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--year', type=int)
parser.add_value_provider_argument('--secret_name', type=str)
class WritePlanDataToBigQuery(beam.DoFn):
def __init__(self, year_vp):
self._year_vp = year_vp
def process(self, element):
year = self._year_vp.get()
table = f's4c.plan_data_{year}'
schema = {
'fields': [ ...some fields properties ]
}
beam.io.WriteToBigQuery(
table=table,
schema=schema,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS
)