Я пытаюсь масштабировать конвейер структурированной потоковой передачи с помощью API-интерфейса Apache Spark 2.3 Scala, работающего на Kubernetes. Основной поток работы выглядит так:
- Считывание статического набора данных, содержащего ~ 1 000 000 записей, которые сопоставляют отдельные исходные идентификаторы с выходной агрегацией
- Чтение потокового набора данных из Kafka, который содержит метрики временных рядов, которые должны быть агрегированы, сопоставлены с их исходным идентификатором
- Разделить каждый набор данных на основе идентификатора источника
- Объединение 2 наборов данных по идентификатору источника (это сопоставляет метрики с правильной агрегацией выходных данных, а также отфильтровывает данные из kafka, которые не должны агрегироваться)
- Применить водяной знак
- Удалить дубликаты
- Агрегировать данные
- Запись в выходной сток Kafka
Я работаю в Kubernetes и настроил кластер с 30 исполнителями, каждый с 3 ядрами. В настоящее время Kafka передает 600000 метрик в секунду для каждого идентификатора источника и настроена на 600 разделов. Я пытаюсь объединить их все в 10 различных выходных данных (то есть каждое выходное объединение состоит из 60000 различных исходных идентификаторов). У меня есть конвейерный триггер каждые 10 секунд для обработки ~ 6 000 000 записей из Кафки. Мои окна агрегации не перекрываются в течение 1 минуты, а мой водяной знак установлен на 30 секунд. В идеале, я бы хотел, чтобы более длинный водяной знак учитывал запоздавшие данные, но стадия удаления дубликатов / водяных знаков кажется узким местом, особенно когда вызывается сборщик мусора. Вот некоторые данные из недавнего запуска моего конвейера:
Обработанные и входные строки в секунду
График показывает, что конвейер идет в ногу со входными строками в течение примерно 8-9 минут, но затем оранжевая линия опускается ниже зеленой линии (~ 10: 01 на оси времени), и конвейер имеет жесткую время идет в ногу со скоростью ввода данных. Я заглянул в Spark UI, чтобы понять, почему произошло замедление, и обнаружил, что одному исполнителю потребовалось 55 секунд для выполнения GC на этапе удаления дубликатов / водяных знаков. Вот сводная статистика со сцены и увеличение масштаба события:
Я пробовал несколько методов, предложенных здесь , со смешанными результатами. В частности:
- Сериализация Kryo, казалось, имела небольшой эффект.
- Использование этих настроек -XX: + UseG1GC -XX: MaxGCPauseMillis = 500, уменьшает частоту длинных пауз, но они все еще происходят.
- Я включил журналы GC и обработал их с помощью gceasy и попытался следовать их рекомендациям. Это указывало на то, что из-за события Full GC наступает длительная пауза, и в журналах отсутствует признак того, что увеличение числа потоков GC поможет. Средняя скорость создания составляет 182,18 МБ / с, а средняя скорость продвижения составляет 49,8 МБ / с
- Я пытался уменьшить NewRatio до 1, но это приводило к более частым длительным паузам с меньшей длительностью (то есть ~ 25 секунд на паузу вместо 50+ секунд)
- Трудно понять, сколько памяти использует мой потоковый набор данных, потому что, если я пытаюсь его кэшировать, возникает ошибка.
Остальная часть совета памяти напоминает «попробуйте изменить этот параметр или этот параметр», но сложно попробовать каждую перестановку, и она не указывает, какое поведение мне следует ожидать. Может ли кто-нибудь указать мне направление дальнейших шагов? Я чувствую, что 55 секунд для сборщика мусора неразумны, и должен быть какой-то способ настроить его так, чтобы 1 исполнитель не мешал моей работе.