Я пытаюсь выполнить последнее преобразование в моем конвейере лучей (используя Google Dataflow в качестве бегуна, но для тестирования DirectRunner) и пытаюсь понять, как изменить мои строки с кортежей на словари. Я попытался сделать функцию ParDo с пользовательским классом, который преобразует кортеж в dict. Отладка, которую я добавил в функцию TupToDict, выдает следующее, так что я думаю, что она должна работать? Любая помощь приветствуется ... Я собираюсь облысеть от того, что вырываю волосы :). Для проверки я перешел на directrunner. У меня также есть «xxxxx» информация о проекте, et c.
Вывод из функции TupToDict.
(u'A3VZVMI3JWCFPD', 5.0)
product_id= A3VZVMI3JWCFPD
avg_rating= 5.0
{'avg_rating': 5.0, 'product_id': u'A3VZVMI3JWCFPD'}
вывод в string.txt от directrunner
avg_rating
product_id
avg_rating
product_id
avg_rating
product_id
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import sys
import logging
PROJECT='xxxx'
BUCKET='xxxxxxxx'
schema = 'product_id:STRING,avg_rating:FLOAT'
class Split(beam.DoFn):
#
# This class returns each row as a dictionary
#
def process(self, element):
user_id, product_id, rating = element.split(",")
return [{
'user_id': user_id,
'product_id': product_id,
'rating': float(rating),
}]
class CollectProductID(beam.DoFn):
def process(self, element):
#returns a list of tuples containing product_id and rating
result = [(element['product_id'], element['rating'])]
return result
class TupToDict(beam.DoFn):
def process(self, element):
print element
print "product_id=",element[0]
print "avg_rating=",element[1]
di = {'product_id': element[0],'avg_rating': element[1]}
print di
return di
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/'.format(BUCKET),
'--temp_location=gs://{0}/'.format(BUCKET),
#'--runner=DataflowRunner'
'--runner=DirectRunner'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/xxxxx_10k.csv'.format(BUCKET))
# Next Transform splits the read PCollection CSV into rows of dictionaries
| 'ParseCSV' >> beam.ParDo(Split())
# Next Transform breaks each row into a tuple and the following one groups by product ID's
| 'CombineProduct' >> beam.ParDo(CollectProductID())
| 'Grouping Product IDs' >> beam.GroupByKey()
# Next Transform averages the ratings per product_id
| 'Calculating Average' >> beam.CombineValues(beam.combiners.MeanCombineFn())
# Next Transform converts to a dictionary
| 'Convert To Dict' >> beam.ParDo(TupToDict())
# Next Transform maps keys to BigQuery Columns
#| 'Map to BQ COL' >> beam.Map(lamda line: dict(record=line))
# Next two are to toggle local writing to to BQ
| 'Write to file' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
#| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:xxxxxx.ecomm_product_ratings_avg'.format(PROJECT), schema=schema))
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()```