Как передать ввод в beam.Flatten ()? - PullRequest
0 голосов
/ 17 июня 2019

Я начал использовать Apache Beam с Python, и я застреваю каждые 30 минут.Я пытаюсь сгладить преобразование:

lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
output = ( lines
           | 'process' >> beam.Map(process_xmls) # returns list
           | 'jsons' >> beam.Map(lambda x: [beam.Create(jsons.dump(model)) for model in x])
           | 'flatten' >> beam.Flatten()
           | beam.WindowInto(window.FixedWindows(1, 0)))

Так что после запуска этого кода я получаю эту ошибку:

ValueError: Input to Flatten must be an iterable. Got a value of type <class 'apache_beam.pvalue.PCollection'> instead.

Что мне делать?

1 Ответ

0 голосов
/ 17 июня 2019

Операция beam.Flatten() берет итерацию PCollections и возвращает новую PCollection, которая содержит объединение всех элементов во входных PCollections. Невозможно иметь PCollection of PCollections.

Я думаю, что вы ищете здесь beam.FlatMap операция. Это отличается от beam.Map тем, что оно испускает несколько элементов на вход. Например, если у вас есть коллекция lines, содержащая элементы {'two', 'words'}, тогда

lines | beam.Map(list)

будет PCollection, состоящей из двух списков

{['t', 'w', 'o'], ['w', 'o', 'r', 'd', 's']}

тогда

lines | beam.FlatMap(list)

приведет к PCollection, состоящему из нескольких букв

{'t', 'w', 'o', 'w', 'o', 'r', 'd', 's'}.

Таким образом, ваша окончательная программа будет выглядеть примерно так:

lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
output = ( lines
           | 'process' >> beam.FlatMap(process_xmls) # concatinates all lists returned by process_xmls into a single PCollection
           | 'jsons' >> beam.Map(jsons.dumps)  # apply json.dumps to each element
           | beam.WindowInto(window.FixedWindows(1, 0)))

(также обратите внимание, что json.dumps - это, вероятно, то, что вам нужно, вместо json.dump, который принимает второй аргумент в качестве файла / потока для записи).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...