У меня есть потребитель spring-kafka, который читает записи и передает их в кеш. Запланированное задание будет периодически очищать записи в кэше. Я хочу обновить COMMIT OFFSET только после успешного сохранения партии в базе данных. Я попытался передать объект подтверждения в службу кэширования, чтобы вызвать метод подтверждения, как показано ниже.
public class KafkaConsumer {
@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
cacheService.add( record.value(), acknowledgment );
}
}
public class CacheService {
// concurrency handling has been left out in favor of readability
public void add( String record, Acknowledgment acknowledgment ) {
this.records.add(record);
this.lastAcknowledgment = acknowledgment;
}
public void saveBatch() { //called by scheduled task
if( records.size() == BATCH_SIZE ) {
// perform batch insert into database
this.lastAcknowledgment.acknowledge();
this.records.clear();
}
}
}
AckMode был установлен следующим образом:
factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );
И автоматическая фиксация ложна:
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
Даже несмотря на то, что метод подтверждения вызывается, смещение фиксации не обновляется. Каков наилучший способ обновления смещения фиксации после сохранения записей?
Я использую spring-kafka 2.1.7. RELEASE.
РЕДАКТИРОВАТЬ : После того, как @GaryRussell подтвердил , что подтверждения, сделанные внешними потоками, выполняются потребительским потоком во время следующего опроса, я перепроверил свой код и обнаружил ошибку в том, как последний объект подтверждения установлен. После исправления смещения фиксации ОБНОВЛЯЕТСЯ, как и ожидалось . Так что эта проблема была решена. Однако я не могу пометить этот вопрос как ответивший.