Какая альтернатива деприминированному методу poll () в Kafka Scala Consumer? - PullRequest
1 голос
/ 29 мая 2020

Я пытаюсь создать потребителя Scala, как показано ниже:

import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import java.util.ArrayList
import scala.concurrent.duration._

object ScalaConsumer {

  def subscribePartitions() = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringDeSerializer")

    val consumer = new KafkaConsumer[String, String](props)
    val partitionList:ArrayList[TopicPartition] = new ArrayList[TopicPartition]()
    val topicPartition1 = new TopicPartition("topicr1p3", 0)
    val topicPartition2 = new TopicPartition("othertopicr1p3", 2)
    partitionList.add(topicPartition1)
    partitionList.add(topicPartition2)
    consumer.assign(partitionList)
    try {
        val records:ConsumerRecords[String, String] = consumer.poll(10)
    } catch {
       case e:Exception => e.printStackTrace()
    }
  }

  def main(args: Array[String]): Unit = {

  }
}

Приведенный выше пример неполон, потому что метод poll (), используемый в

    val records:ConsumerRecords[String, String] = consumer.poll(10) 

, лишен Scala.

enter image description here

В нем написано сообщение: Symbol poll is deprecated. Поэтому я использую альтернативный вариант для опроса, отмеченный на изображении ниже: enter image description here

в коде как:

val records:ConsumerRecords[String, String] = consumer.poll(Duration(1000, "millis"))

Но на этот раз сообщение об ошибке: Cannot resolve overloaded method 'poll' enter image description here

build.sbt contents:

name := "KafkaScalaImplementation"
version := "0.1"
scalaVersion := "2.12.10"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.4.0"

Теперь я немного не понимаю, что мне здесь делать, чтобы использовать poll (). Есть ли новый способ использования метода опроса? Может ли кто-нибудь сообщить мне, как я могу исправить ошибку должным образом?

1 Ответ

1 голос
/ 29 мая 2020

Для меня сработало следующее:

import java.time.Duration
val records:ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(1000))

Согласно Java Docs класса Duration у вас есть следующие варианты:

static Duration     of(long amount, TemporalUnit unit)
Obtains a Duration representing an amount in the specified unit.
static Duration     ofDays(long days)
Obtains a Duration representing a number of standard 24 hour days.
static Duration     ofHours(long hours)
Obtains a Duration representing a number of standard hours.
static Duration     ofMillis(long millis)
Obtains a Duration representing a number of milliseconds.
static Duration     ofMinutes(long minutes)
Obtains a Duration representing a number of standard minutes.
static Duration     ofNanos(long nanos)
Obtains a Duration representing a number of nanoseconds.
static Duration     ofSeconds(long seconds)
...