Заполните нулевые значения в столбце данных следующего значения - PullRequest
3 голосов
/ 27 марта 2019

Я должен заполнить первые нулевые значения непосредственным значением того же столбца в кадре данных. Эта логика применяется только к первым последовательным нулевым значениям только столбца.

У меня есть датафрейм с похожим ниже

 //I replaced null to 0 in value column
 val df = Seq( (0,"exA",30), (0,"exB",22), (0,"exC",19), (16,"exD",13),
               (5,"exE",28), (6,"exF",26), (0,"exG",12), (13,"exH",53))
               .toDF("value", "col2", "col3")

scala> df.show(false)
+-----+----+----+
|value|col2|col3|
+-----+----+----+
|0    |exA |30  |
|0    |exB |22  |
|0    |exC |19  |
|16   |exD |13  |
|5    |exE |28  |
|6    |exF |26  |
|0    |exG |12  |
|13   |exH |53  |
+-----+----+----+

От этого кадра данных я ожидаю, как показано ниже

scala> df.show(false)
+-----+----+----+
|value|col2|col3|
+-----+----+----+
|16   |exA |30  |    // Change the value 0 to 16 at value column
|16   |exB |22  |    // Change the value 0 to 16 at value column
|16   |exC |19  |    // Change the value 0 to 16 at value column
|16   |exD |13  |
|5    |exE |28  |
|6    |exF |26  |
|0    |exG |12  |    // value should not be change here
|13   |exH |53  |
+-----+----+----+

Пожалуйста, помогите мне решить эту проблему.

Ответы [ 3 ]

1 голос
/ 27 марта 2019

Для этой цели вы можете использовать оконную функцию

 val df = Seq( (0,"exA",30), (0,"exB",22), (0,"exC",19), (16,"exD",13),
           (5,"exE",28), (6,"exF",26), (0,"exG",12), (13,"exH",53))
           .toDF("value", "col2", "col3")
 val w = Window.orderBy($"col2".desc)
 df.withColumn("Result", last(when($"value" === 0, null).otherwise($"value"), ignoreNulls = true).over(w))
  .orderBy($"col2")
  .show(10)

В результате вы получите

+-----+----+----+------+
|value|col2|col3|Result|
+-----+----+----+------+
|    0| exA|  30|    16|
|    0| exB|  22|    16|
|    0| exC|  19|    16|
|   16| exD|  13|    16|
|    5| exE|  28|     5|
|    6| exF|  26|     6|
|    0| exG|  12|    13|
|   13| exH|  53|    13|
+-----+----+----+------+

Выражение df.orderBy($"col2") необходимо только для отображения окончательных результатов в правильном порядке.Вы можете пропустить его, если вам не важен окончательный заказ.

ОБНОВЛЕНИЕ Чтобы получить именно то, что вам нужно, вам нужно немного более сложный код

val w = Window.orderBy($"col2")
val w2 = Window.orderBy($"col2".desc)
df.withColumn("IntermediateResult", first(when($"value" === 0, null).otherwise($"value"), ignoreNulls = true).over(w))
  .withColumn("Result", when($"IntermediateResult".isNull, last($"IntermediateResult", ignoreNulls = true).over(w2)).otherwise($"value"))
  .orderBy($"col2")
    .show(10)

+-----+----+----+------------------+------+
|value|col2|col3|IntermediateResult|Result|
+-----+----+----+------------------+------+
|    0| exA|  30|              null|    16|
|    0| exB|  22|              null|    16|
|    0| exC|  19|              null|    16|
|   16| exD|  13|                16|    16|
|    5| exE|  28|                16|     5|
|    6| exF|  26|                16|     6|
|    0| exG|  12|                16|     0|
|   13| exH|  53|                16|    13|
+-----+----+----+------------------+------+
0 голосов
/ 27 марта 2019

Я добавил новый столбец с инкрементным идентификатором к вашему DF

import org.apache.spark.sql.functions._    
val df_1 = Seq((0,"exA",30),
    (0,"exB",22), 
    (0,"exC",19), 
    (16,"exD",13),  
    (5,"exE",28), 
    (6,"exF",26), 
    (0,"exG",12), 
    (13,"exH",53))
    .toDF("value", "col2", "col3")
    .withColumn("UniqueID", monotonically_increasing_id)

фильтр DF имеет ненулевые значения

val df_2 = df_1.filter("value != 0")

создать переменную «limit» для ограничения первой N-й строки, которая нам нужна, и переменную Nvar для первого ненулевого значения

val limit = df_2.agg(min("UniqueID")).collect().map(_(0)).mkString("").toInt + 1
val nVal = df_1.limit(limit).agg(max("value")).collect().map(_(0)).mkString("").toInt

создать DF со столбцом с тем же именем («значением») с условием

val df_4 = df_1.withColumn("value", when(($"UniqueID" < limit), nVal).otherwise($"value"))
0 голосов
/ 27 марта 2019

Я думаю, вам нужно принять 1-е ненулевое или ненулевое значение в зависимости от порядка col2.Пожалуйста, найдите сценарий ниже.Я создал таблицу в памяти спарк для записи sql.

val df = Seq( (0,"exA",30), (0,"exB",22), (0,"exC",19), (16,"exD",13),
               (5,"exE",28), (6,"exF",26), (0,"exG",12), (13,"exH",53))
               .toDF("value", "col2", "col3")
df.registerTempTable("table_df")
spark.sql("with cte as(select *,row_number() over(order by col2) rno from table_df) select case when value = 0 and rno<(select min(rno) from cte where value != 0) then (select value from cte where rno=(select min(rno) from cte where value != 0)) else value end value,col2,col3 from cte").show(df.count.toInt,false)

enter image description here

Пожалуйста, дайте мне знать, если у вас есть какие-либо вопросы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...