Проверка смещения темы Кафки для штормового потребителя - PullRequest
0 голосов
/ 17 мая 2018

Я использую storm-kafka-client 1.2.1 и создаю конфигурацию spout для KafkaTridentSpoutOpaque, как показано ниже

            kafkaSpoutConfig = KafkaSpoutConfig.builder(brokerURL, kafkaTopic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG,"storm-kafka-group")
                .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
                .setProp(ConsumerConfig.CLIENT_ID_CONFIG,InetAddress.getLocalHost().getHostName())

Я не могу найти ни идентификатор группы, ни смещение ни в Kafka, ни в Zookeeper,Через Zookeeper я попробовал с zkCli.sh и попробовал ls /consumers, но их не было, так как я думаю, что сама Кафка теперь поддерживает смещения, а не zookeeper.

Я тоже пытался с Kafka с помощью команды ниже

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand  --list  --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-20130
console-consumer-82696
console-consumer-6106
console-consumer-67393
console-consumer-14333
console-consumer-21174
console-consumer-64550

Может ли кто-нибудь помочь мне, как я могу найти свое смещение, и будет ли он снова воспроизводить мои события в Kafka, если я перезапущу топологию?

1 Ответ

0 голосов
/ 17 мая 2018

Trident хранит не смещения в Кафке, а в Storm's Zookeeper.Если вы работаете с настройками по умолчанию для конфигурации Storm Zookeeper, путь в Storm Zookeeper будет выглядеть примерно так: /coordinator/<your-topology-id>/meta.

Объекты под этим путем будут содержать первое и последнее смещение, а также раздел темы длякаждая партия.Так, например, /coordinator/<your-topology-id>/meta/15 будет содержать первое и последнее смещение, испускаемое в пакете с номером 15.

Независимо от того, воспроизводит ли излив смещения после перезапуска, управляется параметром FirstPollOffsetStrategy, установленным в KafkaSpoutConfig.Значение по умолчанию UNCOMMITTED_EARLIEST, которое не начинается заново при перезапуске.См. Javadoc в https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L126.

...