Процесс потоковой передачи PySpark завершился неудачно с ожиданием завершения - PullRequest
0 голосов
/ 24 декабря 2018

Вот код потоковой передачи, который я запускаю, после запуска в течение двух дней он автоматически останавливается, я что-то пропустил?

def streaming_setup():
    stream = StreamingContext(sc.sparkContext, 10)
    stream.checkpoint(config['checkpointPath'])
    lines_data = stream.textFileStream(monitor_directory)
    lines_data.foreachRDD(persist_file)
    return stream

Сессия потоковой передачи Spark началась здесь,

ssc = StreamingContext.getOrCreate(config['checkpointPath'], lambda: streaming_setup())
ssc = streaming_setup()
ssc.start()
ssc.awaitTermination()

Следы стека.

INFO:py4j.java_gateway:Received command c on object id p2
ERROR:root:Exception Caught ========>An error occurred while calling o91.awaitTermination.
: java.lang.NullPointerException
    at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:120)
    at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1796)
    at org.apache.spark.rdd.RDD.unpersist(RDD.scala:216)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$3.apply(DStream.scala:458)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$3.apply(DStream.scala:457)
    at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:108)
    at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:108)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:108)
    at org.apache.spark.streaming.dstream.DStream.clearMetadata(DStream.scala:457)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$5.apply(DStream.scala:470)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$5.apply(DStream.scala:470)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.streaming.dstream.DStream.clearMetadata(DStream.scala:470)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$clearMetadata$2.apply(DStreamGraph.scala:134)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$clearMetadata$2.apply(DStreamGraph.scala:134)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.streaming.DStreamGraph.clearMetadata(DStreamGraph.scala:134)
    at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:263)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:184)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...