Подсчет количества пользователей на окно с использованием PySpark - PullRequest
0 голосов
/ 20 ноября 2018

Я использую Kafka для потоковой передачи файла JSON, отправляя каждую строку в виде сообщения.Один из ключей - это email.

пользователя. Затем я использую PySpark для подсчета количества уникальных пользователей в каждом окне, используя их электронную почту для их идентификации.Команда

def print_users_count(count):
    print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())

дает мне ошибку ниже.Как я могу это исправить?

AttributeError                            Traceback (most recent call last)
<ipython-input-19-311ba744b41f> in <module>()
      2     print 'The number of unique users is:', count
      3 
----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'

Вот мой код PySpark:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:
    sc.stop()
except:
    pass  

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

# Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

# Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

# Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()

1 Ответ

0 голосов
/ 20 ноября 2018

Вы не применяете лямбда-функцию ни к чему.Что такое message ссылка?Право, не лямбда-функция, это просто функция.Вот почему вы получаете AttributeError: 'function' object has no attribute 'distinct'.Он не применяется к каким-либо данным, поэтому он не возвращает никаких данных.Вам нужно сослаться на фрейм данных, в котором находится ключ email.

См. Документы pyspark для pyspark.sql.functions.countDistinct(col, *cols) и pyspark.sql.functions.approx_count_distinct документы pyspark .Это должно быть более простым решением для получения уникального количества.

...