Разница между beam.ParDo и beam.Map в типе вывода? - PullRequest
0 голосов
/ 24 декабря 2018

Я использую Apache-Beam для запуска некоторых преобразований данных, включая извлечение данных из txt, csv и различных источников данных.Одна вещь, которую я заметил, это разница результатов при использовании beam.Map и beam.ParDo

В следующем примере:

ЯЧтение данных CSV, и в первом случае передать их в DoFn, используя beam.ParDo , который извлекает первый элемент, который является датой, а затем распечатывает его.Во втором случае я непосредственно использую beam.Map , чтобы сделать то же самое, а затем распечатать его.

class Printer(beam.DoFn):
    def process(self,data_item):
        print data_item

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return (str(data_item).split(','))[0]

data_from_source = (p
                    | 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
                    | 'Printer the data 01' >> beam.ParDo(Printer())
                    )

copy_of_the_data =  (p
                    | 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
                    | 'Printer the data 02' >> beam.ParDo(Printer())
                    )

То, что я заметил в двух выходах, следующее:

##With beam.ParDo##
2
0
1
7
-
0
4
-
0
3
2
0
1
7

##With beam.Map##
2017-04-03
2017-04-03
2017-04-10
2017-04-10
2017-04-11
2017-04-12
2017-04-12

Я нахожу это странным.Мне интересно, если проблема в функции печати?Но после использования разных преобразований он показывает одинаковые результаты.Пример выполнения:

| 'Group it 01' >> beam.Map(lambda record: (record, 1))

, который все еще возвращает ту же проблему:

##With beam.ParDo##
('8', 1)
('2', 1)
('0', 1)
('1', 1)

##With beam.Map##
(u'2017-04-08', 1)
(u'2017-04-08', 1)
(u'2017-04-09', 1)
(u'2017-04-09', 1)

Любая идея, в чем причина?Чего мне не хватает в разнице между beam.Map и beam.ParDo ???

1 Ответ

0 голосов
/ 27 декабря 2018

Короткий ответ

Вам необходимо заключить возвращаемое значение ParDo в список.

Более длинная версия

ParDos в общем случае может возвращать любое количество выходов для одного входа, т. Е. Для одной строки ввода вы можете выдать ноль, один или несколько результатов.По этой причине Beam SDK обрабатывает вывод ParDo не как отдельный элемент, а как набор элементов.

В вашем случае ParDo выдает одну строку вместо коллекции.Beam Python SDK все еще пытается интерпретировать вывод этого ParDo, как если бы он был набором элементов.И делает это, интерпретируя строку, которую вы выпустили, как набор символов.По этой причине ваш ParDo теперь эффективно создает поток отдельных символов, а не поток строк.

Что вам нужно сделать, это заключить возвращаемое значение в список:

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return [(str(data_item).split(','))[0]]

Обратите внимание на квадратные скобки.См. руководство по программированию для получения дополнительных примеров.

Map, с другой стороны, может рассматриваться как особый случай ParDo.Ожидается, что Map будет производить ровно один вывод для каждого входа.Таким образом, в этом случае вы можете просто вернуть одно значение из лямбды, и оно работает как положено.

И вам, вероятно, не нужно заключать data_item в str. Согласно документам преобразование ReadFromText создает строки.

...