Потребитель командной строки Kafka читает, но не может читать через Java - PullRequest
0 голосов
/ 13 декабря 2018

Я создал тему вручную test с помощью этой команды:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

и с помощью этой команды:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Я вставил следующие записи:

This is a message
This is another message
This is a message2

Во-первых, я получаю сообщения через командную строку следующим образом:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

, и все записи успешно отображаются.Затем я пытаюсь реализовать потребитель на Java, используя этот код:

public class KafkaSubscriber {

    public void consume() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));
        // also with this command
        // consumer.subscribe(Arrays.asList("test"));

        System.out.println("Starting to read data...");

        try {
            while (true) {
                try {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    System.out.println("Number of records found: " + records.count());
                    for (ConsumerRecord rec : records) {
                        System.out.println(rec.value());
                    }
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
        catch (Exception e) {
                e.printStackTrace();
        } finally {
            consumer.close();
        }
}

Но вывод:

Starting to read data...
0
0
0
0
0
....

Это означает, что он не находит никаких записей в теме test.Я также пытался опубликовать некоторые записи после того, как запустил потребитель Java, но снова то же самое.Есть идеи, что может пойти не так?


EDIT : После добавления следующей строки:

 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

потребитель теперь читает только когда я пишу новые записик теме.Он не читает все записи с самого начала.

1 Ответ

0 голосов
/ 13 декабря 2018

По умолчанию, если для группы ранее не было смещений, потребитель начинает с завершающих тем.

Следовательно, если вы запускаете его после создания записей, он не получит их.

Обратите внимание, что в вашем kafka-console-consumer.sh есть флаг --from-beginning, который заставляет потребителя вместо этого начинать с начала темы.

Один обходной путь, как предлагается в комментарии, заключается вустановите ConsumerConfig.AUTO_OFFSET_RESET_CONFIG на earliest.Однако я был бы осторожен с этим параметром, так как ваш потребитель будет использовать с самого начала тем, и это может быть много данных в реальном случае использования.

Самое простое решение теперь, когда вы запустилиВаш потребитель один раз и он создал группу, вы можете просто повторно запустить производителя.После этого, когда вы снова запустите потребителя, он получит свою последнюю позицию, которая будет перед сообщениями нового производителя.

С другой стороны, если вы хотите всегда повторять все сообщения, у вас есть 2 варианта:

  • явно использовать seekToBeginning(), когда ваш потребитель начинает перемещать свою позицию в начало тем

  • установить auto.offset.reset в earliestи отключите автоматическую фиксацию смещения, установив enable.auto.commit в false

...