Я пытаюсь перехватить события Кафки (которые я получаю в сериализованной форме), используя sparkStreaming в Scala.
Вот мой фрагмент кода:
val spark = SparkSession.builder().master("local[*]").appName("Spark-Kafka-Integration").getOrCreate()
spark.conf.set("spark.driver.allowMultipleContexts", "true")
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val topics=Set("<topic-name>")
val brokers="<some-list>"
val groupId="spark-streaming-test"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"auto.offset.reset" -> "earliest",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> groupId,
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val messages: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
messages.foreachRDD { rdd =>
println(rdd.toDF())
}
ssc.start()
ssc.awaitTermination()
Я получаю ошибкусообщение как: Ошибка: (59, 19) значение toDF не является членом org.apache.spark.rdd.RDD [org.apache.kafka.clients.consumer.ConsumerRecord [String, String]] println (rdd.toDF ())