Предположим, что у меня задание таймера выполняется бесконечно долго, которое перебирает все группы потребителей в кластере kafka и выдает задержку, зафиксированное смещение и конечное смещение для всех разделов для каждой группы.Аналогично тому, как работает консольный сценарий группы потребителей Kafka, за исключением того, что он предназначен для всех групп.
Что-то вроде
Одиночный потребитель - не работает - не возвращает смещения для некоторых из предоставленных разделов темы (например, 10 предусмотрено - возвращено 5 смещений)
Consumer consumer;
static {
consumer = createConsumer();
}
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
}
}
Несколько потребителей - работает
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
}
}
Версии: Kafka-Client 2.0.0
Я неправильно использую API-интерфейс потребителя?В идеале я хотел бы использовать одного потребителя.
Дайте мне знать, если вам нужно больше деталей.