Прежде всего, отметим, что классы ConsumerConnector
или kafka.consumer.KafkaStream
устарели в версии kafka v # 0.11.0.В случае, если вы используете старую версию, вы должны запланировать обновление до более новой версии по крайней мере v # 1.0 или более.пакета и обработать их?
.createMessageStreams
возвращает карту темы и список пары KafkaStream.(topic,list#stream)
Каждый поток поддерживает итератор для сообщений или пары метаданных для темы.Он читает данные последовательно только внутри раздела.Если у вас больше разделов, чем количество потоковых потоков, один поток может читать из нескольких разделов.Но только внутри разделов порядок последовательности гарантирован.
for (final KafkaStream<byte[], byte[]> stream : streamList)
{
ConsumerIterator<byte[], byte[]> it= stream.iterator();
while (it.hasNext())
{
String message = new String(it.next().message());
System.out.println(message);
}
}
}
Эквивалентная функциональность в v # 0.11 и далее - это метод .poll()
.Вы можете установить max.poll.records
или max.poll.interval.ms
, чтобы установить количество записей на запрос опроса и длительность интервала соответственно.
Вы можете найти нового потребителя здесь: https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html