Создайте CountDownLatch
, например new CountDownLatch(50)
, добавьте слушателя в каждое прослушиваемое будущее и посчитайте защелку в каждом. Вы можете использовать один и тот же слушатель для всех фьючерсов, вместо того, чтобы каждый раз создавать новый.
Затем, после отправки 50 записей, используйте latch.await(10, TimeUnit.SECONDS)
. Если время истекло, вы можете перебрать свое будущее, чтобы выяснить, какие из них не завершены.
РЕДАКТИРОВАТЬ
@Component
class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
public void sendThem(KafkaTemplate<String, String> template, List<String> toSend) throws InterruptedException {
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(toSend.size());
ListenableFutureCallback<SendResult<String, String>> callback =
new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOG.info(result.getRecordMetadata().toString());
latch.countDown();
}
@Override
public void onFailure(Throwable ex) {
ProducerRecord<?, ?> producerRecord = ((KafkaProducerException) ex).getProducerRecord();
LOG.error("Failed; " + producerRecord, ex);
latch.countDown();
}
};
toSend.forEach(str -> {
ListenableFuture<SendResult<String, String>> future = template.send("so61490633", str);
future.addCallback(callback);
});
if (latch.await(10, TimeUnit.SECONDS)) {
LOG.info("All sent ok");
}
else {
for (int i = 0; i < toSend.size(); i++) {
if (!futures.get(i).isDone()) {
LOG.error("No send result for " + toSend.get(i));
}
}
}
}
}