Я пытаюсь разрушить слияние с GroupByKey. Это создает одно огромное окно, и, поскольку моя работа большая, я бы предпочел начать излучать.
С прямым бегуном, использующим что-то вроде того, что я нашел здесь , похоже, работает. Однако при запуске в облачном потоке данных кажется, что он объединяет GBK и не генерирует выходные данные до тех пор, пока исходные узлы не «преуспеют».
Я делаю ограниченную / пакетную работу. Я извлекаю содержимое архивных файлов и затем записываю их в gcs.
Все работает, за исключением того, что это занимает больше времени, чем я ожидал, и загрузка процессора низкая. Я подозреваю, что это происходит из-за слияния - моя гипотеза состоит в том, что извлечение сливается с операцией записи, и поэтому существует шаблон извлечения / загрузки с более высоким ЦП, за которым следует меньше ЦП, потому что мы выполняем сетевые вызовы и возвращаемся снова.
Код выглядит так:
.apply("Window",
Window.<MyType>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
.apply("Add key", MapElements...)
.apply(GroupByKey.create())
Локально я проверяю с помощью журналов отладки, чтобы видеть, что работа выполняется после GBK. Временная метка между первым завершением извлечения и первой операцией после GBK обычно отражает продолжительность в 5 с (или другие значения, которые я изменяю на (1,5,10,20,30)).
В GCP я проверяю, просматривая структуру конвейера, и вижу, что все после GBK «не запущено», а выходной набор GBK пуст («-»), в то время как входной набор содержит миллионы элементов.
Редактировать: