Spark: Оптимизация левого внешнего соединения RDD для дублирующих ключей - PullRequest
0 голосов
/ 15 ноября 2018

СЦЕНАРИЙ

Я пытаюсь написать программу Spark, которая эффективно выполняет левое внешнее соединение между двумя RDD. Одно предостережение: эти RDD могут иметь дублирующиеся ключи , что, по-видимому, приводит к неэффективности всей программы.

То, чего я пытаюсь достичь, просто:

  • С учетом двух СДР: rdd1 и rdd2 (оба имеют одинаковую структуру: (k, v))
  • Используя rdd1 и rdd2, сгенерировать еще один СДР rdd3, имеющий структуру: (k1, v1, List(v2..))
  • k1 и v1 взяты из rdd1 (при одинаковых значениях rdd1 и rdd3 имеют одинаковую длину)
  • List(v2..) - список, значения которого берутся из значений rdd2
  • Чтобы добавить rdd2 v в список в кортеже rdd3, его k (ключ от rdd2) должен совпадать с k из rdd1

МОЯ ПОПЫТКА

Мой подход состоял в том, чтобы использовать левое внешнее соединение. Итак, я придумал что-то вроде этого:

rdd1.leftOuterJoin(rdd2).map{case(k, (v1, v2)) => ((k, v1), Array(v2))}
                        .reduceByKey(_ ++ _)

Это на самом деле дает результат, который я пытаюсь достичь. Но когда я использую огромные данные, программа становится очень медленной.

ПРИМЕР

На всякий случай, если моя идея еще не ясна, у меня есть следующий пример:

Учитывая два RDD, которые имеют следующие данные:

rdd1

key | value
-----------
 1  |  a
 1  |  b
 1  |  c
 2  |  a
 2  |  b
 3  |  c

rdd2

key | value
-----------
 1  |  v
 1  |  w
 1  |  x
 1  |  y
 1  |  z
 2  |  v
 2  |  w
 2  |  x
 3  |  y
 4  |  z

Полученный rdd3 должен быть

key | value | list
------------------------
1   |   a   |  v,w,x,y,z
1   |   b   |  v,w,x,y,z
1   |   c   |  v,w,x,y,z
2   |   a   |  v,w,x
2   |   b   |  v,w,x
3   |   c   |  y

1 Ответ

0 голосов
/ 15 ноября 2018

Прежде всего не используйте:

map { ... => (..., Array(...)) }.reduceByKey(_ ++ _)

Это так же неэффективно, как и получается. Чтобы сгруппировать такие значения, используя RDD, вы должны использовать groupByKey.

Дополнительно только к groupByKey впоследствии довольно расточительно. Вы делаете ту же самую работу (группирование по ключу) с правой стороны дважды. Более разумно использовать cogroup напрямую (так работают соединения RDD) и flatMap

val rdd1 = sc.parallelize(Seq(
  (1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))

val rdd2 = sc.parallelize(Seq(
  (1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
  (2, "w"), (2, "x"), (3, "y"),(4, "z")
))

val rdd = rdd1
  .cogroup(rdd2)
  .flatMapValues { case (left, right) => left.map((_, right)) }
  .map { case (k1, (k2, vs)) => ((k1, k2), vs) }

Вы также можете использовать DataSet API, который в таких случаях эффективнее

import org.apache.spark.sql.functions.collect_list

val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")


df2.groupBy("k")
 .agg(collect_list("v").as("list"))
 .join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
 .show

Результат:

+---+---------------+---+                 
|  k|           list|  v|
+---+---------------+---+
|  1|[v, w, x, y, z]|  a|
|  1|[v, w, x, y, z]|  b|
|  1|[v, w, x, y, z]|  c|
|  3|            [y]|  c|
|  2|      [v, w, x]|  a|
|  2|      [v, w, x]|  b|
+---+---------------+---+

Если пересечение наборов ключей невелико, вы можете попытаться оптимизировать процесс, сначала применив фильтр

val should_keep = {
  val f = df1.stat.bloomFilter("k", df1.count, 0.005)
  udf((x: Any) => f.mightContain(x))
}


df2.where(should_keep($"k")).groupBy("k")
 .agg(collect_list("v").as("list"))
 .join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
 .show
+---+---------------+---+
|  k|           list|  v|
+---+---------------+---+
|  1|[v, w, x, y, z]|  a|
|  1|[v, w, x, y, z]|  b|
|  1|[v, w, x, y, z]|  c|
|  3|            [y]|  c|
|  2|      [v, w, x]|  a|
|  2|      [v, w, x]|  b|
+---+---------------+---+

При использовании Dataset API обязательно настройте spark.sql.shuffle.partitions, чтобы отразить объем обрабатываемых данных.

Примечание

Ничто из этого не поможет вам, если количество дубликатов в rdd2 велико. В таком случае невозможно сформулировать общую формулировку проблемы, и вы должны попытаться переформулировать ее с учетом требований последующего процесса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...