Вот решение в Scala, использующее lag(Key, 1)
для объединения предыдущих / текущих ключей для подсчета пары ключей:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = Seq(
("1/1/2018", "A", "XY"),
("1/2/2018", "A", "GT"),
("1/6/2018", "A", "WE"),
("1/9/2018", "A", "PO"),
("1/2/2018", "B", "XY"),
("1/4/2018", "B", "GT")
).toDF("Date", "ID", "Key")
val win = Window.partitionBy("ID").orderBy("Date", "Key")
df.
withColumn("Date", to_date($"Date", "M/d/yyyy")).
withColumn("FirstKey", lag($"Key", 1).over(win)).
groupBy($"FirstKey", $"Key".as("SecondKey")).agg(count("*").as("Count")).
where($"firstKey".isNotNull).
show
// +--------+---------+-----+
// |FirstKey|SecondKey|Count|
// +--------+---------+-----+
// | WE| PO| 1|
// | GT| WE| 1|
// | XY| GT| 2|
// +--------+---------+-----+
Обратите внимание, что преобразование to_date
предназначено для обеспечения правильного хронологического упорядочения.