Мы пытаемся интегрировать загрузку Spark и Spring, к сожалению, каждый раз сталкиваемся с множеством проблем. После разрешения большинства из них мы застряли на исключении ниже
Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 14, xxxxx.ax.internal.cloudapp.net, executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at org.apache.spark.sql.Dataset.show(Dataset.scala:638)
at org.apache.spark.sql.Dataset.show(Dataset.scala:597)
at org.apache.spark.sql.Dataset.show(Dataset.scala:606)
at com.xxx.xxx.spark.Execute.run(Execute.java:46)
at com.xxx.xxx.spark.Loader.process(Loader.java:505)
at com.xxx.xxx.spark.Loader.run(Loader.java:122)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:732)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:716)
at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:703)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:304)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
at com.xxx.xxx.spark.AnalyseFec.main(AnalyseFec.java:11)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Это исключение возникает при попытке манипулировать преобразованным набором данных (созданным с помощью карты). Методы подсчета и сбора отлично работают,
В приведенном ниже примере создается исключение для dataf22.show ();
StructType schemata = DataTypes.createStructType(
new StructField[]{
DataTypes.createStructField("Column1", DataTypes.StringType, false),
DataTypes.createStructField("Column2", DataTypes.DoubleType, false),
DataTypes.createStructField("Column3", DataTypes.DoubleType, false),
});
ExpressionEncoder<Row> encoder = RowEncoder.apply(schemata);
Dataset<Row> dataf2 = session.read()
.option("header", "true")
.option("delimiter",separateur)
// .schema(schemata)
.csv(csvPath);
dataf2.write().mode(SaveMode.Overwrite).parquet("xxx.parquet");
Dataset<Row> parquetFileDF = session.read().parquet("xxx.parquet");
Dataset<Row> dataf22 = parquetFileDF.map(row -> {
return RowFactory.create(row.getAs("Column1"),
Double.parseDouble(row.getAs("Column2").toString().replace(",", ".")),
Double.parseDouble(row.getAs("Column3").toString().replace(",", ".")));
}, encoder);
dataf22.printSchema();
dataf22.show();
dataf22.groupBy("Column1");
Dataset<Row> ds1 = dataf22.groupBy("Column1").sum("Column2");
ds1.show();
Dataset<Row> ds2 = dataf22.groupBy("Column1").sum("Column3");
ds2.show();
Изначально мы упаковывали с помощью Spring-boot-maven-plugin, spark-submit вызывал org.springframework.boot.loader.JarLauncher, который запускает наш начальный класс.
Когда мы перешли на maven-shade-plugin с некоторыми изменениями для поддержки весенней загрузки, вышеприведенное исключение исчезло, и мы смогли выполнить нашу программу, но только в режиме клиента. В кластерном режиме приложение никогда не запускается в Yarn, после нескольких попыток приложение завершится ошибкой без каких-либо ошибок, которые могут помочь решить проблему.
Я чувствую, что после выполнения программы на исполнителях появится проблема, связанная с classpath или загрузчиком классов
Удалось ли вам сделать эту интеграцию работающей? Если да, какой плагин Maven вы использовали? какие дополнительные параметры команды spark-submit вы использовали (extraclasspath…)
Спасибо