Scala: collect_list () над окном с сохранением нулевых значений - PullRequest
0 голосов
/ 26 сентября 2018

У меня есть фрейм данных, подобный приведенному ниже:

+----+----+----+
|colA|colB|colC|
+----+----+----+
|1   |1   |23  |
|1   |2   |63  |
|1   |3   |null|
|1   |4   |32  |
|2   |2   |56  |
+----+----+----+

Я применяю приведенные ниже инструкции, чтобы создать последовательность значений в столбце C:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
df.withColumn("colD", 
collect_list("colC").over(Window.partitionBy("colA").orderBy("colB")))

Результаттаким образом, что столбец D создается и включает значения столбца C в виде последовательности, в то время как он удалил null значение:

+----+----+----+------------+
|colA|colB|colC|colD        |
+----+----+----+------------+
|1   |1   |23  |[23]        |
|1   |2   |63  |[23, 63]    |
|1   |3   |null|[23, 63]    |
|1   |4   |32  |[23,63,32]  |
|2   |2   |56  |[56]        |
+----+----+----+------------+

Однако я хотел бы сохранить нулевые значения в новом столбце и иметьследующий результат:

+----+----+----+-----------------+
|colA|colB|colC|colD             |
+----+----+----+-----------------+
|1   |1   |23  |[23]             |
|1   |2   |63  |[23, 63]         |
|1   |3   |null|[23, 63, null]   |
|1   |4   |32  |[23,63,null, 32] |
|2   |2   |56  |[56]             |
+----+----+----+-----------------+

Как вы видите, у меня все еще есть null значения в результате.Вы знаете, как я могу это сделать?

Ответы [ 2 ]

0 голосов
/ 11 апреля 2019

Как упомянул LeoC, collect_list будет сбрасывать нулевые значения.Кажется, есть обходной путь для этого поведения.Оборачивая каждый скаляр в массив, следуя collect_list, вы получите [[23], [63], [], [32]], тогда когда вы сделаете flatten, вы получите [23, 63,, 32].Эти пропущенные значения в массивах являются нулевыми.

collect_list и flatten встроенные функции sql. Я полагаю, они были введены в Spark 2.4 .Я не изучал реализацию, чтобы убедиться, что это ожидаемое поведение, поэтому я не знаю, насколько надежно это решение.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq(
  (Some(1), Some(1), Some(23)),
  (Some(1), Some(2), Some(63)),
  (Some(1), Some(3), None),
  (Some(1), Some(4), Some(32)),
  (Some(2), Some(2), Some(56))
).toDF("colA", "colB", "colC")

val newDf = df.withColumn("colD", flatten(collect_list(array("colC"))
    .over(Window.partitionBy("colA").orderBy("colB"))))


+----+----+----+-------------+
|colA|colB|colC|         colD|
+----+----+----+-------------+
|   1|   1|  23|         [23]|
|   1|   2|  63|     [23, 63]|
|   1|   3|null|    [23, 63,]|
|   1|   4|  32|[23, 63,, 32]|
|   2|   2|  56|         [56]|
+----+----+----+-------------+
0 голосов
/ 26 сентября 2018

Поскольку collect_list автоматически удаляет все null с, одним из подходов было бы временно заменить null на указанное число, скажем Int.MinValue, перед применением метода, и использовать UDF для восстановления этих чисел доnull потом:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq(
  (Some(1), Some(1), Some(23)),
  (Some(1), Some(2), Some(63)),
  (Some(1), Some(3), None),
  (Some(1), Some(4), Some(32)),
  (Some(2), Some(2), Some(56))
).toDF("colA", "colB", "colC")

def replaceWithNull(n: Int) = udf( (arr: Seq[Int]) =>
  arr.map( i => if (i != n) Some(i) else None )
)

df.withColumn( "colD", replaceWithNull(Int.MinValue)(
    collect_list(when($"colC".isNull, Int.MinValue).otherwise($"colC")).
      over(Window.partitionBy("colA").orderBy("colB"))
  )
).show
// +----+----+----+------------------+
// |colA|colB|colC|              colD|
// +----+----+----+------------------+
// |   1|   1|  23|              [23]|
// |   1|   2|  63|          [23, 63]|
// |   1|   3|null|    [23, 63, null]|
// |   1|   4|  32|[23, 63, null, 32]|
// |   2|   2|  56|              [56]|
// +----+----+----+------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...