Это вопрос из двух частей.
I. Код:
import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
import io.StdIn._
object StandaloneConsumer {
private final var topics = ""
private final val BOOTSTRAPSERVERS = "hostname:9092"
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
val consumer = new KafkaConsumer[String, String](props)
topics = readLine("Enter the topic name: ")
try {
val partitionInfos = consumer.partitionsFor(topics)
val topicPartitions = new util.ArrayList[TopicPartition]
if(partitionInfos != null) {
for (partitionInfo <- partitionInfos.asScala) {
topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
consumer.assign(topicPartitions)
while (true) {
val records = consumer.poll(Duration.ofMillis(500))
for(record <- records.asScala) {
println(s"key = ${record.key}, value = ${record.value()}")
}
consumer.commitAsync()
}
}
} catch {
case e:Exception => e.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
}
}
Когда я запускаю этот , он правильно потребляет все данные , но в конце выдает:
org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
эту ошибку. Почему?
II. Я не понял эту часть кода. Что происходит ?:
val topicPartitions = new util.ArrayList[TopicPartition]
if(partitionInfos != null) {
for (partitionInfo <- partitionInfos.asScala) {
topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
Может кто-нибудь, пожалуйста, объясните мне это. Спасибо.