Я использую Springs kafkaTemplate для публикации сообщения в теме Kafka. Теперь, чтобы реализовать некоторый отказоустойчивый механизм, я хочу повторить попытку до некоторого переменного количества попыток (в случае сбоя Kafka или сбоя сети) и если после "н" нет. повторных попыток, должен быть запущен метод восстановления, помеченный @Recover
.
Вот мое определение интерфейса для того же:
@Retryable(
value = {PartnerPortalException.class},
maxAttempts = 2,
backoff = @Backoff(delay = 100)
)
void sendMessage(BizWebKafkaTopicMessage message) throws PartnerPortalException;
@Recover
void recoverFromSendMessageFailure(PartnerPortalException ex, BizWebKafkaTopicMessage message);
Также метод org.springframework.kafka.core.KafkaTemplate # send () возвращает ListenableFuture, где я могу получить свои обратные вызовы (успех и неудача).
Реализация моего интерфейса
@Override
public void sendMessage(BizWebKafkaTopicMessage message) {
LOGGER.debug("Publishing Message : {} to Kafka Topic : {} from {}#sendMessage ", message, topicName, CLASS_NAME);
ListenableFuture<SendResult<String, BizWebKafkaTopicMessage>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
LOGGER.error("Unable to publish message : {} to topic : [{}] due to : {} from {}#sendMessage() ", message, topicName, ex.getMessage(), CLASS_NAME);
// TODO Implement Retrying publishing of messages to kafka-topic.
throw new PartnerPortalException(HttpStatus.INTERNAL_SERVER_ERROR.toString(), ERROR_PUBLISHING_MESSAGE_TO_KAFKA, (Exception) ex);
}
@Override
public void onSuccess(SendResult<String, BizWebKafkaTopicMessage> sendResult) {
LOGGER.info("Message : {} published to topic : [{}] with offset = [{}] from {}#sendMessage() ", message, topicName, sendResult.getRecordMetadata().offset(), CLASS_NAME);
}
});
}
Проблема заключается даже в том, что onFailure()
обратный вызов выдает исключение, @Retryable
не работает и @Recover
.
Я подозреваю, что это потому, что вложенный внутренний класс. Есть ли способ, которым я могу распространить это исключение, вызванное обратным вызовом onFailure (), на аннотацию @Retryable.
** Примечание **: Почему запускается onFailure, я отключил мой zookeeper и процессы Kafka, работающие в фоновом режиме, чтобы проверить мой механизм отказоустойчивости @Retryable и @Recover.