Как заменить нулевые значения выше / ниже ненулевым значением в том же столбце в кадре данных, используя спарк? - PullRequest
0 голосов
/ 11 апреля 2019

Я пытаюсь заменить пустые или недействительные значения, присутствующие в столбце, на выше или ниже ненулевое значение того же столбца. Например: -

Name|Place|row_count
a   |a1   |1
a   |a2   |2
a   |a2   |3
    |d1   |4
b   |a2   |5
c   |a2   |6
    |     |7
    |     |8
d   |c1   |9

В этом случае я пытаюсь заменить все значения NULL в столбце «Имя». 1-й NULL заменит «a», а 2-й NULL заменит «c», а в столбце «Поместить» NULL заменит «a2». , Когда мы пытаемся заменить NULL 8-й ячейки столбца «Place», также заменить его редким ненулевым значением «a2». Требуемый результат: Если мы выберем восьмую ячейку NULL из столбца «Поместить», то результат будет

Name|Place|row_count
a   |a1   |1
a   |a2   |2
a   |a2   |3
    |d1   |4
b   |a2   |5
c   |a2   |6
    |     |7
    |a2   |8
d   |c1   |9

если мы выберем 4-ю ячейку NULL столбца 'Name' для замены, тогда результат будет

Name|Place|row_count
a   |a1   |1
a   |a2   |2
a   |a2   |3
a   |d1   |4
b   |a2   |5
c   |a2   |6
    |     |7
    |     |8
d   |c1   |9

1 Ответ

0 голосов
/ 12 апреля 2019

Windows функции пригодятся для решения этой проблемы.Для простоты я сосредоточусь только на столбце name.Если предыдущая строка имеет null, я использую значение следующей строки.Вы можете изменить этот порядок в соответствии с вашими потребностями. Такой же подход необходимо использовать и для других столбцов.

import spark.implicits._
import org.apache.spark.sql.functions._

val df = Seq(("a", "a1", "1"),
  ("a", "a2", "2"),
  ("a", "a2", "3"),
  ("d1", null, "4"),
  ("b", "a2", "5"),
  ("c", "a2", "6"),
  (null, null, "7"),
  (null, null, "8"),
  ("d", "c1", "9")).toDF("name", "place", "row_count")

val window = Window.orderBy("row_count")
val lagNameWindowExpression = lag('name, 1).over(window)
val leadNameWindowExpression = lead('name, 1).over(window)

val nameConditionExpression = when($"name".isNull.and('previous_name_col.isNull), 'next_name_col)
  .when($"name".isNull.and('previous_name_col.isNotNull), 'previous_name_col).otherwise($"name")

df.select($"*", lagNameWindowExpression as 'previous_name_col, leadNameWindowExpression as 'next_name_col)
  .withColumn("name", nameConditionExpression).drop("previous_name_col", "next_name_col")
  .show(false)

Выход

+----+-----+---------+
|name|place|row_count|
+----+-----+---------+
|a   |a1   |1        |
|a   |a2   |2        |
|a   |a2   |3        |
|d1  |null |4        |
|b   |a2   |5        |
|c   |a2   |6        |
|c   |null |7        |
|d   |null |8        |
|d   |c1   |9        |
+----+-----+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...