Почему мой простой пример Kafka Consumer не работает - PullRequest
0 голосов
/ 29 мая 2018

У меня проблемы с привлечением к работе самого простого потребителя кафки.Я использую kafka-clients-1.1.0.jar

Вот все, что я сделал.

  1. Запущен zookeeper в командной строке (все команды запускаются из)

zookeeper-server-start.bat ../../config/zookeeper.properties

Запущен сервер Kafka

kafka-server-start.bat ../../config/server.properties

Создал новую тему 'hellotopic' и подтвердил ее, перечислив темы

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hellotopic Created topic "hellotopic".

Подтвердите, перечислив темы

D:\RC\Softwares\kafka_2.12-1.1.0\kafka_2.12-1.1.0\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181 hellotopic

Отправить сообщение в тему и проверить то же самое на консоли потребителя

kafka-console-producer.bat --broker-list localhost:9092 --topic hellotopic --property "parse.key=true" --property "key.separator=:"

Ключ сообщения и значение, введенные как показано ниже

key1:value1

Вы можете видеть, что на потребителе консоли мы можем видеть сообщение в теме 'hellotopic'

kafka-console-consumer.bat --zookeeper localhost:2181 --topic hellotopic --from-beginning

Вывод команды выше приведен ниже.Мы можем видеть значение сообщения 'value1', которое было отправлено

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. value1

Теперь, когда у нас есть тема с сообщением, я запускаю свой простой потребительский код Java kafka, чтобы получить всесообщения в теме 'hellotopic'.Ниже приведен код

import java.util.Arrays\;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SampleConsumer {
    public static void main(String[] args) {
        System.out.println("Start consumer code");
        Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test-consumer-group");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("hellotopic"));
         //while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         //}
         System.out.println("End consumer code");
    }
}

Когда мы запустим вышеприведенный класс, мы увидим вывод

Start consumer code
End consumer code

Много пытался найти проблему, но пока не повезло.Большое спасибо за помощь в этом простом примере.

1 Ответ

0 голосов
/ 29 мая 2018

Я вижу две проблемы с кодом:

  1. Вам не хватает определенной конфигурации, которая заставляет потребителя начинать с самого раннего смещения: props.put("auto.offset.reset", "earliest"); Фактически переведено --from-beginning в потребителе командной строки.к этому конфигу.Этот конфиг указывает потребителю начинать с самого раннего смещения, если не найдено зафиксированного смещения для соответствующей темы и раздела в группе.
  2. Фактическое poll должно быть в цикле.Один poll может не дать потребителю достаточно времени для подписки, а также для получения данных.Один из распространенных способов сделать опрос заключается в следующем:

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    } finally {
        consumer.close();
    }
    
...