Как сравнить два структурных типа в Scala и изменить тип данных столбцов в Scala? - PullRequest
0 голосов
/ 03 февраля 2019

Я пытаюсь переместить данные из GP в HDFS, используя Scala & Spark.

val execQuery    = "select * from schema.tablename"
val yearDF       = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema

Схема для yearDF:

root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
 |-- release_number: decimal(38,30) (nullable = true)
 |-- change_number: decimal(38,30) (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: decimal(15,0) (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

Схема той же таблицы в улье, котораязадается нашим проектом:

val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String

Итак, я взял hiveColumns и создал новый StructType, как показано ниже:

def convertDatatype(datatype: String): DataType = {
  val convert = datatype match {
    case "string"     => StringType
    case "bigint"     => LongType
    case "int"        => IntegerType
    case "double"     => DoubleType
    case "date"       => TimestampType
    case "boolean"    => BooleanType
    case "timestamp"  => TimestampType
  }
  convert
}


val schemaList = hiveColumns.split("\\|")
val newSchema  = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: double (nullable = true)
 |-- release_number: double (nullable = true)
 |-- change_number: double (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: long (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

Когда я пытаюсь применить мою новую схему: schemaStructTypeна yearDF, как показано ниже, я получаю исключение:

 Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double

Исключение возникает из-за преобразования десятичного числа в двойное.Что я не понимаю, так это как я могу преобразовать тип данных столбцов: table_refresh_delay_min, release_number, change_number, fdm_application_id в StructType: newSchema из DoubleType в соответствующие им типы данных, присутствующие в схеме yearDF.т.е.

Если столбец в yearDFSchema имеет десятичный тип данных с точностью больше нуля, в данном случае десятичный (38,30), мне нужно преобразовать тип данных этого же столбца в newSchema в DecimalType(38,30)

Может кто-нибудь дать мне знать, как мне этого добиться?

1 Ответ

0 голосов
/ 03 февраля 2019

Подобные ошибки возникают, когда вы пытаетесь применить схему к RDD[Row], используя API разработчика функции:

def createDataFrame(rows: List[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

В таких случаях хранимые типы данных должны соответствовать внешним (т.е. Тип значения в Scala ) типы данных , перечисленные в официальном SQL , и не применяется приведение типов или принуждение.

Поэтому вы, как пользователь, обязаны обеспечитьчто дата и схема совместимы.

Описание предоставленной вами проблемы указывает на довольно другой сценарий, который запрашивает CAST.Давайте создадим набор данных с точно такой же схемой, как в вашем примере:

val yearDF = spark.createDataFrame(
  sc.parallelize(Seq[Row]()),
  StructType(Seq(
    StructField("source_system_name", StringType),
    StructField("table_refresh_delay_min", DecimalType(38, 30)),
    StructField("release_number", DecimalType(38, 30)),
    StructField("change_number", DecimalType(38, 30)),
    StructField("interface_queue_enabled_flag", StringType),
    StructField("rework_enabled_flag", StringType),
    StructField("fdm_application_id", DecimalType(15, 0)),
    StructField("history_enabled_flag", StringType)
)))

yearDF.printSchema
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
 |-- release_number: decimal(38,30) (nullable = true)
 |-- change_number: decimal(38,30) (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: decimal(15,0) (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

и желаемыми типами, такими как

val dtypes = Seq(
  "source_system_name" -> "string",
  "table_refresh_delay_min" -> "double",
  "release_number" -> "double",
  "change_number" -> "double",
  "interface_queue_enabled_flag" -> "string",
  "rework_enabled_flag" -> "string",
  "fdm_application_id" -> "long",
  "history_enabled_flag" -> "string"
)

, тогда вы можете просто отобразить:

val mapping = dtypes.toMap

yearDF.select(yearDF.columns.map { c => col(c).cast(mapping(c)) }: _*).printSchema
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: double (nullable = true)
 |-- release_number: double (nullable = true)
 |-- change_number: double (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: long (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

Это, конечно, предполагает, что фактический и требуемый типы совместимы, и CAST разрешено .

Если у вас все еще возникают проблемы из-за особенностей конкретного драйвера JDBC, вам следует рассмотреть возможность размещения приведения непосредственно в запросе, либо вручную ( В Apache Spark 2.0.0, возможно ли получить запрос из внешней базы данных (а не получить всю таблицу)? )

val externalDtypes = Seq(
  "source_system_name" -> "text",
  "table_refresh_delay_min" -> "double precision",
  "release_number" -> "float8",
  "change_number" -> "float8",
  "interface_queue_enabled_flag" -> "string",
  "rework_enabled_flag" -> "string",
  "fdm_application_id" -> "bigint",
  "history_enabled_flag" -> "string"
)

val externalDtypes = dtypes.map { 
  case (c, t) => s"CAST(`$c` AS $t)" 
} .mkString(", ")

val dbTable = s"""(select $fields from schema.tablename) as tmp"""

или через пользовательскую схему:

spark.read
  .format("jdbc")
  .option(
    "customSchema",
    dtypes.map { case (c, t) => s"`$c` $t" } .mkString(", "))
  ...
  .load()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...