Прежде всего не используйте:
map { ... => (..., Array(...)) }.reduceByKey(_ ++ _)
Это так же неэффективно, как и получается. Чтобы сгруппировать такие значения, используя RDD, вы должны использовать groupByKey
.
Дополнительно только к groupByKey
впоследствии довольно расточительно. Вы делаете ту же самую работу (группирование по ключу) с правой стороны дважды. Более разумно использовать cogroup
напрямую (так работают соединения RDD) и flatMap
val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))
val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))
val rdd = rdd1
.cogroup(rdd2)
.flatMapValues { case (left, right) => left.map((_, right)) }
.map { case (k1, (k2, vs)) => ((k1, k2), vs) }
Вы также можете использовать DataSet
API, который в таких случаях эффективнее
import org.apache.spark.sql.functions.collect_list
val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")
df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
Результат:
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
Если пересечение наборов ключей невелико, вы можете попытаться оптимизировать процесс, сначала применив фильтр
val should_keep = {
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))
}
df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
При использовании Dataset
API обязательно настройте spark.sql.shuffle.partitions
, чтобы отразить объем обрабатываемых данных.
Примечание
Ничто из этого не поможет вам, если количество дубликатов в rdd2
велико. В таком случае невозможно сформулировать общую формулировку проблемы, и вы должны попытаться переформулировать ее с учетом требований последующего процесса.