Является ли метод consumer.createMessageStreams (map) читать последовательно или читать в некотором пакете - PullRequest
0 голосов
/ 27 марта 2019

Я очень новичок в кафке.Мы пишем потребителю в нашем текущем приложении, которое использует тему и имеет некоторую обработку потребляемых данных.Я хочу понять, что происходит внутри, когда я пишу ниже кусок кода.

Он работает, как ожидалось, потребляет данные и обрабатывается, но просто любопытно узнать, как данные читаются из темы.

Будет ли метод createMessageStreams последовательно считывать данные из темы или считывать их в определенном количестве партий и обрабатывать их?

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

1 Ответ

1 голос
/ 27 марта 2019

Прежде всего, отметим, что классы ConsumerConnector или kafka.consumer.KafkaStream устарели в версии kafka v # 0.11.0.В случае, если вы используете старую версию, вы должны запланировать обновление до более новой версии по крайней мере v # 1.0 или более.пакета и обработать их?

.createMessageStreams возвращает карту темы и список пары KafkaStream.(topic,list#stream) Каждый поток поддерживает итератор для сообщений или пары метаданных для темы.Он читает данные последовательно только внутри раздела.Если у вас больше разделов, чем количество потоковых потоков, один поток может читать из нескольких разделов.Но только внутри разделов порядок последовательности гарантирован.

  for (final KafkaStream<byte[], byte[]> stream : streamList) 
    {
       ConsumerIterator<byte[], byte[]> it= stream.iterator();
       while (it.hasNext()) 
       {
          String message = new String(it.next().message());
          System.out.println(message);
        }
      }
}

Эквивалентная функциональность в v # 0.11 и далее - это метод .poll().Вы можете установить max.poll.records или max.poll.interval.ms, чтобы установить количество записей на запрос опроса и длительность интервала соответственно.

Вы можете найти нового потребителя здесь: https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

...