Python Apache Beam Datapiple для чтения нескольких таблиц BQ - PullRequest
0 голосов
/ 13 марта 2019

У меня есть требование прочитать таблицу BQ, применить преобразование и загрузить его в другую таблицу BQ. преобразование является общим для всех таблиц.

Мне просто интересно, можем ли мы читать несколько таблиц одновременно, применять преобразование и загружать разные таблицы назначения. Структура исходной таблицы и таблицы назначения будет одинаковой. Например

table_x --transformation(abc) -- table_x1
table_y --transformation(abc) -- table_y1

Ниже приведен пример кода, который я протестировал с одной таблицей:

import apache_beam as beam
        from apache_beam.options.pipeline_options import PipelineOptions
        from apache_beam.io.gcp.bigquery import parse_table_schema_from_json

        def get_schema(table_in): 
            """
            Function to pull the json schema for the table being copied. The schema should be in the schema/ folder, and be of the format:
            'schema_"input_table_name".json'
            """
            with open('schema/schema_'+table_in+'.json') as f:
                        data = f.read()
                        # Wrapping the schema in fields is required for the BigQuery API.
                        table_schema = '{"fields": ' + data + '}'        
            return parse_table_schema_from_json(table_schema) # # This code reads from a biq query table and loads in to another table on BQ, transformation still need to looked at

        def run(argv=None):
            """
            Function that will instantiate the pipeline options, define the pipeline, and then run it
            """
            pipeline_options = PipelineOptions()
            p = beam.Pipeline(options=pipeline_options)    
            (p 
             | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource('projecttest:datasetx.table_x'))
             | 'Write to BigQuery' >> beam.io.Write(
                                         beam.io.BigQuerySink('projecttest:datasetx.table_x1',

    schema=get_schema('table_x'),
                                         write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
                                     )
            p.run().wait_until_finish()    

        if __name__ == '__main__':
            run()

1 Ответ

0 голосов
/ 13 марта 2019

У вас есть (обобщенная) строка:

 p | beam.io.Read(beam.io.BigQuerySource("x")
   | beam.io.Write(beam.io.BigQuerySink("x1")

Почему бы просто не добавить следующую (обобщенную) строку?

 p | beam.io.Read(beam.io.BigQuerySource("y")
   | beam.io.Write(beam.io.BigQuerySink("y1")
...