У меня есть сценарий использования, в котором я хочу назначить потребителя разделу, и у меня нет другого потребителя, потребляющего этот раздел, я знаю, что это звучит легко при использовании группы потребителей и метода подписки, но в конечном итоге я хочу получить что никакой перебалансировки не произойдет, если один из потребителей выйдет из строя. Другая проблема заключается в том, что эти потребители будут работать на нескольких машинах.
Позвольте мне привести полный сценарий:
У меня есть тема T
, а T
имеет 4 раздела. Теперь есть две машины M1
и M2
, каждая из которых работает с двумя потоками, которые используют 4 раздела, теперь я могу назначить каждый раздел с помощью функции assign
в KAFKA.
Задачи
Когда потребители воспитаны, я не хочу жестко кодировать назначение раздела в коде, вместо этого я использую KafkaAdminClient
, чтобы найти информацию о теме и группе потребителей, к которой принадлежат эти потребители, и программно узнать, какой раздел назначено, а что нет, это очень сложно в режиме с несколькими компьютерами, потому что приложение, работающее на обоих компьютерах, запрашивает одну и ту же группу потребителей и получает одну и ту же информацию, теперь, скажем, машина M1
назначает раздел 0 потоку 0 и этим машина времени M2
также назначает раздел 0 потоку 0. У меня проблема: оба потока потребляют данные из одного раздела. Это сценарий, который я не могу рассмотреть. Ниже приведен код KafkaAdminClient
, который я использую для сбора информации о группе потребителей и теме. Любые идеи высоко ценятся.
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class KafkaConsumerAdmin {
private final AdminClient adminClient;
public KafkaConsumerAdmin(Properties adminProperties) {
this.adminClient = AdminClient.create(adminProperties);
}
public List<TopicPartitionInfo> getTopicPartitionInfo(String topicName)
throws ExecutionException, InterruptedException {
DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(
Collections.singletonList(topicName)
);
KafkaFuture<Map<String, TopicDescription>> futureOfDescribeTopicResult = describeTopicsResult.all();
Map<String, TopicDescription> topicDescriptionMap = futureOfDescribeTopicResult.get();
TopicDescription topicDescription = topicDescriptionMap.get(topicName);
return topicDescription.partitions();
}
public ConsumerGroupDescription getConsumerGroupInfo(String consumerGroupID)
throws ExecutionException, InterruptedException {
DescribeConsumerGroupsResult describeConsumerGroupsResult =
this.adminClient.describeConsumerGroups(Collections.singletonList(consumerGroupID));
KafkaFuture<Map<String, ConsumerGroupDescription>> futureOfDescribeConsumerGroupsResult =
describeConsumerGroupsResult.all();
Map<String, ConsumerGroupDescription> consumerGroupDescriptionMap = futureOfDescribeConsumerGroupsResult.get();
return consumerGroupDescriptionMap.get(consumerGroupID);
}
public Map<String, Set<TopicPartition>> getConsumerGroupMemberInfo(ConsumerGroupDescription consumerGroupDescription) {
Map<String, Set<TopicPartition>> memberToTopicPartitionMap = new HashMap<>();
for (MemberDescription memberDescription : consumerGroupDescription.members()) {
MemberAssignment memberAssignment = memberDescription.assignment();
Set<TopicPartition> topicPartitions = memberAssignment.topicPartitions();
memberToTopicPartitionMap.put(memberDescription.consumerId(), topicPartitions);
}
return memberToTopicPartitionMap;
}
public List<Integer> getAvailablePartitions(List<TopicPartitionInfo> topicPartitionInfoList,
Map<String, Set<TopicPartition>> memberToTopicPartitionMap) {
Map<Integer, List<String>> partitionToMemberMap = new HashMap<>();
topicPartitionInfoList.forEach(x -> partitionToMemberMap.put(x.partition(), new LinkedList<>()));
for (Map.Entry<String, Set<TopicPartition>> entry : memberToTopicPartitionMap.entrySet()) {
String memberID = entry.getKey();
for (TopicPartition topicPartition : entry.getValue()) {
partitionToMemberMap.get(topicPartition.partition()).add(memberID);
}
}
return partitionToMemberMap.entrySet().stream().
filter(x -> x.getValue().isEmpty()).map(Map.Entry::getKey).collect(Collectors.toList());
}
Map<TopicPartition, Long> getConsumerGroupOffsetInfo(String consumerGroupID)
throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult =
this.adminClient.listConsumerGroupOffsets(consumerGroupID);
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> futureOfConsumerGroupOffsetResult =
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata();
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsetInfo = futureOfConsumerGroupOffsetResult.get();
return consumerGroupOffsetInfo.entrySet().stream().
collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().offset()));
}
}