«FlatMap можно использовать только с вызываемыми объектами ...» - PullRequest
0 голосов
/ 29 августа 2018

Мой код прилагается. Есть два CSV, которые мне нужно прочитать. Я читаю в первом CSV, а затем хочу передать эту коллекцию PC в качестве побочного ввода в другой файл CSV, который я собираюсь читать построчно. Затем я хочу получить два элемента, соединенных с функцией FlatMap. Probelm, я не могу заставить его передавать данные в функцию (я использую Python). Я просмотрел множество примеров в Интернете, другие делали это в более ранних версиях. Я знаю, что это на самом деле делает что-то, потому что я могу, по крайней мере, записать левый CSV в текстовый файл и вижу, что он изменил каждую строку в пару ключ-значение. Буду очень признателен за помощь здесь, спасибо за чтение.

from __future__ import absolute_import
import logging
import csv
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class append_lr(beam.DoFn):
    def __init__(self, lineup):
        self._lineup=(1,2)

    def process(self, left, right):
        bla=left
        burp=right
        both=left+right
        yield both


class MyCsvFileSource(beam.io.filebasedsource.FileBasedSource):
    def read_records(self, file_name, range_tracker):
        self._file = self.open_file(file_name)
        reader = csv.DictReader(self._file)
        for rec in reader:
            yield rec

def combine_lines():
    with beam.Pipeline(options=PipelineOptions()) as p:

        left_side = p | 'Read_Left_Side' >> beam.io.Read(MyCsvFileSource('/folder/left_side.csv'))
        left_and_right = (p | 'Read_Rght_Side' >> beam.io.Read(MyCsvFileSource('/folder/right_side.csv'))
                     | beam.FlatMap(append_lr, beam.pvalue.AsIter(left_side)))
        left_and_right | 'Write' >> beam.io.WriteToText('/folder/', file_name_suffix='test_output.csv')

def run(argv=None):
    combine_lines()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(None)

1 Ответ

0 голосов
/ 30 августа 2018

Оказывается, что я должен был реализовать DoFn как «вызываемый» объект, фрагмент кода документа Apache Beam ниже:

def FlatMap(fn, *args, **kwargs):  # pylint: disable=invalid-name

"" ": func: FlatMap похоже на: class: ParDo за исключением того, что вызывается для определения преобразования.

Так что я изменил функцию с класса на def, работал как шарм. Несколько других изменений кода, которые также показывают, как в основном построить цикл for с боковым вводом из другой PCollection (ПРИМЕЧАНИЕ: если вы запускаете это локально, убедитесь, что ваши тестовые файлы для левой и правой сторон довольно малы, это дает огромный размер выходы):

from __future__ import absolute_import
import logging
import csv
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class MyCsvFileSource(beam.io.filebasedsource.FileBasedSource):
    def read_records(self, file_name, range_tracker):
        self._file = self.open_file(file_name)
        reader = csv.DictReader(self._file)
        for rec in reader:
            yield rec

def append_lr(left_, right_):
    for thingy in right_:
        yield (left_, thingy)

def combine_sides():
    with beam.Pipeline(options=PipelineOptions()) as p:
        left_file = '/path/to/file/left.csv'
        right_file = '/path/to/file/right.csv'
        test_output = '/path/to/file/outputs/'

        left_side = p | 'Read_Left_Side' >> beam.io.Read(MyCsvFileSource(left_file))
        right_side = p | 'Read_Right_Side' >> beam.io.Read(MyCsvFileSource(right_file))
        all_combos = left_side | beam.FlatMap(append_lr, beam.pvalue.AsIter(right_side))
        all_combos | 'Write' >> beam.io.WriteToText(test_output, file_name_suffix='purple_nurple.csv')

def run(argv=None):
    combine_sides()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(None)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...