У меня есть простой конвейер потока данных (код задания 2018-05-15_06_17_40-8591349846083543299) с рабочим в 1 минуту и 7 рабочим максимумом, который выполняет следующие действия:
- Использование 4 тем Kafka с использованием KafkaIO.Каждая тема представлена по-разному и представляет собой отдельную PCollection
- Выполните преобразование на каждой PCollection для вывода стандартного представления PCollection.
- Объедините 4 PCollection с помощью
Flatten.pCollections
Окно ежечасно с использованием следующего триггера:
Repeatedly
.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(40000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))
)
)
.orFinally(AfterWatermark.pastEndOfWindow())
Записать эти события в GCS с помощью оконных записей AvroIO с 14 шардами.
Когдаизначально конвейер запускается, все в порядке, но через несколько часов отставание системы резко возрастает на этапе AvroIO: GroupIntoShards.
При дальнейшем исследовании одна из тем отстает от многих часов (эта тема имеет наибольшие события в секунду по сравнению с другими 3).Глядя на логи, я вижу Closing idle reader for S12-000000000000000a
, что понятно.Однако смещения групп потребителей данной темы для 36 разделов находятся в состоянии, когда для некоторых разделов смещение очень низкое, а для некоторых очень высокое.Смещение конца журнала распределяется более или менее равномерно, и производимые нами записи имеют примерно одинаковый размер.
Вопросы:
- Если системная задержка высока в определенномшаг, это мешает потребителям Kafka потреблять?
- Есть ли какая-либо возможная причина неравномерного распределения в смещениях Kafka?
- Объединяемые PCollection имеют разные схемы трафика, некоторые низкие и некоторые высокие.Будет ли добавление триггера
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5)
эффективно начинать запись в GCS для каждого (окна, шарда) через 5 минут, когда событие впервые появляется в окне?
Обновление конвейера с использованием того же кода / конфигурациивозвращает его в нормальное состояние, когда потребляемая скорость намного выше (из-за задержки до перезапуска), чем произведенная скорость.