Как вернуть словарь в качестве PCollection? - PullRequest
0 голосов
/ 24 мая 2018

Мне дан URL-адрес облака Google.Я должен:

  1. Использовать URL-адрес для получения списка больших двоичных объектов в этом сегменте

  2. Для каждого большого двоичного объекта я выполняю несколько вызовов GCS API дляполучить информацию о BLOB-объекте (blob.size, blob.name и т. д.)

  3. Для каждого блоба я также должен прочитать его, найти что-то внутри него и добавить к полученным значениямиз вызовов GCS API

  4. Для каждого большого двоичного объекта я должен записать значения, найденные на шаге 2 и 3 для большого двоичного объекта, в BigQuery

Iу меня есть тысячи BLOB-объектов, так что это нужно сделать с помощью Apache Beam (я рекомендовал)

Моя идея конвейера выглядит примерно так:

GetUrlOfBucket и сделать PCollection

Используя эту коллекцию PCollection, получите список больших двоичных объектов в качестве новой коллекции PCollection.

Создайте коллекцию PCollection с метаданными этих больших объектов.

Выполните преобразование, которое примет PCollection, являющийся словарем метаданных.значения, входит в BLOB-объект, сканирует значение и возвращаетНовая PCollection, которая представляет собой словарь значений метаданных и этого нового значения

Запишите это в BigQuery.

Мне особенно трудно думать о том, как вернуть словарь в виде PCollection

[+] То, что я прочитал:

https://beam.apache.org/documentation/programming-guide/#composite-transforms

https://medium.com/@rajeshhegde/data-pipeline-using-apache-beam-python-sdk-on-dataflow-6bb8550bf366

Любые предложения, особенно о том, как взять в этом имени ивозвращаю PC коллекцию капель, с благодарностью.

1 Ответ

0 голосов
/ 30 мая 2018

Я решил эту проблему, прочитав больше о apache-beam и выяснив, что мне пришлось использовать функцию ParDo для разделения работы между моими ресурсами, в ParDo я вызываю свою функцию DoFn, которая принимает элемент и выполняет всю обработкунужно для этого и дает DIC.см. этот пост Apache Beam: как одновременно создать много PCollections, которые подвергаются одному и тому же PTransform?

    class ExtractMetadata(beam.DoFn):                                                                                                                                                                                                                                                  
def process(self, element):                                                                                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    Takes in a blobName, fetches the blob and its values and returns a dictionary of values                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    metadata = element.metadata                                                                                                                                                                                                                                                
    if metadata is not None:                                                                                                                                                                                                                                                   
        event_count = int(metadata['count'])                                                                                                                                                                                                                                   
    else:                                                                                                                                                                                                                                                                      
        event_count = None                                                                                                                                                                                                                                                     

    event_type = self.determine_event_type(element.id)                                                                                                                                                                                                                         
    cluster    = self.determine_cluster(element.id)                                                                                                                                                                                                                            
    customer   = self.determine_customer(element)                                                                                                                                                                                                                              
   # date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')                                                                                                                                                                                                   
    #date = date.isoformat()                                                                                                                                                                                                                                                   
    dic = {                                                                                                                                                                                                                                                                    
        'blob_name'       : element.name,                                                                                                                                                                                                                                      
        'event_path'      : element.path,                                                                                                                                                                                                                                      
        'size'            : int(element.size),                                                                                                                                                                                                                                 
        'time_of_creation': element.time_created.isoformat(),                                                                                                                                                                                                                  
        'event_count'     : event_count,                                                                                                                                                                                                                                       
        'event_type'      : event_type,                                                                                                                                                                                                                                        
        'cluster'         : cluster,                                                                                                                                                                                                                                           
        'customer'        : customer                                                                                                                                                                                                                                           
    }                                                                                                                                                                                                                                                                          
    yield dic
...