Вы можете присоединить свой DataFrame или DataSet к себе, объединяя строки, в которых идентификатор одинаков в обеих строках, а версия левой строки равна i
, а версия правой строки - i+1
. Вот пример
case class T(id: String, version: Int, data: String)
val data = Seq(T("1", 1, "d1-1"), T("1", 2, "d1-2"), T("2", 1, "d2-1"), T("2", 2, "d2-2"), T("2", 3, "d2-3"), T("3", 1, "d3-1"))
data: Seq[T] = List(T(1,1,d1-1), T(1,2,d1-2), T(2,1,d2-1), T(2,2,d2-2), T(2,3,d2-3), T(3,1,d3-1))
val ds = data.toDS
val joined = ds.as("ds1").join(ds.as("ds2"), $"ds1.id" === $"ds2.id" && (($"ds1.version"+1) === $"ds2.version"))
И затем вы можете ссылаться на столбцы в новом DataFrame / DataSet, например $"ds1.data
, $"ds2.data
и т. Д.
Чтобы найти строки, в которых данные изменились с одной версии на другую, вы можете сделать
joined.filter($"ds1.data" !== $"ds2.data")