Я пытаюсь создать динамическую c схему из JSON записей из текстового файла как , каждая запись будет иметь другую схему . Вот мой код:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json, col}
object streamingexample {
def main(args: Array[String]): Unit = {
val spark:SparkSession = SparkSession.builder()
.master("local[*]")
.appName("SparkByExamples")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val df1 = spark.readStream.textFile("C:\\Users\\sheol\\Desktop\\streaming")
val newdf11=df1
val json_schema = newdf11.select("value").collect().map(x => x.get(0)).mkString(",")
val df2 = df1.select(from_json($"value", schema_of_json(json_schema)).alias("value_new"))
val df3 = df2.select($"value_new.*")
df3.printSchema()
df3.writeStream
.option("truncate", "false")
.format("console")
.start()
.awaitTermination()
}
}
Я получаю следующую ошибку. Пожалуйста, помогите, как исправить код. Я много пробовал. невозможно выяснить.
Error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Пример данных:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}