Spark найти блоки значений NULL в Серии значений - PullRequest
0 голосов
/ 09 июня 2018

Предположим, это мои данные:

date         value
2016-01-01   1
2016-01-02   NULL
2016-01-03   NULL
2016-01-04   2
2016-01-05   3
2016-01-06   NULL
2016-01-07   NULL
2016-01-08   NULL
2016-01-09   1

Я пытаюсь найти даты начала и окончания, которые окружают группы значений NULL.Пример вывода может быть следующим:

start        end
2016-01-01   2016-01-04
2016-01-05   2016-01-09

Моя первая попытка решить проблему привела к следующему:

df.filter($"value".isNull)\
    .agg(to_date(date_add(max("date"), 1)) as "max", 
         to_date(date_sub(min("date"),1)) as "min"
        )

, но при этом будет найдено только общее минимальное и максимальное значение.Я думал об использовании groupBy, но не знаю, как создать столбец для каждого из блоков с нулевым значением.

Ответы [ 2 ]

0 голосов
/ 10 июня 2018

Сложная задача - получить границы групп, поэтому вам нужно выполнить несколько шагов.

  • сначала для построения групп нулей / не нулей (с использованием оконных функций)
  • , затем группирование по блокам, чтобы получить границы внутри блоков
  • , затем сноваоконная функция для расширения границ

Вот рабочий пример:

import ss.implicits._

val df = Seq(
  ("2016-01-01", Some(1)),
  ("2016-01-02", None),
  ("2016-01-03", None),
  ("2016-01-04", Some(2)),
  ("2016-01-05", Some(3)),
  ("2016-01-06", None),
  ("2016-01-07", None),
  ("2016-01-08", None),
  ("2016-01-09", Some(1))
).toDF("date", "value")


df
  // build blocks
  .withColumn("isnull", when($"value".isNull, true).otherwise(false))
  .withColumn("lag_isnull", lag($"isnull",1).over(Window.orderBy($"date")))
  .withColumn("change", coalesce($"isnull"=!=$"lag_isnull",lit(false)))
  .withColumn("block", sum($"change".cast("int")).over(Window.orderBy($"date")))
  // now calculate min/max within groups
  .groupBy($"block")
  .agg(
    min($"date").as("tmp_min"),
    max($"date").as("tmp_max"),
    (count($"value")===0).as("null_block")
  )
  // now extend groups to include borders
  .withColumn("min", lag($"tmp_max", 1).over(Window.orderBy($"tmp_min")))
  .withColumn("max", lead($"tmp_min", 1).over(Window.orderBy($"tmp_max")))
  // only select null-groups
  .where($"null_block")
  .select($"min", $"max")
  .orderBy($"min")
  .show()

дает

+----------+----------+
|       min|       max|
+----------+----------+
|2016-01-01|2016-01-04|
|2016-01-05|2016-01-09|
+----------+----------+
0 голосов
/ 09 июня 2018

У меня нет рабочего решения, но у меня есть несколько рекомендаций.

Посмотрите на использование лага ;вам также придется немного изменить этот код, чтобы получить ведущий столбец.

Теперь предположим, что у вас есть колонка отставания и опережения.Ваш результирующий кадр данных теперь будет выглядеть следующим образом:

date         value     lag_value     lead_value
2016-01-01   1         NULL          1 
2016-01-02   NULL      NULL          1
2016-01-03   NULL      2             NULL
2016-01-04   2         3             NULL
2016-01-05   3         NULL          2
2016-01-06   NULL      NULL          3
2016-01-07   NULL      NULL          NULL
2016-01-08   NULL      1             NULL
2016-01-09   1         1             NULL

Теперь вам нужно просто отфильтровать по следующим условиям:

min date:
df.filter("value IS NOT NULL AND lag_value IS NULL")

max date:
df.filter("value IS NULL AND lead_value IS NOT NULL")

Если вы хотите быть немного более продвинутымВы также можете использовать команду when для создания нового столбца, в котором указано, является ли дата начальной или конечной датой для нулевой группы:

date         value     lag_value     lead_value   group_date_type
2016-01-01   1         NULL          1            start
2016-01-02   NULL      NULL          1            NULL
2016-01-03   NULL      2             NULL         NULL   
2016-01-04   2         3             NULL         end
2016-01-05   3         NULL          2            start
2016-01-06   NULL      NULL          3            NULL
2016-01-07   NULL      NULL          NULL         NULL
2016-01-08   NULL      1             NULL         NULL
2016-01-09   1         1             NULL         end 

. Это можно создать с помощью чего-то, похожего на это:

from pyspark.sql import functions as F
df_2.withColumn('group_date_type', 
                F.when("value IS NOT NULL AND lag_value IS NULL", start)\
                  .when("value IS NULL AND lead_value IS NOT NULL", end)\
                  .otherwise(None)
                 )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...