Как проверить, что производитель sprng kafka успешно отправил сообщение или нет? - PullRequest
0 голосов
/ 17 марта 2020

Я пытаюсь протестировать пружинную кафку (2.2.5.RELEASE), где, когда производитель отправляет сообщение с помощью kafkatemplate, я хочу знать, было ли это сообщение отправлено успешно или нет. Исходя из этого, я хотел бы обновить запись БД для этого идентификатора сообщения. Как лучше всего справиться с этим сценарием?

Вот пример кода, который проверяет успех или неудачу

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test_topic", key, userMsg);
    SendResult<String, String> result = null;
    try {
        result = future.get(10000, TimeUnit.MILLISECONDS);
        LOGGER.info("Send Result : {}", result);
        LOGGER.info("Saving entry in db");
        messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
    } catch (Exception e) {
        LOGGER.error("Error publishing message ", e);
        messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
    }

1 Ответ

1 голос
/ 17 марта 2020

Операции send() возвращают ListenableFuture<>, который завершается асинхронно.

Если вы хотите заблокировать вызывающий поток для получения результата, используйте

future.get(timeout, TimeUnit.MILLISECONDS);
...