Apache Kafka получит список потребителей по конкретной теме - PullRequest
1 голос
/ 01 мая 2019

Как видно из названия, есть ли способ получить список потребителей по определенной теме в Java?До сих пор я не мог получить список тем, подобных этой

    final ListTopicsResult listTopicsResult = adminClient.listTopics();
    KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
    Set<String> map = kafkaFuture.get();

, но я не нашел способа получить список потребителей по каждой теме

Ответы [ 2 ]

3 голосов
/ 01 мая 2019

Недавно я решал ту же проблему для моего клиента kafka.Это не просто, но единственный способ, который я нашел из кода, заключается в следующем:

Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);

//Here you get all the consumer groups
List<String> groupIds = kafkaClient.listConsumerGroups().all().get().
                       stream().map(s -> s.groupId()).collect(Collectors.toList()); 

//Here you get all the descriptions for the groups
Map<String, ConsumerGroupDescription> groups = kafkaClient.
                                               describeConsumerGroups(groupIds).all().get();
for (final String groupId : groupIds) {
    ConsumerGroupDescription descr = groups.get(groupId);
    //find if any description is connected to the topic with topicName
    Optional<TopicPartition> tp = descr.members().stream().
                                  map(s -> s.assignment().topicPartitions()).
                                  flatMap(coll -> coll.stream()).
                                  filter(s -> s.topic().equals(topicName)).findAny();
            if (tp.isPresent()) {
                //you found the consumer, so collect the group id somewhere
            }
} 

Этот API доступен с версии 2.0.Возможно, есть лучший способ, но я не смог его найти.Вы также можете найти код на моем битбакете

2 голосов
/ 01 мая 2019

Потребители не привязаны к теме. Они привязаны к группам потребителей

Чтобы получить всех потребителей темы, вы должны сначала перечислить все группы, а затем отфильтровать вашу тему в каждой группе

Что начинается с AdminClient.listConsumerGroups, за которым следует AdminClient.describeConsumerGroups

Это дает вам список описаний, которые содержат «участников», где вы можете найти темы

https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/admin/ConsumerGroupDescription.html

Примечание: есть внешние инструменты, которые делают это намного проще, такие как Hortonworks SMM https://docs.hortonworks.com/HDPDocuments/SMM/SMM-1.2.0/monitoring-kafka-clusters/content/smm-monitoring-consumers.html

...