Как удалить дубликаты из PySpark Dataframe и изменить значение оставшегося столбца на null - PullRequest
1 голос
/ 08 января 2020

Я новичок в Pyspark. У меня есть фрейм данных Pyspark, и я хочу удалить дубликаты на основе столбца id и timestamp. Затем я хочу заменить значение чтения идентификатора дубликата на ноль. Я не хочу использовать Pandas. Пожалуйста, смотрите ниже:

Фрейм данных:

id       reading      timestamp
1        13015        2018-03-22 08:00:00.000        
1        14550        2018-03-22 09:00:00.000
1        14570        2018-03-22 09:00:00.000
2        15700        2018-03-22 08:00:00.000
2        16700        2018-03-22 09:00:00.000
2        18000        2018-03-22 10:00:00.000

Желаемый вывод:

id       reading      timestamp
1        13015        2018-03-22 08:00:00.000        
1        Null         2018-03-22 09:00:00.000
2        15700        2018-03-22 08:00:00.000
2        16700        2018-03-22 09:00:00.000
2        18000        2018-03-22 10:00:00.000

Как мне добавить этот код:

df.dropDuplicates(['id','timestamp'])

Любая помощь будет высоко ценится. Большое спасибо

Ответы [ 2 ]

1 голос
/ 08 января 2020

В одну сторону с помощью оконной функции для подсчета дубликатов по разделу id, timestamp и последующего обновления reading в зависимости от количества:

from pyspark.sql import Window

w = Window.partitionBy("id", "timestamp").orderBy("timestamp")

df.select(col("id"),
          when(count("*").over(w) > lit(1), lit(None)).otherwise(col("reading")).alias("reading"),
          col("timestamp")
          ) \
  .dropDuplicates(["id", "reading", "timestamp"]).show(truncate=False)

Или с помощью группировки по:

df.groupBy("id", "timestamp").agg(first("reading").alias("reading"), count("*").alias("cn")) \
  .withColumn("reading", when(col("cn") > lit(1), lit(None)).otherwise(col("reading"))) \
  .select(*df.columns) \
  .show(truncate=False)

Дает:

+---+-------+-----------------------+
|id |reading|timestamp              |
+---+-------+-----------------------+
|1  |null   |2018-03-22 09:00:00.000|
|1  |13015  |2018-03-22 08:00:00.000|
|2  |18000  |2018-03-22 10:00:00.000|
|2  |15700  |2018-03-22 08:00:00.000|
|2  |16700  |2018-03-22 09:00:00.000|
+---+-------+-----------------------+
1 голос
/ 08 января 2020

Вкл. Scala может быть выполнено с группированием и заменой «чтения» значений на ноль, где число больше одного:

val df = Seq(
  (1, 13015, "2018-03-22 08:00:00.000"),
  (1, 14550, "2018-03-22 09:00:00.000"),
  (1, 14570, "2018-03-22 09:00:00.000"),
  (2, 15700, "2018-03-22 08:00:00.000"),
  (2, 16700, "2018-03-22 09:00:00.000"),
  (2, 18000, "2018-03-22 10:00:00.000")
).toDF("id", "reading", "timestamp")

// action
df
  .groupBy("id", "timestamp")
  .agg(
    min("reading").alias("reading"),
    count("reading").alias("readingCount")
  )
  .withColumn("reading", when($"readingCount" > 1, null).otherwise($"reading"))
  .drop("readingCount")

Вывод:

+---+-----------------------+-------+
|id |timestamp              |reading|
+---+-----------------------+-------+
|2  |2018-03-22 09:00:00.000|16700  |
|1  |2018-03-22 08:00:00.000|13015  |
|1  |2018-03-22 09:00:00.000|null   |
|2  |2018-03-22 10:00:00.000|18000  |
|2  |2018-03-22 08:00:00.000|15700  |
+---+-----------------------+-------+

Угадай , можно легко перевести в Python.

...