Kafka Consumer не читает данные темы через Java - PullRequest
2 голосов
/ 07 марта 2019

Я отправляю входные данные JSON в тему Кафки. Я могу увидеть те же данные JSON в kafka при помощи команды ниже.

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mytopic --from-beginning

Вариант 1: Но когда я пытаюсь прочитать записи от потребителя kafka (java), не получаю никаких записей в консоли java. Я пробовал примеры, приведенные в этой ссылке https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Вариант 2: Если я отправляю какое-либо сообщение от производителя (командное окно), то же самое попаду в потребитель (командное окно) и в состоянии видеть те же записи в консоли Java. Этот сценарий работает.

Если я отправлю данные в тему через Java-программу. Затем те же данные JSON появляются в потребителе (командное окно). Но не входит в консоль Java. Case1 не работает.
Case2 работает. Пожалуйста, дайте мне знать, какую конфигурацию нужно выполнить?

Ответы [ 3 ]

0 голосов
/ 07 марта 2019

Если код производителя и потребителя правильный.

  • Остановить всех потребителей.
  • Сброс вашей группы потребителей

$ KAFKA_HOME / bin / kafka-consumer-groups.sh --bootstrap-server localhost: 9092 --group group_name --topic topic_name--reset-offsets --to-наиболее ранние --execute

  • Теперь запустите вашего потребителя

Это должно решить вашу проблему.

Вот некоторые потребительские свойства Kafka:

    bootstrap.servers: 'localhost:9092'
    group.id: 'group_id'
    auto.offset.reset: 'earliest'
    key.deserializer: 'org.apache.kafka.common.serialization.*' //Replace * with class
    value.deserializer: 'org.apache.kafka.common.serialization.*'

Спасибо

0 голосов
/ 08 марта 2019

Вы должны убедиться, что вы не отправляете пустые строки в конце и только потребляете latest - используйте auto.offset.reset: 'earliest', как указано выше, или properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

А потом, consumer.seekToBeginning(consumer.assignment()); чтобы убедиться.

0 голосов
/ 07 марта 2019

Вам нужно установить ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, чтобы читать с начала.

kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); 

А также убедитесь, что вы не запускаете разные процессы разных потребителей, использующие один и тот же идентификатор группы потребителей, поскольку данные из одного раздела могут быть прочитаны одним процессом, а другой ничего не увидит.

...