Spart Streaming с кодом Кафки внутри foreachRdd не выполняется, если у меня есть функция, выполненная ЛОКАЛЬНО - PullRequest
0 голосов
/ 13 сентября 2018

У меня локально настроен спарк 2.2, и я работаю со scala

Конфигурация сеанса спарка ниже

val sparkSession = SparkSession
  .builder()
  .appName("My application")
  .config("es.nodes", "localhost:9200")
  .config("es.index.auto.create", true)
  .config("spark.streaming.backpressure.initialRate", "1")
  .config("spark.streaming.kafka.maxRatePerPartition", "7")
  .master("local[2]")
  .enableHiveSupport()
  .getOrCreate()

Когда я запускаю искру на моей локальной машине

  kafkaStream.foreachRDD(rdd => {
   calledFunction(rdd)
 })


def calledFunction(rdd: RDD[ConsumerRecord[String, String]]): Unit ={

 rdd.foreach(r=>{
 print("hello")})
}

для приведенного выше кода на моем локальном компьютере «привет» не печатается, но все задания выстраиваются в очередь.

, если я изменю свой код на

kafkaStream.foreachRDD(rdd => { rdd.foreach(r=>{ print("hello")}) })

тогда он печатает "привет" на консоли.

не могли бы вы мне помочь, в чем проблема?

1 Ответ

0 голосов
/ 13 сентября 2018

При работе с spark 1.6 его печать привет в консоли.Для справки вот пример кода

val message = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
  ssc,
  kafkaConf,
  Map("test" ->1),
  StorageLevel.MEMORY_ONLY
)
val lines = message.map(_._2)
lines.foreachRDD(rdd => {calledFunction(rdd)})


def calledFunction(rdd: RDD[String]): Unit ={
  rdd.foreach(r=>{
    print("hello")})
}

Надеюсь, это поможет.Поскольку сейчас я не могу восстановить ту же проблему с помощью spark 2.0 из-за несоответствия зависимостей.

...