Как сплющить несколько Pcollections в Python Apache Beam - PullRequest
3 голосов
/ 16 мая 2019

Как реализовать следующую логику, расположенную в https://beam.apache.org/documentation/pipelines/design-your-pipeline/:

//merge the two PCollections with Flatten//me 
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

, в результате чего несколько коллекций PC могут быть объединены в одну коллекцию PC в Apache Beam Python API?

1 Ответ

1 голос
/ 16 мая 2019

Вы также можете использовать преобразование Flatten. Например:

data1 = ['one', 'two', 'three']
data2 = ['four','five']

input1 = p | 'Create PCollection1' >> beam.Create(data1)
input2 = p | 'Create PCollection2' >> beam.Create(data2)

merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())

объединенная PCollection будет содержать:

INFO:root:one
INFO:root:two
INFO:root:three
INFO:root:four
INFO:root:five

полный код:

import argparse, logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class LogFn(beam.DoFn):
  """Prints information"""
  def process(self, element):
    logging.info(element)
    return element


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  p = beam.Pipeline(options=pipeline_options)

  data1 = ['one', 'two', 'three']
  data2 = ['four','five']

  input1 = p | 'Create PCollection1' >> beam.Create(data1)
  input2 = p | 'Create PCollection2' >> beam.Create(data2)

  merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())

  merged | 'Check Results' >> beam.ParDo(LogFn())

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
...