Kafka AvroConsumer использует временную метку с помощью offsets_for_times - PullRequest
0 голосов
/ 30 ноября 2018

Попытка использовать confluent_kafka.AvroConsumer для получения сообщений с заданной отметкой времени.

if flag:

    # creating a list
    topic_partitons_to_search = list(
        map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))

    print("Searching for offsets with %s" % topic_partitons_to_search)
    offsets = c.offsets_for_times(topic_partitons_to_search, timeout=1.0)
    print("offsets_for_times results: %s" % offsets)

    for x in offsets:
        c.seek(x)
    flag=False 

консоль возвращает это

Searching for offsets with [TopicPartition{topic=my_topic2,partition=0,offset=1543584425,error=None}]
offsets_for_times results: [TopicPartition{topic=my_topic2,partition=0,offset=0,error=None}]
{'name': 'Hello'}
{'name': 'Hello'}
{'name': 'Hello1'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Offset 8'}
{'name': 'Offset 9'}
{'name': 'Offset 10'}
{'name': 'Offset 11'}
{'name': 'New'} 

Это все мои сообщения в разделе 0 my_topic2 (ничего не иметь в разделе 1), мы ничего не должны возвращать, потому что у нас нет сообщений, созданных за текущее время (time.time ()).Затем я хотел бы иметь возможность использовать что-то вроде time.time() - 60000 для получения всех сообщений за последние 60000 миллисекунд

1 Ответ

0 голосов
/ 30 ноября 2018

Pythons time.time () возвращает количество секунд с начала эпохи, offsets_for_times использует количество миллисекунд с начала эпохи, поэтому, когда я отправлял количество секунд, он вычислял дату намного раньше, чем сегодня, что означало, что мыдолжен включать все мои смещения.

...