Подтвердить вручную слушателя Kafka Batch. Что я делаю не так для многопоточности? - PullRequest
0 голосов
/ 18 марта 2020

Я создаю приложение, которое использует данные из kafka и выполняет некоторую обработку на нем. Поскольку масштаб работы этого приложения огромен, я использую слушатель Concurrent Batch (max.poll.records = 20) со службой Java 8 Executor. Ниже приведен фрагмент кода с комментариями, где я застрял:

    //Below method is invoked by spring kafka concurrent batch listener
public void onMessage(List<ConsumerRecord<String,String>> records , Acknowledgement ack)
{
    CompletableFuture.runAsync(() -> records.stream().forEach(record -> 
    doSomeProcessingAsynchronously()),new ThreadPoolExecutor(10,10,0,TimeUnit.MILLISECONDS. new 
    LinkedBlockingQueue<>(20)));
    //I want to execute all consumed records first only then flow should come after this line
    //and acknowledge complete batch
    ack.acknowledge();
}

Есть предложения о том, как это сделать? Заранее спасибо

1 Ответ

0 голосов
/ 18 марта 2020

Добавьте

CountDownLatch latch = new CountDownLatch(records.size());

, затем в лямбда-счетчике просчитайте его для каждого выполненного задания.

Перед подтверждением boolean ok = latch.await(60, TimeUnit.SECONDS);.

(проверьте время ожидания и действуйте соответственно).

records.stream().forEach(...

Тем не менее, вы не выполняете обработку параллельно; вы просто запускаете все 20 последовательно в другом потоке.

Вам нужно переместить forEach наружу.

...