Я новичок в kafka, у меня есть следующий пример кода:
KafkaConsumer<String,String> kc = new KafkaConsumer<String, String>(props);
while(true) {
List<String> topicNames = Arrays.asList(topics.split(","));
if (!kc.assignment().isEmpty()) {
kc.unsubscribe();
}
kc.subscribe(topicNames);
ConsumerRecords<String, String> recv = kc.poll(1000L);
if (!recv.isEmpty()) {
System.out.println("NOT EMPTY");
}
}
Recv всегда пуст, но если я пытаюсь увеличить тайм-аут пула, записи возвращаются, даже если я отключаю часть отказа от подписки.
Я взял этот фрагмент кода у проприетарного программного обеспечения и не могу его изменить.
Итак, мой вопрос: это только проблема синхронизации или есть еще?