Кафка потребительских свойств с самого начала в топике - PullRequest
1 голос
/ 06 апреля 2020

Я пытаюсь написать потребителя Kafka, чтобы он принимал сообщения с самого начала. Я мог бы сделать то же самое с консольного потребителя, используя --from-begin

Но я не смог найти соответствующие свойства в JAVA API.

 def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }

Также еще один вопрос о том, что должно быть значение.deserializer для сообщений Avro?

1 Ответ

2 голосов
/ 06 апреля 2020

Влияние --from-beginning, которое используется в kafka-console-consumer, может быть достигнуто путем установки auto.offset.reset в earliest. В сочетании с уникальным / новым group.id он имеет тот же эффект.

По сути, вы хотите создать новую группу потребителей (через group.id), и поскольку брокер Kafka не знает эту группу потребителей, он автоматически сбрасывает смещение для этой группы потребителей на основе конфигурации auto.offset.reset. Если установлено значение earliest, оно начнется с начала. Когда установлено значение latest, оно ожидает новых поступающих данных.

Что касается десериализации Avro, это здесь может помочь.

...