Ключ данных в KafkaStream является нулевым - PullRequest
0 голосов
/ 19 октября 2018

При использовании Spark Streaming для использования темы Apache Kafka с прямым потоком ключ данных в KafkaStream равен null

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparktest").setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val util = new PropertiesUtil("/common.properties")
    val offsetreset = util.getProperty("Dataclean_offsetRest")
    val brokerlist = util.getProperty("brokerlist")
    val zookeeperCon = util.getProperty("zookeeperCon")
    val groupid: String = util.getProperty("Dataclean_groupid")
    val sparkinterval = util.getProperty("Dataclean_sparkinterval").toInt
    val topicStr: String = util.getProperty("Dataclean_topic")
    val ssc = new StreamingContext(sc, Seconds(sparkinterval))
    val topic = topicStr.split(",").toSet
    val kafkaParams = Map("serializer.class" -> "kafka.serializer.StringEncoder", "metadata.broker.list" -> brokerlist, "zookeeper.connect" -> zookeeperCon,
      "auto.offset.reset" -> "smallest", "group.id" -> groupid,
      "zookeeper.session.timeout.ms" -> "40000")
    val kakfaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
    kakfaStream.transform(rdd=> {
      rdd.mapPartitions(records=> {
        records.map(json => {
          println("i am here")
          json._1
        })
      })
    }).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

enter image description here

но когда я печатаю json._2, я могу получить следующий результат:

enter image description here

...