PySpark: использование оконных функций для свертывания данных - PullRequest
0 голосов
/ 07 ноября 2019

У меня есть фрейм данных my_df, который содержит 4 столбца:

+----------------+---------------+--------+---------+
|         user_id|         domain|isp_flag|frequency|
+----------------+---------------+--------+---------+
|            josh|     wanadoo.fr|       1|       15|
|            josh|      random.it|       0|       12|
|        samantha|     wanadoo.fr|       1|       16|
|             bob|    eidsiva.net|       1|        5|
|             bob|      media.net|       0|        1|
|           dylan|    vodafone.it|       1|      448|
|           dylan|   somesite.net|       0|       20|
|           dylan|   yolosite.net|       0|       49|
|           dylan|      random.it|       0|        3|
|             don|    vodafone.it|       1|       39|
|             don|   popsugar.com|       0|       10|
|             don|      fabio.com|       1|       49|
+----------------+---------------+--------+---------+

Это то, что я планирую сделать -

Найти все user_id, гдемаксимальная частота domain с isp_flag=0 имеет частоту, которая составляет менее 25% от максимальной частоты domain с isp_flag=1.

Итак, в приведенном выше примере,мой output_df будет выглядеть так:

+----------------+---------------+--------+---------+
|         user_id|         domain|isp_flag|frequency|
+----------------+---------------+--------+---------+
|             bob|    eidsiva.net|       1|        5|
|             bob|      media.net|       0|        1|
|           dylan|    vodafone.it|       1|      448|
|           dylan|   yolosite.net|       0|       49|
|             don|      fabio.com|       1|       49|
|             don|   popsugar.com|       0|       10|
+----------------+---------------+--------+---------+

Я считаю, что для этого мне нужны оконные функции, и поэтому я попробовал следующее, чтобы сначала найти максимальные частотные области для isp_flag=0 и isp_flag=1 соответственно,для каждого из user_id -

>>> win_1 = Window().partitionBy("user_id", "domain", "isp_flag").orderBy((col("frequency").desc()))
>>> final_df = my_df.select("*", rank().over(win_1).alias("rank")).filter(col("rank")==1)
>>> final_df.show(5)   # this just gives me the original dataframe back

Что я здесь не так делаю? Как мне добраться до финала output_df, который я напечатал выше?

1 Ответ

1 голос
/ 08 ноября 2019

IIUC, вы можете попробовать следующее: вычислить max_frequencies (max_0, max_1) для каждого пользователя, имеющего isp_flag == 0 или 1 соответственно. а затем отфильтруйте по условию max_0 < 0.25*max_1 и плюс frequency in (max_1, max_0), чтобы выбрать только записи с максимальной частотой.

from pyspark.sql import Window, functions as F

# set up the Window to calculate max_0 and max_1 for each user
# having isp_flag = 0 and 1 respectively
w1 = Window.partitionBy('user_id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn('max_1', F.max(F.expr("IF(isp_flag==1, frequency, NULL)")).over(w1))\ 
  .withColumn('max_0', F.max(F.expr("IF(isp_flag==0, frequency, NULL)")).over(w1))\ 
  .where('max_0 < 0.25*max_1 AND frequency in (max_1, max_0)') \ 
  .show() 
+-------+------------+--------+---------+-----+-----+                           
|user_id|      domain|isp_flag|frequency|max_1|max_0|
+-------+------------+--------+---------+-----+-----+
|    don|popsugar.com|       0|       10|   49|   10|
|    don|   fabio.com|       1|       49|   49|   10|
|  dylan| vodafone.it|       1|      448|  448|   49|
|  dylan|yolosite.net|       0|       49|  448|   49|
|    bob| eidsiva.net|       1|        5|    5|    1|
|    bob|   media.net|       0|        1|    5|    1|
+-------+------------+--------+---------+-----+-----+

Некоторые объяснения для запроса:

  • WindowSpec w1 настроен на проверку всех записей для одного и того же пользователя (partitionBy), чтобы функция F.max () сравнивала все строки на основе одного и того же пользователя.

  • мы используем IF(isp_flag==1, frequency, NULL), чтобы найти частоту для строк, имеющих isp_flag== 1, он возвращает NULL, когда isp_flag не равен 1 и, таким образом, пропускается в функции F.max (). это выражение SQL, поэтому для его запуска нам нужна функция F.expr().

  • F.max(...).over(w1) примет максимальное значение результата от выполнения вышеупомянутого выражения SQL. этот расчет основан на окне w1.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...