Мой код прилагается. Есть два CSV, которые мне нужно прочитать. Я читаю в первом CSV, а затем хочу передать эту коллекцию PC в качестве побочного ввода в другой файл CSV, который я собираюсь читать построчно. Затем я хочу получить два элемента, соединенных с функцией FlatMap. Probelm, я не могу заставить его передавать данные в функцию (я использую Python). Я просмотрел множество примеров в Интернете, другие делали это в более ранних версиях. Я знаю, что это на самом деле делает что-то, потому что я могу, по крайней мере, записать левый CSV в текстовый файл и вижу, что он изменил каждую строку в пару ключ-значение. Буду очень признателен за помощь здесь, спасибо за чтение.
from __future__ import absolute_import
import logging
import csv
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class append_lr(beam.DoFn):
def __init__(self, lineup):
self._lineup=(1,2)
def process(self, left, right):
bla=left
burp=right
both=left+right
yield both
class MyCsvFileSource(beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.DictReader(self._file)
for rec in reader:
yield rec
def combine_lines():
with beam.Pipeline(options=PipelineOptions()) as p:
left_side = p | 'Read_Left_Side' >> beam.io.Read(MyCsvFileSource('/folder/left_side.csv'))
left_and_right = (p | 'Read_Rght_Side' >> beam.io.Read(MyCsvFileSource('/folder/right_side.csv'))
| beam.FlatMap(append_lr, beam.pvalue.AsIter(left_side)))
left_and_right | 'Write' >> beam.io.WriteToText('/folder/', file_name_suffix='test_output.csv')
def run(argv=None):
combine_lines()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run(None)