Я кодирую искровое потоковое задание о базе "RecoverableStatefulNetworkWordCount" на StatefulNetworkWordCount.scala ! и RecoverableNetworkWordCount.scala !
при перезапуске драйвера сообщает об исключении:
"(2) Когда задание Spark Streaming восстанавливается из контрольной точки, это исключение будет срабатывать, если в операциях DStream используется ссылка на СДР, не определенная заданием потоковой передачи. Для получения дополнительной информации см. SPARK-13758"
если я удаляю .initialState(initialRDD)
, он запускается и перезапускается правильно.
import org.apache.spark._
import org.apache.spark.streaming._
object SimpleApp {
def main(args: Array[String]) {
val checkpointDir = if(args.length >= 1) "hdfs://10.2.35.117:9000/spark-cp" else "./spark-cp"
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(2)) // new context
ssc.checkpoint(checkpointDir)
val lines = ssc.socketTextStream("10.2.35.117", 9999) // create DStreams
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc))
stateCounter.print
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
val sc = ssc.sparkContext
sc.setLogLevel("WARN")
ssc.start()
ssc.awaitTermination()
}
}
./spark-submit --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar
Мой вопрос: как правильно создать начальное состояние?