Исполнитель Spark Structured Streaming завершается с ошибкой OutOfMemoryError
Проверка выделения кучи с помощью VirtualVM указывает на то, что использование памяти JMX Mbean Server линейно растет со временем.
После дальнейшего исследования кажется, что JMX Mbeanзаполнены тысячами экземпляров объектов KafkaMbean с метриками для потребителя - (\ d +), которые исчисляются тысячами (равным количеству задач, созданных на исполнителе).
Запуск потребителя Kafka с журналами DEBUG на исполнителе показываетчто исполнитель добавляет тысячи датчиков метрик и часто вообще не удаляет их или только удаляет некоторые
Я использую HDP Spark 2.3.0.2.6.5.0-292 с HDP Kafka 1.0.0.2.6.5.0-292.
Вот как я инициализирую структурированную потоковую передачу:
sparkSession
.readStream
.format("kafka")
.options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
"subscribePattern" -> INPUT_TOPIC,
"startingOffsets" -> "earliest",
"failOnDataLoss" -> "false"))
.mapPartitions(processData)
.writeStream
.format("kafka")
.options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
"checkpointLocation" -> CHECKPOINT_LOCATION))
.queryName("Process Data")
.outputMode("update")
.trigger(Trigger.ProcessingTime(1000))
.load()
.start()
.awaitTermination()
Я ожидал, что Spark / Kafka должным образом очистит MBeans при завершении задачи, но, похоже, это не так.