У меня есть потребитель kakfa, для которого enable.auto.commit имеет значение false . Всякий раз, когда я перезапускаю свое потребительское приложение, оно всегда снова читает последнее зафиксированное смещение, а затем следующие смещения.
Например Последнее зафиксированное смещение равно 50. Когда я перезапускаю потребителя, сначала снова читается смещение 50, а затем следующие смещения.
Я выполняю commitsync, как показано ниже.
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("sometopic", partition), new OffsetAndMetadata(offset));
kafkaconsumer.commitSync(offsets);
Я пытался установить auto.offset.reset до самое раннее и самое последнее но это не меняет поведение.
Я что-то упустил в потребительской конфигурации?
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENT_ID");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "GROUP_ID");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CustomDeserializer.class.getName());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");