Поток данных - запись в BigQuery - кортеж в словарь - PullRequest
0 голосов
/ 30 января 2020

Я пытаюсь выполнить последнее преобразование в моем конвейере лучей (используя Google Dataflow в качестве бегуна, но для тестирования DirectRunner) и пытаюсь понять, как изменить мои строки с кортежей на словари. Я попытался сделать функцию ParDo с пользовательским классом, который преобразует кортеж в dict. Отладка, которую я добавил в функцию TupToDict, выдает следующее, так что я думаю, что она должна работать? Любая помощь приветствуется ... Я собираюсь облысеть от того, что вырываю волосы :). Для проверки я перешел на directrunner. У меня также есть «xxxxx» информация о проекте, et c.

Вывод из функции TupToDict.

(u'A3VZVMI3JWCFPD', 5.0)
product_id= A3VZVMI3JWCFPD
avg_rating= 5.0
{'avg_rating': 5.0, 'product_id': u'A3VZVMI3JWCFPD'}

вывод в string.txt от directrunner

avg_rating
product_id
avg_rating
product_id
avg_rating
product_id
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    import sys
    import logging

    PROJECT='xxxx'
    BUCKET='xxxxxxxx'
    schema = 'product_id:STRING,avg_rating:FLOAT'

    class Split(beam.DoFn):
    #
    # This class returns each row as a dictionary
    #
        def process(self, element):
            user_id, product_id, rating = element.split(",")

            return [{
                'user_id': user_id,
                'product_id': product_id,
                'rating': float(rating),
            }]


    class CollectProductID(beam.DoFn):
        def process(self, element):
            #returns a list of tuples containing product_id and rating
            result = [(element['product_id'], element['rating'])]
            return result


    class TupToDict(beam.DoFn):
        def process(self, element):
                print element
            print "product_id=",element[0]
            print "avg_rating=",element[1]
            di = {'product_id': element[0],'avg_rating': element[1]}
            print di
            return di


    def run():
       argv = [
          '--project={0}'.format(PROJECT),
          '--staging_location=gs://{0}/'.format(BUCKET),
          '--temp_location=gs://{0}/'.format(BUCKET),
          #'--runner=DataflowRunner'
          '--runner=DirectRunner'
       ]

       p = beam.Pipeline(argv=argv)

       (p
          | 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/xxxxx_10k.csv'.format(BUCKET))
          # Next Transform splits the read PCollection CSV into rows of dictionaries
          | 'ParseCSV' >> beam.ParDo(Split())
          # Next Transform breaks each row into a tuple and the following one groups by product ID's
          | 'CombineProduct' >> beam.ParDo(CollectProductID())
          | 'Grouping Product IDs' >> beam.GroupByKey()
          # Next Transform averages the ratings per product_id
          | 'Calculating Average' >> beam.CombineValues(beam.combiners.MeanCombineFn())
          # Next Transform converts to a dictionary
          | 'Convert To Dict' >> beam.ParDo(TupToDict())
          # Next Transform maps keys to BigQuery Columns
          #| 'Map to BQ COL' >> beam.Map(lamda line: dict(record=line))
          # Next two are to toggle local writing to to BQ
          | 'Write to file' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
          #| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:xxxxxx.ecomm_product_ratings_avg'.format(PROJECT), schema=schema))

       p.run()

    if __name__ == '__main__':
           logging.getLogger().setLevel(logging.INFO)
           run()```

Ответы [ 2 ]

0 голосов
/ 30 января 2020

Цаман - герой. Мне нужно было добавить [] в мою функцию TupToDict, чтобы вернуть фактический словарь.

Я изменил его на поток данных в качестве бегуна, и он отлично вошел в BQ! Спасибо!

0 голосов
/ 30 января 2020

Можете ли вы попробовать выдать свой словарь вместо возврата

def process(self, element):
    print (element)
    print("product_id=",element[0])
    print("avg_rating=",element[1])
    di = {'product_id': element[0],'avg_rating': element[1]}
    print (di)
    yield di

Функция процесса должна возвращать итеративное значение, означающее, что если вы возвращаете dict, она будет повторяться над dict, поэтому она будет хранить только ключи в вашем выводе. Если вы дадите свой дикт, он вернет генератор, содержащий ваши дикты в качестве элементов.

...