Spring @Recover и @Retry не работают, когда org.springframework.util.concurrent.ListenableFuture выдает исключение - PullRequest
0 голосов
/ 27 мая 2019

Я использую Springs kafkaTemplate для публикации сообщения в теме Kafka. Теперь, чтобы реализовать некоторый отказоустойчивый механизм, я хочу повторить попытку до некоторого переменного количества попыток (в случае сбоя Kafka или сбоя сети) и если после "н" нет. повторных попыток, должен быть запущен метод восстановления, помеченный @Recover.

Вот мое определение интерфейса для того же:

@Retryable(
   value = {PartnerPortalException.class},
   maxAttempts = 2,
   backoff = @Backoff(delay = 100)
)
void sendMessage(BizWebKafkaTopicMessage message) throws PartnerPortalException;

@Recover
void recoverFromSendMessageFailure(PartnerPortalException ex, BizWebKafkaTopicMessage message);

Также метод org.springframework.kafka.core.KafkaTemplate # send () возвращает ListenableFuture, где я могу получить свои обратные вызовы (успех и неудача).

Реализация моего интерфейса

@Override
    public void sendMessage(BizWebKafkaTopicMessage message) {
        LOGGER.debug("Publishing Message : {} to Kafka Topic : {} from {}#sendMessage ", message, topicName, CLASS_NAME);

        ListenableFuture<SendResult<String, BizWebKafkaTopicMessage>> future = kafkaTemplate.send(topicName, message);

        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error("Unable to publish message : {} to topic : [{}] due to : {} from {}#sendMessage() ", message, topicName, ex.getMessage(), CLASS_NAME);
                // TODO Implement Retrying publishing of messages to kafka-topic.
                throw new PartnerPortalException(HttpStatus.INTERNAL_SERVER_ERROR.toString(), ERROR_PUBLISHING_MESSAGE_TO_KAFKA, (Exception) ex);
            }

            @Override
            public void onSuccess(SendResult<String, BizWebKafkaTopicMessage> sendResult) {
                LOGGER.info("Message : {} published to topic : [{}] with offset = [{}] from {}#sendMessage() ", message, topicName, sendResult.getRecordMetadata().offset(), CLASS_NAME);
            }
        });
    }

Проблема заключается даже в том, что onFailure() обратный вызов выдает исключение, @Retryable не работает и @Recover.

Я подозреваю, что это потому, что вложенный внутренний класс. Есть ли способ, которым я могу распространить это исключение, вызванное обратным вызовом onFailure (), на аннотацию @Retryable.

** Примечание **: Почему запускается onFailure, я отключил мой zookeeper и процессы Kafka, работающие в фоновом режиме, чтобы проверить мой механизм отказоустойчивости @Retryable и @Recover.

...