Я использую Kafka для потоковой передачи файла JSON, отправляя каждую строку в виде сообщения.Один из ключей - это email
.
пользователя. Затем я использую PySpark для подсчета количества уникальных пользователей в каждом окне, используя их электронную почту для их идентификации.Команда
def print_users_count(count):
print 'The number of unique users is:', count
print_users_count((lambda message: message['email']).distinct().count())
дает мне ошибку ниже.Как я могу это исправить?
AttributeError Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
2 print 'The number of unique users is:', count
3
----> 4 print_users_count((lambda message: message['email']).distinct().count())
AttributeError: 'function' object has no attribute 'distinct'
Вот мой код PySpark:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
try:
sc.stop()
except:
pass
sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})
# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()