Я использую @KafkaListener для потребления из темы kafka, у меня есть логика приложения для обработки каждой записи несколькими потребителями в разных группах потребителей.
Теперь моя проблема в том, что -Я должен отправитьизрасходованное сообщение для сторонней точки отдыха.Если отправка сообщения завершается неудачно, мне не следует фиксировать смещение, и мне нужно повторить попытку отправки сообщения на основе настраиваемого числа раз.После настраиваемых нет повторных попыток, если не удается отправить сообщение.Мне нужно записать причину исключения и зафиксировать смещение.
Я использую пружинную кафку 2.2.0.Ниже моя конфигурация для подтверждения вручную
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual-immediate
logging.level.org.springframework.kafka=debug
До сих пор я могу использовать сообщение, опубликованное в теме другим потребителем.а также в состоянии вручную подтвердить смещение от другого потребителя.например,
@KafkaListener(topics = "testTopic",groupId="group-1")
public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("OF PayLoad:{}", record);
logger.info("OF value:"+record.value());
logger.info("OF Offset:{}",record.offset());
ack.acknowledge();
}
@KafkaListener(topics = "testTopic",groupId="group-2")
public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("MF PayLoad_2:{}", record);
logger.info("MF value_2:"+record.value());
logger.info("MF Offset_2:{}",record.offset());
//ack.acknowledge();
}
здесь, consumerTwo не подтверждает сообщение.Я хочу повторить попытку отправить все сообщение потребителю снова в зависимости от настраиваемого промежутка времени.
Любой пример или предложение будут очень полезны.