Заполнить ноль или пустое значение следующей строки искрой - PullRequest
1 голос
/ 21 мая 2019

Есть ли способ заменить нулевые значения в кадре данных искры следующей строкой, а не нулевым значением.Для разделения и упорядочивания окон добавлен дополнительный столбец row_count.Более конкретно, я хотел бы добиться следующего результата:

      +---------+-----------+      +---------+--------+
      | row_count |       id|      |row_count |     id|
      +---------+-----------+      +------+-----------+
      |        1|       null|      |     1|        109|
      |        2|        109|      |     2|        109|
      |        3|       null|      |     3|        108|
      |        4|       null|      |     4|        108|
      |        5|        108| =>   |     5|        108|
      |        6|       null|      |     6|        110|
      |        7|        110|      |     7|        110|
      |        8|       null|      |     8|       null|
      |        9|       null|      |     9|       null|
      |       10|       null|      |    10|       null|
      +---------+-----------+      +---------+--------+

Я пробовал с приведенным ниже кодом, он не дает должного результата.

      val ss = dataframe.select($"*", sum(when(dataframe("id").isNull||dataframe("id") === "", 1).otherwise(0)).over(Window.orderBy($"row_count")) as "value")
      val window1=Window.partitionBy($"value").orderBy("id").rowsBetween(0, Long.MaxValue)
      val selectList=ss.withColumn("id_fill_from_below",last("id").over(window1)).drop($"row_count").drop($"value")

1 Ответ

0 голосов
/ 22 мая 2019

Вот подход

  1. Фильтровать ненулевые значения (dfNonNulls)
  2. Фильтровать нулевые значения (dfNulls)
  3. Найдите правильное значение для нулевого идентификатора, используяФункция соединения и окна
  4. Заполнить нулевой фрейм данных (dfNullFills)
  5. union dfNonNulls и dfNullFills

data.csv

row_count,id
1,
2,109
3,
4,
5,108
6,
7,110
8,
9,
10,
var df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data.csv")

var dfNulls = df.filter(
  $"id".isNull
).withColumnRenamed(
  "row_count","row_count_nulls"
).withColumnRenamed(
  "id","id_nulls"
)

val dfNonNulls = df.filter(
  $"id".isNotNull
).withColumnRenamed(
  "row_count","row_count_values"
).withColumnRenamed(
  "id","id_values"
)

dfNulls = dfNulls.join(
  dfNonNulls, $"row_count_nulls" lt $"row_count_values","left"
).select(
  $"id_nulls",$"id_values",$"row_count_nulls",$"row_count_values"
)

val window = Window.partitionBy("row_count_nulls").orderBy("row_count_values")

val dfNullFills = dfNulls.withColumn(
  "rn", row_number.over(window)
).where($"rn" === 1).drop("rn").select(
  $"row_count_nulls".alias("row_count"),$"id_values".alias("id"))

dfNullFills .union(dfNonNulls).orderBy($"row_count").show()

что приводит к

+---------+----+
|row_count|  id|
+---------+----+
|        1| 109|
|        2| 109|
|        3| 108|
|        4| 108|
|        5| 108|
|        6| 110|
|        7| 110|
|        8|null|
|        9|null|
|       10|null|
+---------+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...