Как отфильтровать строки на основе предыдущих последовательных строк? - PullRequest
0 голосов
/ 29 мая 2018

У меня есть требование, когда кадр данных сортируется по col1 (метка времени), и мне нужно фильтровать по col2.

Любая строка, где значение col2 меньше значения col2 предыдущей строки, мне нужно отфильтроватьиз этого ряда.Результат должен монотонно увеличивать значение col2.

Обратите внимание, что это не просто две строки.

Например, допустим, значение col2 для 4 строк равно 4,2,3,5,Результат должен быть равен 4,5, так как 2-я и 3-я строка меньше 4 (значение в первой строке).

val input = Seq(
  (1,4), (2,2), (3,3), (4,5), (5, 1), (6, 9), (7, 6)
).toDF("timestamp", "value")
scala> input.show
+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        2|    2|
|        3|    3|
|        4|    5|
|        5|    1|
|        6|    9|
|        7|    6|
+---------+-----+

val expected = Seq((1,4), (4,5), (6, 9)).toDF("timestamp", "value")
scala> expected.show
+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        4|    5|
|        6|    9|
+---------+-----+

Обратите внимание:

  • строки 2 и 3отфильтровано, так как его значение меньше значения в строке 1, т.е. 4
  • строка 5 отфильтровано, так как его значение меньше значения в строке 4, т.е. 6

в общем, есть ли способ фильтрации строк на основе сравнения значения одной строки со значением в предыдущих строках?

Ответы [ 2 ]

0 голосов
/ 29 мая 2018

проверка равенства с максимальным пробегом должна помочь:

val input = Seq((1,4), (2,2), (3,3), (4,5), (5, 1), (6, 9), (7, 6)).toDF("timestamp", "value")

input.show()

+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        2|    2|
|        3|    3|
|        4|    5|
|        5|    1|
|        6|    9|
|        7|    6|
+---------+-----+


input
  .withColumn("max",max($"value").over(Window.orderBy($"timestamp")))
  .where($"value"===$"max").drop($"max")
  .show()

+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        4|    5|
|        6|    9|
+---------+-----+
0 голосов
/ 29 мая 2018

Я думаю, то, что вы ищете, называется максимальный пробег (после текущий итог ).Это всегда заставляет меня использовать оконную агрегацию .

// I made the input a bit more tricky
val input = Seq(
  (1,4), (2,2), (3,3), (4,5), (5, 1), (6, 9), (7, 6)
).toDF("timestamp", "value")
scala> input.show
+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        2|    2|
|        3|    3|
|        4|    5|
|        5|    1|
|        6|    9|
|        7|    6|
+---------+-----+

Я стремлюсь к следующему ожидаемому результату.Поправьте меня, если я ошибаюсь.

val expected = Seq((1,4), (4,5), (6, 9)).toDF("timestamp", "value")
scala> expected.show
+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        4|    5|
|        6|    9|
+---------+-----+

Хитрость для использования "бегущих" проблем заключается в использовании rangeBetween при определении спецификации окна.

import org.apache.spark.sql.expressions.Window
val ts = Window
  .orderBy("timestamp")
  .rangeBetween(Window.unboundedPreceding, Window.currentRow)

Используя спецификацию окна, вы отфильтровываете то, от чего вы хотите избавиться, от результата, и все готово.

val result = input
  .withColumn("running_max", max("value") over ts)
  .where($"running_max" === $"value")
  .select("timestamp", "value")

scala> result.show
18/05/29 22:09:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        4|    5|
|        6|    9|
+---------+-----+

Как вы можете видеть, он не очень эффективен, поскольку использует толькоодин раздел (который приводит к плохому однопоточному выполнению и, таким образом, не сильно отличается от запуска эксперимента на одной машине).

Я думаю, мы могли бы разделить входные данные, частично вычислить рабочий максимум и затем объединитьчастичные результаты и заново запустите расчет рабочего максимума.Просто мысль, которую я сам не испытал.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...