Spark: фильтр строк на основе значения столбца - PullRequest
1 голос
/ 26 июня 2019

У меня миллионы строк в виде фрейма данных, например:

val df = Seq(("id1", "ACTIVE"), ("id1", "INACTIVE"), ("id1", "INACTIVE"), ("id2", "ACTIVE"), ("id3", "INACTIVE"), ("id3", "INACTIVE")).toDF("id", "status")

scala> df.show(false)
+---+--------+
|id |status  |
+---+--------+
|id1|ACTIVE  |
|id1|INACTIVE|
|id1|INACTIVE|
|id2|ACTIVE  |
|id3|INACTIVE|
|id3|INACTIVE|
+---+--------+

Теперь я хочу разделить эти данные на три отдельных фрейма данных, например:

  1. Только идентификаторы ACTIVE (например,id2), скажем activeDF
  2. Только неактивные идентификаторы (например, id3), скажем, inactiveDF
  3. Имея как ACTIVE, так и INACTIVE в качестве статуса, скажем обаDF

Как рассчитать activeDF и inactiveDF ?

Я знаю, что bothDF можно рассчитать как

df.select("id").distinct.except(activeDF).except(inactiveDF)

, но это потребуеттасование (так как «отдельная» операция требует того же самого).Есть ли лучший способ рассчитать оба DF

Версии:

Spark : 2.2.1
Scala : 2.11

Ответы [ 2 ]

2 голосов
/ 26 июня 2019

Самым элегантным решением является поворот на status

val counts = df
  .groupBy("id")
  .pivot("status", Seq("ACTIVE", "INACTIVE"))
  .count

или эквивалентный прямой agg

val counts = df
  .groupBy("id")
  .agg(
    count(when($"status" === "ACTIVE", true)) as "ACTIVE",
    count(when($"status" === "INACTIVE", true)) as "INACTIVE"
  )

с последующим простым CASE ... WHEN:

val result = counts.withColumn(
  "status",
  when($"ACTIVE" === 0, "INACTIVE")
    .when($"inactive" === 0, "ACTIVE")
    .otherwise("BOTH")
)

result.show
+---+------+--------+--------+                                                  
| id|ACTIVE|INACTIVE|  status|
+---+------+--------+--------+
|id3|     0|       2|INACTIVE|
|id1|     1|       2|    BOTH|
|id2|     1|       0|  ACTIVE|
+---+------+--------+--------+

Позже вы можете разделить result с помощью filters или создать дамп на диск с источником, который поддерживает partitionBy ( Как разбить информационный кадр на информационные кадры с такими же значениями столбцов? ).

1 голос
/ 26 июня 2019

просто по-другому - groupBy, собирать как установлено, а затем, если размер набора равен 1, он либо активен, либо неактивен, либо оба

scala> val df = Seq(("id1", "ACTIVE"), ("id1", "INACTIVE"), ("id1", "INACTIVE"), ("id2", "ACTIVE"), ("id3", "INACTIVE"), ("id3", "INACTIVE"), ("id4", "ACTIVE"), ("id5", "ACTIVE"), ("id6", "INACTIVE"), ("id7", "ACTIVE"), ("id7", "INACTIVE")).toDF("id", "status")
df: org.apache.spark.sql.DataFrame = [id: string, status: string]

scala> df.show(false)
+---+--------+
|id |status  |
+---+--------+
|id1|ACTIVE  |
|id1|INACTIVE|
|id1|INACTIVE|
|id2|ACTIVE  |
|id3|INACTIVE|
|id3|INACTIVE|
|id4|ACTIVE  |
|id5|ACTIVE  |
|id6|INACTIVE|
|id7|ACTIVE  |
|id7|INACTIVE|
+---+--------+


scala> val allstatusDF = df.groupBy("id").agg(collect_set("status") as "allstatus")
allstatusDF: org.apache.spark.sql.DataFrame = [id: string, allstatus: array<string>]

scala> allstatusDF.show(false)
+---+------------------+
|id |allstatus         |
+---+------------------+
|id7|[ACTIVE, INACTIVE]|
|id3|[INACTIVE]        |
|id5|[ACTIVE]          |
|id6|[INACTIVE]        |
|id1|[ACTIVE, INACTIVE]|
|id2|[ACTIVE]          |
|id4|[ACTIVE]          |
+---+------------------+


scala> allstatusDF.withColumn("status", when(size($"allstatus") === 1, $"allstatus".getItem(0)).otherwise("BOTH")).show(false)
+---+------------------+--------+
|id |allstatus         |status  |
+---+------------------+--------+
|id7|[ACTIVE, INACTIVE]|BOTH    |
|id3|[INACTIVE]        |INACTIVE|
|id5|[ACTIVE]          |ACTIVE  |
|id6|[INACTIVE]        |INACTIVE|
|id1|[ACTIVE, INACTIVE]|BOTH    |
|id2|[ACTIVE]          |ACTIVE  |
|id4|[ACTIVE]          |ACTIVE  |
+---+------------------+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...