Я использую Kafka слушатель сообщений потребителей.Я включил подтверждение вручную.Я использовал метод Acknowledgment.acknowledge () для фиксации смещения в случае успеха.В случае исключения из-за сбоя вызова API, я не зафиксировал его.
Я немного сбит с толку, поскольку незафиксированное сообщение не будет снова использоваться, пока я не перезапущу свое приложение.Я новичок в kafka, поэтому я уверен, что упускаю что-то действительно элементарное, но поиск был бесполезен для этого.Может кто-нибудь объяснить или указать мне правильный источник, где я могу знать, что здесь не так?
@KafkaListener(topics = "temp_topic", groupId = "temp-consumer", containerFactory = "tempListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord,
Acknowledgment acknowledgment) {
try {
log.info("Consuming message : {} ", consumerRecord.value().toString());
String message = consumerRecord.value().toString();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Payload payload = objectMapper.readValue(message,Payload.class);
tempService.processEvent(payload);
log.info("Message: {} successfully consumed",message);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Error while listening to kafka event: ", e);
}
}
Заранее спасибо.