Задача Apache Spark дает исключение нулевого указателя - PullRequest
0 голосов
/ 01 октября 2019

В задании apache spark rdd моя задача не завершается и выдается исключение нулевого указателя.

Lost task 22.3 in stage 8.0 (TID 19700, 10.64.109.70): java.lang.NullPointerException
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:210)
        at com.google.common.base.Optional.of(Optional.java:85)
        at org.apache.spark.api.java.JavaUtils$.optionToOptional(JavaUtils.scala:30)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
        at com.my.SparkRDDHelper$18.call(SparkRDDHelper.java:457)
        at com.my.SparkRDDHelper$18.call(SparkRDDHelper.java:1)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at scala.Option.foreach(Option.scala:236) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37) ~[spark-cassandra-connector_2.10-1.6.3.jar:1.6.3]
        at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:61) ~[spark-cassandra-connector_2.10-1.6.3.jar:1.6.3]
        at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:486) ~[spark-cassandra-connector_2.10-1.6.3.jar:1.6.3]
        at com.my.VucCleanUpSparkService.processFinal(VucCleanUpSparkService.java:312) [myutility-0.0.1.jar:?]
        at com.my.Starter.main(Starter.java:109) [myutility-0.0.1.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_221]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) [spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) [spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) [spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) [spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) [spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
Caused by: java.lang.NullPointerException
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:210) ~[guava-18.0.jar:?]
        at com.google.common.base.Optional.of(Optional.java:85) ~[guava-18.0.jar:?]
        at org.apache.spark.api.java.JavaUtils$.optionToOptional(JavaUtils.scala:30) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at com.my.SparkRDDHelper$18.call(SparkRDDHelper.java:457) ~[myutility-0.0.1.jar:?]
        at com.my.SparkRDDHelper$18.call(SparkRDDHelper.java:1) ~[myutility-0.0.1.jar:?]
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.scheduler.Task.run(Task.scala:89) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_221]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_221]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]

вот код:

public static JavaRDD<SchemaLess> mapPartitionForDeletedCode(
            JavaPairRDD<String, Tuple2<Long, Optional<Long>>> parJavaRdd, final int rDays) {
        return parJavaRdd.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Tuple2<Long, Optional<Long>>>>, SchemaLess>() {
            private static final long serialVersionUID = 5501482349294705299L;

            @Override
            public Iterable<SchemaLess> call(Iterator<Tuple2<String, Tuple2<Long, Optional<Long>>>> parTuple) throws Exception {
                List<SchemaLess> theSchemaLess = new ArrayList<SchemaLess>();
                while(parTuple != null && parTuple.hasNext()) 
                {
                    Tuple2<String, Tuple2<Long, Optional<Long>>> theTuple = parTuple.next();
                    SchemaLess theData;
                    theData = new SchemaLess();
                    theData.setKey(theTuple._1);
                    theData.setAcode(Constants.ACODE);
                    theData.setTtl(compareTtl(theTuple._2._2.get().intValue(), rDays));
                    theSchemaLess.add(theData);
                }

                return theSchemaLess;
            }
        });
    }
...