Я пытаюсь прочитать json и разобрать два значения valueStr1
и valueStr2
из темы Кафки KafkaStreamTestTopic1
с использованием потоковой передачи искры.И преобразовать его в фрейм данных для дальнейшей обработки.
Я выполняю код в оболочке spark, поэтому контекст spark sc
доступен.
Но когда я запускаю этот скрипт, онвыдает мне следующую ошибку:
ошибка: карта объектов не является членом пакета org.apache.spark.streaming.rdd val dfa = rdd.map (record => {
Ниже приведен скрипт:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import play.api.libs.json._
import org.apache.spark.sql._
val ssc = new StreamingContext(sc, Seconds(5))
val sparkSession = SparkSession.builder().appName("myApp").getOrCreate()
val sqlContext = new SQLContext(sc)
// Create direct kafka stream with brokers and topics
val topicsSet = Array("KafkaStreamTestTopic1").toSet
// Set kafka Parameters
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "my_group",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "false"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
)
val lines = stream.map(_.value)
lines.print()
case class MyObj(val one: JsValue)
lines.foreachRDD(rdd => {
println("Debug Entered")
import sparkSession.implicits._
import sqlContext.implicits._
val dfa = rdd.map(record => {
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val json: JsValue = Json.parse(record)
val value1 = (json \ "root" \ "child1" \ "child2" \ "valueStr1").getOrElse(null)
val value2 = (json \ "root" \ "child1" \ "child2" \ "valueStr2").getOrElse(null)
(new MyObj(value1), new MyObj(value2))
}).toDF()
dfa.show()
println("Dfa Size is: " + dfa.count())
})
ssc.start()