Использование sparksession. sql, приводящее к не сериализуемой ошибке задачи - PullRequest
0 голосов
/ 07 апреля 2020

Я пытаюсь выполнить следующий код: -

SparkSession sparkSession = SparkSession
                .builder()
                .appName("test")
//                .master("local")
                .enableHiveSupport()
                .getOrCreate();


        Dataset<Row> df = sparkSession.sql("## My SQL query to HIVE here ##").toDF();
        Dataset<Row> rulesDataset = sparkSession.sql("select * from rules.md");
        String code = rulesDataset.collectAsList().get(0).get(0).toString();
        df.show(false);
        Script script=new GroovyShell().parse(code);

        UDF3 rulesExecutingUDF = new UDF3<Double,String,String, String>() {
            @Override
            public String call(Double val1,String val2,String val3) throws Exception {
                Binding binding = new Binding();
                binding.setVariable("VAL1",val1);
                binding.setVariable("VAL2", val2);
                binding.setVariable("VAL3", val3);
                script.setBinding(binding);
                Object value = script.run();
                return value.toString();
            }
        };
        sparkSession.udf().register("rulesExecutingUDF",rulesExecutingUDF, DataTypes.StringType);
        df=df.withColumn("NEW_COL",callUDF("rulesExecutingUDF",col("VAL1"),col("VAL2"),col("VAL3")));
        df.show();

Проблема в том, что я получаю ошибку сериализации, говоря, что задача не сериализуема. Я сделал много проб и ошибок и обнаружил, что утверждение

Dataset<Row> df = sparkSession.sql("## My SQL query to HIVE here ##").toDF();

может иметь какое-то отношение к этому. Я получаю этот набор данных из таблицы улья с сервера.

Я подготовил аналогичный набор данных с похожей схемой, и вместо этого запроса, если я использую жестко закодированную переменную, такую ​​как

StructField[] structFields = new StructField[]{
                new StructField("VAL1", DataTypes.DoubleType, true, Metadata.empty()),
                new StructField("VAL2", DataTypes.StringType, true, Metadata.empty()),
                new StructField("VAL3", DataTypes.StringType, true, Metadata.empty())
        };

        StructType structType = new StructType(structFields);

        List<Row> rows = new ArrayList<>();
        rows.add(RowFactory.create(160.0,"X","I"));
        rows.add(RowFactory.create(200.0,"D","C"));
        Dataset<Row> df = sparkSession.createDataFrame(rows, structType);

, я не получаю сериализованную ошибку и искро задание успешно выполняется.

Схема наборов данных, созданных в обоих направлениях, одинакова, а значения также получены из таблицы кустов. Я не могу понять, почему это происходит. Кто-нибудь может мне помочь?

Сообщение отладчика

