Один из распространенных подходов в этом случае - поместить все ваши записи в очередь. И иметь один поток, который возьмет эти записи, когда ваша очередь достигнет размера 10 или после 1000 мс, в зависимости от того, что будет первым.
Код потребителя:
CountDownLatch countDownLatch = new CountDownLatch(10);
countDownLatch.await(1000, TimeUnit.MILLISECONDS);
int queueSize = queue.size();
for(int i = 0; i < queueSize; ++i) {
... do your work here or put in a batch a do it right after loop
}
Код производителя:
Record record = ...receive new record...
queue.put(record);
consumer.getCountDownLatch().countDown();
В качестве очереди я рекомендую использовать несвязанную, например LinkedTransferQueue
, потому что вы не хотите останавливать своего продюсера при достижении 10 задач, вам все равно нужно использовать результаты из kafka.
Также другой вариант реактивные потоки .