Реализовать микропакет в Java - PullRequest
0 голосов
/ 05 мая 2018

Я разрабатываю приложение на основе kafka, в котором слушатель kafka будет прослушивать запись; как только kafka получит запись, мне может понадобиться записать запись в файл. Здесь, чтобы записать запись в файл, мы хотим использовать микропакет с настройками размера пакета и времени ожидания. Например, размер пакета равен 10, а время ожидания равно 1000 мс, что означает ожидание 10 записей перед записью в файл с временем ожидания 1000 мс. Если в любом случае Кафка получил только 5 записей за 1000 мс, то записать только 5 записей в этом пакете.

Насколько эффективно я могу сделать это на Java.

Ответы [ 2 ]

0 голосов
/ 05 мая 2018

Похоже, вы должны использовать Kafka Connect API . Это часть Apache Kafka и предназначена для поддержки описанного вами процесса.

Здесь есть руководство разработчика .

0 голосов
/ 05 мая 2018

Один из распространенных подходов в этом случае - поместить все ваши записи в очередь. И иметь один поток, который возьмет эти записи, когда ваша очередь достигнет размера 10 или после 1000 мс, в зависимости от того, что будет первым.

Код потребителя:

 CountDownLatch countDownLatch = new CountDownLatch(10);
 countDownLatch.await(1000, TimeUnit.MILLISECONDS);
 int queueSize = queue.size();
 for(int i = 0; i < queueSize; ++i) {
     ... do your work here or put in a batch a do it right after loop
 }

Код производителя:

 Record record = ...receive new record...
 queue.put(record);
 consumer.getCountDownLatch().countDown();

В качестве очереди я рекомендую использовать несвязанную, например LinkedTransferQueue, потому что вы не хотите останавливать своего продюсера при достижении 10 задач, вам все равно нужно использовать результаты из kafka.

Также другой вариант реактивные потоки .

...