Я пытаюсь создать потребителя Scala, как показано ниже:
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import java.util.ArrayList
import scala.concurrent.duration._
object ScalaConsumer {
def subscribePartitions() = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringDeSerializer")
val consumer = new KafkaConsumer[String, String](props)
val partitionList:ArrayList[TopicPartition] = new ArrayList[TopicPartition]()
val topicPartition1 = new TopicPartition("topicr1p3", 0)
val topicPartition2 = new TopicPartition("othertopicr1p3", 2)
partitionList.add(topicPartition1)
partitionList.add(topicPartition2)
consumer.assign(partitionList)
try {
val records:ConsumerRecords[String, String] = consumer.poll(10)
} catch {
case e:Exception => e.printStackTrace()
}
}
def main(args: Array[String]): Unit = {
}
}
Приведенный выше пример неполон, потому что метод poll (), используемый в
val records:ConsumerRecords[String, String] = consumer.poll(10)
, лишен Scala.
В нем написано сообщение: Symbol poll is deprecated.
Поэтому я использую альтернативный вариант для опроса, отмеченный на изображении ниже:
в коде как:
val records:ConsumerRecords[String, String] = consumer.poll(Duration(1000, "millis"))
Но на этот раз сообщение об ошибке: Cannot resolve overloaded method 'poll'
build.sbt contents:
name := "KafkaScalaImplementation"
version := "0.1"
scalaVersion := "2.12.10"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.4.0"
Теперь я немного не понимаю, что мне здесь делать, чтобы использовать poll (). Есть ли новый способ использования метода опроса? Может ли кто-нибудь сообщить мне, как я могу исправить ошибку должным образом?