Оконное управление работает при работе в DirectRunner, но не при работе в облачном потоке данных - PullRequest
0 голосов
/ 05 апреля 2019

Я пытаюсь разрушить слияние с GroupByKey. Это создает одно огромное окно, и, поскольку моя работа большая, я бы предпочел начать излучать.

С прямым бегуном, использующим что-то вроде того, что я нашел здесь , похоже, работает. Однако при запуске в облачном потоке данных кажется, что он объединяет GBK и не генерирует выходные данные до тех пор, пока исходные узлы не «преуспеют».

Я делаю ограниченную / пакетную работу. Я извлекаю содержимое архивных файлов и затем записываю их в gcs.

Все работает, за исключением того, что это занимает больше времени, чем я ожидал, и загрузка процессора низкая. Я подозреваю, что это происходит из-за слияния - моя гипотеза состоит в том, что извлечение сливается с операцией записи, и поэтому существует шаблон извлечения / загрузки с более высоким ЦП, за которым следует меньше ЦП, потому что мы выполняем сетевые вызовы и возвращаемся снова.

Код выглядит так:

.apply("Window",
  Window.<MyType>into(new GlobalWindows())
    .triggering(
      Repeatedly.forever(                             
        AfterProcessingTime.pastFirstElementInPane()                                
          .plusDelayOf(Duration.standardSeconds(5))))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes()
)
.apply("Add key", MapElements...)
.apply(GroupByKey.create())

Локально я проверяю с помощью журналов отладки, чтобы видеть, что работа выполняется после GBK. Временная метка между первым завершением извлечения и первой операцией после GBK обычно отражает продолжительность в 5 с (или другие значения, которые я изменяю на (1,5,10,20,30)).

В GCP я проверяю, просматривая структуру конвейера, и вижу, что все после GBK «не запущено», а выходной набор GBK пуст («-»), в то время как входной набор содержит миллионы элементов.

Редактировать:

  • это на луче v2.10.0.

  • извлечение выполняется SplittableDoFn (не уверен, что это актуально)

1 Ответ

1 голос
/ 05 апреля 2019

Похоже, что ответ, который вы упоминали, был для потокового конвейера (неограниченный ввод).Для пакетной конвейерной обработки ограниченного ввода GroupByKey не будет выдавать, пока не будут обработаны все данные для данного ключа.Пожалуйста, смотрите здесь для более подробной информации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...