, поэтому у меня есть контроллер конечной точки SpringBoot, который запускается так:
@RequestMapping(value = "/post", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public Response post(@Valid @RequestBody Message message) throws FailedToPostException {
message.setRecieveTime(System.currentTimeMillis());
return this.service.post(message);
}
и функция post:
public Response post(Message message) throws FailedToPostException{
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("topicName", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onSuccess(SendResult<String, Message> result) {
LOGGER.info("Post Finished. '{}' with offset: {}", message,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("Message Post Failed. '{}'", message, ex);
long nowMillis = System.currentTimeMillis();
int diffSeconds = (int) ((nowMillis - message.getRecieveTime()) / 1000);
if (diffSeconds >= 10) {
LOGGER.debug("timeout sending message to Kafka, aborting.");
return;
}
else {
post(message);
}
}
});
LOGGER.debug("D: " + Utils.getMetricValue("buffer-available-bytes", kafkaTemplate));
return new Response("Message Posted");
}
Теперь вы можете видеть, что мы пытаемся сделатьконечно, если kafkaTemplate.send
потерпел неудачу, мы будем рекурсивно вызывать post(message)
снова на срок до 10 секунд, пока буфер памяти производителя не очистится и сообщение не пройдет.
Проблемы:
- Мы хотим иметь возможность вернуть ответ об ошибке клиенту конечной точки (например: «Не удалось подтвердить сообщение»).
- Есть ли лучший способ обработки исключений из будущего вкусок кода, подобный приведенному выше?
- Есть ли способ избежать использования рекурсивной функции здесь?Мы сделали это, потому что мы хотели попытаться доставить сообщение в Kafka примерно на 10 секунд, прежде чем отправлять его как электронное письмо для просмотра.
Примечание: я все еще не использовал атрибут buffer-available-bytes
от kafkaTemplate.metrics (), я намереваюсь использовать его, чтобы минимизировать вероятность этой проблемы, но все же необходимо обработать вышеупомянутое только в случае некоторых условий гонки