Как использовать foreachRDD в устаревшей потоковой передаче Spark - PullRequest
0 голосов
/ 03 января 2019

Я получаю исключение при использовании foreachRDD для обработки данных в формате CSV. Вот мой код

  case class Person(name: String, age: Long)
  val conf = new SparkConf()
  conf.setMaster("local[*]")
  conf.setAppName("CassandraExample").set("spark.driver.allowMultipleContexts", "true")
  val ssc = new StreamingContext(conf, Seconds(10))
  val smDstream=ssc.textFileStream("file:///home/sa/testFiles")

  smDstream.foreachRDD((rdd,time) => {
  val peopleDF = rdd.map(_.split(",")).map(attributes => 
  Person(attributes(0), attributes(1).trim.toInt)).toDF()
  peopleDF.createOrReplaceTempView("people")
  val teenagersDF = spark.sql("insert into table devDB.stam SELECT name, age 
  FROM people WHERE age BETWEEN 13 AND 29")
  //teenagersDF.show  
    })
  ssc.checkpoint("hdfs://go/hive/warehouse/devDB.db")
  ssc.start()

я получаю следующую ошибку java.io.NotSerializableException: контрольная точка DStream была включена, но DStreams с их функциями не сериализуемы org.apache.spark.streaming.StreamingContext Стек сериализации: - объект не сериализуем (класс: org.apache.spark.streaming.StreamingContext, значение: org.apache.spark.streaming.StreamingContext@1263422a) - поле (класс: $ iw, имя: ssc, тип: класс org.apache.spark.streaming.StreamingContext)

пожалуйста, помогите

1 Ответ

0 голосов
/ 03 января 2019

Вопрос больше не имеет смысла, поскольку dStreams устарели / прекращены.

В коде нужно учесть несколько вещей, поэтому точный вопрос трудно подобрать. Тем не менее, я должен был подумать так же, как я не эксперт по сериализации.

Вы можете найти несколько сообщений о попытках записи в таблицу Hive напрямую, а не о пути, в моем ответе я использую подход, но вы можете использовать свой подход Spark SQL для написания для TempView, это все возможно .

Я имитировал ввод из QueueStream, поэтому мне не нужно применять разбиение. Вы можете адаптировать это к вашей собственной ситуации, если вы будете следовать тому же «глобальному» подходу. Я решил написать в файл паркета, который создается при необходимости. Вы можете создать свой tempView и затем использовать spark.sql в соответствии с вашим первоначальным подходом.

Операции вывода на DStreams:

  • print ()
  • saveAsTextFiles (префикс, [суффикс])
  • saveAsObjectFiles (префикс, [суффикс])
  • saveAsHadoopFiles (префикс, [суффикс])
  • foreachRDD (FUNC)

foreachRDD

Наиболее общий оператор вывода, который применяет функцию func к каждый RDD генерируется из потока. Эта функция должна выдвигать данные в каждом СДР на внешнюю систему, такую ​​как сохранение СДР в файлы, или запись его по сети в базу данных. Обратите внимание, что функция func выполняется в процессе драйвера, выполняющего потоковое приложение, и, как правило, будут иметь действия RDD, которые заставят вычисление потоковых RDD.

В нем говорится о сохранении в файлы, но он может делать то, что вы хотите через foreachRDD, хотя я Предполагается, что идея была для внешних систем. Сохранение в файлы быстрее на мой взгляд, в отличие от прохождения шагов, чтобы написать таблицу непосредственно. Вы хотите как можно быстрее разгрузить данные с помощью потоковой передачи, поскольку объемы обычно высоки.

Два шага:

В отдельном классе для Streaming Class - запустить под Spark 2.4:

case class Person(name: String, age: Int)

Тогда вам нужно применить логику потоковой передачи - вам может потребоваться импорт что у меня в записной книжке, иначе я запустил это под DataBricks:

import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import org.apache.spark.sql.SaveMode

val spark = SparkSession
           .builder
           .master("local[4]")
           .config("spark.driver.cores", 2)
           .appName("forEachRDD")
           .getOrCreate()

val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) 

val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
val QS = ssc.queueStream(rddQueue) 

QS.foreachRDD(q => {
   if(!q.isEmpty) {   
      val q_flatMap = q.flatMap{x=>x}
      val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
      val df = q_withPerson.toDF()      

      df.write
        .format("parquet")
        .mode(SaveMode.Append)
        .saveAsTable("SO_Quest_BigD")
   }
 }
)

ssc.start()
for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
   rddQueue += ssc.sparkContext.parallelize(List(c))
} 
ssc.awaitTermination()    
...