Мне было интересно, есть ли способ перехватить исключение / throwable при создании сообщения kafka с помощью шаблона Kafka.
Я не вижу ничего, где бы он генерировал исключение KafkaException в случае сбоя. Мне нужно выяснить, было ли сообщение отправлено в Kafka, прежде чем я смогу продолжить выполнение моего приложения. Я знаю, что ListenableFuture будет регистрировать ошибку, но я не знаю, как перехватить это onFailure.
public void sendToKafkaTopic(KafkaMessage data) {
ListenableFuture<SendResult<String, KafkaMessage>> future = kafkaTemplate.send(primaryKafkaTopic, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaMessage>>() {
@Override
public void onSuccess(SendResult<String, KafkaMessage> result) {
log.info("sent message='{}' with offset={}", data,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("unable to send message='{}'", data, ex);
}
});
}
Я пытался найти что-то вроде этого:
public void sendToKafkaTopic(KafkaMessage data) {
try {
kafkaTemplate.send(primaryKafkaTopic, data);
} catch (RuntimeException err) {
log.error("unable to send message='{}'", data, ex);
throw new CustomException(err);
}
}
}