Исключение scala.collection.immutable.List возникает при развертывании баночки пружинной загрузки на автономном кластере искры. - PullRequest
0 голосов
/ 30 ноября 2018

"невозможно назначить экземпляр scala.collection.immutable.List $ SerializationProxy полю org.apache.spark.rdd.RDD.org", возникающий при развертывании исполняемого файла jar, созданного с помощью springboot и упакованного maven, на кластере искр.Эта же программа успешно работает при работе в локальной среде Linux.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
....

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.12.RELEASE</version>
    <relativePath /> 
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.11</artifactId>
        <version>1.6.3</version>
    </dependency>
    <dependency>
        <groupId>com.thoughtworks.paranamer</groupId>
        <artifactId>paranamer</artifactId>
        <version>2.8</version>
    </dependency>
</dependencies>

<build>
    <finalName>spring-spark</finalName>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>

    <resources>
        <resource>
            <filtering>true</filtering>
            <directory>src/main/resources</directory>
            <excludes>
                <exclude>configuration/local/*</exclude>
                <exclude>configuration/dev/*</exclude>
            </excludes>
        </resource>
        <resource>
            <directory>src/main/resources/configuration/${env}</directory>
        </resource>
    </resources>
</build>

@SpringBootApplicationpublic 
class Application implements CommandLineRunner {

    @Autowired private SparkTestService service;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        service.run();
    }
}

config is

@Bean
@ConditionalOnMissingBean(SparkConf.class)
public SparkConf sparkConf() throws Exception {
    SparkConf conf = new SparkConf()
            .setAppName(appName)
            .setMaster("spark://192.168.43.20:7077");
    return conf;
}

@Bean
@ConditionalOnMissingBean(JavaSparkContext.class)
public JavaSparkContext javaSparkContext() throws Exception {
   return new JavaSparkContext(sparkConf());
}

aaaaa

@Service  public class SparkTestService implements Serializable {
  private static final long serialVersionUID = 1L;
  private final static Logger logger = 
            LoggerFactory.getLogger(SparkTestService.class);
  private static final Pattern SPACE = Pattern.compile(" ");

  @Autowired
  private transient JavaSparkContext sc;

  public Map<String, Integer> run() {
    Map<String, Integer> result = new HashMap<>();
    JavaRDD<String> lines = sc.textFile("/softwares/test").cache();
    System.out.println(lines.count());

    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        private static final long serialVersionUID = 1L;

        @Override
        public Iterator<String> call(String line) throws Exception {
            return Arrays.asList(line.split(" ")).iterator();
        }
    });

    JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public Tuple2<String, Integer> call(String word) throws Exception {
            return new Tuple2<String, Integer>(word, 1);
        }
    });

    JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            return v1 + v2;

        }
    });

    wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void call(Tuple2<String, Integer> wordCount) throws Exception {
            System.out.println(wordCount._1 + "------" + wordCount._2 + "times.");
        }
    });

    return result;
}

}

./spark-2.4.0-bin-hadoop2.7/bin/spark-submit --master spark://s0:7077 --driver-memory 600m  --executor-memory 600m --executor-cores 1  --class org.springframework.boot.loader.JarLauncher /dkd/softwares/spring-spark.jar

Исключение составляет:

Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
...