pyspark с использованием оконной функции - PullRequest
0 голосов
/ 26 марта 2019

У меня есть фрейм данных, который содержит строки, которые представляют собой экземпляр рейтинга конкретного фильма пользователем. Каждый фильм может быть оценен в нескольких категориях несколькими пользователями. Это результирующий кадр данных, который я создал, используя данные movie_lens.

|movie_id|year|categories|
+--------+----+----------+
|     122|1990|    Comedy|
|     122|1990|   Romance|
|     185|1990|    Action|
|     185|1990|     Crime|
|     185|1990|  Thriller|
|     231|1990|    Comedy|
|     292|1990|    Action|
|     292|1990|     Drama|
|     292|1990|    Sci-Fi|
|     292|1990|  Thriller|
|     316|1990|    Action|
|     316|1990| Adventure|
|     316|1990|    Sci-Fi|
|     329|1990|    Action|
|     329|1990| Adventure|
|     329|1990|     Drama|
.
.
.

movie_id - это уникальный идентификатор фильма, год - это год, в который пользователь оценил фильм, категория является одной из 12 категорий фильма. Частичный файл здесь

Я хочу найти фильм с наибольшим рейтингом за каждое десятилетие в каждой категории (с учетом частоты каждого фильма за каждое десятилетие в каждой категории)

что-то вроде

+-----------------------------------+
| year | category | movie_id | rank |
+-----------------------------------+
| 1990 | Comedy   | 1273     | 1    |
| 1990 | Comedy   | 6547     | 2    |
| 1990 | Comedy   | 8973     | 3    |
.
.
| 1990 | Comedy   | 7483     | 10   |
.
.
| 1990 | Drama    | 1273     | 1    |
| 1990 | Drama    | 6547     | 2    |
| 1990 | Drama    | 8973     | 3    |
.
.
| 1990 | Comedy   | 7483     | 10   |  
.
.
| 2000 | Comedy   | 1273     | 1    |
| 2000 | Comedy   | 6547     | 2    |
.
.

for every decade, top 10 movies in each category 

Я понимаю, что нужно использовать функцию окна pyspark. Это то, что я пытался

windowSpec = Window.partitionBy(res_agg['year']).orderBy(res_agg['categories'].desc())

final = res_agg.select(res_agg['year'], res_agg['movie_id'], res_agg['categories']).withColumn('rank', func.rank().over(windowSpec))

но он возвращает что-то вроде ниже:

+----+--------+------------------+----+
|year|movie_id|        categories|rank|
+----+--------+------------------+----+
|2000|    8606|(no genres listed)|   1|
|2000|    1587|            Action|   1|
|2000|    1518|            Action|   1|
|2000|    2582|            Action|   1|
|2000|    5460|            Action|   1|
|2000|   27611|            Action|   1|
|2000|   48304|            Action|   1|
|2000|   54995|            Action|   1|
|2000|    4629|            Action|   1|
|2000|   26606|            Action|   1|
|2000|   56775|            Action|   1|
|2000|   62008|            Action|   1|

Я довольно новичок в pyspark и застрял здесь. Кто-нибудь может подсказать мне, что я делаю неправильно.

1 Ответ

3 голосов
/ 26 марта 2019

Вы правы, вам нужно использовать окно, но сначала вам нужно выполнить первое агрегирование для вычисления частот.

Сначала давайте вычислим десятилетие.

df_decade = df.withColumn("decade", concat(substring(col("year"), 0, 3), lit("0")))

Затем мы вычисляем частоту по декаде, category и movie_id:

agg_df = df_decade\
      .groupBy("decade", "category", "movie_id")\
      .agg(count(col("*")).alias("freq"))

И, наконец, мы определяем окно, разделенное на декаду и категорию, и выбираем первые 10, используя функцию ранга:

w = Window.partitionBy("decade", "category").orderBy(desc("freq"))
top10 = agg_df.withColumn("r", rank().over(w)).where(col("r") <= 10)
...