как убедиться, что последующие задания потока данных будут выполняться на той же машине - PullRequest
0 голосов
/ 25 апреля 2019

Я могу загрузить несколько CSV-файлов в bigquery по потоку данных, используя цикл for.Но в этом случае каждый раз запускается новый поток данных, что приводит к дополнительным издержкам.

DataFlow часть моего кода:

def run(abs_csv_file_name="", table_name="", argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_csv_file',
                        dest='input_csv_file',
               default='gs://{0}/{1}'.format(bucket_name,abs_csv_file_name),
                        help='Input file to process.')
    parser.add_argument('--output_stage_bq',
                        dest='output_stage_bq',
                        default='{0}:{1}.{2}'.format(project_id,stage_dataset_name,table_name),
                        help='Output file to write results to.')

    parser.add_argument('--output_target_bq',
                        dest='output_target_bq',
                        default='{0}:{1}.{2}'.format(project_id,dataset_name,table_name),
                        help='Output file to write results to.')

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    # delete_a_bq_table(table_name)
    table_spec = "{0}:{1}.{2}".format(project_id, stage_dataset_name, table_name)

    with beam.Pipeline(options=pipeline_options) as p1:
        data_csv = p1 | 'Read CSV file' >> ReadFromText(known_args.input_csv_file)
        dict1 = (data_csv | 'Format to json' >> (beam.ParDo(Split())))
        (dict1 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                                            known_args.output_stage_bq,
                                            schema=product_revenue_schema
                                            ))
        fullTable = (p1 | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))
        (fullTable | 'writeToBQ another dataset' >> beam.io.WriteToBigQuery(known_args.output_target_bq,
                                schema = product_revenue_schema))

Я уверен, что должен быть лучший способ, чем каждый раз вызывать функцию запуска.

for i in range(len(table_names)):
    find_product_revenue_schema_and_column_name(table_name=table_names[i])

    run(abs_csv_file_name=abs_file_names[i], table_name=table_names[i])

Мне нужнонаписать код, который может гарантировать, что последующие задания потока данных будут выполняться на том же компьютере, чтобы можно было сэкономить время настройки компьютера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...