В настоящее время мы работаем над потоковым конвейером на Apache Beam с DataflowRunner.Мы читаем сообщения из Pub / Sub и выполняем некоторую их обработку, после чего мы помещаем их в окна скольжения (в настоящее время размер окна составляет 3 секунды, а интервал также равен 3 секундам).Как только окно запущено, мы выполняем некоторую постобработку элементов внутри окна.Этот шаг постобработки значительно больше размера окна, он занимает около 15 секунд.
Код луча apache конвейера:
input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
| beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
trigger=AfterCount(30),
accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)
Как вы знаете, Dataflow пытается выполнить некоторые оптимизации шагов конвейера.В нашем случае он объединяет все вместе, начиная от оконного режима (кластеризованные операции: 1 / обработка 2 / управление окнами + постобработка), что вызывает медленную последовательную постобработку всех окон всего одним рабочим.Каждые 15 секунд мы видим логи того, что конвейер обрабатывает следующее окно.Однако нам хотелось бы, чтобы несколько рабочих выбирали отдельные окна вместо рабочей нагрузки, передаваемой одному рабочему.
Поэтому мы искали способы предотвратить это объединение, чтобы поток данных отделял окно от постобработки.из окон.Таким образом, мы ожидаем, что Dataflow сможет снова назначить несколько рабочих для последующей обработки запущенных окон.
То, что мы пробовали до сих пор:
- Увеличение числарабочих до 20, 30 или даже 40, но без эффекта.Только этапы, предшествующие оконному выполнению, назначаются нескольким рабочим
- Запуск конвейера в течение 5 или 10 минут, но мы не заметили перераспределения рабочих, чтобы помочь на этом более крупном шаге постобработки после создания окна
- После создания окна поместите их обратно в глобальное окно
- Имитируйте другой ключ GroupByKey с помощью фиктивного ключа (как упомянуто в https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion), но безуспешно.
последние два действия действительно создали третью кластерную операцию (1 / обработка 2 / управление окнами 3 / постобработка) , но мы заметили, что все тот же рабочий выполняет все после обработки окна.
Есть ли решение, которое может решить эту проблему?
Текущий обходной путь, который мы сейчас рассматриваем, заключается в создании другого потокового конвейера, который получает окна, чтобы эти работники могли обрабатывать окна параллельно, но это громоздко ..