фильтровать список по первым 2 классам наблюдений на значение параметра в наборе данных scala - PullRequest
0 голосов
/ 28 февраля 2019

У меня есть такой набор искровых данных:

+--------+--------------------+
|     uid|     recommendations|
+--------+--------------------+
|41344966|[[2133, red]...|
|41345063|[[11353, red...|
|41346177|[[2996, yellow]...|
|41349171|[[8477, green]...|

res98: org.apache.spark.sql.Dataset[userItems] = [uid: int, recommendations: array<struct<iid:int,color:string>>]

Я хочу отфильтровать каждый массив рекомендаций, чтобы он содержал первые два каждого цвета.Псевдо пример:

[(13,'red'), (4,'green'), (8,'red'), (2,'red'), (10, 'yellow')]

станет

[(13,'red'), (4,'green'), (8,'red'), (10, 'yellow')]

Как я могу эффективно сделать это в Scala с наборами данных?Есть ли элегантное решение, использующее что-то вроде reduceGroups?

Что у меня есть до сих пор:

case class itemData (iid: Int, color: String)

val filterList = (recs: Array[itemData], filterAttribute, maxCount) => {
  // filter the list somehow... using the max count and attribute
  })

dataset.map(d => filterList(d.recommendations, "color", 2))

1 Ответ

0 голосов
/ 28 февраля 2019

Вы можете разбить рекомендации, затем создать раздел номера строк по uid и цвету и, наконец, отфильтровать номера строк больше 2. Код должен выглядеть следующим образом.Надеюсь, это поможет.

//Creating Test Data
val df = Seq((13,"red"), (4,"green"), (8,"red"), (2,"red"), (10, "yellow")).toDF("iid", "color")
  .withColumn("uid", lit(41344966))
  .groupBy("uid").agg(collect_list(struct("iid", "color")).as("recommendations"))

df.show(false)
+--------+----------------------------------------------------+
|uid     |recommendations                                     |
+--------+----------------------------------------------------+
|41344966|[[13,red], [4,green], [8,red], [2,red], [10,yellow]]|
+--------+----------------------------------------------------+

val filterDF = df.withColumn("rec", explode(col("recommendations")))
    .withColumn("iid", col("rec.iid"))
    .withColumn("color", col("rec.color"))
    .drop("recommendations", "rec")
    .withColumn("rownum",
      row_number().over(Window.partitionBy("uid", "color").orderBy(col("iid").desc)))
    .filter(col("rownum") <= 2)
    .groupBy("uid").agg(collect_list(struct("iid", "color")).as("recommendations"))

filterDF.show(false)
+--------+-------------------------------------------+
|uid     |recommendations                            |
+--------+-------------------------------------------+
|41344966|[[4,green], [13,red], [8,red], [10,yellow]]|
+--------+-------------------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...