java.io.NotSerializableException: график неожиданно нулевой, когда DStream сериализуется - PullRequest
0 голосов
/ 30 апреля 2018

Я новичок в Искра Потоковое программирование, пожалуйста, кто-нибудь объясните мне, в чем проблема Я имею в виду, что я повторяю нулевую структуру, но у меня есть класс производителя, который работает нормально мой исходный код:

public class Main3 implements java.io.Serializable {
     public static JavaDStream<Double> pr;
    public  void consumer() throws Exception{


    // Configure Spark to connect to Kafka running on local machine
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);



    Collection<String> topics = Arrays.asList("testing");
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka10WordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));
    final JavaInputDStream<ConsumerRecord<String, String>> receiver=
            KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
 JavaDStream<String> stream = receiver.map(new Function<ConsumerRecord<String,String>, String>() {
        @Override
        public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
            return kafkaRecord.value();
        }
    });


stream.foreachRDD( x->x.saveAsTextFile("/home/khouloud/Desktop/exemple/b")); //that does no do any thing 
   stream.foreachRDD( x-> {
        x.collect().stream().forEach(n-> System.out.println("item of list: "+n));
    }); // also this i see any thing in the console

    stream.foreachRDD( rdd -> {
        if (rdd.isEmpty()) System.out.println("its empty"); }); //nothing`


JavaPairDStream<Integer, List<Double>> points= stream.mapToPair(new PairFunction<String, Integer, List<Double>>(){
        @Override
        public Tuple2<Integer, List<Double>> call(String x) throws Exception {
            String[]  item = x.split(" ");
            List<Double> l = new ArrayList<Double>();
            for (int i= 1 ; i < item.length ; i++)
            {

                l.add(new Double(item[i]));
            }
            return new Tuple2<>(new Integer(item[0]), l);
        }}
    );`

Ошибка -

`org.apache.spark.SparkException: задача не сериализуется в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 340) в org.apache.spark.util.ClosureCleaner $ .org $ апач $ искра $ Util $ ClosureCleaner $$ чистый (ClosureCleaner.scala: 330) в org.apache.spark.util.ClosureCleaner $ .Почистить (ClosureCleaner.scala: 156) в org.apache.spark.SparkContext.clean (SparkContext.scala: 2294) в org.apache.spark.streaming.dstream.DStream $$ anonfun $ Карта $ 1.Apply (DStream.scala: 547) в org.apache.spark.streaming.dstream.DStream $$ anonfun $ Карта $ 1.Apply (DStream.scala: 547) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 112) в org.apache.spark.SparkContext.withScope (SparkContext.scala: 701) в org.apache.spark.streaming.StreamingContext.withScope (StreamingContext.scala: 265) в org.apache.spark.streaming.dstream.DStream.map (DStream.scala: 546) в org.apache.spark.streaming.api.java.JavaDStreamLike $ class.mapToPair (JavaDStreamLike.scala: 163) в org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapToPair (JavaDStreamLike.scala: 42) в Min.calculDegSim (Min.java:43) в SkyRule.execute (SkyRule.java:34) на Main3.consumer (Main3.java:159) на Исполнитель $ 2.run (Executer.java:27) в java.lang.Thread.run (Thread.java:748) Причина: java.io.NotSerializableException: график неожиданно обнуляется, когда DStream сериализуется. Стек сериализации:

в org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40) в org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 46) в org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 100) в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 337)

...