Вывести схему из rdd в Dataframe в Spark Scala - PullRequest
0 голосов
/ 12 апреля 2020

Этот вопрос является ссылкой из ( Spark - программным путем создания схемы с различными типами данных )

Я пытаюсь вывести схему из rdd в Dataframe, ниже приведен мой код

 def inferType(field: String) = field.split(":")(1) match {
    case "Integer" => IntegerType
    case "Double" => DoubleType
    case "String" => StringType
    case "Timestamp" => TimestampType
    case "Date" => DateType
    case "Long" => LongType
    case _ => StringType
 }


val header = c1:String|c2:String|c3:Double|c4:Integer|c5:String|c6:Timestamp|c7:Long|c8:Date

val df1 = Seq(("a|b|44.44|5|c|2018-01-01 01:00:00|456|2018-01-01")).toDF("data")
val rdd1 = df1.rdd.map(x => Row(x.getString(0).split("\\|"): _*))

val schema = StructType(header.split("\\|").map(column => StructField(column.split(":")(0), inferType(column), true)))
val df = spark.createDataFrame(rdd1, schema)
df.show()

Когда я делаю шоу, выдает ошибку ниже. Я должен выполнить эту операцию с данными более крупного масштаба, и у меня возникли проблемы с поиском правильного решения. Помогите ли вы кому-нибудь, пожалуйста, помогите мне найти решение для того или иного способа, где я могу этого достичь.

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int

Спасибо заранее

1 Ответ

0 голосов
/ 12 апреля 2020

Краткий ответ: Строка / Текст не может быть указан с пользовательскими типами / форматами.

То, что вы пытаетесь сделать, - это проанализировать строку как sql столбцы. Отличие от другого примера в том, что загрузку из csv вы пытаетесь просто. Рабочая версия может быть достигнута следующим образом:

// skipped other details such as schematype, spark session...

val header = "c1:String|c2:String|c3:Double|c4:Integer"

// Create `Row` from `Seq`
val row = Row.fromSeq(Seq("a|b|44.44|12|"))

// Create `RDD` from `Row`
val rdd: RDD[Row] = spark.sparkContext
  .makeRDD(List(row))
  .map { row =>
    row.getString(0).split("\\|") match {
      case Array(col1, col2, col3, col4) =>
        Row.fromTuple(col1, col2, col3.toDouble, col4.toInt)
    }
  }
val stt: StructType = StructType(
  header
    .split("\\|")
    .map(column => StructField(column, inferType(column), true))
)

val dataFrame = spark.createDataFrame(rdd, stt)
dataFrame.show()

Причина создания строки из типов Scala заключается в том, что здесь вводятся совместимые типы или Row уважаемые типы.
Примечание. Я пропустил дату и время связанные поля, преобразования даты сложны. Вы можете проверить мой другой ответ, как использовать отформатированные дату и время здесь

...