pyspark - находит максимальные и минимальные значения в json-поточных данных с использованием createDataFrame - PullRequest
0 голосов
/ 20 ноября 2018

У меня есть набор json-сообщений от Kafka, каждый из которых описывает пользователя веб-сайта.Используя pyspark, мне нужно подсчитать количество пользователей в каждой стране на окно потоковой передачи и вернуть страны с максимальным и минимальным количеством пользователей.

Вот пример потоковых сообщений json:

{"id":1,"first_name":"Barthel","last_name":"Kittel","email":"bkittel0@printfriendly.com","gender":"Male","ip_address":"130.187.82.195","date":"06/05/2018","country":"France"}

Вот мой код:

from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark import SparkContext
from pyspark.sql import SQLContext

fields = ['id', 'first_name', 'last_name', 'email', 'gender', 'ip_address', 'date', 'country']
schema =  StructType([
  StructField(field, StringType(), True) for field in fields
])

def parse(s, fields):
    try:
        d = json.loads(s[0])
        return [tuple(d.get(field) for field in fields)]
    except:
        return []

array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)

rdd = sc.parallelize(array_of_users)

# group by country and then substitute the list of messages for each country by its length, resulting into a rdd of (country, length) tuples
country_count = rdd.groupBy(lambda user: user['country']).mapValues(len)

# identify the min and max using as comparison key the second element of the (country, length) tuple
country_min = country_count.min(key = lambda grp: grp[1])
country_max = country_count.max(key = lambda grp: grp[1])

Когда я его запускаю, я получаю сообщение

AttributeError                            Traceback (most recent call last)
<ipython-input-24-6e6b83935bc3> in <module>()
     16         return []
     17 
---> 18 array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)
     19 
     20 rdd = sc.parallelize(array_of_users)

AttributeError: 'TransformedDStream' object has no attribute 'SQLContext'

Как я могу это исправить?

1 Ответ

0 голосов
/ 20 ноября 2018

Если я правильно понял, вам нужно сгруппировать список сообщений по странам, затем подсчитать количество сообщений в каждой группе и затем выбрать группы с минимальным и максимальным количеством сообщений.

Измоя голова, код будет что-то вроде:

# assuming the array_of_users is your array of messages
rdd = sc.parallelize(array_of_users)

# group by country and then substitute the list of messages for each country by its length, resulting into a rdd of (country, length) tuples
country_count = rdd.groupBy(lambda user: user['country']).mapValues(len)

# identify the min and max using as comparison key the second element of the (country, length) tuple
country_min = country_count.min(key = lambda grp: grp[1])
country_max = country_count.max(key = lambda grp: grp[1])
...