Сглаживание PCollection в Tuple Order - PullRequest
0 голосов
/ 27 апреля 2018

Я пытаюсь добавить заголовок, используя функцию Flatten в Apache Beam. Однако, как представляется, нет способа установить порядок в соответствии с документацией: https://beam.apache.org/documentation/sdks/pydoc/2.4.0/apache_beam.transforms.core.html?highlight=flatten#apache_beam.transforms.core.Flatten.

Иногда заголовок находится в конце данных, а другие в верхней части. Есть ли способ установить порядок? Интересно, если я что-то упустил?

with beam.Pipeline(options=options) as p:


  header = [
      ('name', 'number'),
  ]
  phones_list = [
      ('amy', '111-222-3333'),
      ('james', '222-333-4444'),
      ('amy', '333-444-5555'),
      ('carl', '444-555-6666'),
  ]

  header = p | 'Header' >> beam.Create(header)
  phones = p | 'CreatePhones' >> beam.Create(phones_list)  

  merged = ((phones,header)
            | 'MergedPColl' >> beam.Flatten())

  output = merged

  output | 'Write' >> beam.io.WriteToText('./_output')

Выход 1:

('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')
('name', 'number')

Выход 2:

('name', 'number')
('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')

1 Ответ

0 голосов
/ 22 мая 2018

Flatten - это преобразователь, который работает с PCollections. Чтобы слияние работало параллельно, я не думаю, что они могут гарантировать сохранение порядка; что согласуется с неупорядоченным характером результирующего PCollection.

Но если ваша единственная цель - добавить заголовок вверху, вы можете использовать header аргумент textio.WriteToText().

> header (str): строка для записи в начале файла в качестве заголовка. Если нет: данные: None и append_trailing_newlines установлены, будет добавлено `\ n``.

phones | 'Write' >> beam.io.WriteToText(
  # Feel free to make your own header format.
  './_output', header="('name', 'number')")

В целом, чтобы сохранить последовательность исходного ввода, я бы добавил входные данные к порядковому номеру. После параллельного преобразования луча (содержащего порядковый номер для каждого элемента) вы всегда можете «восстановить» исходный порядок, отсортировав этот порядковый номер в качестве шага последующей обработки (в непараллельном режиме).

...