Как сделать ранжирование скользящего окна в Spark с помощью Scala? - PullRequest
2 голосов
/ 14 мая 2019

У меня есть набор данных:

+-----+-------------------+---------------------+------------------+
|query|similar_queries    |model_score          |count             |
+-----+-------------------+---------------------+------------------+
|shirt|funny shirt        |0.0034038130658784866|189.0             |
|shirt|shirt womens       |0.0019435265241921438|136.0             |
|shirt|watch              |0.001097496453284101 |212.0             |
|shirt|necklace           |6.694577024597908E-4 |151.0             |
|shirt|white shirt        |0.0037413097560623485|217.0             |
|shirt|shoes              |0.0022062579255572733|575.0             |
|shirt|crop top           |9.065831060804897E-4 |173.0             |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |
|shirt|shorts             |0.002669621942466027 |200.0             |
|shirt|black shirt        |0.03264296242546658  |114.0             |
+-----+-------------------+---------------------+------------------+

Я ранжирую набор данных на основе «подсчета» первым.

lazy val countWindowByFreq = Window.partitionBy(col(QUERY)).orderBy(col(COUNT).desc)
val ranked_data = data.withColumn("count_rank", row_number over countWindowByFreq)

+-----+-------------------+---------------------+------------------+----------+
|query|similar_queries    |model_score          |count             |count_rank|
+-----+-------------------+---------------------+------------------+----------+
|shirt|shoes              |0.0022062579255572733|575.0             |1         |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |2         |
|shirt|white shirt        |0.0037413097560623485|217.0             |3         |
|shirt|watch              |0.001097496453284101 |212.0             |4         |
|shirt|shorts             |0.002669621942466027 |200.0             |5         |
|shirt|funny shirt        |0.0034038130658784866|189.0             |6         |
|shirt|crop top           |9.065831060804897E-4 |173.0             |7         |
|shirt|necklace           |6.694577024597908E-4 |151.0             |8         |
|shirt|shirt womens       |0.0019435265241921438|136.0             |9         |
|shirt|black shirt        |0.03264296242546658  |114.0             |10        |
+-----+-------------------+---------------------+------------------+----------+

Сейчас я пытаюсь ранжировать контент с помощью скользящего окна по row_number (4 строки) и ранжировать в пределах окна на основе model_score.Например:

В первом окне, row_number 1 до 4, новый ранг (новый столбец) будет

1. polo shirts for men
2. white shirt
3. shoes
4. watch

В первом окне, row_number 5 до 8, новый ранг(новый столбец) будет

5. funny shirt
6. shorts
7. shirt womens 
8. crop top

В первом окне, номер строки 9 для отдыха, новый ранг (новый столбец) будет

9. black shirt 
10. shirt womens

Может кто-нибудь сказать мне, если естьКак я могу достичь с этой искрой и Scala?Могу ли я использовать какие-либо предопределенные функции?

Я пытался:

lazy val MODEL_RANK = Window.partitionBy (col (QUERY)) .orderBy (col (MODEL_SCORE) .desc).rowBetween (0, 3)

но это дает мне:

sql.AnalysisException: Window Frame ROWS BETWEEN CURRENT ROW AND 3 FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;

Кроме того, пробовал с .rowsBetween (-3, 0), но это также дает мне ошибку:

org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN 3 PRECEDING AND CURRENT ROW must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;

1 Ответ

2 голосов
/ 14 мая 2019

Поскольку вы вычислили count_rank, следующий шаг - найти способ сгруппировать строку по четверке.Это можно сделать следующим образом:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val ranked_data_grouped = ranked_data
  .withColumn("bucket", (($"count_rank" -1)/4).cast(IntegerType))

ranked_data_grouped будет выглядеть так:

+-----+-------------------+---------------------+------------------+----------+-------+
|query|similar_queries    |model_score          |count             |count_rank|bucket |
+-----+-------------------+---------------------+------------------+----------+-------+
|shirt|shoes              |0.0022062579255572733|575.0             |1         |0      |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |2         |0      |      
|shirt|white shirt        |0.0037413097560623485|217.0             |3         |0      |
|shirt|watch              |0.001097496453284101 |212.0             |4         |0      |
|shirt|shorts             |0.002669621942466027 |200.0             |5         |1      |
|shirt|funny shirt        |0.0034038130658784866|189.0             |6         |1      |
|shirt|crop top           |9.065831060804897E-4 |173.0             |7         |1      |
|shirt|necklace           |6.694577024597908E-4 |151.0             |8         |1      |
|shirt|shirt womens       |0.0019435265241921438|136.0             |9         |2      |
|shirt|black shirt        |0.03264296242546658  |114.0             |10        |2      |
+-----+-------------------+---------------------+------------------+----------+-------+

Теперь все, что вам нужно сделать, это разделить на bucket и упорядочить на model_score:

val output = ranked_data_grouped
  .withColumn("finalRank", row_number().over(Window.partitionBy($"bucket").orderBy($"model_score".desc)))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...