У меня есть фрейм данных, в котором я пытаюсь выполнить оконную функцию для столбца массива.
Логика следующая: сгруппировать (или разделить окно) столбцы id
и filtered
. Вычислите максимальную оценку строк, в которых столбец types
равен нулю, в противном случае возьмите оценку этой строки. Если оценка не равна максимальной оценке группы, добавьте «NA» к типу столбца.
val data = spark.createDataFrame(Seq(
(1, "shirt for women", Seq("shirt", "women"), 19.1, "ST"),
(1, "shirt for women", Seq("shirt", "women"), 10.1, null),
(1, "shirt for women", Seq("shirt", "women"), 12.1, null),
(0, "shirt group women", Seq("group", "women"), 15.1, null),
(0, "shirt group women", Seq("group", "women"), 12.1, null),
(3, "shirt nmn women", Seq("shirt", "women"), 16.1, "ST"),
(3, "shirt were women", Seq("shirt", "women"), 13.1, "ST")
)).toDF("id", "raw", "filtered", "score", "types")
+---+-----------------+--------------+-----+-----+
|id |raw |filtered |score|types|
+---+-----------------+--------------+-----+-----+
|1 |shirt for women |[shirt, women]|19.1 |ST |
|1 |shirt for women |[shirt, women]|10.1 |null |
|1 |shirt for women |[shirt, women]|12.1 |null |
|0 |shirt group women|[group, women]|15.1 |null |
|0 |shirt group women|[group, women]|12.1 |null |
|3 |shirt nmn women |[shirt, women]|16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |ST |
+---+-----------------+--------------+-----+-----+
Ожидаемый результат:
+---+------------------+--------------+-----+----+
|id |raw |filtered |score|types|
+---+-----------------+--------------+-----+----+
|1 |shirt for women |[shirt, women]|19.1 |ST |
|1 |shirt for women |[shirt, women]|10.1 |NA |
|1 |shirt for women |[shirt, women]|12.1 |null|
|0 |shirt group women[women, group] |15.1 |null|
|0 |shirt group women|[women, group]|12.1 |NA |
|3 |shirt nmn women |[shirt, women]|16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |ST |
+---+-----------------+--------------+-----+----+
Я пытался:
data.withColumn("max_score",
when(col("types").isNull,
max("score")
.over(Window.partitionBy("id", "filtered")))
.otherwise($"score"))
.withColumn("type_temp",
when(col("score") =!= col("max_score"),
addReasonsUDF(col("type"),
lit("NA")))
.otherwise(col("type")))
.drop("types", "max_score")
.withColumnRenamed("type_temp", "types")
Но это не работает. Это дает мне:
+---+-----------------+--------------+-----+---------+-----+
|id |raw |filtered |score|max_score|types|
+---+-----------------+--------------+-----+---------+-----+
|1 |shirt for women |[shirt, women]|19.1 |19.1 |ST |
|1 |shirt women |[shirt, women]|10.1 |19.1 |NA |
|1 |shirt of women |[shirt, women]|12.1 |19.1 |NA |
|0 |shirt group women|[group, women]|15.1 |15.1 |null |
|0 |shirt will women |[group, women]|12.1 |15.1 |NA |
|3 |shirt nmn women |[shirt, women]|16.1 |16.1 |ST |
|3 |shirt were women |[shirt, women]|13.1 |13.1 |ST |
+---+-----------------+--------------+-----+---------+-----+
Может кто-нибудь сказать мне, что я здесь делаю неправильно?
Я думаю, что-то не так с моей оконной функцией, когда я пробовал разделить на id
и raw
, она тоже не работает. Поэтому разделы строк и массивов не работают.
dataSet.withColumn("max_score",
when(col("types").isNull,
max("score").over(Window.partitionBy("id", "raw")))
.otherwise($"score")).show(false)
+---+-----------------+--------------+-----+-----+---------+
|id |raw |filtered |score|types|max_score|
+---+-----------------+--------------+-----+-----+---------+
|3 |shirt nmn women |[shirt, women]|16.1 |ST |16.1 |
|0 |shirt group women|[group, women]|15.1 |null |15.1 |
|0 |shirt group women|[group, women]|12.1 |null |15.1 |
|3 |shirt were women |[shirt, women]|13.1 |ST |13.1 |
|1 |shirt for women |[shirt, women]|19.1 |ST |19.1 |
|1 |shirt for women |[shirt, women]|10.1 |null |19.1 |
|1 |shirt for women |[shirt, women]|12.1 |null |19.1 |
+---+-----------------+--------------+-----+-----+---------+