Как преобразовать мои данные в dstream из kafka в Dataframe - PullRequest
0 голосов
/ 08 февраля 2019

Я пытаюсь использовать данные из потоковой передачи Kafka Thorgh Spark. После использования я получаю следующий поток данных.

  scala> val strmk = stream.map(record => (record.value,record.timestamp))
   strmk: org.apache.spark.streaming.dstream.DStream[(String, Long)] = 
   org.apache.spark.streaming.dstream.MappedDStream@7ad7cdad

Теперь я хочу преобразовать это как фрейм данных.my (record.value) содержит 5 данных, разделенных символами "," и (record.timestamp), имеющих метку времени от kafka.например:

record.value содержит такие данные, как

   ton,2018,34,ford,GERMANY

record.timestamp содержит

2019-02-07 21:52:43

Я хочу преобразовать их в DF с отметкой времени в качестве последнего столбца.

Может кто-нибудь помочь, как это сделать.

Я пытался с последующим, но не уверен, как включить поле отметки времени также

  val requestsDataFrame = strmk.map(line => line._1.split(',')).map(s => (s(0).toString, s(1).toString,s(2).toString,s(3).toString,s(4).toString))

          requestsDataFrame.foreachRDD((rdd, time) => {
     val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
     import sqlContext.implicits._
     val requestsDataFrame = rdd.map(w => Record(w._1, w._2, w._3,w._4, w._5)).toDF()

     requestsDataFrame.createOrReplaceTempView("requests")
     val word_df =sqlContext.sql("select * from  requests ")


     println(s"========= $time =========")
     word_df.show()

     })

Может кто-то помочь, как включить поле отметки временитакже в датафрейме.

...