Подсчитать количество записей, которые соответствуют разнесенному столбцу в pyspark.sql? - PullRequest
0 голосов
/ 21 июня 2019

У меня есть задание, использующее Spark 2.4 и часть набора данных Yelp.Часть схемы, которую мы будем использовать из бизнес-данных, приведена ниже и используется в том же самом DataFrame:

"business_id": string
"categories": comma delimited list of strings
"stars": double

Мы должны создать новый DataFrame, который группирует предприятия по категориям, со следующимистолбцы:

"category": string exploded from "categories"
"businessCount": integer; number of businesses in that category
"averageStarRating": double; average rating of businesses in the category
"minStarRating": double; lowest rating of any restaurant in that category
"maxStarRating": double; highest rating of any restaurant in that category

До сих пор мне удавалось выяснить, как использовать команду взрыва, чтобы разбить столбец «категории» на отдельные записи и показать «business_id», «category» и «stars ":

import from pyspark.sql functions as F
businessdf.select("business_id", F.explode(F.split("categories", ",")).alias("category"), "stars").show(5)

Приведенная выше команда дает мне следующее:

+--------------------+--------------+-----+
|         business_id|      category|stars|
+--------------------+--------------+-----+
|1SWheh84yJXfytovI...|          Golf|  3.0|
|1SWheh84yJXfytovI...|   Active Life|  3.0|
|QXAEGFB4oINsVuTFx...|Specialty Food|  2.5|
|QXAEGFB4oINsVuTFx...|   Restaurants|  2.5|
|QXAEGFB4oINsVuTFx...|       Dim Sum|  2.5|
+--------------------+--------------+-----+
only showing top 5 rows

Что я не могу понять, как это сделать, это использовать агрегатные функции для создания других столбцов,Мой профессор говорит, что все должно быть сделано в одном утверждении.Все мои попытки до сих пор приводили к ошибкам.

В моем задании говорится, что мне также нужно будет удалить любые пробелы в начале / конце вновь созданного столбца "категории" перед выполнением каких-либо агрегаций, но все мои попытки привели кк ошибкам.

Я чувствую, что это самое близкое, что я пришел, но понятия не имею, что делать дальше:

businessdf.select(F.explode(F.split("categories", ",")).alias("category")).groupBy("category").agg(F.count("category").alias("businessCount"), F.avg("stars").alias("averageStarRating"), F.min("stars").alias("minStarRating"), F.max("stars").alias("maxStarRating"))

Вот ошибка, которая сопровождает эту команду:

`pyspark.sql.utils.AnalysisException: "cannot resolve '`stars`' given input columns: [category];;\n'Aggregate [category#337], [category#337, count(category#337) AS businessCount#342L, avg('stars) AS averageStarRating#344, min('stars) AS minStarRating#346, max('stars) AS maxStarRating#348]\n+- Project [category#337]\n   +- Generate explode(split(categories#33, ,)), false, [category#337]\n      +- Relation[address#30,attributes#31,business_id#32,categories#33,city#34,hours#35,is_open#36L,latitude#37,lo`ngitude#38,name#39,postal_code#40,review_count#41L,stars#42,state#43] json\n"

1 Ответ

0 голосов
/ 21 июня 2019

Неважно, сообщения, должно быть, помогли мне пройти через это самостоятельно.Команда, которую я разместил выше, очень близка, но я забыл добавить столбец «звездочки» в выбранную отметку.Правильная команда здесь:

businessdf.select(F.explode(F.split("categories", ",")).alias("category"), "stars").groupBy("category").agg(F.count("category").alias("businessCount"), F.avg("stars").alias("averageStarRating"), F.min("stars").alias("minStarRating"), F.max("stars").alias("maxStarRating")).show()
...