Чтение из файла CSV и загрузка в Google Data Store Python - PullRequest
0 голосов
/ 08 января 2020

Мое требование - прочитать данные из CSV-файла вместе с заголовком и создать ту же структуру в Google Dat a Store, используя Python с Dataflow. Я попытался создать пример кода, как показано ниже.

Мой образец CSV ниже,

First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"

Мой фрагмент кода Pyhton 2.7 такой, как показано ниже

import csv
import datetime
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
from apache_beam.io.filesystems import FileSystems
from apache_beam import pvalue


class CSVtoDict(beam.DoFn):
    """Converts line into dictionary"""
    def process(self, element, header):
        rec = ""
        element = element.encode('utf-8')
        try:
            for line in csv.reader([element]):
                rec = line

            if len(rec) == len(header):
                data = {header.strip(): val.strip() for header, val in zip(header, rec)}
                return [data]
            else:
                logging.info("row contains bad data")
        except Exception:
            pass

class CreateEntities(beam.DoFn):
    """Creates Datastore entity"""
    def process(self, element):
        entity = entity_pb2.Entity()
        sku = int(element.pop('sku'))
        element[1] = float(element[1])
        element['salePrice'] = float(element['salePrice'])
        element['name'] = unicode(element['name'].decode('utf-8'))
        element['type'] = unicode(element['type'].decode('utf-8'))
        element['url'] = unicode(element['url'].decode('utf-8'))
        element['image'] = unicode(element['image'].decode('utf-8'))
        element['inStoreAvailability'] = unicode(element['inStoreAvailability'])

        datastore_helper.add_key_path(entity.key, 'Productx', sku)
        datastore_helper.add_properties(entity, element)
        return [entity]


class ProcessOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--input',
                dest='input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

def read_header_from_filename(filename):
  # note that depending on your newline character/file encoding, this may need to be modified
  file_handle = FileSystems.open(filename)  
  header = file_handle.readline()
  return header.split(',')

process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
# Create PCollection containing header line
header = (p
          | beam.Create(process_options.input)
          | beam.Map(read_header_from_filename))

def dataflow(argv=None):
    process_options = PipelineOptions().view_as(ProcessOptions)
    p = beam.Pipeline(options=process_options)

    (p
    | 'Reading input file' >> beam.io.ReadFromText(process_options.input)
    | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
    | 'Create entities' >> beam.ParDo(CreateEntities())
    | 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
    )    
    p.run().wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    dataflow()

Я могу возможность загружать объекты, используя поток данных, однако я хотел бы проанализировать файл CSV из заголовка, а затем строки вместо жесткого кодирования значений в классе CreateEntities и записать то же самое в объектах хранилища данных.

По сути, выгрузите тот же CSV-файл, который задан в качестве входных данных для задания потока данных со строками. Может кто-нибудь помочь, пожалуйста?

Required Output in Data Store for Key Actor:

First Name Last Name Date of Birth
Tom,Cruise "July 3, 1962"
Bruce,Willis "March 19, 1955"
Morgan,Freeman "June 1, 1937"
John,Wayne "May 26, 1907"

Редактировать : Я включил код, указанный вами, и теперь получаю сообщение об ошибке ниже. Я использую Python 2.7 и импортировал соответствующие библиотеки. Извините, я очень новичок в Python.

Error:
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/home/gurusankar_p/upload-data-datastore-dataflow/upload2.py", line 70, in <module>
    | beam.Map(read_header_from_filename))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/core.py", line 2423, in __init__
    self.values = tuple(values)
TypeError: 'RuntimeValueProvider' object is not iterable

Спасибо, GS

1 Ответ

1 голос
/ 08 января 2020

Apache Beam распараллеливает обработку ваших данных, распределяя чтение файла по многим работникам, что означает, что большинство работников вообще никогда не будут читать строку заголовка.

Что вы хотите сделать, это объединить строки, прочитанные строкой заголовка. Поскольку строка заголовка представляет собой небольшой объем данных, вы можете прочитать его как отдельную коллекцию PC и передать его в качестве побочного ввода в CSVtoDict .

Пример кода для чтения строки заголовка. :

def read_header_from_filename(filename):
  # note that depending on your newline character/file encoding, this may need to be modified
  file_handle = FileSystems.open(filename)  
  header = file_handle.readline()
  return header.split(',')

# Create PCollection containing header line
header = (p
          | beam.Create(process_options.input) 
          | beam.Map(read_header_from_filename))

Ваш код строительства трубопровода становится:

    (p 
    | 'Reading input file' >> beam.io.ReadFromText(process_options.input)
    | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
    | 'Create entities' >> beam.ParDo(CreateEntities())
    | 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
     )    
    p.run().wait_until_finish()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...