Я пытаюсь написать потребителя Kafka, чтобы он принимал сообщения с самого начала. Я мог бы сделать то же самое с консольного потребителя, используя --from-begin
Но я не смог найти соответствующие свойства в JAVA API.
def consumeFromKafka(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
Также еще один вопрос о том, что должно быть значение.deserializer для сообщений Avro?