У Spark Streaming Kafka слишком много открытых проблем с подключением - PullRequest
0 голосов
/ 26 октября 2019

У нас есть приложение Spark Streaming (Kafka), которое также выступает в роли продюсера, создавая сообщения для отдельной темы Kafka и реагируя на входящие сообщения. К сожалению, в течение довольно долгого времени мы получали too many open connection оповещения для нашей работы. И я заметил, что предупреждения появляются после того, как мы обработали большую нагрузку (наши провайдеры работают в пакетном цикле, следовательно, мы получаем тяжелую нагрузку в течение примерно 8 часов в день), и предупреждения остаются активными в течение длительного времени после тяжелой нагрузки. был обработан.

Вот полный набор конфигураций Spark, которые мы имеем для нашей работы.

  "hive.exec.dynamic.partition": "true"
  "hive.exec.dynamic.partition.mode": "nonstrict"
  "hive.exec.max.dynamic.partitions": "9000"
  "spark.sql.crossJoin.enabled": "true"
  "spark.sql.shuffle.partitions": "1000"
  "spark.driver.memory": "22g"
  "spark.driver.cores": "8"
  "spark.executor.cores": "4"
  "spark.executor.memory": "28g"
  "spark.dynamicAllocation.executorIdleTimeout": "2000s"
  "spark.network.timeout": "6000s"
  "spark.dynamicAllocation.minExecutors": "4"
  "spark.dynamicAllocation.maxExecutors": "60"
  "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
  "spark.streaming.backpressure.enabled": "false"
  "spark.streaming.kafka.maxRatePerPartition": "44"
  "spark.streaming.kafka.allowNonConsecutiveOffsets": "true"

Для Kafka Consumer у нас есть следующие дополнительные конфигурации:

var kafkaParams = Map[String, Object](
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaConfig.bootstrapServers,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
        ConsumerConfig.GROUP_ID_CONFIG -> appContext.appName,
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> kafkaConfig.offsetResetConfig,
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
        ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG -> (540000L: java.lang.Long)
      )

Содержит конфигурацию connections.max.idle.ms для закрытия незанятых соединений после 540000 Milliseconds.

Также у нас есть такая же конфигурация и для источника. Пожалуйста, помогите нам понять, почему у нас все еще есть проблема открытого соединения.

...