Поток данных GCS в BigQuery - Как вывести несколько строк на вход? - PullRequest
0 голосов
/ 06 ноября 2018

В настоящее время я использую предоставленный Google шаблон gcs-text-to-bigquery и добавляю функцию преобразования для преобразования моего файла jsonl. Jsonl довольно вложенный, и я хотел иметь возможность выводить несколько строк на одну строку json с разделителями новой строки, выполняя некоторые преобразования.

Например:

{'state': 'FL', 'metropolitan_counties':[{'name': 'miami dade', 'population':100000}, {'name': 'county2', 'population':100000}…], 'rural_counties':{'name': 'county1', 'population':100000}, {'name': 'county2', 'population':100000}….{}], 'total_state_pop':10000000,….}

Очевидно, что округов будет больше, чем 2, и в каждом штате будет одна из этих строк. Вывод, который хочет мой босс:

Output

Когда я выполняю преобразование текста из gcs-в-bq, я получаю только одну строку на штат (поэтому я получу округ Майами-Дейд из FL, а затем независимо от того, какой первый округ находится в моем преобразовании для следующего штата ). Я прочитал немного, и я думаю, что это из-за сопоставления в шаблоне, который ожидает один вывод на jsonline. Кажется, я могу сделать pardo (DoFn?), Не уверенный, что это такое, или есть похожая опция с beam.Map в python. В преобразованиях есть некоторая бизнес-логика (сейчас это около 25 строк кода, так как у json больше столбцов, чем я показал, но это довольно просто).

Есть предложения по этому поводу? данные поступят сегодня вечером / завтра, и в таблице BQ будут сотни тысяч строк.

шаблон, который я использую, в настоящее время находится в Java, но я могу довольно легко перевести его на python, так как в Python есть много примеров в Интернете. я лучше знаю python и думаю, что он легче, учитывая различные типы (иногда поле может быть пустым), и это выглядит менее устрашающе, учитывая, что примеры, которые я видел, выглядят проще, однако открыты либо для

1 Ответ

0 голосов
/ 07 ноября 2018

Решить это в Python довольно просто. Вот одна возможность (не полностью проверенная):

from __future__ import absolute_import                                                               

import ast                                                                      

import apache_beam as beam                                                      
from apache_beam.io import ReadFromText                                            
from apache_beam.io import WriteToText                                             

from apache_beam.options.pipeline_options import PipelineOptions                   

import os                                                                       
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/service_account.json'      

pipeline_args = [                                                                  
    '--job_name=test'                                                              
]                                                                                  

pipeline_options = PipelineOptions(pipeline_args)                                  


def jsonify(element):                                                              
    return ast.literal_eval(element)                                               


def unnest(element):                                                            
    state = element.get('state')                                                
    state_pop = element.get('total_state_pop')                                  
    if state is None or state_pop is None:                                                   
        return                                                                  
    for type_ in ['metropolitan_counties', 'rural_counties']:                   
        for e in element.get(type_, []):                                        
            name = e.get('name')                                                
            pop = e.get('population')                                           
            county_type = (                                                     
                'Metropolitan' if type_ == 'metropolitan_counties' else 'Rural' 
            )                                                                   
            if name is None or pop is None:                                     
                continue                                                        
            yield {                                                             
                'State': state,                                                 
                'County_Type': county_type,                                     
                'County_Name': name,                                            
                'County_Pop': pop,                                              
                'State_Pop': state_pop                                          
            }

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        

    schema = 'State:STRING,County_Type:STRING,County_Name:STRING,County_Pop:INTEGER,State_Pop:INTEGER'                                                                      

    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                                       
    )

Это будет успешным, только если вы работаете с пакетными данными. Если у вас есть потоковые данные, просто измените beam.io.Write(beam.io.BigquerySink(...)) на beam.io.WriteToBigQuery.

...