Длинная пауза GC на структурированной потоковой передаче Apache Spark в Kubernetes - PullRequest
0 голосов
/ 27 августа 2018

Я пытаюсь масштабировать конвейер структурированной потоковой передачи с помощью API-интерфейса Apache Spark 2.3 Scala, работающего на Kubernetes. Основной поток работы выглядит так:

  • Считывание статического набора данных, содержащего ~ 1 000 000 записей, которые сопоставляют отдельные исходные идентификаторы с выходной агрегацией
  • Чтение потокового набора данных из Kafka, который содержит метрики временных рядов, которые должны быть агрегированы, сопоставлены с их исходным идентификатором
  • Разделить каждый набор данных на основе идентификатора источника
  • Объединение 2 наборов данных по идентификатору источника (это сопоставляет метрики с правильной агрегацией выходных данных, а также отфильтровывает данные из kafka, которые не должны агрегироваться)
  • Применить водяной знак
  • Удалить дубликаты
  • Агрегировать данные
  • Запись в выходной сток Kafka

Я работаю в Kubernetes и настроил кластер с 30 исполнителями, каждый с 3 ядрами. В настоящее время Kafka передает 600000 метрик в секунду для каждого идентификатора источника и настроена на 600 разделов. Я пытаюсь объединить их все в 10 различных выходных данных (то есть каждое выходное объединение состоит из 60000 различных исходных идентификаторов). У меня есть конвейерный триггер каждые 10 секунд для обработки ~ 6 000 000 записей из Кафки. Мои окна агрегации не перекрываются в течение 1 минуты, а мой водяной знак установлен на 30 секунд. В идеале, я бы хотел, чтобы более длинный водяной знак учитывал запоздавшие данные, но стадия удаления дубликатов / водяных знаков кажется узким местом, особенно когда вызывается сборщик мусора. Вот некоторые данные из недавнего запуска моего конвейера:

Обработанные и входные строки в секунду

График показывает, что конвейер идет в ногу со входными строками в течение примерно 8-9 минут, но затем оранжевая линия опускается ниже зеленой линии (~ 10: 01 на оси времени), и конвейер имеет жесткую время идет в ногу со скоростью ввода данных. Я заглянул в Spark UI, чтобы понять, почему произошло замедление, и обнаружил, что одному исполнителю потребовалось 55 секунд для выполнения GC на этапе удаления дубликатов / водяных знаков. Вот сводная статистика со сцены и увеличение масштаба события:

Я пробовал несколько методов, предложенных здесь , со смешанными результатами. В частности:

  • Сериализация Kryo, казалось, имела небольшой эффект.
  • Использование этих настроек -XX: + UseG1GC -XX: MaxGCPauseMillis = 500, уменьшает частоту длинных пауз, но они все еще происходят.
  • Я включил журналы GC и обработал их с помощью gceasy и попытался следовать их рекомендациям. Это указывало на то, что из-за события Full GC наступает длительная пауза, и в журналах отсутствует признак того, что увеличение числа потоков GC поможет. Средняя скорость создания составляет 182,18 МБ / с, а средняя скорость продвижения составляет 49,8 МБ / с
  • Я пытался уменьшить NewRatio до 1, но это приводило к более частым длительным паузам с меньшей длительностью (то есть ~ 25 секунд на паузу вместо 50+ секунд)
  • Трудно понять, сколько памяти использует мой потоковый набор данных, потому что, если я пытаюсь его кэшировать, возникает ошибка.

Остальная часть совета памяти напоминает «попробуйте изменить этот параметр или этот параметр», но сложно попробовать каждую перестановку, и она не указывает, какое поведение мне следует ожидать. Может ли кто-нибудь указать мне направление дальнейших шагов? Я чувствую, что 55 секунд для сборщика мусора неразумны, и должен быть какой-то способ настроить его так, чтобы 1 исполнитель не мешал моей работе.

1 Ответ

0 голосов
/ 10 декабря 2018

Так что я должен был ответить на это раньше, пока решение было свежим в моей памяти, но в итоге я сделал несколько вещей, которые способствовали сокращению времени сбора мусора. Я не помню всех источников документации, которые помогли мне решить эту проблему, но я потратил много времени на изучение SO, рекомендаций gceasy и общей литературы по Java GC. В любом случае вот что помогло:

  • Ограничено количество ядер, участвующих в полном событии GC: * ​​1005 * Я полагаю, что это был основной вклад в повышение производительности. Я заметил, что некоторые исполнители будут иметь большие времена GC во время данной микропакета, а другие исполнители на той же виртуальной машине kubernetes будут иметь большие времена вычислений, которые были близки (если не точно) к длительности паузы GC. Эта корреляция привела меня на путь исследования, где я в конечном итоге обнаружил, что JVM (по крайней мере для Java 8) получает свои значения по умолчанию для GC от базовой виртуальной машины kubernetes, а не от ограниченных ресурсов, выделенных для контейнера, на котором работает JVM. Поскольку у каждого контейнера был свой экземпляр JVM, каждый исполнитель имел параметры GC по умолчанию, предполагая, что это была единственная JVM, работающая на базовой виртуальной машине kubernetes. Параметр GC, который указывает количество потоков, доступных для события Full GC, - это ParallelGCThreads. Это устанавливается по умолчанию JVM как процент от общего количества ядер на виртуальной машине. Для 32-х ядерной виртуальной машины kubernetes, если я правильно помню, она составила 23. Поэтому, когда происходит событие Full GC, GC вызывает конфликты на процессорах, используемых другими исполнителями, которые выполняли обычные вычисления. Моя теория состоит в том, что этот конфликт приводил к увеличению времени выполнения GC / вычислений, которые происходили на той же базовой виртуальной машине kubernetes. Для моего конкретного теста я переопределил параметры по умолчанию для ConcGCThreads (до 1) и ParallelGCThreads (до 5), так как я запускал 6 исполнителей на 32-ядерную виртуальную машину kubernetes.
  • Увеличение памяти у каждого исполнителя: Графики gceasy никогда не показывали плато памяти. Это только увеличилось, поскольку трубопровод продолжал работать. В итоге я увеличил объем памяти, выделенной каждому исполнителю, до ~ 15 ГБ с 8 ГБ, и после этого получал плато примерно на 10 ГБ. Фактический объем памяти, который вам необходим, вероятно, будет зависеть от вашего кода.
  • Включена дедупликация строк: Большая часть моего набора данных была строками, поэтому это помогло уменьшить общий объем памяти для моего приложения
  • Изменено начальное заполнение кучи: Это было рекомендовано в gceasy, а также в некоторых потоках SO.

Итак, вот окончательный набор параметров JVM, которые я использую после всего этого. Надеюсь, это поможет.

-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:InitiatingHeapOccupancyPercent=35 -XX:+UseStringDeduplication -XX:ConcGCThreads=1 -XX:ParallelGCThreads=5
...