Получить значение объекта в вызове метода Onfailure () метода Kafka Send () - PullRequest
0 голосов
/ 02 мая 2020

Я хочу получить те объекты Person, которые не были отправлены в Kafka, т.е. метод onFailure ().

1) Я создал временный массив типа person и передал его в onFailure (). Но это не работает . Он всегда показывает одну и ту же работу temp [0]. Он всегда печатает последнее значение в списке. 2) Я попытался создать локальный объект Person для решения этой проблемы. Но я получил ошибку компиляции

"Локальная переменная pp, определенная в прилагаемой области видимости, должна быть окончательной или фактически конечной"

Как я могу решить эту проблему?

class Sender {

        @Autowired
        private KafkaTemplate<String, Person> template;

        private static final Logger LOG = LoggerFactory.getLogger(Sender.class);



        Person temp= null;

        // @Transactional("ktm")
        public void sendThem(List<Person> toSend) throws InterruptedException {
            List<ListenableFuture<SendResult<String, Person>>> futures = new ArrayList<>();


            ListenableFutureCallback<SendResult<String, Person>> callback = new ListenableFutureCallback<SendResult<String, Person>>() {

                @Override
                public void onSuccess(SendResult<String, Person> result) {
                    //LOG.info(" message success 1: " + result.getProducerRecord().value());
                    LOG.info(" message success 2: " + temp);
                }

                @Override
                public void onFailure(Throwable ex) {
                    LOG.info(" message failed : "  + temp);

                }
            };

            for (Person p : toSend) {

                temp=p;
                ListenableFuture<SendResult<String, Person>> future = template.send("t_101", temp);


                future.addCallback(callback);
            }



        }
    }

1 Ответ

1 голос
/ 02 мая 2020

Значение неудачной отправки доступно в Throwable - приведите его к KafkaProducerException. Ваш Person находится в producerRecord.value().

Я обновил свой ответ до ваш другой вопрос .

Вы также можете создать новый обратный вызов в l oop и тогда вам будет доступен p; но более эффективно использовать один обратный вызов и получить Person из исключения.

...