KafkaIO неравномерное потребление разделов через некоторое время - PullRequest
0 голосов
/ 16 мая 2018

У меня есть простой конвейер потока данных (код задания 2018-05-15_06_17_40-8591349846083543299) с рабочим в 1 минуту и ​​7 рабочим максимумом, который выполняет следующие действия:

  1. Использование 4 тем Kafka с использованием KafkaIO.Каждая тема представлена ​​по-разному и представляет собой отдельную PCollection
  2. Выполните преобразование на каждой PCollection для вывода стандартного представления PCollection.
  3. Объедините 4 PCollection с помощью Flatten.pCollections
  4. Окно ежечасно с использованием следующего триггера:

    Repeatedly
    .forever(
      AfterFirst.of(
        AfterPane.elementCountAtLeast(40000),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))
      )
    )
    .orFinally(AfterWatermark.pastEndOfWindow()) 
    
  5. Записать эти события в GCS с помощью оконных записей AvroIO с 14 шардами.

Когдаизначально конвейер запускается, все в порядке, но через несколько часов отставание системы резко возрастает на этапе AvroIO: GroupIntoShards.

При дальнейшем исследовании одна из тем отстает от многих часов (эта тема имеет наибольшие события в секунду по сравнению с другими 3).Глядя на логи, я вижу Closing idle reader for S12-000000000000000a, что понятно.Однако смещения групп потребителей данной темы для 36 разделов находятся в состоянии, когда для некоторых разделов смещение очень низкое, а для некоторых очень высокое.Смещение конца журнала распределяется более или менее равномерно, и производимые нами записи имеют примерно одинаковый размер.

Вопросы:

  • Если системная задержка высока в определенномшаг, это мешает потребителям Kafka потреблять?
  • Есть ли какая-либо возможная причина неравномерного распределения в смещениях Kafka?
  • Объединяемые PCollection имеют разные схемы трафика, некоторые низкие и некоторые высокие.Будет ли добавление триггера AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5) эффективно начинать запись в GCS для каждого (окна, шарда) через 5 минут, когда событие впервые появляется в окне?

Обновление конвейера с использованием того же кода / конфигурациивозвращает его в нормальное состояние, когда потребляемая скорость намного выше (из-за задержки до перезапуска), чем произведенная скорость.

1 Ответ

0 голосов
/ 18 мая 2018

Отвечая на 3 вопроса (я оставил комментарий о конкретной работе):

  1. Нет, системная задержка не препятствует потреблению Kafka.
    • В целом, если имеется много работы для последующих этапов, готовых к обработке, это может задержать начало восходящей работы.Но это не специфично для KafkaIO.
  2. Кажется, что это не так.В общем, если предположить, что между самими разделами Kafka нет перекосов, сильный перекос при обработке Beam может привести к тому, что читатели будут назначены работникам, которые выполняют больше работы, чем другие.
  3. Я думаю, что да.Я думаю, что firstElementInPane() относится к элементу из любого источника.
...