Я пытаюсь использовать данные из потоковой передачи Kafka Thorgh Spark. После использования я получаю следующий поток данных.
scala> val strmk = stream.map(record => (record.value,record.timestamp))
strmk: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
org.apache.spark.streaming.dstream.MappedDStream@7ad7cdad
Теперь я хочу преобразовать это как фрейм данных.my (record.value) содержит 5 данных, разделенных символами "," и (record.timestamp), имеющих метку времени от kafka.например:
record.value содержит такие данные, как
ton,2018,34,ford,GERMANY
record.timestamp содержит
2019-02-07 21:52:43
Я хочу преобразовать их в DF с отметкой времени в качестве последнего столбца.
Может кто-нибудь помочь, как это сделать.
Я пытался с последующим, но не уверен, как включить поле отметки времени также
val requestsDataFrame = strmk.map(line => line._1.split(',')).map(s => (s(0).toString, s(1).toString,s(2).toString,s(3).toString,s(4).toString))
requestsDataFrame.foreachRDD((rdd, time) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val requestsDataFrame = rdd.map(w => Record(w._1, w._2, w._3,w._4, w._5)).toDF()
requestsDataFrame.createOrReplaceTempView("requests")
val word_df =sqlContext.sql("select * from requests ")
println(s"========= $time =========")
word_df.show()
})
Может кто-то помочь, как включить поле отметки временитакже в датафрейме.