удалить дубликаты записей на основе других столбцов pyspark - PullRequest
0 голосов
/ 03 мая 2018

У меня есть data frame в pyspark, как показано ниже.

df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  1|   N|
|  2|   Y|
|  3|   N|
+---+----+

Я хочу удалить запись, если есть дубликат id и test равен N

Теперь, когда я запрашиваю new_df

new_df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  2|   Y|
|  3|   N|
+---+----+

Я не могу определить вариант использования.

Я сделал сгруппировку на счетчике id, но он дает только столбец id и count.

Я сделал, как показано ниже.

grouped_df = new_df.groupBy("id").count()

Как мне достичь желаемого результата

редактировать

У меня есть фрейм данных, как показано ниже.

+-------------+--------------------+--------------------+
|           sn|              device|           attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   Y|
|4VT1735J00337|                  TV|                   N|
|4VT1735J00337|                  TV|                   Y|
|4VT47B52003EE|              Router|                   N|
|4VT47C5N00A10|               Other|                   N|
+-------------+--------------------+--------------------+

Когда я закончу, как показано ниже

new_df = df.groupBy("sn").agg(max("attribute").alias("attribute"))

Я получаю str has no attribute alias Ошибка

Ожидаемый результат должен быть как ниже

+-------------+--------------------+--------------------+
|           sn|              device|           attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   Y|
|4VT1735J00337|                  TV|                   Y|
|4VT47B52003EE|              Router|                   N|
|4VT47C5N00A10|               Other|                   N|
+-------------+--------------------+--------------------+

Ответы [ 4 ]

0 голосов
/ 14 января 2019

Вы можете использовать следующий код:

#register as temp table
df.registerTempTable("df")

#create single rows
newDF = sqlc.sql(WITH dfCte AS 
(
    select *,row_number() over (partition by id order by test desc) as RowNumber
    from df
)
select * from dfCte where RowNumber =1)

#drop row numbers and show the newdf
newDF.drop('RowNumber').show()
0 голосов
/ 03 мая 2018

Другая опция, использующая row_number:

df.selectExpr(
    '*', 
    'row_number() over (partition by id order by test desc) as rn'
).filter('rn=1 or test="Y"').drop('rn').show()

+---+----+
| id|test|
+---+----+
|  1|   Y|
|  3|   N|
|  2|   Y|
+---+----+

Этот метод не агрегирует, а только удаляет дублированные идентификаторы, когда тест равен N

0 голосов
/ 04 мая 2018

Используя временные таблицы Spark SQL, я использовал Блокнот данных

case class T(id:Int,test:String)
val df=spark.createDataset(Seq(T(1, "Y"), T(1, "N"), T(2, "Y"), T(3, "N")))
df.createOrReplaceTempView("df")
%sql select id, max(test) from df group by id

enter image description here

0 голосов
/ 03 мая 2018

Не самое универсальное решение, но должно подходить здесь:

from pyspark.sql.functions import max

df = spark.createDataFrame(
  [(1, "Y"), (1, "N"), (2, "Y"), (3, "N")], ("id", "test")
)

df.groupBy("id").agg(max("test").alias("test")).show()
# +---+----+         
# | id|test|
# +---+----+
# |  1|   Y|
# |  3|   N|
# |  2|   Y|
# +---+----+

Более общий:

from pyspark.sql.functions import col, count, when

test = when(count(when(col("test") == "Y", "Y")) > 0, "Y").otherwise("N")

df.groupBy("id").agg(test.alias("test")).show()
# +---+----+
# | id|test|
# +---+----+
# |  1|   Y|
# |  3|   N|
# |  2|   Y|
# +---+----+

, который можно обобщить для размещения большего количества классов и нетривиального упорядочения, например, если у вас есть три класса Y, ?, N, вычисленные в этом порядке, вы можете:

(when(count(when(col("test") == "Y", True)) > 0, "Y")
     .when(count(when(col("test") == "?", True)) > 0, "?")
     .otherwise("N"))

Если есть другие столбцы, которые вам нужно сохранить, эти методы не будут работать, и вам нужно что-то вроде показанного в Найти максимальный ряд на группу в Spark DataFrame

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...