SparkAppHandle не работает при обновлении с Spark 1.6 до Spark 2.3 - PullRequest
0 голосов
/ 10 июня 2019

Во время работы Spark 1.6 мой SparkAppHandle работал нормально. Но после обновления до Spark 2.3 задание spark не вызывается.

handle.getState () приходит как FAILED handle.getAppId () имеет значение NULL.

Нужно ли что-то добавить в мой pom.xml или мне нужно добавить некоторые свойства для версии Spark 2.3?

Ниже приведен код, который я запускал с Spark 1.6:

SparkAppHandle handle = new SparkLauncher().setSparkHome(sparkProp.getProperty("spark.home"))
                .setAppResource(sparkProp.getProperty("spark.resource_jar"))
                .setMainClass("com.mycom.bdec.spark.validator.SparkDBToDBComparison")
                .setConf("spark.serializer", sparkProp.getProperty("spark.serializer"))
                .setConf(SparkLauncher.DRIVER_MEMORY, "16g")//sparkProp.getProperty("spark.driver_memory"))
                .setConf(SparkLauncher.EXECUTOR_MEMORY, sparkProp.getProperty("spark.executor_memory"))
                .setConf(SparkLauncher.EXECUTOR_CORES, sparkProp.getProperty("spark.executor_cores"))
                .setConf("spark.submit.deployMode", sparkProp.getProperty("spark.deploy_mode"))
                .setConf(SparkLauncher.SPARK_MASTER, sparkProp.getProperty("spark.master"))
                .setConf("spark.shuffle.service.enabled", "true")
                .setConf("spark.dynamicAllocation.enabled", "false")
                .setConf("spark.executor.instances", "300")
                .setConf("spark.sql.shuffle.partitions", "2001")
                .setConf("spark.default.parallelism", "2001")
                .setConf("spark.yarn.executor.memoryOverhead", "7168")
                .setConf("spark.task.cpus", "2")
                .setConf("spark.scheduler.mode", "FAIR")
                .setConf("spark.yarn.queue", queueName)
                .setConf("spark.yarn.jar", "maprfs://"+sparkProp.getProperty("spark.cloak.jar"))
                .setConf("spark.sql.parquet.binaryAsString","true") 
                .setConf(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "-XX:MaxDirectMemorySize=1024m -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=30 -XX:+ScavengeBeforeFullGC -XX:+CMSScavengeBeforeRemark")
                .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,"-XX:MaxDirectMemorySize=1024m -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=30 -XX:+ScavengeBeforeFullGC -XX:+CMSScavengeBeforeRemark")
                .setConf("spark.network.timeout", "5000s")
                .setConf("spark.akka.frameSize", "1024")
                .setConf("spark.rpc.numRetries", "5")
                .setConf("spark.speculation", "true")
                .setConf("spark.locality.wait", "1s")
                .setConf("spark.akka.threads", "5")
                .setConf("spark.sql.parquet.filterPushdown", "true")
                .addJar(sparkProp.getProperty("spark.cloak.jar"))
                .addJar(sparkProp.getProperty("spark.csv.jar")) 
                .addJar(sparkProp.getProperty("spark.commons.csv.jar"))
                .addJar(sparkProp.getProperty("spark.nz.jdbc.jar"))
                .addJar(sparkProp.getProperty("spark.nz.connector.jar"))
                .addJar(sparkProp.getProperty("spark.gmdaisvcrypt.jar"))
                .setConf("spark.db1Type", metaDataMap.get("db1Type"))
                .setConf("spark.db2Type", metaDataMap.get("db2Type"))
                .setConf("spark.sqoopHiveTable1", sqoopHiveTable1)
                .setConf("spark.sqoopHiveTable2", sqoopHiveTable2)
                .setConf("spark.jdbcDbUrl1", metaDataMap.get("jdbcDbUrl1"))
                .setConf("spark.jdbcDriver1", metaDataMap.get("jdbcDriver1"))
                .setConf("spark.jdbcUsername1", metaDataMap.get("jdbcUsername1"))
                .setConf("spark.jdbcPassword1", metaDataMap.get("jdbcPassword1"))
                .setConf("spark.hiveDatabaseName1", metaDataMap.get("dbName"))
                .setConf("spark.jdbcDbUrl2", metaDataMap.get("jdbcDbUrl2"))
                .setConf("spark.jdbcDriver2", metaDataMap.get("jdbcDriver2"))
                .setConf("spark.jdbcUsername2", metaDataMap.get("jdbcUsername2"))
                .setConf("spark.jdbcPassword2", metaDataMap.get("jdbcPassword2"))
                .setConf("spark.hiveDatabaseName2", metaDataMap.get("dbName_2"))
                .setConf("spark.query1", metaDataMap.get("query_1"))
                .setConf("spark.query2", metaDataMap.get("query_2"))
                .setConf("spark.recordCount", metaDataMap.get("recordCount"))
                .setConf("spark.distinctValue", metaDataMap.get("distinctValues"))
                .setConf("spark.colNames", metaDataMap.get("tableKey"))
                .setConf("spark.colNames2", metaDataMap.get("tableKey_2"))
                .setConf("spark.dataTypeCheck", metaDataMap.get("dataTypeCheck"))
                .setConf("spark.dataTypeColNames", dataTypeColNames)
                .setConf("spark.dataLengthCheck", metaDataMap.get("dataLengthCheck"))
                .setConf("spark.dataLengthColNames", metaDataMap.get("dataLengthColNames"))
                .setConf("spark.multiColumnKeyValueCheck", metaDataMap.get("multiColumnKeyValueCheck"))
                .setConf("spark.keyCols", metaDataMap.get("keyCols"))
                .setConf("spark.userValuesCheck", metaDataMap.get("userValuesCheck"))
                .setConf("spark.valueCols", metaDataMap.get("valueCols"))
                .setConf("spark.testExecutionId", metaDataMap.get("testExecutionId"))
                .setConf("spark.ouputDataPath", ouputDataPath)
                .setConf("spark.resultStatDataPath", resultStatDataPath) 
                .setConf("spark.matchDataPath", matchDataPath)
                .setConf("spark.writeDelimeter", writeDelimeter)
                .setConf("spark.voltageConfigFile", metaDataMap.get("voltageConfigFile"))
                .setConf("spark.pIColNm_1", metaDataMap.get("pIColNm_1"))
                .setConf("spark.pIColNm_2", metaDataMap.get("pIColNm_2"))
                .setConf("spark.yarn.log4jpath", ExecutionUtil.getLogFilePath())
                .setConf("spark.yarn.logFilePath", ExecutionUtil.makeLog4jProperties(logDir)) 
                .setAppName("SparkDbToDbComparison")
                .startApplication();

        CountDownLatch countDownLatch = new CountDownLatch(1);
        handle.addListener(new SparkAppHandle.Listener() {
            boolean sparkJobIdSaved = false;
            @Override
            public void stateChanged(SparkAppHandle handle) {

                if(!sparkJobIdSaved && handle.getAppId()!=null){
                    logger.info("Getting Spark App Id -State " + handle.getState());
                    logger.info("Getting Spark App Id -getAppId " + handle.getAppId());
                    //Store SparkJobId in DB
                    ExecutionUtil.updateExecution(exec,handle.getAppId());
                    sparkJobIdSaved=true;
                }

                if (handle.getState().isFinal()) {
                    countDownLatch.countDown();
                }
            }

            @Override
            public void infoChanged(SparkAppHandle handle) {
            }
        });

        countDownLatch.await();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...