Потребитель консоли Kafka фиксирует неправильное смещение при использовании --max-messages - PullRequest
1 голос
/ 28 февраля 2020

У меня есть потребитель консоли kafka в версии 1.1.0, который я использую для получения сообщений от Kafka. Когда я использую скрипт kafka-console-consumer. sh с опцией --max-messages, кажется, что он совершает неправильные смещения.

Я создал топи c и группу потребителей и прочитал несколько сообщений:

/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.offset     1          374             374             0               -               -               -
test.offset     0          0               375             375             -               -               -

Чем я прочитал 10 таких сообщений:

/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
1 var_1
3 var_3
5 var_5
7 var_7
9 var_9
11 var_11
13 var_13
15 var_15
17 var_17
19 var_19
Processed a total of 10 messages

Но теперь смещения показывают, что он читает все сообщения в топи c

/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'my-consumer-group' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.offset     1          374             374             0               -               -               -
test.offset     0          375             375             0               -               -               -

И теперь, когда я хочу прочитать больше сообщений, я получаю сообщение об ошибке, что в топи c больше нет сообщений :

/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
[2020-02-28 08:27:54,782] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
kafka.consumer.ConsumerTimeoutException
        at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:98)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:129)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

Что я делаю не так? Почему смещение переместилось на последнее сообщение в topi c, а не просто на 10 сообщений?

1 Ответ

0 голосов
/ 28 февраля 2020

Это автоматическая фиксация потребителя Kafka. Как упоминалось в по этой ссылке :

Самый простой способ зафиксировать смещения - это позволить потребителю сделать это за вас. Если вы настроите enable.auto.commit = true, то каждые пять секунд потребитель будет фиксировать наибольшее смещение, полученное вашим клиентом от poll (). Пятисекундный интервал является значением по умолчанию и управляется настройкой auto .commit.interval.ms. Как и все остальное в потребителе, автоматические коммиты c управляются опросом l oop. Всякий раз, когда вы проводите опрос, потребитель проверяет, настало ли время для фиксации, и если это так, он фиксирует смещения, которые он возвратил в последнем опросе.

Так что в вашем случае, когда ваш опрос потребителей, он получает сообщения до 500 (значение по умолчанию max.poll.records) и через 5 секунд фиксирует наибольшее смещение, возвращаемое из последнего опроса (375 в вашем случае), даже если вы указали максимальное количество сообщений как 10.

- max-messages: Максимальное количество сообщений, потребляемых перед выходом. Если не установлено, потребление будет постоянным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...