Лучшие практики для отслеживания значений для каждого раздела - PullRequest
2 голосов
/ 07 октября 2019

Я пытаюсь прочитать запрошенное количество сообщений kafka в обратном порядке. Поскольку существует несколько разделов, мне нужно отслеживать все считанные смещения, чтобы я мог остановиться, когда все будет сделано. Есть два шага, первый шаг, чтобы получить начальное смещение (конечное смещение - запрошенный номер сообщений) и конечное смещение. (смещения не непрерывны, есть пробелы). Второй шаг заключается в опросе сообщений и подсчете сообщений в каждом разделе, и если мы не встретим запрошенное количество сообщений, повторите первый и второй шаг снова, пока не встретим количество сообщений для каждого раздела.

ПервыйШаг:

В настоящее время у меня есть Map<Integer,OffsetValue>, где ключ - это номер раздела, а значение - это объект Java (OffsetValue) - startOffset, endOffset. Я бы искал раздел, чтобы начать потреблять с начала смещения.

Второй шаг:

Теперь возникает проблема, когда я начинаю опрашивать запрошенные сообщения. Допустим, пользователь хочет 10 сообщений для каждого раздела. У меня есть хэш-карта для отслеживания сообщений, прочитанных каждым разделом.

Условия

Первоначальный опрос может не вернуть никаких записей, поэтому продолжайте опрос. Прекратите опрос, когда вы достигли конечного смещения для каждого раздела, или опрос не даст результатов. Проверьте каждый раздел на наличие сообщений, прочитанных так же, как и запрошенные сообщенияЕсли да, отметьте как выполненное, если нет, отметьте как продолжить и повторите шаги

Вопрос:

Как бы я улучшил приведенную ниже логику, чтобы знать, что все сообщения были прочитаны для каждого раздела и разбитыпетли? Сообщения в каждом разделе будут приходить по порядку, если это будет полезно. Мы знаем верхний предел для сообщений в каждом разделе (в нашем случае это 10). Поэтому мне потребуется некоторая логика для использования offsetMap, чтобы узнать, что я достиг конечного смещения, отследить и повторить это для всех разделов и выйти из цикла.

 List<KafkaRecord> messages = new ArrayList<>();
 Map<Integer,Integer> messagesReadMap= new HashMap<>();
 Map<Integer,OffsetValue> offsetMap = getOffsetMap();
 while(true){
    List<KafkaRecord> kafkaRecords = pollKafka();// will poll from all partitions from the set start offset
    for(KafkaRecord kafkaRecord: kafkaRecords) {// read the messages and process
      OffsetValue offset = offsetMap.get(kafkaRecord.partition);
      if(kafkaRecord.offset < offset.endOffset) {// this is needed to make sure we are not processing messages past the set end offset
       //Add code here to track the messages read in each partition. Once we have have 10 messages for each partition or reached the end offset for that partition we can exit the loop and return the messages.
         messagesReadMap.merge(kafkaRecord.partition,1,(a,b)->a+b);
         messages.add(kafkaRecord);
      }
 }

Пожалуйста, спросите меня, нужна ли вам дополнительная информация. Дайте мне знать, если есть лучший способ решить проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...