Я думаю, то, что вы ищете, называется максимальный пробег (после текущий итог ).Это всегда заставляет меня использовать оконную агрегацию .
// I made the input a bit more tricky
val input = Seq(
(1,4), (2,2), (3,3), (4,5), (5, 1), (6, 9), (7, 6)
).toDF("timestamp", "value")
scala> input.show
+---------+-----+
|timestamp|value|
+---------+-----+
| 1| 4|
| 2| 2|
| 3| 3|
| 4| 5|
| 5| 1|
| 6| 9|
| 7| 6|
+---------+-----+
Я стремлюсь к следующему ожидаемому результату.Поправьте меня, если я ошибаюсь.
val expected = Seq((1,4), (4,5), (6, 9)).toDF("timestamp", "value")
scala> expected.show
+---------+-----+
|timestamp|value|
+---------+-----+
| 1| 4|
| 4| 5|
| 6| 9|
+---------+-----+
Хитрость для использования "бегущих" проблем заключается в использовании rangeBetween
при определении спецификации окна.
import org.apache.spark.sql.expressions.Window
val ts = Window
.orderBy("timestamp")
.rangeBetween(Window.unboundedPreceding, Window.currentRow)
Используя спецификацию окна, вы отфильтровываете то, от чего вы хотите избавиться, от результата, и все готово.
val result = input
.withColumn("running_max", max("value") over ts)
.where($"running_max" === $"value")
.select("timestamp", "value")
scala> result.show
18/05/29 22:09:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---------+-----+
|timestamp|value|
+---------+-----+
| 1| 4|
| 4| 5|
| 6| 9|
+---------+-----+
Как вы можете видеть, он не очень эффективен, поскольку использует толькоодин раздел (который приводит к плохому однопоточному выполнению и, таким образом, не сильно отличается от запуска эксперимента на одной машине).
Я думаю, мы могли бы разделить входные данные, частично вычислить рабочий максимум и затем объединитьчастичные результаты и заново запустите расчет рабочего максимума.Просто мысль, которую я сам не испытал.