Переименование столбцов BigQuery в Apache Beam - PullRequest
1 голос
/ 09 апреля 2020

Я пытаюсь переименовать строки больших запросов в Apache Beam Pipeline в Python, как в следующем примере: Наличие 1 PCollection с полными данными и 1 другого только с 3 полями, переименованными в col1 в col1.2, col2 в col2 .2 ...

Как правильно применить мой фильтр для получения второй коллекции PC с переименованными строками?

def is_filtered(row):
    row['col1'] == row['col1.2']
    row['col2'] == row['col2.2']
    row['col3'] == row['col3.2']
    yield row


with beam.Pipeline() as pipeline:
    query = open('query.sql', 'r')
    bq_source = beam.io.BigQuerySource(query=query.read(),
                                       use_standard_sql=True)    
    main_table = \
        pipeline \
        | 'ReadBQData' >> beam.io.Read(bq_source) \

    cycle_table = (
        pipeline 
        | 'FilterMainTable' >> beam.Filter(is_filtered, main_table))

Я также подумал об использовании раздела, но примеры разделов, которые я нашел, были более о разделении содержимого строк, а не самой строки

1 Ответ

1 голос
/ 09 апреля 2020

Оператор Filter используется для создания PCollection со строками, удаленными из источника (ожидается, что он возвращает логическое значение). Используйте оператор Map , если вы хотите создать PCollection с преобразованными строками 1: 1. Вот пример:

def filter_columns(row):
    return {'col1.2': row['col1'],
            'col2.2': row['col2'],
            'col3.2': row['col3']}


with beam.Pipeline() as pipeline:
    query = open('query.sql', 'r')
    bq_source = beam.io.BigQuerySource(query=query.read(),
                                       use_standard_sql=True)    
    main_table = \
        pipeline \
        | 'ReadBQData' >> beam.io.Read(bq_source)

    cycle_table = (
        main_table 
        | 'FilterMainTable' >> beam.Map(filter_columns))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...