получить смещение в теме кафки на основе метки времени - PullRequest
0 голосов
/ 27 июня 2019

Я пытался получить смещение, основанное на временной шкале, но когда я запускал свой код, он выдавал ошибку нулевого указателя здесь "Long seekOffset = outOffsets.get (partition) .offset ();"потому что в это конкретное время нет смещения Мой вопрос заключается в том, как получить ближайшее смещение на конкретном таймсампе

Ниже кода, который я пробовал,


Long startTimestamp=Instant.now().minus (10, ChronoUnit.MINUTES ).toEpochMilli();

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
              for (TopicPartition partition : partitions) {
                timestampsToSearch.put(partition,  startTimestamp);
              }
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
              for (TopicPartition partition : partitions) {
                Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset); 

Любая помощь будет признательна!Заранее спасибо

1 Ответ

0 голосов
/ 27 июня 2019

Метод offsetsForTimes возвращает смещение первого сообщения с отметкой времени, большей или равной целевой отметке времени. Если оно пустое, то такого сообщения нет. В этом случае просто поместите потребителя до конца.

for (TopicPartition partition : partitions) {
          OffsetAndTimestamp seekOffset = outOffsets.get(partition);
          if(seekOffset!=null){
             consumer.seek(partition, seekOffset.offset()); 
          }else{
             consumer.seekToEnd(Collections.singleton(partition));
          }
}
...