Spark Streaming на Kafka медленная производительность - PullRequest
1 голос
/ 02 июня 2019

Я пишу потоковое задание, которое считывает данные из Kafka, вносит некоторые изменения в записи и отправляет результаты в другой кластер Kafka.

Производительность задания кажется очень низкой, скорость обработки составляет около 70 000 записей в секунду. Выборка показывает, что 30% времени уходит на чтение данных и их обработку, а остальные 70% - на отправку данных в Kafka.

Я пытался настроить конфигурации Kafka, добавить память, изменить интервалы между партиями, но единственное, что работает, - это добавить больше ядер.

профилировщика: profiler snapshot

Сведения о работе Spark:

max.cores 30
driver memory 6G
executor memory 16G
batch.interval 3 minutes
ingres rate 180,000 messages per second

Свойства производителя (я пробовал разные варианты)

def buildProducerKafkaProperties: Properties = {
  val producerConfig = new Properties
  producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBrokers)
  producerConfig.put(ProducerConfig.ACKS_CONFIG, "all")
  producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, "200000")
  producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "2000")
  producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
  producerConfig.put(ProducerConfig.RETRIES_CONFIG, "0")
  producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "13421728")
  producerConfig.put(ProducerConfig.SEND_BUFFER_CONFIG, "13421728")
  producerConfig
}

Отправка кода

 stream
    .foreachRDD(rdd => {    
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd
          .map(consumerRecord => doSomething(consumerRecord))
          .foreachPartition(partitionIter => {
            val producer = kafkaSinkBroadcast.value    
            partitionIter.foreach(row => {
              producer.send(kafkaTopic, row)
              producedRecordsAcc.add(1)
            })

        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      })

Версия

Spark Standalone cluster 2.3.1
Destination Kafka cluster 1.1.1
Kafka topic has 120 partitions 

Кто-нибудь может подсказать, как увеличить пропускную способность отправки?


Обновление июль 2019

размер : 150 тыс. Сообщений в секунду, каждое сообщение имеет около 100 столбцов.

основные настройки :

spark.cores.max = 30 # the cores balanced between all the workers.
spark.streaming.backpressure.enabled = true
ob.ingest.batch.duration= 3 minutes 

Я пытался использовать rdd.repartition (30), но это замедлило выполнение на ~ 10%

Спасибо

1 Ответ

0 голосов
/ 24 июня 2019

Попробуйте использовать перераспределение, как показано ниже -

val numPartitons = (Количество исполнителей * Количество ядер исполнителей)

stream
    .repartition(numPartitons)
    .foreachRDD(rdd => {    
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd
          .map(consumerRecord => doSomething(consumerRecord))
          .foreachPartition(partitionIter => {
            val producer = kafkaSinkBroadcast.value    
            partitionIter.foreach(row => {
              producer.send(kafkaTopic, row)
              producedRecordsAcc.add(1)
            })

        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      })

Это даст вам оптимальную производительность.

Надеюсь, это поможет.

...