Spark Структурированные потоковые блоки данных Схема концентратора событий Определение проблемы - PullRequest
0 голосов
/ 26 июня 2019

У меня проблема с определением структуры документа json. enter image description here

Теперь я пытаюсь сделать ту же схему на streamread.

val jsonSchema = StructType([ StructField("associatedEntities", struct<driver:StringType,truck:StringType>, True), 
                          StructField("heading", StringType, True), 
                          StructField("location", struct<accuracyType:StringType,captureDateTime:StringType,cityStateCode:StringType,description:StringType,latitude:DoubleType,longitude:DoubleType,quality:StringType,transmitDateTime:StringType>, True), 
                          StructField("measurements", array<struct<type:StringType,uom:StringType,value:StringType>>, True), 
                          StructField("source", struct<entityType:StringType,key:StringType,vendor:StringType>, True), 
                          StructField("speed", DoubleType, True)])

val df = spark
 .readStream
 .format("eventhubs")
 //.schema(jsonSchema) 
 .options(ehConf.toMap)
 .load()

Когда я запускаю эту ячейку в записной книжке ": 15: ошибка: недопустимое начало простого выражения val jsonSchema = StructType ([StructField ("relatedEntities", struct, True), "

Редактировать: цель состоит в том, чтобы поместить данные в фрейм данных. Я могу получить строку json из тела сообщения концентратора событий, но я не уверен, что делать оттуда, если я не могу заставить работать схему.

1 Ответ

0 голосов
/ 07 июля 2019

Вы получаете сообщение об ошибке из-за определения вашей схемы.Определение схемы должно выглядеть примерно так:

import org.apache.spark.sql.types._

val jsonSchema = StructType(
                        Seq(StructField("associatedEntities", 
                                        StructType(Seq(
                                          StructField("driver", StringType), 
                                          StructField ("truck", StringType)
                                        ))),
                            StructField("heading", StringType),
                            StructField("measurements", ArrayType(StructType(Seq(StructField ("type", StringType), StructField ("uom", StringType), StructField("value", StringType)))))
                           )
                         )

Вы можете дважды проверить схему с помощью:

jsonSchema.printTreeString

Возвращая вам схему:

root
 |-- associatedEntities: struct (nullable = true)
 |    |-- driver: string (nullable = true)
 |    |-- truck: string (nullable = true)
 |-- heading: string (nullable = true)
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- uom: string (nullable = true)
 |    |    |-- value: string (nullable = true)

Какупоминается в комментариях вы получаете двоичные данные.Итак, сначала вы получите необработанный кадр данных:

val rawData = spark.readStream
  .format("eventhubs")
  .option(...)
  .load()

Вам необходимо:

  • преобразовать данные в строку
  • разобрать вложенный json
  • и сгладить его

Определить кадр данных с проанализированными данными:

val parsedData = rawData
   .selectExpr("cast (Body as string) as json")
   .select(from_json($"json", jsonSchema).as("data"))
   .select("data.*")
...