Анализ сообщений от потребителя Kafka - PullRequest
0 голосов
/ 20 ноября 2018

Я настроил систему-потребителя Kafka, и мне нужно обрабатывать передаваемые сообщения.Это строки из файла JSON, такие как

ConsumerRecord(topic=u'json_data103052', partition=0, offset=676, timestamp=1542710197257, timestamp_type=0, key=None, value='{"Name": "Simone", "Surname": "Zimbolli", "gender": "Other", "email": "zzz@uiuc.edu", "country": "Nigeria", "date": "11/07/2018"}', checksum=354265828, serialized_key_size=-1, serialized_value_size=189)

Я ищу простое в реализации решение для

  • Определение окна потоковой передачи
  • Анализ сообщений вокно (количество уникальных пользователей и тому подобное)

У кого-нибудь есть предложения, как поступить?Благодарю.

У меня проблемы с использованием Spark, поэтому я бы предпочел избегать его.Я пишу сценарии на Python, используя Jupyter.

Вот мой код:

from kafka import KafkaConsumer
from random import randint
from time import sleep

bootstrap_servers = ['localhost:9092']

%store -r topicName    # Get the topic name from the kafka producer
print topicName

consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers,
                         auto_offset_reset='earliest'
                        )
consumer.subscribe([topicName])

for message in consumer:
    print (message)

Ответы [ 2 ]

0 голосов
/ 20 ноября 2018

Для вашего сценария Kafka Streams кажется подходящим.Он поддерживает windowing со следующими 4 типами:

Tumbling time window - Time-based   Fixed-size, non-overlapping, gap-less windows
Hopping time window- Time-based Fixed-size, overlapping windows
Sliding time window- Time-based Fixed-size, overlapping windows that work on differences between record timestamps
Session window

Для python есть библиотека: https://github.com/wintoncode/winton-kafka-streams

, которая может быть полезна для вас.

0 голосов
/ 20 ноября 2018

Использование Kafka Streams API - это то, что вам нужно, я думаю.У вас есть все функции, необходимые для работы с окнами.Вы можете найти больше информации о Kafka Streams здесь:

https://kafka.apache.org/documentation/streams/

...