Как выбрать отдельные строки в разделе Spark Window - PullRequest
0 голосов
/ 08 января 2020

У меня есть образец DF с дублирующимися строками, подобными этому:

+-------------------+--------------------+----+-----------+-------+----------+
|ID                  |CL_ID               |NBR |DT         |TYP    |KEY       |
+--------------------+--------------------+----+-----------+-------+----------+
|1000031075_20190422 |10017157594301072477|10  |2019-04-24 |N      |0000000000|
|1000031075_20190422 |10017157594301072477|10  |2019-04-24 |N      |0000000000|
|1006473016_20190421 |10577157412800147475|11  |2019-04-21 |N      |0000000000|
|1006473016_20190421 |10577157412800147475|11  |2019-04-21 |N      |0000000000|
+--------------------+--------------------+----+-----------+-------+----------+

val w = Window.partitionBy($"ENCOUNTER_ID")

Можно ли выбрать отдельные строки с помощью приведенного выше раздела Spark Window? Я ожидаю, что выходной DF будет:

+-------------------+--------------------+----+-----------+-------+----------+
|ID                  |CL_ID               |NBR |DT         |TYP    |KEY       |
+--------------------+--------------------+----+-----------+-------+----------+
|1000031075_20190422 |10017157594301072477|10  |2019-04-24 |N      |0000000000|
|1006473016_20190421 |10577157412800147475|11  |2019-04-21 |N      |0000000000|
+--------------------+--------------------+----+-----------+-------+----------+

Я не хочу использовать DF.DISTINCT или DF.DROPDUPLICATES, так как это потребует тасования. Я предпочитаю не использовать задержку или опережение, потому что в реальном времени порядок строк не может быть гарантирован.

Ответы [ 2 ]

0 голосов
/ 01 марта 2020

Точный ответ на ваш вопрос, который масштабируется:

df.dropDuplicates(include your key cols here = ID in this case). 

Оконная функция перемешивает данные, но если у вас есть повторяющиеся записи и вы хотите выбрать, какой из них оставить, например, или хотите суммировать значение из дубликатов, тогда оконная функция - это путь к go

w = Window.PartitionBy('id')
df.agg(first( value col ).over(w)) #you can use max, min, sum, first, last depending on how you want to treat duplicates

Интересная третья возможность, если вы хотите сохранить значения дубликатов (для записи), приведена ниже до

df.withColumn('dup_values',  collect(value_col).over(w)) 

это создаст дополнительный столбец с массивом на строку, чтобы сохранить повторяющиеся значения после того, как вы избавились от строк

0 голосов
/ 09 января 2020

Функция окна также перетасовывает данные. Так что, если все ваши столбцы дублируются, тогда лучше использовать df.dropDuplicates. Если ваш вариант использования хочет использовать функцию Window, то вы можете использовать следующий подход.

scala> df.show()
+-------------------+--------------------+---+----------+---+----------+
|                 ID|               CL_ID|NBR|        DT|TYP|       KEY|
+-------------------+--------------------+---+----------+---+----------+
|1000031075_20190422|10017157594301072477| 10|2019-04-24|  N|0000000000|
|1000031075_20190422|10017157594301072477| 10|2019-04-24|  N|0000000000|
|1006473016_20190421|10577157412800147475| 11|2019-04-21|  N|0000000000|
|1006473016_20190421|10577157412800147475| 11|2019-04-21|  N|0000000000|
+-------------------+--------------------+---+----------+---+----------+

//You can use column in partitionBy  that need to check for duplicate and also use respective orderBy also as of now I have use sample Window

scala> val W  = Window.partitionBy(col("ID"),col("CL_ID"),col("NBR"),col("DT"), col("TYP"), col("KEY")).orderBy(lit(1))

scala> df.withColumn("duplicate", when(row_number.over(W) === lit(1), lit("Y")).otherwise(lit("N")))
              .filter(col("duplicate") === lit("Y"))
              .drop("duplicate")
              .show()
+-------------------+--------------------+---+----------+---+----------+
|                 ID|               CL_ID|NBR|        DT|TYP|       KEY|
+-------------------+--------------------+---+----------+---+----------+
|1000031075_20190422|10017157594301072477| 10|2019-04-24|  N|0000000000|
|1006473016_20190421|10577157412800147475| 11|2019-04-21|  N|0000000000|
+-------------------+--------------------+---+----------+---+----------+
...