Проблема сериализации при обновлении файла JSON в Dataframe - PullRequest
1 голос
/ 09 мая 2019

Я читаю в JSON файле и сохраняю его в Dataframe .

val df1 = spark.read.option("multiline", "true")
            .json("dbfs:/something.json")

Схема этого файла выглядитследующее:

Connections:array
    element:struct
           Name:string
           Properties:struct
                   database:string
                   driver:string
                   hostname:string
                   password.encrypted:string
                   password.encrypted.keyARN:string
                   port:string
                   username:string
           Type:string

Я хотел бы создать функцию, которую можно было бы использовать повторно, когда я хочу добавить новое соединение.

Я не был уверен, каков наилучший способ сделать этоДолжен ли я создать новую схему, заполнить ее данными и добавить в исходный массив Connections, а затем просто записать обратно в файл?

Вот как я пытаюсь заставить его работать, но естьошибка с сериализацией.

import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, ArrayType, FloatType}

val zipsSchema3 = StructType(List(
  StructField("Name", StringType, true), 
  StructField("Properties", StructType(List(
      StructField("driver", StringType, true), 
      StructField("hostname", StringType, true), 
      StructField("password.encrypted", StringType, true), 
      StructField("password.encrypted.keyARN", StringType, true), 
      StructField("port", StringType, true), 
      StructField("username", StringType, true)
 ))),
  StructField("Type", StringType, true)
))

val data2 = Seq(
  Row("db2", struct("test","testHost","encpwd","keyTest","testPort","testUser"), "typeTest"))

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data2),
  zipsSchema3
)

Или есть какие-то встроенные функции, которые можно использовать в этом случае?

Заранее спасибо за все ваши предложения!:)

1 Ответ

1 голос
/ 09 мая 2019

Я не совсем уверен, почему, но ошибка сериализации исчезла, когда я запустил его так.

 val zipsSchema3 = StructType(List(
      StructField("Name", StringType, true), 
      StructField("Properties", StructType(List(
          StructField("driver", StringType, true), 
          StructField("hostname", StringType, true), 
          StructField("password.encrypted", StringType, true), 
          StructField("password.encrypted.keyARN", StringType, true), 
          StructField("port", StringType, true), 
          StructField("username", StringType, true)
     ))),
      StructField("Type", StringType, true)
    ))

val data2 = Seq(("db2", Seq("test","testHost","encpwd","keyTest","testPort","testUser"), "typeTest"))

val rdd = spark.sparkContext.parallelize(data2)
  .map{ case (name, props, sType) => Row(name, props, sType ) }

val df = spark.createDataFrame(
  rdd,
  zipsSchema3  
)
...