Я создаю приложение, которое использует данные из kafka и выполняет некоторую обработку на нем. Поскольку масштаб работы этого приложения огромен, я использую слушатель Concurrent Batch (max.poll.records = 20) со службой Java 8 Executor. Ниже приведен фрагмент кода с комментариями, где я застрял:
//Below method is invoked by spring kafka concurrent batch listener
public void onMessage(List<ConsumerRecord<String,String>> records , Acknowledgement ack)
{
CompletableFuture.runAsync(() -> records.stream().forEach(record ->
doSomeProcessingAsynchronously()),new ThreadPoolExecutor(10,10,0,TimeUnit.MILLISECONDS. new
LinkedBlockingQueue<>(20)));
//I want to execute all consumed records first only then flow should come after this line
//and acknowledge complete batch
ack.acknowledge();
}
Есть предложения о том, как это сделать? Заранее спасибо