У нас есть приложение Spark Streaming (Kafka), которое также выступает в роли продюсера, создавая сообщения для отдельной темы Kafka и реагируя на входящие сообщения. К сожалению, в течение довольно долгого времени мы получали too many open connection
оповещения для нашей работы. И я заметил, что предупреждения появляются после того, как мы обработали большую нагрузку (наши провайдеры работают в пакетном цикле, следовательно, мы получаем тяжелую нагрузку в течение примерно 8 часов в день), и предупреждения остаются активными в течение длительного времени после тяжелой нагрузки. был обработан.
Вот полный набор конфигураций Spark, которые мы имеем для нашей работы.
"hive.exec.dynamic.partition": "true"
"hive.exec.dynamic.partition.mode": "nonstrict"
"hive.exec.max.dynamic.partitions": "9000"
"spark.sql.crossJoin.enabled": "true"
"spark.sql.shuffle.partitions": "1000"
"spark.driver.memory": "22g"
"spark.driver.cores": "8"
"spark.executor.cores": "4"
"spark.executor.memory": "28g"
"spark.dynamicAllocation.executorIdleTimeout": "2000s"
"spark.network.timeout": "6000s"
"spark.dynamicAllocation.minExecutors": "4"
"spark.dynamicAllocation.maxExecutors": "60"
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
"spark.streaming.backpressure.enabled": "false"
"spark.streaming.kafka.maxRatePerPartition": "44"
"spark.streaming.kafka.allowNonConsecutiveOffsets": "true"
Для Kafka Consumer у нас есть следующие дополнительные конфигурации:
var kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaConfig.bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> appContext.appName,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> kafkaConfig.offsetResetConfig,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG -> (540000L: java.lang.Long)
)
Содержит конфигурацию connections.max.idle.ms
для закрытия незанятых соединений после 540000 Milliseconds
.
Также у нас есть такая же конфигурация и для источника. Пожалуйста, помогите нам понять, почему у нас все еще есть проблема открытого соединения.