Kafka Consumer пропускает сообщения при перезагрузке - PullRequest
0 голосов
/ 21 апреля 2020

У меня работает кластер Kafka, и при перезапуске приложения (потребителя) он пропускает некоторые сообщения, которые были отправлены в topi c во время его работы.

Когда приложение работает, я вижу что он читает сообщение со смещением 100, а затем переводит смещение 101 в __consumer_offsets. Затем, когда приложение не работает, сообщения со смещением 101, 102 and 103 отправляются в топи c. После перезапуска приложения оно читает 101 и устанавливает его смещение на 104, таким образом, пропуская 102 and 103.

Это моя конфигурация:

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
config.put(ConsumerConfig.GROUP_ID_CONFIG, my-consumer);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

1 Ответ

0 голосов
/ 21 апреля 2020

При просмотре доступной информации не похоже, что ваш потребитель пропускает какие-либо сообщения.

После использования сообщения со смещением 100 во внутренней топике c __consumer_offsets сохраняется смещение. 101 для этого потребителя. Смещение 101 - это смещение следующее , которое потребитель будет читать в этом топи c.

после перезапуска, и имея еще 3 сообщения в топи c, потребитель начинает обработку смещения 101, а затем должен обрабатывать другие сообщения. Однако, основываясь на вашей стратегии фиксации, он сообщает внутренней топи c __consumer_offsets, что следующее прочитанное сообщение имеет смещение 104. Он не будет конкретно указывать 101, 102 и 103, так как все сообщения опрашиваются одновременно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...