Обработка исключений при отправке сообщения через шаблон Kafka - PullRequest
0 голосов
/ 28 февраля 2020

Мне было интересно, есть ли способ перехватить исключение / throwable при создании сообщения kafka с помощью шаблона Kafka.

Я не вижу ничего, где бы он генерировал исключение KafkaException в случае сбоя. Мне нужно выяснить, было ли сообщение отправлено в Kafka, прежде чем я смогу продолжить выполнение моего приложения. Я знаю, что ListenableFuture будет регистрировать ошибку, но я не знаю, как перехватить это onFailure.

public void sendToKafkaTopic(KafkaMessage data) {

    ListenableFuture<SendResult<String, KafkaMessage>> future = kafkaTemplate.send(primaryKafkaTopic, data);
    future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaMessage>>() {

      @Override
      public void onSuccess(SendResult<String, KafkaMessage> result) {
        log.info("sent message='{}' with offset={}", data,
            result.getRecordMetadata().offset());
      }

      @Override
      public void onFailure(Throwable ex) {
        log.error("unable to send message='{}'", data, ex);
      }
    });
  }

Я пытался найти что-то вроде этого:

  public void sendToKafkaTopic(KafkaMessage data) {

    try {
      kafkaTemplate.send(primaryKafkaTopic, data);
    } catch (RuntimeException err) {
      log.error("unable to send message='{}'", data, ex);
      throw new CustomException(err);
    }
  }
}

1 Ответ

1 голос
/ 28 февраля 2020

прежде чем я смогу продолжить работу с моим приложением

Нет ничего плохого, если вы просто сделаете Future.get(). Когда исключение случится вниз по течению, оно будет выброшено вам из этой блокировки get().

...