Если скользящие diff
s должны быть вычислены для каждого раздела с помощью name
, я бы использовал lag()
оконную функцию:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
("a", 100), ("a", 120),
("b", 200), ("b", 240), ("b", 270)
).toDF("name", "value")
val window = Window.partitionBy($"name").orderBy("value")
df.
withColumn("diff", $"value" - lag($"value", 1).over(window)).
na.fill(0).
orderBy("name", "value").
show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// | a| 100| 0|
// | a| 120| 20|
// | b| 200| 0|
// | b| 240| 40|
// | b| 270| 30|
// +----+-----+----+
С другой стороны, если скользящая diff
s должны быть вычислены по всему набору данных, функция Window без разделения не будет масштабироваться, поэтому я прибегну к использованию функции sliding()
RDD:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = df.rdd
val diffRDD = rdd.sliding(2).
map{ case Array(x, y) => Row(y.getString(0), y.getInt(1), y.getInt(1) - x.getInt(1)) }
val headRDD = sc.parallelize(Seq(Row.fromSeq(rdd.first.toSeq :+ 0)))
val headDF = spark.createDataFrame(headRDD, df.schema.add("diff", IntegerType))
val diffDF = spark.createDataFrame(diffRDD, df.schema.add("diff", IntegerType))
val resultDF = headDF union diffDF
resultDF.show
// +----+-----+----+
// |name|value|diff|
// +----+-----+----+
// | a| 100| 0|
// | a| 120| 20|
// | b| 200| 80|
// | b| 240| 40|
// | b| 270| 30|
// +----+-----+----+