Интеграция загрузки Spark и Spring - PullRequest
0 голосов
/ 03 мая 2018

Мы пытаемся интегрировать загрузку Spark и Spring, к сожалению, каждый раз сталкиваемся с множеством проблем. После разрешения большинства из них мы застряли на исключении ниже

Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 14, xxxxx.ax.internal.cloudapp.net, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
        at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:638)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:597)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:606)
        at com.xxx.xxx.spark.Execute.run(Execute.java:46)
        at com.xxx.xxx.spark.Loader.process(Loader.java:505)
        at com.xxx.xxx.spark.Loader.run(Loader.java:122)
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:732)
        at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:716)
        at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:703)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:304)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
        at com.xxx.xxx.spark.AnalyseFec.main(AnalyseFec.java:11)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)

Это исключение возникает при попытке манипулировать преобразованным набором данных (созданным с помощью карты). Методы подсчета и сбора отлично работают,

В приведенном ниже примере создается исключение для dataf22.show ();

StructType schemata = DataTypes.createStructType(
                new StructField[]{
                        DataTypes.createStructField("Column1", DataTypes.StringType, false),
                        DataTypes.createStructField("Column2", DataTypes.DoubleType, false),
                        DataTypes.createStructField("Column3", DataTypes.DoubleType, false),
                });

        ExpressionEncoder<Row> encoder = RowEncoder.apply(schemata);

        Dataset<Row> dataf2 = session.read()
                .option("header", "true")
                .option("delimiter",separateur)
//              .schema(schemata)
                .csv(csvPath);


        dataf2.write().mode(SaveMode.Overwrite).parquet("xxx.parquet");
        Dataset<Row> parquetFileDF = session.read().parquet("xxx.parquet");

        Dataset<Row> dataf22 = parquetFileDF.map(row -> {
            return RowFactory.create(row.getAs("Column1"), 
                    Double.parseDouble(row.getAs("Column2").toString().replace(",", ".")),
                    Double.parseDouble(row.getAs("Column3").toString().replace(",", ".")));
        }, encoder);

        dataf22.printSchema();
        dataf22.show();

        dataf22.groupBy("Column1");
        Dataset<Row> ds1 = dataf22.groupBy("Column1").sum("Column2");
        ds1.show();
        Dataset<Row> ds2 = dataf22.groupBy("Column1").sum("Column3");
        ds2.show();

Изначально мы упаковывали с помощью Spring-boot-maven-plugin, spark-submit вызывал org.springframework.boot.loader.JarLauncher, который запускает наш начальный класс.

Когда мы перешли на maven-shade-plugin с некоторыми изменениями для поддержки весенней загрузки, вышеприведенное исключение исчезло, и мы смогли выполнить нашу программу, но только в режиме клиента. В кластерном режиме приложение никогда не запускается в Yarn, после нескольких попыток приложение завершится ошибкой без каких-либо ошибок, которые могут помочь решить проблему.

Я чувствую, что после выполнения программы на исполнителях появится проблема, связанная с classpath или загрузчиком классов

Удалось ли вам сделать эту интеграцию работающей? Если да, какой плагин Maven вы использовали? какие дополнительные параметры команды spark-submit вы использовали (extraclasspath…)

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...