Spark Streaming - пишите в тему Кафки - PullRequest
0 голосов
/ 19 февраля 2019

Может кто-нибудь, пожалуйста, помогите мне в этом.У меня проблема с производительностью при использовании приведенного ниже кода для публикации сообщений на kafka

 message.foreachPartition{ part =>
  val producer = new KafkaProducer[String, String](props)
  part.foreach{ msg =>
    val message = new ProducerRecord[String, String](topic, msg._1, msg._2)
    producer.send(message)
  }
  producer.close()

}

Поэтому я использовал post для оптимизации производительности.Ниже приведен код, который я написал в своем коде.

val kafkaSink = sparkContext.broadcast(KafkaSink(kafkaProps))

resultRDD.foreach{message =>
    kafkaSink.value.send(outputTopic, message._1, message._2)
   }


class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
  lazy val producer = createProducer()
  def send(topic: String, key:String, value: String): Unit = 
  producer.send(new ProducerRecord(topic, key, value))
}

object KafkaSink {
  def apply(config: Map[String, Object]): KafkaSink = {
  val f = () => {
  val producer = new KafkaProducer[String, String](config.asJava)
  sys.addShutdownHook {
                        producer.close()
                      }
  producer
}
new KafkaSink(f)
}}

Но программа застревает, и даже Кафка не публикует ни одного сообщения.Я проверил журналы и смог найти только приведенную ниже информацию в файле журналов пряжи.

Manufacturer.KafkaProducer: Закрытие производителя Kafka с помощью timeoutMillis = 9223372036854775807 ms

Не могли бы вы дать мне знать, что я скучаю.Версия Spark 1.6.0.В настоящее время для публикации сообщений требуется около 8 секунд с интервалом в 20 секунд, составляющим около 300 тыс. Сообщений.

Заранее спасибо.

...