Кафка: как использовать данные на основе метки времени - PullRequest
0 голосов
/ 18 мая 2018

Я хочу знать, есть ли другой способ, кроме смещения, для извлечения данных относительно временного интервала?Скажем, я хочу использовать все вчерашние даты, как мне это сделать?

Ответы [ 2 ]

0 голосов
/ 09 января 2019

Используйте offsetsForTimes , чтобы получить правое смещение, связанное с требуемой отметкой времени.В Python это будет выглядеть следующим образом:

from datetime import datetime
from kafka import KafkaConsumer, TopicPartition

topic  = "www.kilskil.com" 
broker = "localhost:9092"

# lets check messages of the first day in New Year
date_in  = datetime(2019,1,1)
date_out = datetime(2019,1,2)

consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll()  # we need to read message or call dumb poll before seeking the right position

tp      = TopicPartition(topic, 0) # partition n. 0
# in simple case without any special kafka configuration there is only one partition for each topic channel
# and it's number is 0

# in fact you asked about how to use 2 methods: offsets_for_times() and seek()
rec_in  = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})

consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!

c = 0
for msg in consumer:
  if msg.offset >= rec_out[tp].offset:
    break

  c += 1
  # message also has .timestamp field

print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))

Не забывайте, что Kafka измеряет метку времени в миллисекундах и имеет тип long .Python lib datetime возвращает метки времени в секундах, поэтому нам нужно умножить его на 1000. Метод offsets_for_times возвращает dict с TopicPartition ключами и OffsetAndTimestamp значениями.

0 голосов
/ 18 мая 2018

Вы можете найти самое раннее смещение для начала указанного временного интервала и перемотать назад к этому смещению.Однако трудно понять, где находится конец интервала, поскольку записи с самыми ранними временными метками могут поступить позже.Таким образом, вы можете использовать записи с начала интервала, пока не найдете записи с метками времени позже, чем endTime плюс еще несколько записей, чтобы перехватить поздние сообщения.

Код для перемоткидля началаВремя это:

public void rewind(DateTime time) {
    Set<TopicPartition> assignments = consumer.assignment();
    Map<TopicPartition, Long> query = new HashMap<>();
    for (TopicPartition topicPartition : assignments) {
        query.put(topicPartition, time.getMillis());
    }
    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);

    result.entrySet().stream().forEach(entry -> consumer.seek(entry.getKey(),
            Optional.ofNullable(entry.getValue()).map(OffsetAndTimestamp::offset).orElse(new Long(0))));
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...