Задание потока данных застряло и не читает сообщения из PubSub - PullRequest
0 голосов
/ 07 февраля 2020

У меня есть задание потока данных, которое читает JSON из 3 тем PubSub, объединяя их в одну, применяя некоторые преобразования и сохраняя в BigQuery.

Я использую GlobalWindow со следующей конфигурацией.

.apply(Window.<PubsubMessage>into(new GlobalWindows()).triggering(AfterWatermark.pastEndOfWindow()
                            .withEarlyFirings(AfterFirst.of(AfterPane.elementCountAtLeast(20000),
                                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(durations))))
                            .discardingFiredPanes());

Задание выполняется со следующей конфигурацией

Max Workers : 20
Disk Size: 10GB
Machine Type : n1-standard-4
Autoscaling Algo: Throughput Based

enter image description here

Проблема, с которой я сталкиваюсь, заключается в том, что после обработки нескольких сообщений ( около 80k) задание перестает читать сообщения из PubSub. В одной из этих тем имеется очередь из почти 10 миллионов сообщений, но задание потока данных не читает сообщения и не выполняет автоматическое масштабирование.

Я также проверил загрузку ЦП каждого работника, и это также зависает в одном ди git после первоначального пакета.

enter image description here

Я пытался изменить тип машины и максимальную рабочую конфигурацию, но, похоже, ничего не работает.

Как мне подойти к этой проблеме?

1 Ответ

0 голосов
/ 08 февраля 2020

Я подозреваю, что виноват оконная функция. GlobalWindow не подходит для потоковых заданий (что я предполагаю, что это задание связано с использованием PubSub), потому что оно не будет запускать окно до тех пор, пока не будут присутствовать все элементы, что никогда не происходит в потоковом контексте.

В вашей ситуации кажется, что окно будет срабатывать раньше, когда оно достигнет либо количества элементов, либо продолжительности, но после этого окно застрянет, ожидая, когда все элементы наконец появятся. Быстрое исправление, чтобы проверить, в этом ли дело, состоит в том, чтобы обернуть ранние увольнения в триггер Repeatedly.forever, например так:

withEarlyFirings(
    Repeatedly.forever(
        AfterFirst.of(
            AfterPane.elementCountAtLeast(20000),
            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(durations)))))

Это должно позволить раннему увольнению многократно срабатывать, предотвращая появление окна застрял.

Однако для более постоянного решения я рекомендую отказаться от использования GlobalWindow в потоковых конвейерах. Использование фиксированного времени windows с ранними увольнениями на основе количества элементов даст вам такое же поведение, но без риска застрять.

...