Диагностика: Исключение класса пользователя: org. apache .spark.SparkException: Задача не сериализуема в организации. apache .spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner. scala: 345) в организации. apache .spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean (ClosureCleaner. scala: 335) в орг. apache .spark.util.ClosureCleaner $ .clean (ClosureCleaner. scala: 159) в орг. apache .spark.SparkContext.clean (SparkContext. scala: 2304) в орг. apache .spark.rdd.RDD $$ anonfun $ mapPartitionsWithIndex $ 1.apply (СДР. scala: 850) в орг. apache .spark.rdd.RDD $$ anonfun $ mapPartitionsWithIndex $ 1.apply (RDD. scala: 849) в орг. apache .spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope. scala: 151) в орг. apache .spark.rdd.RDDOperationScope $. withScope (RDDOperationScope. scala: 112) в орг. apache .spark.rdd.RDD.withScope (RDD. scala: 363) в орг. apache .spark.rdd.RDD.mapPartitionsWithIndex (RDD. scala: 849) в орг. apache .spark. sql .execution.WholeStageCodegenExe c .doExecute (WholeStageCodegenExe c. scala: 608) в org. apache .spark. sql .execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan. scala: 131) в орг. apache .spark. sql .execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan. scala: 127) в орг. apache .spark. sql .execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan. scala: 155) в org. apache .spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope. scala: 151) в org. apache .spark. sql .execution.SparkPlan.executeQuery (SparkPlan. scala: 152) в орг. apache .spark. sql .execution.SparkPlan.execute (SparkPlan. scala: 127) в орг. apache .spark. sql .execution.SparkPlan.getByteArrayRdd (SparkPlan. scala: 247) в орг. apache .spark. sql .execution.SparkPlan.executeTake (SparkPlan. scala: 337) в орг. apache .spark. sql .execution.CollectLimitExe c .executeCollect (limit. scala: 38) в орг. apache .spark. sql .Dataset.org $ apache $ spark $ sql $ Набор данных $$ collectFromPlan (Набор данных. scala: 3278) в орг. apache .spark. sql .Dataset $$ anonfun $ head $ 1.apply (Набор данных. scala: 2489) в org. apache .spark. sql .Dataset $$ anonfun $ head $ 1.apply (Набор данных. scala : 2489) at org. apache .spark. sql .Dataset $$ anonfun $ 52.apply (Набор данных. scala: 3259) в org. apache .spark. sql .execution.SQLExecution $. withNewExecutionId (SQLExecution. scala: 77) в орг. apache .spark. sql .Dataset.withAction (Набор данных. scala: 3258) в орг. apache .spark. sql .Dataset. голова (набор данных. scala: 2489) в орг. apache .spark. sql .Dataset.take (набор данных. scala: 2703) в орг. apache .spark. sql .Dataset. showString (Набор данных. scala: 254) в орг. apache .spark. sql .Dataset.show (Набор данных. scala: 725) на com.test2.FeefactMD.main (FeefactMD. java: 65) at sun.reflect. Родной метод. 1113 * .lang.reflect.Mehod.invoke (Метод. java: 498) в орг. apache .spark.deploy.yarn. ApplicationMaster $$ anon $ 4.run (ApplicationMaster. scala: 721) Вызывается: java .io.NotSerializableException: Script1 Стек сериализации: - объект не сериализуем (класс: Script1, значение: Script1@121b5eab) - элемент массива (индекс: 0) - массив (класс [L java .lang.Object; размер 1) - поле (класс: java .lang.invoke.SerializedLambda, имя: capturedArgs, тип: класс [L java .lang.Object;) - объект (класс java .lang.invoke.SerializedLambda, SerializedLambda [capturingClass = класс com.test2.FeefactMD, functionsInterfaceMethod = org / apache / spark / sql / api / java / UDF3.call :( Ljava / lang / Object; Ljava / lang / Object; Ljava / lang / Object;) Ljava / lang / Object ;, реализация = invokeStati c com / test2 / FeefactMD.lambda $ main $ c068ede9 $ 1: (Lgroovy / lang / Скрипт; Ljava / lang / Double; Ljava / lang / String; Ljava / lang / String;) Ljava / lang / String ;, instantiatedMethodType = (Ljava / lang / Double; Ljava / lang / String; Ljava / lang / String;) Ljava / lang / String ;, numCaptured = 1]) - данные writeReplace (класс: java .lang.invoke.SerializedLambda) - объект (класс com.test2.FeefactMD $$ Lambda $ 61/1143680308, com.test2.FeefactMD $$ Lambda $ 61 / 1143680308@1fe9c374) - поле (класс: org. apache .spark. sql .UDFRegistration $$ anonfun $ 30, имя : f $ 20, тип: interface org. apache .spark. sql .api. java .UDF3) - объект (класс org. apache .spark. sql .UDFRegistration $$ anonfun $ 30,) - элемент массива (индекс: 5) - массив (класс [L java .lang.Object ;, размер 6) - поле (класс: org. apache .spark. sql .execution.WholeStageCodegenExec $$ anonfun $ 10, имя: ссылки $ 1, тип: класс [L java .lang.Object;) - объект (класс org. apache .spark. sql .execution.WholeStageCodegenExec $$ anonfun $ 10,) в org. apache .spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger. scala: 40) в орг. apache .spark.serializer.JavaSerializationStream.writeObject (JavaSerializer. scala: 46) в орг. apache. spark.serializer.JavaSerializerInstance.serialize (JavaSerializer. scala: 100) в орг. apache .spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner. scala: 342)

...