Как работает Consumer.endOffsets в Кафке? - PullRequest
0 голосов
/ 18 октября 2018

Предположим, что у меня задание таймера выполняется бесконечно долго, которое перебирает все группы потребителей в кластере kafka и выдает задержку, зафиксированное смещение и конечное смещение для всех разделов для каждой группы.Аналогично тому, как работает консольный сценарий группы потребителей Kafka, за исключением того, что он предназначен для всех групп.

Что-то вроде

Одиночный потребитель - не работает - не возвращает смещения для некоторых из предоставленных разделов темы (например, 10 предусмотрено - возвращено 5 смещений)

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}

Несколько потребителей - работает

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }

Версии: Kafka-Client 2.0.0

Я неправильно использую API-интерфейс потребителя?В идеале я хотел бы использовать одного потребителя.

Дайте мне знать, если вам нужно больше деталей.

Ответы [ 2 ]

0 голосов
/ 23 октября 2018

Это ошибка в Fetcher.fetchOffsetsByTimes(), в частности, в методе groupListOffsetRequests, в котором логика не добавляла разделы для повторной попытки, когда лидер для запроса смещения для раздела был неизвестен или недоступен.

Это было более заметнокогда вы используете одного потребителя во всех разделах групп потребителей, где в некоторых группах уже есть информация о лидере раздела тем, когда мы запросили endoffsets, а для разделов тем, где нет информации о лидере, неизвестные или недоступные из-за ошибки исключаются.

Позже я понял, что было бы неуместно извлекать разделы тем из каждой группы потребителей, вместо этого сделал изменение, чтобы читать разделы тем из AdminClient.listTopics & AdminClient.describeTopics и сразу передать все в Consumer.endOffsets.

Хотя это полностью не решает проблему, поскольку темы / разделы могут все еще быть недоступными или неизвестными между несколькими запусками.

Можно найти дополнительную информацию - KAFKA-7044 & pull request.Это было исправлено и запланировано на выпуск 2.1.0.

0 голосов
/ 18 октября 2018

Я думаю, ты почти у цели.Сначала соберите все интересующие вас разделы темы, а затем , затем введите команду consumer.endOffsets.

Помните, что я не пытался запуститьэто, но что-то вроде этого должно работать:

run() { 
   Consumer consumer = createConsumer();
   List<String> groupIds = getConsumerGroups();
   List<TopicPartition> topicPartitions = new ArrayList<>();

   for (String groupId: groupIds) {
        topicPartitions.addAll(getTopicPartitions(groupId));
   }

   consumer.endOffsets(topicPartitions); 
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...