Может кто-нибудь, пожалуйста, помогите мне в этом.У меня проблема с производительностью при использовании приведенного ниже кода для публикации сообщений на kafka
message.foreachPartition{ part =>
val producer = new KafkaProducer[String, String](props)
part.foreach{ msg =>
val message = new ProducerRecord[String, String](topic, msg._1, msg._2)
producer.send(message)
}
producer.close()
}
Поэтому я использовал post для оптимизации производительности.Ниже приведен код, который я написал в своем коде.
val kafkaSink = sparkContext.broadcast(KafkaSink(kafkaProps))
resultRDD.foreach{message =>
kafkaSink.value.send(outputTopic, message._1, message._2)
}
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key:String, value: String): Unit =
producer.send(new ProducerRecord(topic, key, value))
}
object KafkaSink {
def apply(config: Map[String, Object]): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config.asJava)
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}}
Но программа застревает, и даже Кафка не публикует ни одного сообщения.Я проверил журналы и смог найти только приведенную ниже информацию в файле журналов пряжи.
Manufacturer.KafkaProducer: Закрытие производителя Kafka с помощью timeoutMillis = 9223372036854775807 ms
Не могли бы вы дать мне знать, что я скучаю.Версия Spark 1.6.0.В настоящее время для публикации сообщений требуется около 8 секунд с интервалом в 20 секунд, составляющим около 300 тыс. Сообщений.
Заранее спасибо.