Я делаю потоковую искру от Кафки.Я хочу преобразовать мой rdd из kafka в dataframe.Я использую следующий подход.val ssc = new StreamingContext ("local [*]", "KafkaExample", Seconds (4))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "dofff2.dl.uk.feefr.com:8002",
"security.protocol" -> "SASL_PLAINTEXT",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("csv")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val strmk = stream.map(record => (record.value))
val rdd1 = strmk.map(line => line.split(',')).map(s => (s(0).toString, s(1).toString,s(2).toString,s(3).toString,s(4).toString, s(5).toString,s(6).toString,s(7).toString))
rdd1.foreachRDD((rdd, time) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val requestsDataFrame = rdd.map(w => Record(w._1, w._2, w._3,w._4, w._5, w._6,w._7, w._8)).toDF()
requestsDataFrame.createOrReplaceTempView("requests")
val word_df =sqlContext.sql("select * from requests ")
println(s"========= $time =========")
word_df.show()
})
Но в фрейме данных я хочу также включить метку времени из kafka.Может кто-нибудь помочь, как это сделать?