Spark SQL Windows: создание фрейма на основе столбца массива - PullRequest
0 голосов
/ 10 февраля 2020

Я хочу использовать оконную функцию Spark SQL, но с пользовательским условием в спецификации фрейма.

Используемый кадр данных выглядит следующим образом:

+--------------------+--------------------+--------------------+-----+
|              userid|           elementid|       prerequisites|score|
+--------------------+--------------------+--------------------+-----+
|a                   |1                   |[]                  |  1  |
|a                   |2                   |[]                  |  1  |
|a                   |3                   |[]                  |  1  |
|b                   |1                   |[]                  |  1  |
|a                   |4                   |[1, 2]              |  1  |
+--------------------+--------------------+--------------------+-----+

Каждый элемент в столбце prerequisites является значением в столбце elementid другой строки.

I хотел бы создать окно, где я делю на userid, а затем захватить все предыдущие строки, где elementid содержится в столбце prerequisites данной строки.

Как только я достигну этого окна, я хочу выполните sum для столбца score.

Требуемый вывод для вышеприведенного примера:

+--------------------+--------------------+--------------------+-----+
|              userid|           elementid|       prerequisites|sum  |
+--------------------+--------------------+--------------------+-----+
|a                   |1                   |[]                  |  0  |
|a                   |2                   |[]                  |  0  |
|a                   |3                   |[]                  |  0  |
|b                   |1                   |[]                  |  0  |
|a                   |4                   |[1, 2]              |  2  |
+--------------------+--------------------+--------------------+-----+

Обратите внимание, что, поскольку пользователь a является единственным пользователем с предварительными условиями его элемент, предшествующий этому, единственный с> 0 sum.

Ближайший вопрос, который я видел, был этот вопрос, который использует collect_list.

Однако это не столько строит окно, сколько собирает потенциальный список идентификаторов. У кого-нибудь есть идеи, как сконструировать вышеупомянутое окно?

1 Ответ

0 голосов
/ 10 февраля 2020
scala> import org.apache.spark.sql.expressions.{Window,UserDefinedFunction}

scala> df.show()
+------+---------+-------------+-----+
|userid|elementid|prerequisites|score|
+------+---------+-------------+-----+
|     a|        1|           []|    1|
|     a|        2|           []|    1|
|     a|        3|           []|    1|
|     b|        1|           []|    1|
|     a|        4|       [1, 2]|    1|
+------+---------+-------------+-----+

scala> df.printSchema
root
 |-- userid: string (nullable = true)
 |-- elementid: string (nullable = true)
 |-- prerequisites: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- score: string (nullable = true)

scala> val W = Window.partitionBy("userid")

scala> val df1 = df.withColumn("elementidList", collect_set(col("elementid")).over(W))
                   .withColumn("elementidScoreMap", map_from_arrays(col("elementidList"), collect_list(col("score").cast("long")).over(W)))
                   .withColumn("common", array_intersect(col("prerequisites"), col("elementidList")))
                   .drop("elementidList", "score") 

scala> def getSumUDF:UserDefinedFunction = udf((Score:Map[String,Long], Id:String) => {
     | var out:Long =  0
     | Id.split(",").foreach{ x => out = Score(x.toString) + out}
     | out})

scala> df1.withColumn("sum", when(size(col("common")) =!= 0  ,getSumUDF(col("elementidScoreMap"), concat_ws(",",col("prerequisites")))).otherwise(lit(0)))
          .drop("elementidScoreMap", "common")
          .show()
+------+---------+-------------+---+
|userid|elementid|prerequisites|sum|
+------+---------+-------------+---+
|     b|        1|           []|  0|
|     a|        1|           []|  0|
|     a|        2|           []|  0|
|     a|        3|           []|  0|
|     a|        4|       [1, 2]|  2|
+------+---------+-------------+---+
...