отсутствует тип параметра в foreachRDD при потоковой передаче искры - PullRequest
0 голосов
/ 11 февраля 2019

У меня проблема с программой потокового воспроизведения.Я использую spark 2.3.0 для своей программы.Вот мой код

    case class Record(name: String, trQ: String, traW: String,traNS: String, traned: String, tranS: String,transwer: String, trABN: String,kafkatime: Long)
    val checkPointLocation = "/user/oo/sparkCheckpoint_old/" 
    def creatingFunc():StreamingContext = { 
     val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
       val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> "ffff.dl.uk.fff.com:8002",
       "security.protocol" -> "SASL_PLAINTEXT",
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       "group.id" -> "1",
       "auto.offset.reset" -> "latest",
       "enable.auto.commit" -> (false: java.lang.Boolean)
       )

       val topics = Array("mytopic")
       val from_kafkastream = KafkaUtils.createDirectStream[String, 
       String](
       ssc,
       PreferConsistent,
       Subscribe[String, String](topics, kafkaParams)
       )
       val strmk = from_kafkastream.map(record => 
      (record.value,record.timestamp))
      val splitup2 = strmk.map{ case (line1, line2) => 
     (line1.split(","),line2)}

      object SQLContextSingleton {
        @transient  private var instance: SQLContext = _

        def getInstance(sparkContext: SparkContext): SQLContext = {
          if (instance == null) {
            instance = new SQLContext(sparkContext)
          }
          instance
        }
      }
      splitup2.foreachRDD((rdd) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      spark.sparkContext.setLogLevel("ERROR")
      import sqlContext.implicits._
      val requestsDataFrame = rdd.map(w => Record(w(0).toString, 
    w(1).toString, w(2).toString,w(3).toString, w(4).toString, 
    w(5).toString,w(6).toString, w(7).toString,w(8).toString)).toDF()
      // am getting issue here
      requestsDataFrame.show()
      })

      }
      val context = StreamingContext.getOrCreate(checkPointLocation,creatingFunc)

Проблема в том, что я получаю следующую ошибку после запуска метода " creationFunc "

enter image description here

Примечание: если я запускаю программу без использования createFunc, она работает нормально ... Я не уверен, почему эта проблема возникает .. Я новичок в спарке.может кто-нибудь помочь мне

...