У меня есть данные JSON с отметкой времени, как это
a = {'key1':'value','timestamp':'123344453'}
Я использую kafka python версии 1.3.4.1
Код моего производителя:
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for a in list_of_dicts:
producer.send('topic', a, timestamp_ms=int(a['timestamp']))
Мой потребительский код:
consumer = KafkaConsumer('topic')
for msg in consumer:
mj = (msg.value.decode('utf8'))
data = json.loads(mj)
print(data)
print(msg.timestamp)
Я хочу, чтобы данные отправлялись согласно отметке времени. Но данные сразу отправляются потребителю, и потребитель печатает все словари из списка сразу. Мои данные имеют метку времени с разрывом в 120 секунд. Поэтому, если моя временная метка, например, равна 120, то следующая временная метка будет 240. Мне нужно, чтобы потребитель потреблял таким образом, а не все сразу, или производитель отправлял по одному из двух.
Также я не могу понять использование timestamp_ms
из документов.
Просто начинаю с кафки, так что я понятия не имею об этом. Просьба помочь.