Предположим, у меня есть DataFrame
в Spark, состоящий из столбцов для id, даты и ряда свойств (скажем, x, y, z).К сожалению, DataFrame очень большой.К счастью, большинство записей являются записями «без изменений», в которых id, x, y и z одинаковы, и изменяется только дата.Например,
| date | id | x |
| -------- | -- | - |
| 20150101 | 1 | 1 |
| 20150102 | 1 | 1 |
| 20150103 | 1 | 1 |
| 20150104 | 1 | 1 |
| 20150105 | 1 | 2 |
| 20150106 | 1 | 2 |
| 20150107 | 1 | 2 |
| 20150108 | 1 | 2 |
может быть уменьшено до
| date | id | x |
| -------- | -- | - |
| 20150101 | 1 | 1 |
| 20150105 | 1 | 2 |
. Сначала я думал, что эта функция будет делать то, что я хотел
def filterToUpdates (df : DataFrame) = {
val colsData = df.column.filter(x => (x != "id" && x != "date"))
val window = Window.partitionBy(colsData).orderBy($"date".asc)
df.withColumn("row_num", row_number.over(window)).
select($"row_num" === 1).drop("row_num")
Но в случае неудачикогда мои столбцы данных изменятся, а затем вернутся назад.
например,
| date | id | x |
| -------- | -- | - |
| 20150101 | 1 | 1 |
| 20150102 | 1 | 1 |
| 20150103 | 1 | 1 |
| 20150104 | 1 | 1 |
| 20150105 | 1 | 2 |
| 20150106 | 1 | 2 |
| 20150107 | 1 | 1 |
| 20150108 | 1 | 1 |
будет преобразовано в
| date | id | x |
| -------- | -- | - |
| 20150101 | 1 | 1 |
| 20150105 | 1 | 2 |
вместо того, что я хочу:
| date | id | x |
| -------- | -- | - |
| 20150101 | 1 | 1 |
| 20150105 | 1 | 2 |
| 20150107 | 1 | 1 |
Это не было бы трудной задачей в процедурном коде, который передавал записи для строки по порядку (разделенный по id и упорядоченный по дате), но я просто не вижу, как сформулировать это каквычисление искры.
Примечание: это отличается от Оконная функция Spark SQL со сложным условием .Я пытаюсь отфильтровать строки, которые отличаются от предыдущих строк, что может быть сделано дополнительно после построения столбца became_inactive
в этом вопросе.