Я использую приложение spark в конфигурации ниже:
1 мастер, 2 рабочих узла.
Каждый работник имеет 88 ядер, следовательно, общее количество нет. ядер 176
Каждый рабочий имеет 502 ГБ памяти, поэтому общий объем доступной памяти составляет 1004 ГБ
Я получаю исключение при запуске приложения:
Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
В этой ошибке упоминается два решения:
В качестве обходного пути вы можете отключить трансляцию, установив
spark.sql.autoBroadcastJoinThreshold to -1.
ИЛИ
Увеличьте память искрового драйвера, установив для параметра spark.driver.memory значение
более высокое значение.
Я пытаюсь запустить с настройкой дополнительной памяти драйвера, однако я хочу понять причину этой проблемы. Может кто-нибудь объяснить, пожалуйста.
Я использовал Java в своем коде.
РЕДАКТИРОВАТЬ 1
Я использую широковещательные переменные в моем коде.
РЕДАКТИРОВАТЬ 2
Добавление кода, содержащего переменные широковещания.
//1.
Dataset<Row> currencySet1 = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CURRENCY_CD).load();
currencySetCache = currencySet1.select(CURRENCY_CD, DECIMAL_POSITIONS).persist(StorageLevel.MEMORY_ONLY());
Dataset<Row> currencyCodes = currencySetCache.select(CURRENCY_CD);
currencySet = currencyCodes.as(Encoders.STRING()).collectAsList();
//2.
Dataset<Row> divisionSet = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CIS_DIVISION).load();
divisionSetCache = divisionSet.select(CIS_DIVISION).persist(StorageLevel.MEMORY_ONLY());
divisionList = divisionSetCache.as(Encoders.STRING()).collectAsList();
//3.
Dataset<Row> userIdSet = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", SC_USER).load();
userIdSetCache = userIdSet.select(USER_ID).persist(StorageLevel.MEMORY_ONLY());
userIdList = userIdSetCache.as(Encoders.STRING()).collectAsList();
ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);
ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);
ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);
//Validation -- Start
Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);
Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
broadcastVarForUserID.value()),encoder);
validateDataset.persist(StorageLevel.MEMORY_ONLY());