Как обрабатывать исключения в SpringBoot ListenableFuture - PullRequest
0 голосов
/ 25 мая 2018

, поэтому у меня есть контроллер конечной точки SpringBoot, который запускается так:

@RequestMapping(value = "/post", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public Response post(@Valid @RequestBody Message message) throws FailedToPostException {
    message.setRecieveTime(System.currentTimeMillis());

    return this.service.post(message);
}

и функция post:

public Response post(Message message) throws FailedToPostException{


    ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("topicName", message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {

        @Override
        public void onSuccess(SendResult<String, Message> result) {
            LOGGER.info("Post Finished. '{}' with offset: {}", message,
                        result.getRecordMetadata().offset());
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.error("Message Post Failed. '{}'", message, ex);

            long nowMillis = System.currentTimeMillis();
            int diffSeconds =  (int) ((nowMillis - message.getRecieveTime()) / 1000);

            if (diffSeconds >= 10) {
                LOGGER.debug("timeout sending message to Kafka, aborting.");
                return;
            }
            else {
                post(message);
            }
        }
    });

    LOGGER.debug("D: " + Utils.getMetricValue("buffer-available-bytes", kafkaTemplate));

    return new Response("Message Posted");
}

Теперь вы можете видеть, что мы пытаемся сделатьконечно, если kafkaTemplate.send потерпел неудачу, мы будем рекурсивно вызывать post(message) снова на срок до 10 секунд, пока буфер памяти производителя не очистится и сообщение не пройдет.

Проблемы:

  1. Мы хотим иметь возможность вернуть ответ об ошибке клиенту конечной точки (например: «Не удалось подтвердить сообщение»).
  2. Есть ли лучший способ обработки исключений из будущего вкусок кода, подобный приведенному выше?
  3. Есть ли способ избежать использования рекурсивной функции здесь?Мы сделали это, потому что мы хотели попытаться доставить сообщение в Kafka примерно на 10 секунд, прежде чем отправлять его как электронное письмо для просмотра.

Примечание: я все еще не использовал атрибут buffer-available-bytesот kafkaTemplate.metrics (), я намереваюсь использовать его, чтобы минимизировать вероятность этой проблемы, но все же необходимо обработать вышеупомянутое только в случае некоторых условий гонки

1 Ответ

0 голосов
/ 25 мая 2018

Есть несколько способов сделать это, но мне действительно нравится Spring Retry как способ решения этой проблемы.Здесь немного псевдокода, но если вам нужно больше подробностей о том, как это сделать, я мог бы сделать вещи более явными:

@Retryable(maxAttempts = 10, value = KafkaSendException.class)
public Response post(Message message) throws FailedToPostException{
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("topicName", message);

    try {
        future.get(1. TimeUnit.SECONDS);
    } catch(SomeException ex) {
        LOGGER.error("Message Post Failed. '{}'", ex.getCause().getMessage(), ex);
        throw ex;
    }

    LOGGER.info("Post Finished. '{}' with offset: {}", message,
                        result.getRecordMetadata().offset());

}

Эффективно делает то же самое без повторения.Я бы не советовал использовать повторяющийся код для обработки ошибок.

Контроллер должен иметь возможность обрабатывать фактические значения KafkaSendException хорошими @ExceptionHandler.

...