У меня работает кластер Kafka, и при перезапуске приложения (потребителя) он пропускает некоторые сообщения, которые были отправлены в topi c во время его работы.
Когда приложение работает, я вижу что он читает сообщение со смещением 100
, а затем переводит смещение 101
в __consumer_offsets
. Затем, когда приложение не работает, сообщения со смещением 101, 102 and 103
отправляются в топи c. После перезапуска приложения оно читает 101
и устанавливает его смещение на 104
, таким образом, пропуская 102 and 103
.
Это моя конфигурация:
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
config.put(ConsumerConfig.GROUP_ID_CONFIG, my-consumer);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);