Я использую storm-kafka-client 1.2.1 и создаю конфигурацию spout для KafkaTridentSpoutOpaque, как показано ниже
kafkaSpoutConfig = KafkaSpoutConfig.builder(brokerURL, kafkaTopic)
.setProp(ConsumerConfig.GROUP_ID_CONFIG,"storm-kafka-group")
.setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
.setProp(ConsumerConfig.CLIENT_ID_CONFIG,InetAddress.getLocalHost().getHostName())
Я не могу найти ни идентификатор группы, ни смещение ни в Kafka, ни в Zookeeper,Через Zookeeper я попробовал с zkCli.sh и попробовал ls /consumers
, но их не было, так как я думаю, что сама Кафка теперь поддерживает смещения, а не zookeeper.
Я тоже пытался с Kafka с помощью команды ниже
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-20130
console-consumer-82696
console-consumer-6106
console-consumer-67393
console-consumer-14333
console-consumer-21174
console-consumer-64550
Может ли кто-нибудь помочь мне, как я могу найти свое смещение, и будет ли он снова воспроизводить мои события в Kafka, если я перезапущу топологию?