Spark Structured Streaming - растет объем памяти пользовательского интерфейса - PullRequest
1 голос
/ 04 июня 2019

Я выполняю миграцию из приложения DStreams Spark в приложение структурированной потоковой передачи.Во время тестирования я обнаружил, что объем памяти на вкладке «Исполнители» в пользовательском интерфейсе Spark продолжает расти.Он даже превосходит выделенную память, при этом разлив на диск отсутствует, а кэшированные RDD составляют всего несколько МБ.Я использую Spark версии 2.4.3 и использую данные из Kafka версии 2.1.

В приведенном ниже примере показано приложение с драйвером и одним исполнителем.Драйверу выделено 3 ГБ памяти, а исполнителю - 5 ГБ (и 3 ядра).

Spark UI Executors tab

Как видно, пользовательский интерфейс показываетчто каждый процесс (исполнитель и драйвер) потребляет около 8 ГБ памяти, тогда как выделенные значения намного меньше.Это также показывает, что нет разлива на диск.На следующем рисунке также показано, что размер кэшированных RDD составляет около 100 МБ:

Spark UI Storage tab

Я попытался проверить использование памяти, сообщенное пользовательским интерфейсом, с помощью системы.ценности.Я использовал команду ps, которая показывает, что драйвер потребляет около 2 ГБ памяти, а исполнитель - 5 ГБ, что соответствует выделенным значениям.

Я также использовал REST API Spark для получения статуса исполнителей.,Ответ показывает, что значение «memoryUsed» - это то, которое отображается в пользовательском интерфейсе.Вот ответ JSON:

{
  "id": "driver",
  "hostPort": "ip:41214",
  "isActive": true,
  "rddBlocks": 0,
  "memoryUsed": 7909526598,
  "diskUsed": 0,
  "totalCores": 0,
  "maxTasks": 0,
  "activeTasks": 0,
  "failedTasks": 0,
  "completedTasks": 0,
  "totalTasks": 0,
  "totalDuration": 0,
  "totalGCTime": 0,
  "totalInputBytes": 0,
  "totalShuffleRead": 0,
  "totalShuffleWrite": 0,
  "isBlacklisted": false,
  "maxMemory": 1529452953,
  "addTime": "2019-05-31T14:46:51.563GMT",
  "executorLogs": {},
  "memoryMetrics": {
    "usedOnHeapStorageMemory": 7909526598,
    "usedOffHeapStorageMemory": 0,
    "totalOnHeapStorageMemory": 1529452953,
    "totalOffHeapStorageMemory": 0
  },
  "blacklistedInStages": []
},
{
  "id": "0",
  "hostPort": "ip:40787",
  "isActive": true,
  "rddBlocks": 24,
  "memoryUsed": 7996553955,
  "diskUsed": 0,
  "totalCores": 3,
  "maxTasks": 3,
  "activeTasks": 0,
  "failedTasks": 0,
  "completedTasks": 710401,
  "totalTasks": 710401,
  "totalDuration": 306845440,
  "totalGCTime": 8128264,
  "totalInputBytes": 733395681216,
  "totalShuffleRead": 475652972265,
  "totalShuffleWrite": 354298278067,
  "isBlacklisted": false,
  "maxMemory": 2674812518,
  "addTime": "2019-05-31T14:46:53.680GMT",
  "executorLogs": {
    "stdout": "http://ip:8081/logPage/?appId=app-20190531164651-0027&executorId=0&logType=stdout",
    "stderr": "http://ip:8081/logPage/?appId=app-20190531164651-0027&executorId=0&logType=stderr"
  },
  "memoryMetrics": {
    "usedOnHeapStorageMemory": 7996553955,
    "usedOffHeapStorageMemory": 0,
    "totalOnHeapStorageMemory": 2674812518,
    "totalOffHeapStorageMemory": 0
  },
  "blacklistedInStages": []
}

Кажется, что значения «memoryUsed» и «usedOnHeapStorageMemory» совпадают со значением, показанным в пользовательском интерфейсе.

То естьесть ли ошибка на том, как Spark показывает используемую память для структурированной потоковой передачи?Указанные значения не соответствуют системным значениям.

Обратите внимание, что в моем приложении я использую агрегацию с водяным знаком и режим добавления.Я думал, что это может быть проблемой, и что государство не очищается должным образом.Однако я использовал метод query.lastProgress для мониторинга состояния потокового запроса, и он показывает, что состояние действительно очищено.Я даже удалил агрегацию и использовал режим добавления, чтобы запрос не сохранял состояния и поведение было таким же.

Заранее спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...