Я пытаюсь протестировать пружинную кафку (2.2.5.RELEASE), где, когда производитель отправляет сообщение с помощью kafkatemplate, я хочу знать, было ли это сообщение отправлено успешно или нет. Исходя из этого, я хотел бы обновить запись БД для этого идентификатора сообщения. Как лучше всего справиться с этим сценарием?
Вот пример кода, который проверяет успех или неудачу
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test_topic", key, userMsg);
SendResult<String, String> result = null;
try {
result = future.get(10000, TimeUnit.MILLISECONDS);
LOGGER.info("Send Result : {}", result);
LOGGER.info("Saving entry in db");
messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
} catch (Exception e) {
LOGGER.error("Error publishing message ", e);
messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
}