Как использовать многопоточность в Кафке - PullRequest
0 голосов
/ 18 мая 2018

Я пытаюсь прочитать сообщения kafka от производителя, используя Многопоточность Java .

Предположим, Kafka Producer отправить несколько сообщений на Kafka Consumer ,тогда как читать эти несколько сообщений отдельно, используя ExecutorService в JAVA

1 Ответ

0 голосов
/ 18 мая 2018

Я реализовал ваш упомянутый случай и собираюсь поделиться шагами, которые вы должны выполнить.

Создайте потребительский класс, который должен реализовать интерфейс Runnable, он должен иметь экземпляр Kafkaconsumer в качестве члена класса,Вы можете настроить потребительские свойства в методе конструктора.

public LogConsumer(List<String> topics, String group, String brokerList) {

    Properties propsConsumer = new Properties();
    propsConsumer.put("bootstrap.servers", brokerList);
    propsConsumer.put("group.id", group);
    propsConsumer.put("enable.auto.commit", "false");
    propsConsumer.put("key.deserializer", StringDeserializer.class);
    propsConsumer.put("value.deserializer", ByteArrayDeserializer.class);
    propsConsumer.put("auto.offset.reset", "latest");

    this.consumer = new KafkaConsumer(propsConsumer);
    this.consumer.subscribe(topics);
}

Затем в методе run вы можете использовать каждое сообщение kafka, как показано ниже.

public void run() {
    try {
        while (!flagOfThread) {
            ConsumerRecords<String, byte []> records = consumer.poll(10000);
            for (ConsumerRecord<String, byte []> record : records) {
                handleRecord(record);
            }
            // At least one
            consumer.commitSync();
        }
    } catch (WakeupException e) {
        // Ignore exception if closing
        if (!flagOfThread){
            LOG.error("Log Consumer is shutting down.",e);
        }

    } finally {
        consumer.close();
    }
}

В классе вашего приложения Runner, вы должны создать пул потоков, этот счетчик пула потоков должен быть равен количеству разделов темы .

 ExecutorService executorService = Executors.newFixedThreadPool(parallelismCount);
    for (int i = 0; i < parallelismCount; i++) {
        ExecutionLogConsumer bean = new LogConsumer(/*parameters*/);
        executorService.execute(bean);
    }

Теперь вы можете начать использовать ваши сообщения от kafka:)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...