Я выполняю миграцию из приложения DStreams Spark в приложение структурированной потоковой передачи.Во время тестирования я обнаружил, что объем памяти на вкладке «Исполнители» в пользовательском интерфейсе Spark продолжает расти.Он даже превосходит выделенную память, при этом разлив на диск отсутствует, а кэшированные RDD составляют всего несколько МБ.Я использую Spark версии 2.4.3 и использую данные из Kafka версии 2.1.
В приведенном ниже примере показано приложение с драйвером и одним исполнителем.Драйверу выделено 3 ГБ памяти, а исполнителю - 5 ГБ (и 3 ядра).
Как видно, пользовательский интерфейс показываетчто каждый процесс (исполнитель и драйвер) потребляет около 8 ГБ памяти, тогда как выделенные значения намного меньше.Это также показывает, что нет разлива на диск.На следующем рисунке также показано, что размер кэшированных RDD составляет около 100 МБ:
Я попытался проверить использование памяти, сообщенное пользовательским интерфейсом, с помощью системы.ценности.Я использовал команду ps, которая показывает, что драйвер потребляет около 2 ГБ памяти, а исполнитель - 5 ГБ, что соответствует выделенным значениям.
Я также использовал REST API Spark для получения статуса исполнителей.,Ответ показывает, что значение «memoryUsed» - это то, которое отображается в пользовательском интерфейсе.Вот ответ JSON:
{
"id": "driver",
"hostPort": "ip:41214",
"isActive": true,
"rddBlocks": 0,
"memoryUsed": 7909526598,
"diskUsed": 0,
"totalCores": 0,
"maxTasks": 0,
"activeTasks": 0,
"failedTasks": 0,
"completedTasks": 0,
"totalTasks": 0,
"totalDuration": 0,
"totalGCTime": 0,
"totalInputBytes": 0,
"totalShuffleRead": 0,
"totalShuffleWrite": 0,
"isBlacklisted": false,
"maxMemory": 1529452953,
"addTime": "2019-05-31T14:46:51.563GMT",
"executorLogs": {},
"memoryMetrics": {
"usedOnHeapStorageMemory": 7909526598,
"usedOffHeapStorageMemory": 0,
"totalOnHeapStorageMemory": 1529452953,
"totalOffHeapStorageMemory": 0
},
"blacklistedInStages": []
},
{
"id": "0",
"hostPort": "ip:40787",
"isActive": true,
"rddBlocks": 24,
"memoryUsed": 7996553955,
"diskUsed": 0,
"totalCores": 3,
"maxTasks": 3,
"activeTasks": 0,
"failedTasks": 0,
"completedTasks": 710401,
"totalTasks": 710401,
"totalDuration": 306845440,
"totalGCTime": 8128264,
"totalInputBytes": 733395681216,
"totalShuffleRead": 475652972265,
"totalShuffleWrite": 354298278067,
"isBlacklisted": false,
"maxMemory": 2674812518,
"addTime": "2019-05-31T14:46:53.680GMT",
"executorLogs": {
"stdout": "http://ip:8081/logPage/?appId=app-20190531164651-0027&executorId=0&logType=stdout",
"stderr": "http://ip:8081/logPage/?appId=app-20190531164651-0027&executorId=0&logType=stderr"
},
"memoryMetrics": {
"usedOnHeapStorageMemory": 7996553955,
"usedOffHeapStorageMemory": 0,
"totalOnHeapStorageMemory": 2674812518,
"totalOffHeapStorageMemory": 0
},
"blacklistedInStages": []
}
Кажется, что значения «memoryUsed» и «usedOnHeapStorageMemory» совпадают со значением, показанным в пользовательском интерфейсе.
То естьесть ли ошибка на том, как Spark показывает используемую память для структурированной потоковой передачи?Указанные значения не соответствуют системным значениям.
Обратите внимание, что в моем приложении я использую агрегацию с водяным знаком и режим добавления.Я думал, что это может быть проблемой, и что государство не очищается должным образом.Однако я использовал метод query.lastProgress для мониторинга состояния потокового запроса, и он показывает, что состояние действительно очищено.Я даже удалил агрегацию и использовал режим добавления, чтобы запрос не сохранял состояния и поведение было таким же.
Заранее спасибо.