У меня есть требование прочитать таблицу BQ, применить преобразование и загрузить его в другую таблицу BQ. преобразование является общим для всех таблиц.
Мне просто интересно, можем ли мы читать несколько таблиц одновременно, применять преобразование и загружать разные таблицы назначения. Структура исходной таблицы и таблицы назначения будет одинаковой. Например
table_x --transformation(abc) -- table_x1
table_y --transformation(abc) -- table_y1
Ниже приведен пример кода, который я протестировал с одной таблицей:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
def get_schema(table_in):
"""
Function to pull the json schema for the table being copied. The schema should be in the schema/ folder, and be of the format:
'schema_"input_table_name".json'
"""
with open('schema/schema_'+table_in+'.json') as f:
data = f.read()
# Wrapping the schema in fields is required for the BigQuery API.
table_schema = '{"fields": ' + data + '}'
return parse_table_schema_from_json(table_schema) # # This code reads from a biq query table and loads in to another table on BQ, transformation still need to looked at
def run(argv=None):
"""
Function that will instantiate the pipeline options, define the pipeline, and then run it
"""
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
(p
| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource('projecttest:datasetx.table_x'))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink('projecttest:datasetx.table_x1',
schema=get_schema('table_x'),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
)
p.run().wait_until_finish()
if __name__ == '__main__':
run()