Мое требование - прочитать данные из CSV-файла вместе с заголовком и создать ту же структуру в Google Dat a Store, используя Python с Dataflow. Я попытался создать пример кода, как показано ниже.
Мой образец CSV ниже,
First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"
Мой фрагмент кода Pyhton 2.7 такой, как показано ниже
import csv
import datetime
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
from apache_beam.io.filesystems import FileSystems
from apache_beam import pvalue
class CSVtoDict(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element, header):
rec = ""
element = element.encode('utf-8')
for line in csv.reader([element]):
rec = line
if len(rec) == len(header):
data = {header.strip(): val.strip() for header, val in zip(header, rec)}
return [data]
logging.info("row contains bad data")
except Exception:
class CreateEntities(beam.DoFn):
"""Creates Datastore entity"""
def process(self, element):
entity = entity_pb2.Entity()
sku = int(element.pop('sku'))
element[1] = float(element[1])
element['salePrice'] = float(element['salePrice'])
element['name'] = unicode(element['name'].decode('utf-8'))
element['type'] = unicode(element['type'].decode('utf-8'))
element['url'] = unicode(element['url'].decode('utf-8'))
element['image'] = unicode(element['image'].decode('utf-8'))
element['inStoreAvailability'] = unicode(element['inStoreAvailability'])
datastore_helper.add_key_path(entity.key, 'Productx', sku)
datastore_helper.add_properties(entity, element)
return [entity]
class ProcessOptions(PipelineOptions):
def _add_argparse_args(cls, parser):
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
def read_header_from_filename(filename):
# note that depending on your newline character/file encoding, this may need to be modified
file_handle = FileSystems.open(filename)
header = file_handle.readline()
return header.split(',')
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
# Create PCollection containing header line
header = (p
| beam.Create(process_options.input)
| beam.Map(read_header_from_filename))
def dataflow(argv=None):
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
| 'Reading input file' >> beam.io.ReadFromText(process_options.input)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
if __name__ == '__main__':
Я могу возможность загружать объекты, используя поток данных, однако я хотел бы проанализировать файл CSV из заголовка, а затем строки вместо жесткого кодирования значений в классе CreateEntities и записать то же самое в объектах хранилища данных.
По сути, выгрузите тот же CSV-файл, который задан в качестве входных данных для задания потока данных со строками. Может кто-нибудь помочь, пожалуйста?
Required Output in Data Store for Key Actor:
First Name Last Name Date of Birth
Tom,Cruise "July 3, 1962"
Bruce,Willis "March 19, 1955"
Morgan,Freeman "June 1, 1937"
John,Wayne "May 26, 1907"
Редактировать : Я включил код, указанный вами, и теперь получаю сообщение об ошибке ниже. Я использую Python 2.7 и импортировал соответствующие библиотеки. Извините, я очень новичок в Python.
File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/home/gurusankar_p/upload-data-datastore-dataflow/upload2.py", line 70, in <module>
| beam.Map(read_header_from_filename))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/core.py", line 2423, in __init__
self.values = tuple(values)
TypeError: 'RuntimeValueProvider' object is not iterable
Спасибо, GS