Кафка Python отправка данных с пользовательской отметкой - PullRequest
0 голосов
/ 04 июля 2018

У меня есть данные JSON с отметкой времени, как это

a = {'key1':'value','timestamp':'123344453'}

Я использую kafka python версии 1.3.4.1

Код моего производителя:

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for a in list_of_dicts:
    producer.send('topic', a, timestamp_ms=int(a['timestamp']))

Мой потребительский код:

consumer = KafkaConsumer('topic')
for msg in consumer:

    mj = (msg.value.decode('utf8'))
    data = json.loads(mj)
    print(data)
    print(msg.timestamp)

Я хочу, чтобы данные отправлялись согласно отметке времени. Но данные сразу отправляются потребителю, и потребитель печатает все словари из списка сразу. Мои данные имеют метку времени с разрывом в 120 секунд. Поэтому, если моя временная метка, например, равна 120, то следующая временная метка будет 240. Мне нужно, чтобы потребитель потреблял таким образом, а не все сразу, или производитель отправлял по одному из двух.

Также я не могу понять использование timestamp_ms из документов. Просто начинаю с кафки, так что я понятия не имею об этом. Просьба помочь.

...