Если вы знакомы с SQL, просто создайте временное представление и создайте все столбцы за один раз.Проверьте это
scala> val df = Seq((1,1,0,3),(1,2,3,2),(1,5,6,1),(2,1,3,7),(2,2,1,9),(3,1,7,5),(3,2,9,3),(3,7,2,5),(3,8,4,7),(4,1,7,9),(4,2,9,0)).toDF("id","time","x","y")
df: org.apache.spark.sql.DataFrame = [id: int, time: int ... 2 more fields]
scala> df.createOrReplaceTempView("m2008")
scala> spark.sql(""" select *, lead(time) over(partition by id order by time) timep1,lead(x) over(partition by id order by time) xp1, lead(y) over(partition by id order by time) yp1 from m2008 """).show(false)
+---+----+---+---+------+----+----+
|id |time|x |y |timep1|xp1 |yp1 |
+---+----+---+---+------+----+----+
|1 |1 |0 |3 |2 |3 |2 |
|1 |2 |3 |2 |5 |6 |1 |
|1 |5 |6 |1 |null |null|null|
|3 |1 |7 |5 |2 |9 |3 |
|3 |2 |9 |3 |7 |2 |5 |
|3 |7 |2 |5 |8 |4 |7 |
|3 |8 |4 |7 |null |null|null|
|4 |1 |7 |9 |2 |9 |0 |
|4 |2 |9 |0 |null |null|null|
|2 |1 |3 |7 |2 |1 |9 |
|2 |2 |1 |9 |null |null|null|
+---+----+---+---+------+----+----+
scala>
Вы можете получить его как другой фрейм данных, просто присвоив результат spark.sql
scala> val df2 = spark.sql(""" select *, lead(time) over(partition by id order by time) timep1,lead(x) over(partition by id order by time) xp1, lead(y) over(partition by id order by time) yp1 from m2008 """)
df2: org.apache.spark.sql.DataFrame = [id: int, time: int ... 5 more fields]
scala> df2.printSchema
root
|-- id: integer (nullable = false)
|-- time: integer (nullable = false)
|-- x: integer (nullable = false)
|-- y: integer (nullable = false)
|-- timep1: integer (nullable = true)
|-- xp1: integer (nullable = true)
|-- yp1: integer (nullable = true)
scala>