Потребитель Kafka читает последнее зафиксированное смещение при повторном запуске (Java) - PullRequest
1 голос
/ 26 марта 2019

У меня есть потребитель kakfa, для которого enable.auto.commit имеет значение false . Всякий раз, когда я перезапускаю свое потребительское приложение, оно всегда снова читает последнее зафиксированное смещение, а затем следующие смещения.

Например Последнее зафиксированное смещение равно 50. Когда я перезапускаю потребителя, сначала снова читается смещение 50, а затем следующие смещения.

Я выполняю commitsync, как показано ниже.

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("sometopic", partition), new OffsetAndMetadata(offset));
kafkaconsumer.commitSync(offsets);

Я пытался установить auto.offset.reset до самое раннее и самое последнее но это не меняет поведение.

Я что-то упустил в потребительской конфигурации?

config.put(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENT_ID");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "GROUP_ID");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CustomDeserializer.class.getName());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

Ответы [ 2 ]

2 голосов
/ 26 марта 2019

Если вы хотите использовать commitSync(offset), вы должны быть осторожны и прочитать его Javadoc :

Фиксированное смещение должно быть следующим сообщением, которое ваше приложение будет использовать, т.е. lastProcessedMessageOffset + 1.

Если вы не добавите + 1 к смещению, ожидается, что при следующем перезапуске потребитель снова получит последнее сообщение. Как упоминалось в другом ответе, если вы используете commitSync() без каких-либо аргументов, вам не нужно беспокоиться об этом

1 голос
/ 26 марта 2019

Похоже, вы пытаетесь зафиксировать, используя new OffsetAndMetadta(offset). Это не типичное использование.

Вот пример из документации под Ручное управление смещением :

 List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }

https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Обратите внимание, что вызов consumer.commitSync() выполняется без каких-либо параметров. Он просто потребляет, и он совершит все, что было израсходовано до этого момента.

...