Предотвращение слияния в конвейерах Apache Beam / Dataflow (python) для устранения узкого места в конвейере - PullRequest
0 голосов
/ 20 февраля 2019

В настоящее время мы работаем над потоковым конвейером на Apache Beam с DataflowRunner.Мы читаем сообщения из Pub / Sub и выполняем некоторую их обработку, после чего мы помещаем их в окна скольжения (в настоящее время размер окна составляет 3 секунды, а интервал также равен 3 секундам).Как только окно запущено, мы выполняем некоторую постобработку элементов внутри окна.Этот шаг постобработки значительно больше размера окна, он занимает около 15 секунд.

Код луча apache конвейера:

input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
                   | beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
                                  trigger=AfterCount(30), 
                                  accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)

Как вы знаете, Dataflow пытается выполнить некоторые оптимизации шагов конвейера.В нашем случае он объединяет все вместе, начиная от оконного режима (кластеризованные операции: 1 / обработка 2 / управление окнами + постобработка), что вызывает медленную последовательную постобработку всех окон всего одним рабочим.Каждые 15 секунд мы видим логи того, что конвейер обрабатывает следующее окно.Однако нам хотелось бы, чтобы несколько рабочих выбирали отдельные окна вместо рабочей нагрузки, передаваемой одному рабочему.

Поэтому мы искали способы предотвратить это объединение, чтобы поток данных отделял окно от постобработки.из окон.Таким образом, мы ожидаем, что Dataflow сможет снова назначить несколько рабочих для последующей обработки запущенных окон.

То, что мы пробовали до сих пор:

  • Увеличение числарабочих до 20, 30 или даже 40, но без эффекта.Только этапы, предшествующие оконному выполнению, назначаются нескольким рабочим
  • Запуск конвейера в течение 5 или 10 минут, но мы не заметили перераспределения рабочих, чтобы помочь на этом более крупном шаге постобработки после создания окна
  • После создания окна поместите их обратно в глобальное окно
  • Имитируйте другой ключ GroupByKey с помощью фиктивного ключа (как упомянуто в https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion), но безуспешно.

последние два действия действительно создали третью кластерную операцию (1 / обработка 2 / управление окнами 3 / постобработка) , но мы заметили, что все тот же рабочий выполняет все после обработки окна.

Есть ли решение, которое может решить эту проблему?

Текущий обходной путь, который мы сейчас рассматриваем, заключается в создании другого потокового конвейера, который получает окна, чтобы эти работники могли обрабатывать окна параллельно, но это громоздко ..

1 Ответ

0 голосов
/ 12 марта 2019

Вы сделали правильную вещь, чтобы сломать слияние в ваших элементах.Я подозреваю, что может быть проблема, из-за которой у вас возникают проблемы.

Для потоковой передачи один ключ всегда обрабатывается одним и тем же рабочим .Случайно, все или большинство ваших записей назначены на один ключ?Если это так, ваша обработка будет выполняться одним рабочим.

Чтобы предотвратить это, вы можете сделать окно частью ключа, чтобы элементы для нескольких окон могли обрабатываться в разных окнах.работники, даже если они имеют один и тот же ключ:

class KeyIntoKeyPlusWindow(core.DoFn):
  def process(self, element, window=core.DoFn.WindowParam):
    key, values = element
    yield ((key, window), element)

group = windows | beam.ParDo(KeyIntoKeyPlusWindow() | beam.GroupByKey()

И как только вы это сделаете, вы можете применить свою постобработку:

group | beam.Map(post_processing_fn)
...