Вам нужно join
два кадра данных вместе, чтобы сделать любое сравнение их столбцов.
Что вы можете сделать, это сначала присоединиться к фреймам данных, а затем выполнить всю фильтрацию, чтобы получить новый фрейм данных со всеми строками, которые должны быть обновлены:
val diffDf = df1.as("a").join(df2.as("b"), Seq("id"))
.filter($"b.dt" > $"a.dt")
.filter(...) // Any other filter required
.select($"id", $"b.dt", $"b.speed", $"b.stats")
Примечание: В некоторых ситуациях требуется выполнить groupBy(id)
или использовать оконную функцию, поскольку в id
в кадре данных diffDf
должна быть только одна последняя строка. Это можно сделать следующим образом (в данном примере будет выбран ряд с максимальной скоростью, но это зависит от фактических требований):
val w = Window.partitionBy($"id").orderBy($"speed".desc)
val diffDf2 = diffDf.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
Более подробную информацию о различных подходах можно найти здесь: Как увеличить значение и сохранить все столбцы (для максимального количества записей в группе)? .
Чтобы заменить старые строки тем же id
в кадре данных df1
, объедините кадры данных с внешним объединением и coalesce
:
val df = df1.as("a").join(diffDf.as("b"), Seq("id"), "outer")
.select(
$"id",
coalesce($"b.dt", $"a.dt").as("dt"),
coalesce($"b.speed", $"a.speed").as("speed"),
coalesce($"b.stats", $"a.stats").as("stats")
)
coalesce
работает, сначала пытаясь получить значение из diffDf
(b
) фрейма данных. Если это значение равно нулю, оно будет принимать значение от df1
(a
).
Результат при использовании только временного фильтра с предоставленными примерами входных данных:
+---------------+-------------------+-----+-----------------+
| id| dt|speed| stats|
+---------------+-------------------+-----+-----------------+
|358899055773504|2018-07-31 18:58:34| 0|[9,0,-1,22,0,1,0]|
|358899055773505|2018-07-31 18:54:23| 4| [9,0,0,22,1,1,1]|
+---------------+-------------------+-----+-----------------+