Искра Исключение «Невозможно передать таблицу, размер которой превышает 8 ГБ», «spark. sql .autoBroadcastJoinThreshold»: «-1» не работает - PullRequest
0 голосов
/ 22 апреля 2020

В одном из наших заданий Pyspark у нас есть сценарий, в котором мы выполняем соединение между большим фреймом данных и относительно меньшим фреймом данных. Я полагаю, что spark использует широковещательное соединение, и мы столкнулись со следующей ошибкой

org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8GB: 8 GB
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:103)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withExecutionId$1.apply(SQLExecution.scala:101)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:98)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Я попытался отключить широковещательное соединение, установив 'spark. sql .autoBroadcastJoinThreshold': '-1' в составе сообщения об отправке

/usr/bin/spark-submit --conf spark.sql.autoBroadcastJoinThreshold=-1 /home/hadoop/scripts/job.py 

Я попытался напечатать значение spark. sql .autoBroadcastJoinThreshold с использованием

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

и возвращает -1. Однако даже после этого изменения я получаю сообщение об ошибке

   org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8GB: 8 GB

Версия Spark Spark 2.3.0

Любая помощь приветствуется.

Ответы [ 2 ]

0 голосов
/ 23 апреля 2020

Возможно, вы явно используете функцию широковещания. Даже если вы установите spark. sql .autoBroadcastJoinThreshold = -1 и явно используете функцию широковещания, она выполнит широковещательное соединение.

Другая причина может заключаться в том, что вы выполняете декартово соединение / неравное соединение, которое заканчивающийся в Broadcastted Nested l oop join (BNLJ join). Как уже упоминалось, вам лучше использовать объяснение и понимать, что происходит.

Чтобы преобразовать оптимизированный логический план в физический план, Spark использует некоторый набор стратегий. Для объединений Spark использует JoinSelection.

Способ работы описан здесь - https://github.com/apache/spark/blob/aefb2e7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L326

Требования к выбору физического оператора объединения для BroadcastNestedLoopJoinExe c -

Нет ключей объединения, и выполняется одно из следующих действий:

1) Тип соединения: CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI или ExistenceJoin (т. Е. CanBuildRight для входного joinType положителен) и правая сторона соединения может быть передана

2) Тип соединения: CROSS, INNER или RIGHT OUTER (т. е. canBuildLeft для входного joinType положителен), а левая сторона соединения может быть передана

ИЛИ

Ни один другой оператор соединения уже не соответствует

0 голосов
/ 22 апреля 2020

Почему бы вам не объяснить соединение и не увидеть физический план? По умолчанию он присоединяется с использованием широковещательной рассылки, и если вы отключите его, он будет использовать сортировку join

print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
# should give 10Mb as default

, а если вы отключите его

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
#-1

, лучше использовать объяснение и понять что происходит.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...