Фильтрация искрового DataFrame до обновлений - PullRequest
0 голосов
/ 11 июня 2018

Предположим, у меня есть DataFrame в Spark, состоящий из столбцов для id, даты и ряда свойств (скажем, x, y, z).К сожалению, DataFrame очень большой.К счастью, большинство записей являются записями «без изменений», в которых id, x, y и z одинаковы, и изменяется только дата.Например,

| date     | id | x |
| -------- | -- | - |
| 20150101 | 1  | 1 |
| 20150102 | 1  | 1 |
| 20150103 | 1  | 1 |
| 20150104 | 1  | 1 |
| 20150105 | 1  | 2 |
| 20150106 | 1  | 2 |
| 20150107 | 1  | 2 |
| 20150108 | 1  | 2 |

может быть уменьшено до

| date     | id | x |
| -------- | -- | - |
| 20150101 | 1  | 1 |
| 20150105 | 1  | 2 |

. Сначала я думал, что эта функция будет делать то, что я хотел

def filterToUpdates (df : DataFrame) = {
  val colsData = df.column.filter(x => (x != "id" && x != "date"))
  val window = Window.partitionBy(colsData).orderBy($"date".asc)
  df.withColumn("row_num", row_number.over(window)).
    select($"row_num" === 1).drop("row_num")

Но в случае неудачикогда мои столбцы данных изменятся, а затем вернутся назад.

например,

| date     | id | x |
| -------- | -- | - |
| 20150101 | 1  | 1 |
| 20150102 | 1  | 1 |
| 20150103 | 1  | 1 |
| 20150104 | 1  | 1 |
| 20150105 | 1  | 2 |
| 20150106 | 1  | 2 |
| 20150107 | 1  | 1 |
| 20150108 | 1  | 1 |

будет преобразовано в

| date     | id | x |
| -------- | -- | - |
| 20150101 | 1  | 1 |
| 20150105 | 1  | 2 |

вместо того, что я хочу:

| date     | id | x |
| -------- | -- | - |
| 20150101 | 1  | 1 |
| 20150105 | 1  | 2 |
| 20150107 | 1  | 1 |

Это не было бы трудной задачей в процедурном коде, который передавал записи для строки по порядку (разделенный по id и упорядоченный по дате), но я просто не вижу, как сформулировать это каквычисление искры.

Примечание: это отличается от Оконная функция Spark SQL со сложным условием .Я пытаюсь отфильтровать строки, которые отличаются от предыдущих строк, что может быть сделано дополнительно после построения столбца became_inactive в этом вопросе.

1 Ответ

0 голосов
/ 11 июня 2018

Вы можете легко использовать lag.Окно

val window = Window.partitionBy($"id").orderBy($"date".asc)

Колонка

import org.apache.spark.sql.functions.{coalesce, lag, lit}

val keep = coalesce(lag($"x", 1).over(window) =!= $"x", lit(true))

df.withColumn("keep", keep).where($"keep").drop("keep").show

// +--------+---+---+
// |    date| id|  x|
// +--------+---+---+
// |20150101|  1|  1|
// |20150105|  1|  2|
// |20150107|  1|  1|
// +--------+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...