Я сталкиваюсь с
org.apache.kafka.common.errors.InvalidGroupIdException: Чтобы использовать
API управления группами или смещения, вы должны предоставить действительный
group.id в потребительской конфигурации.
ошибка при запуске приведенного ниже API-интерфейса пользователя kafka из командной строки, написанного на языке scala. в чем может быть проблема?
object KafkaAggregateConsumerApp extends App{
try {
val properties: Properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "0:9092")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer")
properties.put("group.id", "console-consumer-myapp")
val consumerApp = new KafkaConsumer[String, Int](properties)
consumerApp.subscribe(Pattern.compile("kafkaaggregationsource1"))
try {
while (true) {
val consumerRecord: ConsumerRecords[String, Int] = consumerApp.poll(Duration.ofMinutes(10))
consumerRecord.forEach((each) => println(each.key() + " " + each.value()))
}
} finally {
consumerApp.close()
}
}
catch{
case e: Exception => e.printStackTrace()
}
}