Как получить данные смещения кафки, указанные на временной отметке - PullRequest
1 голос
/ 25 июня 2019

Я пытался получить смещение из темы Kafka, основываясь на метке времени, когда я пытался запустить его, выдавая ошибку нулевого указателя,

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
              for (TopicPartition partition : partitions) {
                timestampsToSearch.put(partition,  startTimestamp);
              }
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
              for (TopicPartition partition : partitions) {
                Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);

Любая помощь будет оценена.

1 Ответ

1 голос
/ 25 июня 2019

Чтобы найти смещения, соответствующие отметке времени, вам необходимо использовать метод offsetsForTimes().

Например, при этом будут смещены смещения для раздела 0 из mytopicкоторые соответствуют 1 секунде назад:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    Map<TopicPartition, Long> timestamps = new HashMap<>();
    timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
    Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
    System.err.println(offsets);
}

Это будет отображать что-то вроде:

{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}
...