Ожидание списка ListenAbleFuture, возвращаемого Kafka Send API - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть список ListenAbleFuture.Я хочу подождать этого списка ListenableFuture<SendResult<Integer, String>> не более 15 минут, если они еще не закончились. Как мне этого добиться.

В настоящее время я делаю это, но это ожидание 15 минут для каждого ListenAbleFuture, чего я не хочу.

 for (ListenableFuture<SendResult<Integer, String>> m : myFutureList) {

                    m.get(15, TimeUnit.MINUTES) ;
    }

ListenableFuture<SendResult<Integer, String>> is from import org.springframework.util.concurrent.ListenableFuture;

Я прошел через Ожидание в списке Future , но это решение для завершаемого будущего

1 Ответ

1 голос
/ 29 апреля 2020

Создайте CountDownLatch, например new CountDownLatch(50), добавьте слушателя в каждое прослушиваемое будущее и посчитайте защелку в каждом. Вы можете использовать один и тот же слушатель для всех фьючерсов, вместо того, чтобы каждый раз создавать новый.

Затем, после отправки 50 записей, используйте latch.await(10, TimeUnit.SECONDS). Если время истекло, вы можете перебрать свое будущее, чтобы выяснить, какие из них не завершены.

РЕДАКТИРОВАТЬ

@Component
class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    public void sendThem(KafkaTemplate<String, String> template, List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback =
                new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(result.getRecordMetadata().toString());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                ProducerRecord<?, ?> producerRecord = ((KafkaProducerException) ex).getProducerRecord();
                LOG.error("Failed; " + producerRecord, ex);
                latch.countDown();
            }
        };
        toSend.forEach(str -> {
             ListenableFuture<SendResult<String, String>> future = template.send("so61490633", str);
             future.addCallback(callback);
        });
        if (latch.await(10, TimeUnit.SECONDS)) {
            LOG.info("All sent ok");
        }
        else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }
    }

}
...