Недавно мы начали обновление стека Kafka с версии 0.8.2 до 2.2.0. Ранее мы также использовали Apache Kafka, но теперь мы пытаемся перейти на Confluent Platform. После обновления нам не нужно было ничего менять в коде (мы использовали kafka_2.9.2 версии 0.8.1.1 и kafka-клиенты версии 0.8.1.1).
Весь код для записи в Kafka с использованием KafkaProducer работал нормально, кроме KafkaStream. Кажется, что KafkaStreams блокируется навсегда, когда я пытаюсь использовать его для чтения данных из тем.
Как только я создаю ConsumerIterator из KafkaStreams, он просто блокируется до истечения времени ожидания. Куда я иду не так?
Я попытался установить для свойства auto.offset.reset значение «наименьшее» и «наибольшее» и одновременно запустил KafkaProducer в другом процессе для записи сообщений на эту тему. Но KafkaStreams никогда не работал.
public static void main(String...args) {
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "test-group");
properties.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test_topic", new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get("jobs_google_api").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
List<String> messages = new ArrayList<>();
while (it.hasNext())
messages.add(new String(it.next().message()));
System.out.println(messages);
}```