Исключение в потоке "broadcast-exchange-0" java.lang.OutOfMemoryError: Недостаточно памяти для построения и трансляции таблицы всем рабочим узлам - PullRequest
0 голосов
/ 27 августа 2018

Я использую приложение spark в конфигурации ниже:

1 мастер, 2 рабочих узла.

  • Каждый работник имеет 88 ядер, следовательно, общее количество нет. ядер 176

  • Каждый рабочий имеет 502 ГБ памяти, поэтому общий объем доступной памяти составляет 1004 ГБ

Я получаю исключение при запуске приложения:

Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:115)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
        at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

В этой ошибке упоминается два решения:

  1. В качестве обходного пути вы можете отключить трансляцию, установив spark.sql.autoBroadcastJoinThreshold to -1.

    ИЛИ

  2. Увеличьте память искрового драйвера, установив для параметра spark.driver.memory значение более высокое значение.

Я пытаюсь запустить с настройкой дополнительной памяти драйвера, однако я хочу понять причину этой проблемы. Может кто-нибудь объяснить, пожалуйста.

Я использовал Java в своем коде.

РЕДАКТИРОВАТЬ 1

Я использую широковещательные переменные в моем коде.

РЕДАКТИРОВАТЬ 2

Добавление кода, содержащего переменные широковещания.

//1.
        Dataset<Row> currencySet1 = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CURRENCY_CD).load();
        currencySetCache = currencySet1.select(CURRENCY_CD, DECIMAL_POSITIONS).persist(StorageLevel.MEMORY_ONLY());
        Dataset<Row> currencyCodes = currencySetCache.select(CURRENCY_CD);
        currencySet = currencyCodes.as(Encoders.STRING()).collectAsList();

        //2.
        Dataset<Row>  divisionSet = sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", CI_CIS_DIVISION).load();
        divisionSetCache = divisionSet.select(CIS_DIVISION).persist(StorageLevel.MEMORY_ONLY());
        divisionList = divisionSetCache.as(Encoders.STRING()).collectAsList();

        //3.
        Dataset<Row> userIdSet =  sparkSession.read().format("jdbc").option("url",connection ).option("dbtable", SC_USER).load();
        userIdSetCache = userIdSet.select(USER_ID).persist(StorageLevel.MEMORY_ONLY());
        userIdList = userIdSetCache.as(Encoders.STRING()).collectAsList();

ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);

        ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);

        ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);


        //Validation -- Start
        Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
        Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);


        Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
                broadcastVarForUserID.value()),encoder);
        validateDataset.persist(StorageLevel.MEMORY_ONLY());

1 Ответ

0 голосов
/ 28 августа 2018

Возможная основная причина: Значение по умолчанию «spark.driver.memory» только 1 Гб (зависит от дистрибутива), это очень небольшое число. Если вы читаете значительные объемы данных в драйвере, OutOfMemory может возникнуть легко, совет от исключения правильный.

Решение: Увеличьте "spark.driver.memory" и "spark.executor.memory" как минимум до 16 Гб.

...