Многие ко многим присоединяются к большим наборам данных в Spark - PullRequest
1 голос
/ 19 марта 2020

У меня есть два больших набора данных, A и B, к которым я должен присоединиться sh по ключу K.

Каждый набор данных содержит много строк с одинаковым значением K, так что это множество для -мани объединение.

Это объединение завершается ошибкой с ошибками, связанными с памятью, если я просто наивно пытаюсь это сделать.

Давайте также скажем, группирование обоих наборов данных по K, выполнение объединения, а затем повторение с некоторыми хитростями получить правильный результат нереально, опять же из-за проблем с памятью

Были ли найдены какие-нибудь хитрые уловки, которые повышают вероятность этой работы?


Обновление:

Добавление очень, очень надуманного конкретного примера:

spark-shell --master local[4] --driver-memory 5G --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=10000 --conf spark.default.parallelism=10000

val numbersA = (1 to 100000).toList.toDS
val numbersWithDataA = numbersA.repartition(10000).map(n => (n, 1, Array.fill[Byte](1000*1000)(0)))
numbersWithDataA.write.mode("overwrite").parquet("numbersWithDataA.parquet")

val numbersB = (1 to 100).toList.toDS
val numbersWithDataB = numbersB.repartition(100).map(n => (n, 1, Array.fill[Byte](1000*1000)(0)))
numbersWithDataB.write.mode("overwrite").parquet("numbersWithDataB.parquet")


val numbersWithDataInA = spark.read.parquet("numbersWithDataA.parquet").toDF("numberA", "one", "dataA")
val numbersWithDataInB = spark.read.parquet("numbersWithDataB.parquet").toDF("numberB", "one", "dataB")

numbersWithDataInA.join(numbersWithDataInB, Seq("one")).write.mode("overwrite").parquet("joined.parquet")

Неудача с Caused by: java.lang.OutOfMemoryError: Java heap space

1 Ответ

2 голосов
/ 20 марта 2020
--conf spark.sql.autoBroadcastJoinThreshold=-1

означает, что вы отключаете функцию вещания.

Вы можете изменить ее на любое подходящее значение <2 ГБ (<a href="https://stackoverflow.com/a/41046320/647053">, так как ограничение 2 ГБ существует ). spark.sql.autoBroadcastJoinThreshold по умолчанию 10 МБ в соответствии с документация искры . Я не знаю причину, по которой вы его отключили. если вы отменяете его, SparkStregies переключит путь к объединению с сортировкой или к перемешиванию с sh соединением. подробности см. В моей статье

Остальное Я не думаю, что есть необходимость что-либо менять в качестве общей схемы объединения двух наборов данных.

Дальнейшее чтение Оптимизация объединения в DataFrame - Broadcast Ha sh Присоединиться

ОБНОВЛЕНИЕ: Альтернативно В вашем реальном примере (не выдумано :-)) вы можете сделать эти шаги

Шаги:

1) Каждый набор данных находит ключ соединения (может быть, например, получение категории уникальный / отличный или страна или поле штата) и собирать их как массив, так как его небольшие данные вы можете собирать.

2) Для каждого элемента категории в массиве объедините 2 набора данных (играя с небольшими объединениями наборов данных) с категорией, в которой условие добавляется к последовательности фреймов данных.

3) уменьшает и объединяет эти dataframes. scala пример:

val dfCatgories = Seq(df1Category1, df2Category2, df3Category3)
dfCatgories.reduce(_ union _)

Примечание: для каждого соединения я по-прежнему предпочитаю BHJ, так как он будет меньше / без перемешивания

...