Схема преобразования в Scala - PullRequest
0 голосов
/ 13 марта 2020

Я новичок в Scala, и мне нужен ваш совет!

У меня есть следующий DataFrame:

someDF.show()
+----+-----+--------+
|Col1| Col2|    Col3|
+----+-----+--------+
|   1|  Cat|01012020|
|   2|  Dog|15032018|
|   3|Mouse|22022019|
+----+-----+--------+

someDF.schema
Col1:string
Col2:string
Col3:string

Теперь я хотел бы преобразовать фрейм данных - изменить имена столбцов и типы данных примерно такие:

AnimalID:Integer
Animal:String
PurchaseDate:Date

Я планирую сохранить целевой DataFrame в виде файла Parquet. Я пытался сделать следующее. Сначала я создал класс:

case class animalSchema(AnimalID: Integer, Animal: String, PurchaseDate: DateTime)

Затем я использовал карту для изменения схемы:

val animals = someDF
  .map(p => animalSchema(p(0).toInteger, p(1).toString, p(2)))

К сожалению, возникли две проблемы:

a) Как я Не удалось найти тип Date или DateTime, доступный в Scala. Как правильно хранить Date в DataFrame? Позже я хотел бы иметь возможность сравнить ее с текущей датой.

b) Похоже, я не могу привести строку, хранящуюся в Col1, к Integer в AnimalID. Как это сделать?

Большое спасибо!

1 Ответ

2 голосов
/ 13 марта 2020
{    package spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, concat_ws, lit, substring}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType, TimestampType, DateType}

object SchemaTransf extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("Spark SQL")
    .getOrCreate()

  import spark.implicits._


  case class source(
   Col1:   String,
   Col2:   String,
   Col3:   String
 )

  val sourceDF = Seq(
    source("1", "Car", "01012020"),
    source("2", "Dog", "15032018"),
    source("3", "Mouse", "22022019"))
    .toDF()

  sourceDF.show(false)
//  +----+-----+--------+
//  |Col1|Col2 |Col3    |
//  +----+-----+--------+
//  |1   |Car  |01012020|
//  |2   |Dog  |15032018|
//  |3   |Mouse|22022019|
//  +----+-----+--------+




  val r1 = sourceDF.withColumn("c3",
    concat_ws("-",
      substring(col("Col3"), 5,4),
      substring(col("Col3"), 3,2),
      substring(col("Col3"), 1,2))
    )
      .select(
        'Col1.alias("AnimalID").cast(IntegerType),
        'Col2.alias("Animal").cast(StringType),
        'c3.alias("PurchaseDate").cast(TimestampType)
      )


  r1.show(false)
//  +--------+------+-------------------+
//  |AnimalID|Animal|PurchaseDate       |
//  +--------+------+-------------------+
//  |1       |Car   |2020-01-01 00:00:00|
//  |2       |Dog   |2018-03-15 00:00:00|
//  |3       |Mouse |2019-02-22 00:00:00|
//  +--------+------+-------------------+

  r1.printSchema()
//  root
//  |-- AnimalID: integer (nullable = true)
//  |-- Animal: string (nullable = true)
//  |-- PurchaseDate: timestamp (nullable = true)




}

}

...