искровой поток - PullRequest
       18

искровой поток

0 голосов
/ 10 марта 2020

Я пытаюсь создать динамическую c схему из JSON записей из текстового файла как , каждая запись будет иметь другую схему . Вот мой код:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json, col}

object streamingexample {
  def main(args: Array[String]): Unit = {
    val spark:SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("SparkByExamples")
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._
    val df1 = spark.readStream.textFile("C:\\Users\\sheol\\Desktop\\streaming")
    val newdf11=df1
    val json_schema = newdf11.select("value").collect().map(x => x.get(0)).mkString(",")
    val df2 = df1.select(from_json($"value", schema_of_json(json_schema)).alias("value_new"))
    val df3 = df2.select($"value_new.*")
    df3.printSchema()
    df3.writeStream
      .option("truncate", "false")
      .format("console")
      .start()
      .awaitTermination()
     }
}

Я получаю следующую ошибку. Пожалуйста, помогите, как исправить код. Я много пробовал. невозможно выяснить.

Error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Пример данных:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

Ответы [ 2 ]

1 голос
/ 06 апреля 2020

Это утверждение в вашем коде вызывает проблему из вашего кода, как вы уже знаете.

val json_schema = newdf11.select("value").collect().map(x => x.get(0)).mkString(",")

Вы можете получить схему json другим способом, как показано ниже ...

val dd: DataFrame =    spark.read.json("C:\\Users\\sheol\\Desktop\\streaming")
      dd.show()
/** you can use  val df1 = spark.readStream.textFile(yourfile) also **/

      val json_schema = dd.schema.json;
      println(json_schema)

Результат:

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}

Вы можете уточнить ваши требования, я оставлю это вам

0 голосов
/ 10 марта 2020

Это исключение произошло, потому что вы пытаетесь получить доступ к данным из потока до его запуска. Проблемы с df3.printSchema () . Обязательно вызывайте эту функцию после запуска потока.

...