Я настроил систему-потребителя Kafka, и мне нужно обрабатывать передаваемые сообщения.Это строки из файла JSON, такие как
ConsumerRecord(topic=u'json_data103052', partition=0, offset=676, timestamp=1542710197257, timestamp_type=0, key=None, value='{"Name": "Simone", "Surname": "Zimbolli", "gender": "Other", "email": "zzz@uiuc.edu", "country": "Nigeria", "date": "11/07/2018"}', checksum=354265828, serialized_key_size=-1, serialized_value_size=189)
Я ищу простое в реализации решение для
- Определение окна потоковой передачи
- Анализ сообщений вокно (количество уникальных пользователей и тому подобное)
У кого-нибудь есть предложения, как поступить?Благодарю.
У меня проблемы с использованием Spark, поэтому я бы предпочел избегать его.Я пишу сценарии на Python, используя Jupyter.
Вот мой код:
from kafka import KafkaConsumer
from random import randint
from time import sleep
bootstrap_servers = ['localhost:9092']
%store -r topicName # Get the topic name from the kafka producer
print topicName
consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers,
auto_offset_reset='earliest'
)
consumer.subscribe([topicName])
for message in consumer:
print (message)