Искры Scala: удалить последовательные (по дате) дубликаты записей из кадра данных - PullRequest
0 голосов
/ 01 октября 2018

Вопрос касается работы с фреймами данных, я хочу удалить полностью дублированные записи, исключая некоторые поля (даты).Я попытался использовать windowFunction (WindowSpec) как:

val wFromDupl: WindowSpec = Window
  .partitionBy(comparateFields: _*)
  .orderBy(asc(orderField))

В переменной сравненияteFields я храню все поля, которые мне нужно проверить (в примере это были бы DESC1 и DESC2), чтобы исключить дубликаты, следующие заЛогика заключается в том, что при наличии дублирующейся записи мы отбрасываем записи с более высокой датой.

В переменной orderField я просто сохраняю полеffective_date.

Поэтому, применяя оконную функцию, чтоЯ делаю, вычисляю временный столбец, назначая наименьшую дату всем записям, которые являются дубликатами, и затем фильтрую dataFrame как:

 val dfFinal: DataFrame = dfInicial
    .withColumn("w_eff_date", min(col("effective_date")).over(wFromDupl))
  .filter(col("effective_date") === col("w_eff_date")) 
  .drop("w_eff_date")
  .distinct()
  .withColumn("effective_end_date", lead(orderField, 1, "9999-12-31").over(w))

Для следующего случая это работает правильно:

KEY EFFECTIVE_DATE  DESC 1  DESC 2  W_EFF_DATE (tmp)
E2  2000            A       B       2000
E2  2001            A       B       2000
E2  2002            AA      B       2002

Код сбрасывает вторую запись:

E2  2001            A       B       2000

Но для последовательных записей (в дате) должна применяться логика, например, для следующего случая, когда код реализован, мыудаление третьей записи (DESC1 и DESC2 одинаковы, минимальная дата - 2000), но мы этого не хотим, потому что имеем (по eff_datд) посередине (2001 AA B), поэтому мы хотим сохранить 3 записи

KEY EFFECTIVE_DATE  DESC1   DESC2   W_EFF_DATE (tmp)
E1     2000         A       B       2000
E1     2001         AA      B       2001
E1     2002         A       B       2000

Какой-нибудь совет по этому поводу?Спасибо всем!

1 Ответ

0 голосов
/ 01 октября 2018

Один из подходов заключается в использовании when/otherwise вместе с оконной функцией lag для определения того, какие строки сохранить, как показано ниже:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("E1", "2000", "A",  "B"),
  ("E1", "2001", "AA", "B"),
  ("E1", "2002", "A",  "B"),
  ("E1", "2003", "A",  "B"),
  ("E1", "2004", "A",  "B"),
  ("E2", "2000", "C",  "D"),
  ("E2", "2001", "C",  "D"),
  ("E2", "2002", "CC", "D"),
  ("E2", "2003", "C",  "D")
).toDF("key", "effective_date", "desc1", "desc2")

val compareCols = List("desc1", "desc2")

val win1 = Window.partitionBy("key").orderBy("effective_date")

val df2 = df.
  withColumn("compCols", struct(compareCols.map(col): _*)).
  withColumn("rowNum", row_number.over(win1)).
  withColumn("toKeep",
    when($"rowNum" === 1 || $"compCols" =!= lag($"compCols", 1).over(win1), true).
      otherwise(false)
  )

// +---+--------------+-----+-----+--------+------+------+
// |key|effective_date|desc1|desc2|compCols|rowNum|toKeep|
// +---+--------------+-----+-----+--------+------+------+
// | E1|          2000|    A|    B|   [A,B]|     1|  true|
// | E1|          2001|   AA|    B|  [AA,B]|     2|  true|
// | E1|          2002|    A|    B|   [A,B]|     3|  true|
// | E1|          2003|    A|    B|   [A,B]|     4| false|
// | E1|          2004|    A|    B|   [A,B]|     5| false|
// | E2|          2000|    C|    D|   [C,D]|     1|  true|
// | E2|          2001|    C|    D|   [C,D]|     2| false|
// | E2|          2002|   CC|    D|  [CC,D]|     3|  true|
// | E2|          2003|    C|    D|   [C,D]|     4|  true|
// +---+--------------+-----+-----+--------+------+------+

df2.where($"toKeep").select(df.columns.map(col): _*).
  show
// +---+--------------+-----+-----+
// |key|effective_date|desc1|desc2|
// +---+--------------+-----+-----+
// | E1|          2000|    A|    B|
// | E1|          2001|   AA|    B|
// | E1|          2002|    A|    B|
// | E2|          2000|    C|    D|
// | E2|          2002|   CC|    D|
// | E2|          2003|    C|    D|
// +---+--------------+-----+-----+
...