Карта объекта ошибки Spark-shell не является членом пакета org.apache.spark.streaming.rdd - PullRequest
0 голосов
/ 09 июня 2018

Я пытаюсь прочитать json и разобрать два значения valueStr1 и valueStr2 из темы Кафки KafkaStreamTestTopic1 с использованием потоковой передачи искры.И преобразовать его в фрейм данных для дальнейшей обработки.

Я выполняю код в оболочке spark, поэтому контекст spark sc доступен.

Но когда я запускаю этот скрипт, онвыдает мне следующую ошибку:

ошибка: карта объектов не является членом пакета org.apache.spark.streaming.rdd val dfa = rdd.map (record => {

Ниже приведен скрипт:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import play.api.libs.json._
import org.apache.spark.sql._

val ssc = new StreamingContext(sc, Seconds(5))

val sparkSession = SparkSession.builder().appName("myApp").getOrCreate()
val sqlContext = new SQLContext(sc)

// Create direct kafka stream with brokers and topics
val topicsSet = Array("KafkaStreamTestTopic1").toSet

// Set kafka Parameters
val kafkaParams = Map[String, String](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
  "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
  "group.id" -> "my_group",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> "false"
)

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
)

val lines = stream.map(_.value)

lines.print()

case class MyObj(val one: JsValue)

lines.foreachRDD(rdd => {
  println("Debug Entered")

  import sparkSession.implicits._
  import sqlContext.implicits._


  val dfa = rdd.map(record => {

    implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]

    val json: JsValue = Json.parse(record)
    val value1 = (json \ "root" \ "child1" \ "child2" \ "valueStr1").getOrElse(null)
    val value2 = (json \ "root" \ "child1" \ "child2" \ "valueStr2").getOrElse(null)

    (new MyObj(value1), new MyObj(value2))

  }).toDF()

  dfa.show()
  println("Dfa Size is: " + dfa.count())


})

ssc.start()

Ответы [ 2 ]

0 голосов
/ 09 июня 2018

Добавьте зависимость spark-streaming в ваш менеджер сборки.

     "org.apache.spark" %% "spark-mllib" % SparkVersion,
    "org.apache.spark" %% "spark-streaming-kafka-0-10" % 
     "2.0.1"

Вы можете использовать maven или SBT для добавления во время сборки.

0 голосов
/ 09 июня 2018

Полагаю, проблема в том, что rdd также является пакетом (org.apache.spark.streaming.rdd), который вы автоматически импортировали со строкой:

import org.apache.spark.streaming._

Чтобы избежать подобных конфликтов, переименуйте вашу переменную во что-нибудьиначе, например myRdd:

lines.foreachRDD(myRdd => { /* ... */ })
...