Spark применяет пользовательскую схему к DataFrame - PullRequest
0 голосов
/ 26 октября 2018

У меня есть данные в файле Parquet и я хочу применить к нему пользовательскую схему.

Мои исходные данные в Parquet такие, как показано ниже,

root
 |-- CUST_ID: decimal(9,0) (nullable = true)
 |-- INACTV_DT: string (nullable = true)
 |-- UPDT_DT: string (nullable = true)
 |-- ACTV_DT: string (nullable = true)
 |-- PMT_AMT: decimal(9,4) (nullable = true)
 |-- CMT_ID: decimal(38,14) (nullable = true)

Моя пользовательская схема приведена ниже,

root
 |-- CUST_ID: decimal(38,0) (nullable = false)
 |-- INACTV_DT: timestamp (nullable = false)
 |-- UPDT_DT: timestamp (nullable = false)
 |-- ACTV_DT: timestamp (nullable = true)
 |-- PMT_AMT: decimal(19,4) (nullable = true)
 |-- CMT_ID: decimal(38,14) (nullable = false)

Ниже приведен мой код для применения к нему нового фрейма данных

val customSchema = getOracleDBSchema(sparkSession, QUERY).schema
val DF_frmOldParkquet = sqlContext_par.read.parquet("src/main/resources/data_0_0_0.parquet")
val rows: RDD[Row] = DF_frmOldParkquet.rdd
val newDataFrame = sparkSession.sqlContext.createDataFrame(rows, tblSchema)
newDataFrame.printSchema()
newDataFrame.show()

При выполнении этой операции я получаю ошибку ниже.

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of timestamp
staticinvoke(class org.apache.spark.sql.types.Decimal$, DecimalType(38,0), fromDecimal, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, CUST_ID), DecimalType(38,0)), true) AS CUST_ID#27

1 Ответ

0 голосов
/ 26 октября 2018

Существует два основных применения схемы в Spark SQL

  • schema аргумент, переданный в schema метод из DataFrameReader, которыйиспользуется для преобразования данных в некоторых форматах (в основном, в текстовые файлы).В этом случае схема может использоваться для автоматического приведения входных записей.
  • schema аргумент, передаваемый createDataFrame (варианты, которые принимают RDD или List из Rows) SparkSession.В этом случае схема должна соответствовать данным и не используется для приведения.

Ничто из перечисленного не применимо в вашем случае:

  • Вводстрого типизирован, поэтому читатель игнорирует schema.

  • Схема не соответствует данным, поэтому ее нельзя использовать для createDataFrame.

В этом сценарии вы должны castкаждый столбец до нужного типа.Предполагая, что типы совместимы, что-то вроде этого должно работать

val newDataFrame = df.schema.fields.foldLeft(df){ 
  (df, s) => df.withColumn(s.name, df(s.name).cast(s.dataType))     
}

В зависимости от формата данных этого может быть достаточно или нет.Например, если поля, которые должны быть преобразованы в метки времени, не используют стандартное форматирование, приведение не будет работать, и вам придется использовать утилиты обработки даты и времени Spark.

...