У меня есть набор json-сообщений от Kafka, каждый из которых описывает пользователя веб-сайта.Используя pyspark, мне нужно подсчитать количество пользователей в каждой стране на окно потоковой передачи и вернуть страны с максимальным и минимальным количеством пользователей.
Вот пример потоковых сообщений json:
{"id":1,"first_name":"Barthel","last_name":"Kittel","email":"bkittel0@printfriendly.com","gender":"Male","ip_address":"130.187.82.195","date":"06/05/2018","country":"France"}
Вот мой код:
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark import SparkContext
from pyspark.sql import SQLContext
fields = ['id', 'first_name', 'last_name', 'email', 'gender', 'ip_address', 'date', 'country']
schema = StructType([
StructField(field, StringType(), True) for field in fields
])
def parse(s, fields):
try:
d = json.loads(s[0])
return [tuple(d.get(field) for field in fields)]
except:
return []
array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)
rdd = sc.parallelize(array_of_users)
# group by country and then substitute the list of messages for each country by its length, resulting into a rdd of (country, length) tuples
country_count = rdd.groupBy(lambda user: user['country']).mapValues(len)
# identify the min and max using as comparison key the second element of the (country, length) tuple
country_min = country_count.min(key = lambda grp: grp[1])
country_max = country_count.max(key = lambda grp: grp[1])
Когда я его запускаю, я получаю сообщение
AttributeError Traceback (most recent call last)
<ipython-input-24-6e6b83935bc3> in <module>()
16 return []
17
---> 18 array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)
19
20 rdd = sc.parallelize(array_of_users)
AttributeError: 'TransformedDStream' object has no attribute 'SQLContext'
Как я могу это исправить?