mapWithState с ошибкой отчета initialState при восстановлении с контрольной точки - PullRequest
0 голосов
/ 11 июня 2019

Я кодирую искровое потоковое задание о базе "RecoverableStatefulNetworkWordCount" на StatefulNetworkWordCount.scala ! и RecoverableNetworkWordCount.scala !

при перезапуске драйвера сообщает об исключении: "(2) Когда задание Spark Streaming восстанавливается из контрольной точки, это исключение будет срабатывать, если в операциях DStream используется ссылка на СДР, не определенная заданием потоковой передачи. Для получения дополнительной информации см. SPARK-13758"

если я удаляю .initialState(initialRDD), он запускается и перезапускается правильно.

import org.apache.spark._
import org.apache.spark.streaming._

object SimpleApp {
    def main(args: Array[String]) {
        val checkpointDir = if(args.length >= 1) "hdfs://10.2.35.117:9000/spark-cp" else "./spark-cp"

        def functionToCreateContext(): StreamingContext = {
            val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount")

            val ssc = new StreamingContext(conf, Seconds(2))   // new context
            ssc.checkpoint(checkpointDir)

            val lines = ssc.socketTextStream("10.2.35.117", 9999) // create DStreams     
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map((_, 1))

            val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

            val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
                val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
                val output = (word, sum)
                state.update(sum)
                output
            }
            val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc))

            stateCounter.print

            ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
        val sc = ssc.sparkContext
        sc.setLogLevel("WARN")

        ssc.start()             
        ssc.awaitTermination()   
    }
}
./spark-submit --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar

Мой вопрос: как правильно создать начальное состояние?

...