Как сделать коммит после каждого сообщения кафки в spring-kafka? - PullRequest
2 голосов
/ 26 октября 2019

В нашем приложении мы используем потребителя kafka для определения отправки электронной почты.

На днях у нас была проблема, когда раздел kafka истекал до того, как он смог прочитать и обработать все свои записи. ,В результате он вернулся к началу раздела и не смог завершить набор полученных записей, а новые данные, сгенерированные после начала цикла, так и не были обработаны.

Мои команды предположили, что мымог бы сказать Kafka о коммите после прочтения каждого сообщения, однако я не могу понять, как это сделать из Spring-kakfa.

Приложение использует spring-kafka 2.1.6, и код потребителя вроде как.

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, @Header("kafka_offset") int offSet) {
    try{
        EmailData data = objectMapper.readValue(message, EmailData.class);
        if(isEligableForEmail(data)){
            emailHandler.sendEmail(data)
        }
    } catch (Exception e) {
        log.error("Error: "+e.getMessage(), e);
    }
}

Примечание: функция sendEmail использует CompletableFutures, поскольку она должна вызывать другой API перед отправкой электронного письма.

Конфигурация: (фрагмент файла yaml для потребителя ичасть производителя)

consumer:
      max.poll.interval.ms: 3600000
producer: 
      retries: 0
      batch-size: 100000
      acks: 0
      buffer-memory: 33554432
      request.timeout.ms: 60000
      linger.ms: 10
      max.block.ms: 5000

Ответы [ 2 ]

1 голос
/ 26 октября 2019

Если вы хотите ручное подтверждение, вы можете предоставить Acknowledgment в аргументах метода

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void consume(String message, Acknowledgment ack, @Header("kafka_offset") int offSet) {

При использовании ручного AckMode вы также можете предоставить слушателю подтверждение. В следующем примере также показано, как использовать другую фабрику контейнеров.

Пример из документации @ KafkaListener Аннотация

@KafkaListener(id = "cat", topics = "myTopic",
      containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();

}
0 голосов
/ 26 октября 2019

Установите для свойства контейнера ackMode значение AckMode.RECORD, чтобы зафиксировать смещение после каждой записи.

Также следует рассмотреть возможность уменьшения max.poll.records или увеличения max.poll.interval.ms потребительских свойств Kafka.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...