фрейм данных искры заменяет значение значением из другой строки - PullRequest
0 голосов
/ 11 июня 2019

У меня есть df с большим количеством столбцов, но моя проблема с двумя столбцами:

val df = Seq(("id1","unknown"),("id1","type1"),("id1","unknown"),("id2","typeX"),
             ("id2","typeX"),("id2","unknown"),("id5","typeY"),("id2","unknown"))
    .toDF("ID","TYPE")
+---+-------+
| ID|   TYPE|
+---+-------+
|id1|unknown|
|id1|  type1|
|id1|unknown|
|id2|  typeX|
|id2|  typeX|
|id2|unknown|
|id5|  typeY|
|id2|unknown|
+---+-------+

Я хочу заменить тип «unknown» типом, который соответствует идентификатору.Результаты должны выглядеть следующим образом:

+---+-----+
| ID| TYPE|
+---+-----+
|id1|type1|
|id1|type1|
|id1|type1|
|id2|typeX|
|id2|typeX|
|id2|typeX|
|id5|typeY|
|id2|typeX|
+---+-----+

Он не может быть жестко запрограммирован (с when id1 -> type1 и т. Д.), Потому что у меня 300 000 идентификаторов, которые меняются каждую неделю ...

Вотчто я уже пробовал:

val w = Window.partitionBy("ID")

df.withColumn("TYPE",collect_list("TYPE").over(w))

+---+--------------------------------+
|ID |TYPE                            |
+---+--------------------------------+
|id5|[typeY]                         |
|id1|[unknown, type1, unknown]       |
|id1|[unknown, type1, unknown]       |
|id1|[unknown, type1, unknown]       |
|id2|[typeX, typeX, unknown, unknown]|
|id2|[typeX, typeX, unknown, unknown]|
|id2|[typeX, typeX, unknown, unknown]|
|id2|[typeX, typeX, unknown, unknown]|
+---+--------------------------------+

df.withColumn("TYPE",typeProcessingUDF(col("TYPE")))

+---+-----+
| ID| TYPE|
+---+-----+
|id5|typeY|
|id1|type1|
|id1|type1|
|id1|type1|
|id2|typeX|
|id2|typeX|
|id2|typeX|
|id2|typeX|
+---+-----+

def dtypeProcessing(dtypeList : mutable.WrappedArray[String]) : String = {
    val dtype = dtypeList
        .filter(element => element!= "unknown" && element!="")
        .distinct
    dtype.length match {
        case 0 => "Unknown"
        case x if x >1 => "Unknown"
        case x if x ==1 => dtype(0)
    }
}
val typeProcessingUDF = udf(dtypeProcessing _)

Это работает,

Но это не обрабатывает все ситуации, рассматривающие случаи:

  • if [type1,type2] => return "Unknown"
  • if [type1,type2,type2] => return type2

1 Ответ

0 голосов
/ 11 июня 2019

С окном "ID" и функцией "first" с игнорируемыми нулями:

val idWindow = Window.partitionBy("ID")
val unknownToNull = when($"TYPE" === "unknown", null).otherwise($"TYPE")
val result = df.withColumn("TYPE",
  coalesce(unknownToNull,
    first(unknownToNull, ignoreNulls = true).over(idWindow)
  )
)

Выход:

+---+-----+
|ID |TYPE |
+---+-----+
|id1|type1|
|id1|type1|
|id1|type1|
|id2|typeX|
|id2|typeX|
|id2|typeX|
|id2|typeX|
|id5|typeY|
+---+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...