В нашем приложении мы используем потребителя kafka для определения отправки электронной почты.
На днях у нас была проблема, когда раздел kafka истекал до того, как он смог прочитать и обработать все свои записи. ,В результате он вернулся к началу раздела и не смог завершить набор полученных записей, а новые данные, сгенерированные после начала цикла, так и не были обработаны.
Мои команды предположили, что мымог бы сказать Kafka о коммите после прочтения каждого сообщения, однако я не могу понять, как это сделать из Spring-kakfa.
Приложение использует spring-kafka 2.1.6, и код потребителя вроде как.
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, @Header("kafka_offset") int offSet) {
try{
EmailData data = objectMapper.readValue(message, EmailData.class);
if(isEligableForEmail(data)){
emailHandler.sendEmail(data)
}
} catch (Exception e) {
log.error("Error: "+e.getMessage(), e);
}
}
Примечание: функция sendEmail использует CompletableFutures, поскольку она должна вызывать другой API перед отправкой электронного письма.
Конфигурация: (фрагмент файла yaml для потребителя ичасть производителя)
consumer:
max.poll.interval.ms: 3600000
producer:
retries: 0
batch-size: 100000
acks: 0
buffer-memory: 33554432
request.timeout.ms: 60000
linger.ms: 10
max.block.ms: 5000