Я использую Java 11 и kafka-client 2.0.0.
Я использую следующий код для создания потребителя:
public Consumer createConsumer(Properties properties,String regex) {
log.info("Creating consumer and listener..");
Consumer consumer = new KafkaConsumer<>(properties);
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("The following partitions were revoked from consumer : {}", Arrays.toString(partitions.toArray()));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("The following partitions were assigned to consumer : {}", Arrays.toString(partitions.toArray()));
}
};
consumer.subscribe(Pattern.compile(regex), listener);
log.info("consumer subscribed");
return consumer;
}
}
Мой опрос l oop находится в другом месте кода:
public <K, V> void startWorking(Consumer<K, V> consumer) {
try {
while (true) {
ConsumerRecords<K, V> records = consumer.poll(600);
if (records.count() > 0) {
log.info("Polled {} records", records.count());
} else {
log.info("polled 0 records.. going to sleep..");
Thread.sleep(200);
}
}
} catch (WakeupException | InterruptedException e) {
log.error("Consumer is shutting down", e);
} finally {
consumer.close();
}
}
Когда я запускаю код и использую эту функцию, создается потребитель, и журнал содержит следующие сообщения:
Creating consumer and listener..
consumer subscribed
polled 0 records.. going to sleep..
polled 0 records.. going to sleep..
polled 0 records.. going to sleep..
Журнал не содержит информации о назначении / отзыве раздела.
Кроме того, я могу видеть в журнале свойства, которые использует потребитель (установлен group.id):
2020-07-09 14:31:07.959 DEBUG 7342 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [server1:9092]
check.crcs = true
client.id =
group.id=mygroupid
key.deserializer=..
value.deserializer=..
Итак, я попытался использовать kafka-console-consumer с той же конфигурацией, чтобы использовать одну из тем, которые регулярное выражение (mytopi c. *) Должно улавливать (в этом случае я использовал topi c mytopi c -1):
/usr/bin/kafka-console-consumer.sh --bootstrap-server server1:9092 --topic mytopic-1 --property print.timestamp=true --consumer.config /data/scripts/kafka-consumer.properties --from-begining
У меня есть опрос l oop в другой части моего кода, который отключается каждые 10 м. Итак, нижняя строка - проблема в эти разделы не назначены потребителю Java. Отпечатков внутри слушателя никогда не происходит, и у потребителя нет разделов для прослушивания.