Операция beam.Flatten()
берет итерацию PCollections и возвращает новую PCollection, которая содержит объединение всех элементов во входных PCollections. Невозможно иметь PCollection of PCollections.
Я думаю, что вы ищете здесь beam.FlatMap
операция. Это отличается от beam.Map
тем, что оно испускает несколько элементов на вход. Например, если у вас есть коллекция lines
, содержащая элементы {'two', 'words'}
, тогда
lines | beam.Map(list)
будет PCollection, состоящей из двух списков
{['t', 'w', 'o'], ['w', 'o', 'r', 'd', 's']}
тогда
lines | beam.FlatMap(list)
приведет к PCollection, состоящему из нескольких букв
{'t', 'w', 'o', 'w', 'o', 'r', 'd', 's'}
.
Таким образом, ваша окончательная программа будет выглядеть примерно так:
lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
output = ( lines
| 'process' >> beam.FlatMap(process_xmls) # concatinates all lists returned by process_xmls into a single PCollection
| 'jsons' >> beam.Map(jsons.dumps) # apply json.dumps to each element
| beam.WindowInto(window.FixedWindows(1, 0)))
(также обратите внимание, что json.dumps
- это, вероятно, то, что вам нужно, вместо json.dump
, который принимает второй аргумент в качестве файла / потока для записи).