Scala-Spark: фильтрация производительности и оптимизации DataFrame - PullRequest
1 голос
/ 18 апреля 2019

То, чего я пытаюсь достичь, довольно просто: я хочу проверить все идентификаторы (uuid), если они испытывают определенный «статус» (поведенческий статус). Если они это сделают, то верните мне все записи, связанные с этим идентификатором. Например, если один из идентификаторов ниже имеет статус «три», я хочу сохранить все эти записи, связанные с этим идентификатором. Пока что я могу достичь этого двумя способами:

// first method
val dfList = df.filter($"status" === "three").select($"id").distinct.map(_.getString(0)).collect.toList
val dfTransformedOne = df.filter($"id".isin(dfList:_*))

// second method
val dfIds = df.filter($"status" === "three").select($"id").distinct
val dfTransformedTwo = df.join(broadcast(dfIds), Seq("id"))

Приведенные выше два метода отлично работают с образцами данных, с которыми я работаю, однако у меня возникают некоторые проблемы с производительностью, когда я начинаю увеличивать объем обрабатываемых данных, поскольку у меня могут быть миллионы до сотен миллионов идентификаторов, которые Мне нужно отфильтровать для. Есть ли более эффективный способ сделать вышеупомянутое, или это просто случай увеличения аппаратного обеспечения, которое я использую?

Ниже приведен пример данных и ожидаемый результат.

val df = Seq(
  ("1234", "one"), 
  ("1234", "two"), 
  ("1234", "three"), 
  ("234", "one"), 
  ("234", "one"), 
  ("234", "two")
  ).toDF("id", "status")

df.show
+----+------+
|  id|status|
+----+------+
|1234|   one|
|1234|   two|
|1234| three|
| 234|   one|
| 234|   one|
| 234|   two|
+----+------+

dfTransformed.show()
+----+------+
|  id|status|
+----+------+
|1234|   one|
|1234|   two|
|1234| three|
+----+------+

1 Ответ

1 голос
/ 18 апреля 2019

Группировка и агрегирование перед фильтрацией приведут к случайному перемешиванию, исключая необходимость собирать большой список для драйвера. Скорость работы зависит от вашего распределения данных, размера кластера и сетевого подключения. Это, вероятно, стоит проверить, хотя:

val df = Seq(
  ("1234", "one"), 
  ("1234", "two"), 
  ("1234", "three"), 
  ("234", "one"), 
  ("234", "one"), 
  ("234", "two")
  ).toDF("id", "status")

df.groupBy("id")
  .agg(collect_list("status").as("statuses"))
  .filter(array_contains($"statuses", "three"))
  .withColumn("status", explode($"statuses"))
  .select("id", "status")
  .show(false)

Дает ожидаемый результат:

+----+------+
|id  |status|
+----+------+
|1234|one   |
|1234|two   |
|1234|three |
+----+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...