Вопрос больше не имеет смысла, поскольку dStreams устарели / прекращены.
В коде нужно учесть несколько вещей, поэтому точный вопрос трудно подобрать. Тем не менее, я должен был подумать так же, как я не эксперт по сериализации.
Вы можете найти несколько сообщений о попытках записи в таблицу Hive напрямую, а не о пути, в моем ответе я использую подход, но вы можете использовать свой подход Spark SQL для написания для TempView, это все возможно .
Я имитировал ввод из QueueStream, поэтому мне не нужно применять разбиение. Вы можете адаптировать это к вашей собственной ситуации, если вы будете следовать тому же «глобальному» подходу. Я решил написать в файл паркета, который создается при необходимости. Вы можете создать свой tempView и затем использовать spark.sql в соответствии с вашим первоначальным подходом.
Операции вывода на DStreams:
- print ()
- saveAsTextFiles (префикс, [суффикс])
- saveAsObjectFiles (префикс, [суффикс])
- saveAsHadoopFiles (префикс, [суффикс])
- foreachRDD (FUNC)
foreachRDD
Наиболее общий оператор вывода, который применяет функцию func к
каждый RDD генерируется из потока. Эта функция должна выдвигать данные
в каждом СДР на внешнюю систему, такую как сохранение СДР в файлы, или
запись его по сети в базу данных. Обратите внимание, что функция func
выполняется в процессе драйвера, выполняющего потоковое приложение,
и, как правило, будут иметь действия RDD, которые заставят
вычисление потоковых RDD.
В нем говорится о сохранении в файлы, но он может делать то, что вы хотите через foreachRDD, хотя я
Предполагается, что идея была для внешних систем. Сохранение в файлы быстрее
на мой взгляд, в отличие от прохождения шагов, чтобы написать таблицу
непосредственно. Вы хотите как можно быстрее разгрузить данные с помощью потоковой передачи, поскольку объемы обычно высоки.
Два шага:
В отдельном классе для Streaming Class - запустить под Spark 2.4:
case class Person(name: String, age: Int)
Тогда вам нужно применить логику потоковой передачи - вам может потребоваться импорт
что у меня в записной книжке, иначе я запустил это под DataBricks:
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import org.apache.spark.sql.SaveMode
val spark = SparkSession
.builder
.master("local[4]")
.config("spark.driver.cores", 2)
.appName("forEachRDD")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
val QS = ssc.queueStream(rddQueue)
QS.foreachRDD(q => {
if(!q.isEmpty) {
val q_flatMap = q.flatMap{x=>x}
val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
val df = q_withPerson.toDF()
df.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("SO_Quest_BigD")
}
}
)
ssc.start()
for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
rddQueue += ssc.sparkContext.parallelize(List(c))
}
ssc.awaitTermination()