Идея состоит в том, чтобы найти способ добавить новый столбец с информацией «это новый x?», А затем выполнить скользящую сумму, чтобы «добавить» эту информацию.
Для этого нам нужна функция window
и метод lag
.
// Some imports
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window
// inputs
val df = Seq(("tes", 45, 34), ("tes", 43, 67), ("tes", 56, 43), ("raj", 45, 43), ("raj", 44, 67)).toDF("x", "y", "z")
// adding new information
val windowSpec = Window.orderBy(F.monotonically_increasing_id())
val dfWithNewNameInfo = df
.withColumn("n", (F.lag($"x", 1).over(windowSpec) =!= $"x").cast("bigint"))
.na.fill(1, Seq("n"))
dfWithNewNameInfo.show
/*
+---+---+---+---+
| x| y| z| n|
+---+---+---+---+
|tes| 45| 34| 1|
|tes| 43| 67| 0|
|tes| 56| 43| 0|
|raj| 45| 43| 1|
|raj| 44| 67| 0|
+---+---+---+---+
*/
// We can see the "1" in the last column indicates whenever this is a new x
// Adding these 1
val resultDf = dfWithNewNameInfo.withColumn("n", F.sum("n").over(windowSpec))
resultDf.show
/*
+---+---+---+---+
| x| y| z| n|
+---+---+---+---+
|tes| 45| 34| 1|
|tes| 43| 67| 1|
|tes| 56| 43| 1|
|raj| 45| 43| 2|
|raj| 44| 67| 2|
+---+---+---+---+
*/
Этот метод дает желаемый результат для небольшого кадра данных, который может быть полностью загружен в память.
Обратите внимание на порядок windowSpec
по строке, но это не сработало бы для следующего кадра данных:
//+---+---+---+
//| x| y| z|
//+---+---+---+
//|tes| 45| 34|
//|tes| 43| 67|
//|raj| 45| 43|
//|raj| 44| 67|
//|tes| 56| 43|
//+---+---+---+
Предоставление результата:
//+---+---+---+---+
//| x| y| z| n|
//+---+---+---+---+
//|tes| 45| 34| 1|
//|tes| 43| 67| 1|
//|raj| 45| 43| 2|
//|raj| 44| 67| 2|
//|tes| 56| 43| 3|
//+---+---+---+---+
Именно поэтому я настоятельно рекомендую заказывать по "x"
в windowSpec
.