Обеспечение регистрации потребителей Kafka в неназначенном разделе, где потребители работают на нескольких машинах - PullRequest
0 голосов
/ 02 ноября 2018

У меня есть сценарий использования, в котором я хочу назначить потребителя разделу, и у меня нет другого потребителя, потребляющего этот раздел, я знаю, что это звучит легко при использовании группы потребителей и метода подписки, но в конечном итоге я хочу получить что никакой перебалансировки не произойдет, если один из потребителей выйдет из строя. Другая проблема заключается в том, что эти потребители будут работать на нескольких машинах.

Позвольте мне привести полный сценарий:

У меня есть тема T, а T имеет 4 раздела. Теперь есть две машины M1 и M2, каждая из которых работает с двумя потоками, которые используют 4 раздела, теперь я могу назначить каждый раздел с помощью функции assign в KAFKA.

Задачи Когда потребители воспитаны, я не хочу жестко кодировать назначение раздела в коде, вместо этого я использую KafkaAdminClient, чтобы найти информацию о теме и группе потребителей, к которой принадлежат эти потребители, и программно узнать, какой раздел назначено, а что нет, это очень сложно в режиме с несколькими компьютерами, потому что приложение, работающее на обоих компьютерах, запрашивает одну и ту же группу потребителей и получает одну и ту же информацию, теперь, скажем, машина M1 назначает раздел 0 потоку 0 и этим машина времени M2 также назначает раздел 0 потоку 0. У меня проблема: оба потока потребляют данные из одного раздела. Это сценарий, который я не могу рассмотреть. Ниже приведен код KafkaAdminClient, который я использую для сбора информации о группе потребителей и теме. Любые идеи высоко ценятся.

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class KafkaConsumerAdmin {
    private final AdminClient adminClient;

    public KafkaConsumerAdmin(Properties adminProperties) {
        this.adminClient = AdminClient.create(adminProperties);
    }

    public List<TopicPartitionInfo> getTopicPartitionInfo(String topicName)
            throws ExecutionException, InterruptedException {
        DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(
                Collections.singletonList(topicName)
        );
        KafkaFuture<Map<String, TopicDescription>> futureOfDescribeTopicResult = describeTopicsResult.all();
        Map<String, TopicDescription> topicDescriptionMap = futureOfDescribeTopicResult.get();
        TopicDescription topicDescription = topicDescriptionMap.get(topicName);
        return topicDescription.partitions();
    }

    public ConsumerGroupDescription getConsumerGroupInfo(String consumerGroupID)
            throws ExecutionException, InterruptedException {
        DescribeConsumerGroupsResult describeConsumerGroupsResult =
                this.adminClient.describeConsumerGroups(Collections.singletonList(consumerGroupID));
        KafkaFuture<Map<String, ConsumerGroupDescription>> futureOfDescribeConsumerGroupsResult =
                describeConsumerGroupsResult.all();
        Map<String, ConsumerGroupDescription> consumerGroupDescriptionMap = futureOfDescribeConsumerGroupsResult.get();
        return consumerGroupDescriptionMap.get(consumerGroupID);
    }

    public Map<String, Set<TopicPartition>> getConsumerGroupMemberInfo(ConsumerGroupDescription consumerGroupDescription) {
        Map<String, Set<TopicPartition>> memberToTopicPartitionMap = new HashMap<>();
        for (MemberDescription memberDescription : consumerGroupDescription.members()) {
            MemberAssignment memberAssignment = memberDescription.assignment();
            Set<TopicPartition> topicPartitions = memberAssignment.topicPartitions();
            memberToTopicPartitionMap.put(memberDescription.consumerId(), topicPartitions);
        }
        return memberToTopicPartitionMap;
    }

    public List<Integer> getAvailablePartitions(List<TopicPartitionInfo> topicPartitionInfoList,
                                                Map<String, Set<TopicPartition>> memberToTopicPartitionMap) {
        Map<Integer, List<String>> partitionToMemberMap = new HashMap<>();
        topicPartitionInfoList.forEach(x -> partitionToMemberMap.put(x.partition(), new LinkedList<>()));
        for (Map.Entry<String, Set<TopicPartition>> entry : memberToTopicPartitionMap.entrySet()) {
            String memberID = entry.getKey();
            for (TopicPartition topicPartition : entry.getValue()) {
                partitionToMemberMap.get(topicPartition.partition()).add(memberID);
            }
        }
        return partitionToMemberMap.entrySet().stream().
                filter(x -> x.getValue().isEmpty()).map(Map.Entry::getKey).collect(Collectors.toList());
    }

    Map<TopicPartition, Long> getConsumerGroupOffsetInfo(String consumerGroupID)
            throws ExecutionException, InterruptedException {
        ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult =
                this.adminClient.listConsumerGroupOffsets(consumerGroupID);
        KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> futureOfConsumerGroupOffsetResult =
                listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata();
        Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsetInfo = futureOfConsumerGroupOffsetResult.get();
        return consumerGroupOffsetInfo.entrySet().stream().
                collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().offset()));
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...