Мы используем Kafka 0.10 со Spark 2.1, и я обнаружил, что публикация нашего производителя всегда выполнялась медленно. Я могу достичь скорости около 1 к / с после того, как отдаю 8 ядер исполнителям Spark, в то время как в других постах говорилось, что машина легко достигает миллионов в секунду.
Я попытался настроить linger.ms и batch.size, чтобы выяснить это. Однако я обнаружил, что linger.ms = 0 выглядит оптимально для меня, и batch.size не дает особого эффекта. И я отправлял 160 тыс. Событий за итерацию. Похоже, я должен позволить метрикам производителя Kafka точно знать, что именно происходит. Но, похоже, включить его в Spark Executor нелегко.
Может ли кто-нибудь поделиться со мной светом?
Мои коды такие:
private def publishMessagesAttempt(producer: KafkaProducer[String, String], topic: String, messages: Iterable[(String, String)], producerMaxDelay: Long,
individualMessageMaxDelay: Long, logger: (String, Boolean) => Unit = KafkaClusterUtils.DEFAULT_LOGGER): Iterable[(String, String)] = {
val futureMessages = messages.map(message => (message, producer.send(new ProducerRecord[String, String](topic, message._1, message._2))))
val messageSentTime = System.currentTimeMillis
val awaitedResults = futureMessages.map { case (message, future) =>
val waitFor = Math.max(producerMaxDelay - (System.currentTimeMillis - messageSentTime), individualMessageMaxDelay)
val failed = Try(future.get(waitFor, TimeUnit.MILLISECONDS)) match {
case Success(_) => false
case Failure(f) =>
logger(s"Error happened when publish to Kafka: ${f.getStackTraceString}", true)
true
}
(message, failed)
}
awaitedResults.filter(_._2).map(_._1)
}